forkProcess(): watch out for ThreadRelocated
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool sched_state = SCHED_RUNNING;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
209 #if defined(GRAN)
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
211 #endif
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
216 #endif
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
219 #endif
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
223                                          StgTSO *t);
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
225                                     nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
228                                              StgTSO *t );
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static Capability *scheduleDoGC(Capability *cap, Task *task,
231                                 rtsBool force_major, 
232                                 void (*get_roots)(evac_fn));
233
234 static void unblockThread(Capability *cap, StgTSO *tso);
235 static rtsBool checkBlackHoles(Capability *cap);
236 static void AllRoots(evac_fn evac);
237
238 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
239
240 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
241                         rtsBool stop_at_atomically, StgPtr stop_here);
242
243 static void deleteThread (Capability *cap, StgTSO *tso);
244 static void deleteAllThreads (Capability *cap);
245
246 #ifdef DEBUG
247 static void printThreadBlockage(StgTSO *tso);
248 static void printThreadStatus(StgTSO *tso);
249 void printThreadQueue(StgTSO *tso);
250 #endif
251
252 #if defined(PARALLEL_HASKELL)
253 StgTSO * createSparkThread(rtsSpark spark);
254 StgTSO * activateSpark (rtsSpark spark);  
255 #endif
256
257 #ifdef DEBUG
258 static char *whatNext_strs[] = {
259   "(unknown)",
260   "ThreadRunGHC",
261   "ThreadInterpret",
262   "ThreadKilled",
263   "ThreadRelocated",
264   "ThreadComplete"
265 };
266 #endif
267
268 /* -----------------------------------------------------------------------------
269  * Putting a thread on the run queue: different scheduling policies
270  * -------------------------------------------------------------------------- */
271
272 STATIC_INLINE void
273 addToRunQueue( Capability *cap, StgTSO *t )
274 {
275 #if defined(PARALLEL_HASKELL)
276     if (RtsFlags.ParFlags.doFairScheduling) { 
277         // this does round-robin scheduling; good for concurrency
278         appendToRunQueue(cap,t);
279     } else {
280         // this does unfair scheduling; good for parallelism
281         pushOnRunQueue(cap,t);
282     }
283 #else
284     // this does round-robin scheduling; good for concurrency
285     appendToRunQueue(cap,t);
286 #endif
287 }
288
289 /* ---------------------------------------------------------------------------
290    Main scheduling loop.
291
292    We use round-robin scheduling, each thread returning to the
293    scheduler loop when one of these conditions is detected:
294
295       * out of heap space
296       * timer expires (thread yields)
297       * thread blocks
298       * thread ends
299       * stack overflow
300
301    GRAN version:
302      In a GranSim setup this loop iterates over the global event queue.
303      This revolves around the global event queue, which determines what 
304      to do next. Therefore, it's more complicated than either the 
305      concurrent or the parallel (GUM) setup.
306
307    GUM version:
308      GUM iterates over incoming messages.
309      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
310      and sends out a fish whenever it has nothing to do; in-between
311      doing the actual reductions (shared code below) it processes the
312      incoming messages and deals with delayed operations 
313      (see PendingFetches).
314      This is not the ugliest code you could imagine, but it's bloody close.
315
316    ------------------------------------------------------------------------ */
317
318 static Capability *
319 schedule (Capability *initialCapability, Task *task)
320 {
321   StgTSO *t;
322   Capability *cap;
323   StgThreadReturnCode ret;
324 #if defined(GRAN)
325   rtsEvent *event;
326 #elif defined(PARALLEL_HASKELL)
327   StgTSO *tso;
328   GlobalTaskId pe;
329   rtsBool receivedFinish = rtsFalse;
330 # if defined(DEBUG)
331   nat tp_size, sp_size; // stats only
332 # endif
333 #endif
334   nat prev_what_next;
335   rtsBool ready_to_gc;
336 #if defined(THREADED_RTS)
337   rtsBool first = rtsTrue;
338 #endif
339   
340   cap = initialCapability;
341
342   // Pre-condition: this task owns initialCapability.
343   // The sched_mutex is *NOT* held
344   // NB. on return, we still hold a capability.
345
346   IF_DEBUG(scheduler,
347            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
348                        task, initialCapability);
349       );
350
351   schedulePreLoop();
352
353   // -----------------------------------------------------------
354   // Scheduler loop starts here:
355
356 #if defined(PARALLEL_HASKELL)
357 #define TERMINATION_CONDITION        (!receivedFinish)
358 #elif defined(GRAN)
359 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
360 #else
361 #define TERMINATION_CONDITION        rtsTrue
362 #endif
363
364   while (TERMINATION_CONDITION) {
365
366 #if defined(GRAN)
367       /* Choose the processor with the next event */
368       CurrentProc = event->proc;
369       CurrentTSO = event->tso;
370 #endif
371
372 #if defined(THREADED_RTS)
373       if (first) {
374           // don't yield the first time, we want a chance to run this
375           // thread for a bit, even if there are others banging at the
376           // door.
377           first = rtsFalse;
378           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
379       } else {
380           // Yield the capability to higher-priority tasks if necessary.
381           yieldCapability(&cap, task);
382       }
383 #endif
384       
385 #if defined(THREADED_RTS)
386       schedulePushWork(cap,task);
387 #endif
388
389     // Check whether we have re-entered the RTS from Haskell without
390     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
391     // call).
392     if (cap->in_haskell) {
393           errorBelch("schedule: re-entered unsafely.\n"
394                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
395           stg_exit(EXIT_FAILURE);
396     }
397
398     // The interruption / shutdown sequence.
399     // 
400     // In order to cleanly shut down the runtime, we want to:
401     //   * make sure that all main threads return to their callers
402     //     with the state 'Interrupted'.
403     //   * clean up all OS threads assocated with the runtime
404     //   * free all memory etc.
405     //
406     // So the sequence for ^C goes like this:
407     //
408     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
409     //     arranges for some Capability to wake up
410     //
411     //   * all threads in the system are halted, and the zombies are
412     //     placed on the run queue for cleaning up.  We acquire all
413     //     the capabilities in order to delete the threads, this is
414     //     done by scheduleDoGC() for convenience (because GC already
415     //     needs to acquire all the capabilities).  We can't kill
416     //     threads involved in foreign calls.
417     // 
418     //   * sched_state := SCHED_INTERRUPTED
419     //
420     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
421     //
422     //   * sched_state := SCHED_SHUTTING_DOWN
423     //
424     //   * all workers exit when the run queue on their capability
425     //     drains.  All main threads will also exit when their TSO
426     //     reaches the head of the run queue and they can return.
427     //
428     //   * eventually all Capabilities will shut down, and the RTS can
429     //     exit.
430     //
431     //   * We might be left with threads blocked in foreign calls, 
432     //     we should really attempt to kill these somehow (TODO);
433     
434     switch (sched_state) {
435     case SCHED_RUNNING:
436         break;
437     case SCHED_INTERRUPTING:
438         IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
439 #if defined(THREADED_RTS)
440         discardSparksCap(cap);
441 #endif
442         /* scheduleDoGC() deletes all the threads */
443         cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
444         break;
445     case SCHED_INTERRUPTED:
446         IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED"));
447         break;
448     case SCHED_SHUTTING_DOWN:
449         IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
450         // If we are a worker, just exit.  If we're a bound thread
451         // then we will exit below when we've removed our TSO from
452         // the run queue.
453         if (task->tso == NULL && emptyRunQueue(cap)) {
454             return cap;
455         }
456         break;
457     default:
458         barf("sched_state: %d", sched_state);
459     }
460
461 #if defined(THREADED_RTS)
462     // If the run queue is empty, take a spark and turn it into a thread.
463     {
464         if (emptyRunQueue(cap)) {
465             StgClosure *spark;
466             spark = findSpark(cap);
467             if (spark != NULL) {
468                 IF_DEBUG(scheduler,
469                          sched_belch("turning spark of closure %p into a thread",
470                                      (StgClosure *)spark));
471                 createSparkThread(cap,spark);     
472             }
473         }
474     }
475 #endif // THREADED_RTS
476
477     scheduleStartSignalHandlers(cap);
478
479     // Only check the black holes here if we've nothing else to do.
480     // During normal execution, the black hole list only gets checked
481     // at GC time, to avoid repeatedly traversing this possibly long
482     // list each time around the scheduler.
483     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
484
485     scheduleCheckBlockedThreads(cap);
486
487     scheduleDetectDeadlock(cap,task);
488 #if defined(THREADED_RTS)
489     cap = task->cap;    // reload cap, it might have changed
490 #endif
491
492     // Normally, the only way we can get here with no threads to
493     // run is if a keyboard interrupt received during 
494     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
495     // Additionally, it is not fatal for the
496     // threaded RTS to reach here with no threads to run.
497     //
498     // win32: might be here due to awaitEvent() being abandoned
499     // as a result of a console event having been delivered.
500     if ( emptyRunQueue(cap) ) {
501 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
502         ASSERT(sched_state >= SCHED_INTERRUPTING);
503 #endif
504         continue; // nothing to do
505     }
506
507 #if defined(PARALLEL_HASKELL)
508     scheduleSendPendingMessages();
509     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
510         continue;
511
512 #if defined(SPARKS)
513     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
514 #endif
515
516     /* If we still have no work we need to send a FISH to get a spark
517        from another PE */
518     if (emptyRunQueue(cap)) {
519         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
520         ASSERT(rtsFalse); // should not happen at the moment
521     }
522     // from here: non-empty run queue.
523     //  TODO: merge above case with this, only one call processMessages() !
524     if (PacketsWaiting()) {  /* process incoming messages, if
525                                 any pending...  only in else
526                                 because getRemoteWork waits for
527                                 messages as well */
528         receivedFinish = processMessages();
529     }
530 #endif
531
532 #if defined(GRAN)
533     scheduleProcessEvent(event);
534 #endif
535
536     // 
537     // Get a thread to run
538     //
539     t = popRunQueue(cap);
540
541 #if defined(GRAN) || defined(PAR)
542     scheduleGranParReport(); // some kind of debuging output
543 #else
544     // Sanity check the thread we're about to run.  This can be
545     // expensive if there is lots of thread switching going on...
546     IF_DEBUG(sanity,checkTSO(t));
547 #endif
548
549 #if defined(THREADED_RTS)
550     // Check whether we can run this thread in the current task.
551     // If not, we have to pass our capability to the right task.
552     {
553         Task *bound = t->bound;
554       
555         if (bound) {
556             if (bound == task) {
557                 IF_DEBUG(scheduler,
558                          sched_belch("### Running thread %d in bound thread",
559                                      t->id));
560                 // yes, the Haskell thread is bound to the current native thread
561             } else {
562                 IF_DEBUG(scheduler,
563                          sched_belch("### thread %d bound to another OS thread",
564                                      t->id));
565                 // no, bound to a different Haskell thread: pass to that thread
566                 pushOnRunQueue(cap,t);
567                 continue;
568             }
569         } else {
570             // The thread we want to run is unbound.
571             if (task->tso) { 
572                 IF_DEBUG(scheduler,
573                          sched_belch("### this OS thread cannot run thread %d", t->id));
574                 // no, the current native thread is bound to a different
575                 // Haskell thread, so pass it to any worker thread
576                 pushOnRunQueue(cap,t);
577                 continue; 
578             }
579         }
580     }
581 #endif
582
583     cap->r.rCurrentTSO = t;
584     
585     /* context switches are initiated by the timer signal, unless
586      * the user specified "context switch as often as possible", with
587      * +RTS -C0
588      */
589     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
590         && !emptyThreadQueues(cap)) {
591         context_switch = 1;
592     }
593          
594 run_thread:
595
596     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
597                               (long)t->id, whatNext_strs[t->what_next]));
598
599 #if defined(PROFILING)
600     startHeapProfTimer();
601 #endif
602
603     // ----------------------------------------------------------------------
604     // Run the current thread 
605
606     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
607
608     prev_what_next = t->what_next;
609
610     errno = t->saved_errno;
611     cap->in_haskell = rtsTrue;
612
613     dirtyTSO(t);
614
615     recent_activity = ACTIVITY_YES;
616
617     switch (prev_what_next) {
618         
619     case ThreadKilled:
620     case ThreadComplete:
621         /* Thread already finished, return to scheduler. */
622         ret = ThreadFinished;
623         break;
624         
625     case ThreadRunGHC:
626     {
627         StgRegTable *r;
628         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
629         cap = regTableToCapability(r);
630         ret = r->rRet;
631         break;
632     }
633     
634     case ThreadInterpret:
635         cap = interpretBCO(cap);
636         ret = cap->r.rRet;
637         break;
638         
639     default:
640         barf("schedule: invalid what_next field");
641     }
642
643     cap->in_haskell = rtsFalse;
644
645     // The TSO might have moved, eg. if it re-entered the RTS and a GC
646     // happened.  So find the new location:
647     t = cap->r.rCurrentTSO;
648
649     // We have run some Haskell code: there might be blackhole-blocked
650     // threads to wake up now.
651     // Lock-free test here should be ok, we're just setting a flag.
652     if ( blackhole_queue != END_TSO_QUEUE ) {
653         blackholes_need_checking = rtsTrue;
654     }
655     
656     // And save the current errno in this thread.
657     // XXX: possibly bogus for SMP because this thread might already
658     // be running again, see code below.
659     t->saved_errno = errno;
660
661 #if defined(THREADED_RTS)
662     // If ret is ThreadBlocked, and this Task is bound to the TSO that
663     // blocked, we are in limbo - the TSO is now owned by whatever it
664     // is blocked on, and may in fact already have been woken up,
665     // perhaps even on a different Capability.  It may be the case
666     // that task->cap != cap.  We better yield this Capability
667     // immediately and return to normaility.
668     if (ret == ThreadBlocked) {
669         IF_DEBUG(scheduler,
670                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
671                              t->id, whatNext_strs[t->what_next]));
672         continue;
673     }
674 #endif
675
676     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
677
678     // ----------------------------------------------------------------------
679     
680     // Costs for the scheduler are assigned to CCS_SYSTEM
681 #if defined(PROFILING)
682     stopHeapProfTimer();
683     CCCS = CCS_SYSTEM;
684 #endif
685     
686 #if defined(THREADED_RTS)
687     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
688 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
689     IF_DEBUG(scheduler,debugBelch("sched: "););
690 #endif
691     
692     schedulePostRunThread();
693
694     ready_to_gc = rtsFalse;
695
696     switch (ret) {
697     case HeapOverflow:
698         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
699         break;
700
701     case StackOverflow:
702         scheduleHandleStackOverflow(cap,task,t);
703         break;
704
705     case ThreadYielding:
706         if (scheduleHandleYield(cap, t, prev_what_next)) {
707             // shortcut for switching between compiler/interpreter:
708             goto run_thread; 
709         }
710         break;
711
712     case ThreadBlocked:
713         scheduleHandleThreadBlocked(t);
714         break;
715
716     case ThreadFinished:
717         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
718         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
719         break;
720
721     default:
722       barf("schedule: invalid thread return code %d", (int)ret);
723     }
724
725     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
726     if (ready_to_gc) {
727       cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
728     }
729   } /* end of while() */
730
731   IF_PAR_DEBUG(verbose,
732                debugBelch("== Leaving schedule() after having received Finish\n"));
733 }
734
735 /* ----------------------------------------------------------------------------
736  * Setting up the scheduler loop
737  * ------------------------------------------------------------------------- */
738
739 static void
740 schedulePreLoop(void)
741 {
742 #if defined(GRAN) 
743     /* set up first event to get things going */
744     /* ToDo: assign costs for system setup and init MainTSO ! */
745     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
746               ContinueThread, 
747               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
748     
749     IF_DEBUG(gran,
750              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
751                         CurrentTSO);
752              G_TSO(CurrentTSO, 5));
753     
754     if (RtsFlags.GranFlags.Light) {
755         /* Save current time; GranSim Light only */
756         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
757     }      
758 #endif
759 }
760
761 /* -----------------------------------------------------------------------------
762  * schedulePushWork()
763  *
764  * Push work to other Capabilities if we have some.
765  * -------------------------------------------------------------------------- */
766
767 #if defined(THREADED_RTS)
768 static void
769 schedulePushWork(Capability *cap USED_IF_THREADS, 
770                  Task *task      USED_IF_THREADS)
771 {
772     Capability *free_caps[n_capabilities], *cap0;
773     nat i, n_free_caps;
774
775     // Check whether we have more threads on our run queue, or sparks
776     // in our pool, that we could hand to another Capability.
777     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
778         && sparkPoolSizeCap(cap) < 2) {
779         return;
780     }
781
782     // First grab as many free Capabilities as we can.
783     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
784         cap0 = &capabilities[i];
785         if (cap != cap0 && tryGrabCapability(cap0,task)) {
786             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
787                 // it already has some work, we just grabbed it at 
788                 // the wrong moment.  Or maybe it's deadlocked!
789                 releaseCapability(cap0);
790             } else {
791                 free_caps[n_free_caps++] = cap0;
792             }
793         }
794     }
795
796     // we now have n_free_caps free capabilities stashed in
797     // free_caps[].  Share our run queue equally with them.  This is
798     // probably the simplest thing we could do; improvements we might
799     // want to do include:
800     //
801     //   - giving high priority to moving relatively new threads, on 
802     //     the gournds that they haven't had time to build up a
803     //     working set in the cache on this CPU/Capability.
804     //
805     //   - giving low priority to moving long-lived threads
806
807     if (n_free_caps > 0) {
808         StgTSO *prev, *t, *next;
809         rtsBool pushed_to_all;
810
811         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
812
813         i = 0;
814         pushed_to_all = rtsFalse;
815
816         if (cap->run_queue_hd != END_TSO_QUEUE) {
817             prev = cap->run_queue_hd;
818             t = prev->link;
819             prev->link = END_TSO_QUEUE;
820             for (; t != END_TSO_QUEUE; t = next) {
821                 next = t->link;
822                 t->link = END_TSO_QUEUE;
823                 if (t->what_next == ThreadRelocated
824                     || t->bound == task) { // don't move my bound thread
825                     prev->link = t;
826                     prev = t;
827                 } else if (i == n_free_caps) {
828                     pushed_to_all = rtsTrue;
829                     i = 0;
830                     // keep one for us
831                     prev->link = t;
832                     prev = t;
833                 } else {
834                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
835                     appendToRunQueue(free_caps[i],t);
836                     if (t->bound) { t->bound->cap = free_caps[i]; }
837                     i++;
838                 }
839             }
840             cap->run_queue_tl = prev;
841         }
842
843         // If there are some free capabilities that we didn't push any
844         // threads to, then try to push a spark to each one.
845         if (!pushed_to_all) {
846             StgClosure *spark;
847             // i is the next free capability to push to
848             for (; i < n_free_caps; i++) {
849                 if (emptySparkPoolCap(free_caps[i])) {
850                     spark = findSpark(cap);
851                     if (spark != NULL) {
852                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
853                         newSpark(&(free_caps[i]->r), spark);
854                     }
855                 }
856             }
857         }
858
859         // release the capabilities
860         for (i = 0; i < n_free_caps; i++) {
861             task->cap = free_caps[i];
862             releaseCapability(free_caps[i]);
863         }
864     }
865     task->cap = cap; // reset to point to our Capability.
866 }
867 #endif
868
869 /* ----------------------------------------------------------------------------
870  * Start any pending signal handlers
871  * ------------------------------------------------------------------------- */
872
873 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
874 static void
875 scheduleStartSignalHandlers(Capability *cap)
876 {
877     if (signals_pending()) { // safe outside the lock
878         startSignalHandlers(cap);
879     }
880 }
881 #else
882 static void
883 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
884 {
885 }
886 #endif
887
888 /* ----------------------------------------------------------------------------
889  * Check for blocked threads that can be woken up.
890  * ------------------------------------------------------------------------- */
891
892 static void
893 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
894 {
895 #if !defined(THREADED_RTS)
896     //
897     // Check whether any waiting threads need to be woken up.  If the
898     // run queue is empty, and there are no other tasks running, we
899     // can wait indefinitely for something to happen.
900     //
901     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
902     {
903         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
904     }
905 #endif
906 }
907
908
909 /* ----------------------------------------------------------------------------
910  * Check for threads blocked on BLACKHOLEs that can be woken up
911  * ------------------------------------------------------------------------- */
912 static void
913 scheduleCheckBlackHoles (Capability *cap)
914 {
915     if ( blackholes_need_checking ) // check without the lock first
916     {
917         ACQUIRE_LOCK(&sched_mutex);
918         if ( blackholes_need_checking ) {
919             checkBlackHoles(cap);
920             blackholes_need_checking = rtsFalse;
921         }
922         RELEASE_LOCK(&sched_mutex);
923     }
924 }
925
926 /* ----------------------------------------------------------------------------
927  * Detect deadlock conditions and attempt to resolve them.
928  * ------------------------------------------------------------------------- */
929
930 static void
931 scheduleDetectDeadlock (Capability *cap, Task *task)
932 {
933
934 #if defined(PARALLEL_HASKELL)
935     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
936     return;
937 #endif
938
939     /* 
940      * Detect deadlock: when we have no threads to run, there are no
941      * threads blocked, waiting for I/O, or sleeping, and all the
942      * other tasks are waiting for work, we must have a deadlock of
943      * some description.
944      */
945     if ( emptyThreadQueues(cap) )
946     {
947 #if defined(THREADED_RTS)
948         /* 
949          * In the threaded RTS, we only check for deadlock if there
950          * has been no activity in a complete timeslice.  This means
951          * we won't eagerly start a full GC just because we don't have
952          * any threads to run currently.
953          */
954         if (recent_activity != ACTIVITY_INACTIVE) return;
955 #endif
956
957         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
958
959         // Garbage collection can release some new threads due to
960         // either (a) finalizers or (b) threads resurrected because
961         // they are unreachable and will therefore be sent an
962         // exception.  Any threads thus released will be immediately
963         // runnable.
964         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
965
966         recent_activity = ACTIVITY_DONE_GC;
967         
968         if ( !emptyRunQueue(cap) ) return;
969
970 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
971         /* If we have user-installed signal handlers, then wait
972          * for signals to arrive rather then bombing out with a
973          * deadlock.
974          */
975         if ( anyUserHandlers() ) {
976             IF_DEBUG(scheduler, 
977                      sched_belch("still deadlocked, waiting for signals..."));
978
979             awaitUserSignals();
980
981             if (signals_pending()) {
982                 startSignalHandlers(cap);
983             }
984
985             // either we have threads to run, or we were interrupted:
986             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
987         }
988 #endif
989
990 #if !defined(THREADED_RTS)
991         /* Probably a real deadlock.  Send the current main thread the
992          * Deadlock exception.
993          */
994         if (task->tso) {
995             switch (task->tso->why_blocked) {
996             case BlockedOnSTM:
997             case BlockedOnBlackHole:
998             case BlockedOnException:
999             case BlockedOnMVar:
1000                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
1001                 return;
1002             default:
1003                 barf("deadlock: main thread blocked in a strange way");
1004             }
1005         }
1006         return;
1007 #endif
1008     }
1009 }
1010
1011 /* ----------------------------------------------------------------------------
1012  * Process an event (GRAN only)
1013  * ------------------------------------------------------------------------- */
1014
1015 #if defined(GRAN)
1016 static StgTSO *
1017 scheduleProcessEvent(rtsEvent *event)
1018 {
1019     StgTSO *t;
1020
1021     if (RtsFlags.GranFlags.Light)
1022       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1023
1024     /* adjust time based on time-stamp */
1025     if (event->time > CurrentTime[CurrentProc] &&
1026         event->evttype != ContinueThread)
1027       CurrentTime[CurrentProc] = event->time;
1028     
1029     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1030     if (!RtsFlags.GranFlags.Light)
1031       handleIdlePEs();
1032
1033     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1034
1035     /* main event dispatcher in GranSim */
1036     switch (event->evttype) {
1037       /* Should just be continuing execution */
1038     case ContinueThread:
1039       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1040       /* ToDo: check assertion
1041       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1042              run_queue_hd != END_TSO_QUEUE);
1043       */
1044       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1045       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1046           procStatus[CurrentProc]==Fetching) {
1047         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1048               CurrentTSO->id, CurrentTSO, CurrentProc);
1049         goto next_thread;
1050       } 
1051       /* Ignore ContinueThreads for completed threads */
1052       if (CurrentTSO->what_next == ThreadComplete) {
1053         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1054               CurrentTSO->id, CurrentTSO, CurrentProc);
1055         goto next_thread;
1056       } 
1057       /* Ignore ContinueThreads for threads that are being migrated */
1058       if (PROCS(CurrentTSO)==Nowhere) { 
1059         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1060               CurrentTSO->id, CurrentTSO, CurrentProc);
1061         goto next_thread;
1062       }
1063       /* The thread should be at the beginning of the run queue */
1064       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1065         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1066               CurrentTSO->id, CurrentTSO, CurrentProc);
1067         break; // run the thread anyway
1068       }
1069       /*
1070       new_event(proc, proc, CurrentTime[proc],
1071                 FindWork,
1072                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1073       goto next_thread; 
1074       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1075       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1076
1077     case FetchNode:
1078       do_the_fetchnode(event);
1079       goto next_thread;             /* handle next event in event queue  */
1080       
1081     case GlobalBlock:
1082       do_the_globalblock(event);
1083       goto next_thread;             /* handle next event in event queue  */
1084       
1085     case FetchReply:
1086       do_the_fetchreply(event);
1087       goto next_thread;             /* handle next event in event queue  */
1088       
1089     case UnblockThread:   /* Move from the blocked queue to the tail of */
1090       do_the_unblock(event);
1091       goto next_thread;             /* handle next event in event queue  */
1092       
1093     case ResumeThread:  /* Move from the blocked queue to the tail of */
1094       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1095       event->tso->gran.blocktime += 
1096         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1097       do_the_startthread(event);
1098       goto next_thread;             /* handle next event in event queue  */
1099       
1100     case StartThread:
1101       do_the_startthread(event);
1102       goto next_thread;             /* handle next event in event queue  */
1103       
1104     case MoveThread:
1105       do_the_movethread(event);
1106       goto next_thread;             /* handle next event in event queue  */
1107       
1108     case MoveSpark:
1109       do_the_movespark(event);
1110       goto next_thread;             /* handle next event in event queue  */
1111       
1112     case FindWork:
1113       do_the_findwork(event);
1114       goto next_thread;             /* handle next event in event queue  */
1115       
1116     default:
1117       barf("Illegal event type %u\n", event->evttype);
1118     }  /* switch */
1119     
1120     /* This point was scheduler_loop in the old RTS */
1121
1122     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1123
1124     TimeOfLastEvent = CurrentTime[CurrentProc];
1125     TimeOfNextEvent = get_time_of_next_event();
1126     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1127     // CurrentTSO = ThreadQueueHd;
1128
1129     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1130                          TimeOfNextEvent));
1131
1132     if (RtsFlags.GranFlags.Light) 
1133       GranSimLight_leave_system(event, &ActiveTSO); 
1134
1135     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1136
1137     IF_DEBUG(gran, 
1138              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1139
1140     /* in a GranSim setup the TSO stays on the run queue */
1141     t = CurrentTSO;
1142     /* Take a thread from the run queue. */
1143     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1144
1145     IF_DEBUG(gran, 
1146              debugBelch("GRAN: About to run current thread, which is\n");
1147              G_TSO(t,5));
1148
1149     context_switch = 0; // turned on via GranYield, checking events and time slice
1150
1151     IF_DEBUG(gran, 
1152              DumpGranEvent(GR_SCHEDULE, t));
1153
1154     procStatus[CurrentProc] = Busy;
1155 }
1156 #endif // GRAN
1157
1158 /* ----------------------------------------------------------------------------
1159  * Send pending messages (PARALLEL_HASKELL only)
1160  * ------------------------------------------------------------------------- */
1161
1162 #if defined(PARALLEL_HASKELL)
1163 static StgTSO *
1164 scheduleSendPendingMessages(void)
1165 {
1166     StgSparkPool *pool;
1167     rtsSpark spark;
1168     StgTSO *t;
1169
1170 # if defined(PAR) // global Mem.Mgmt., omit for now
1171     if (PendingFetches != END_BF_QUEUE) {
1172         processFetches();
1173     }
1174 # endif
1175     
1176     if (RtsFlags.ParFlags.BufferTime) {
1177         // if we use message buffering, we must send away all message
1178         // packets which have become too old...
1179         sendOldBuffers(); 
1180     }
1181 }
1182 #endif
1183
1184 /* ----------------------------------------------------------------------------
1185  * Activate spark threads (PARALLEL_HASKELL only)
1186  * ------------------------------------------------------------------------- */
1187
1188 #if defined(PARALLEL_HASKELL)
1189 static void
1190 scheduleActivateSpark(void)
1191 {
1192 #if defined(SPARKS)
1193   ASSERT(emptyRunQueue());
1194 /* We get here if the run queue is empty and want some work.
1195    We try to turn a spark into a thread, and add it to the run queue,
1196    from where it will be picked up in the next iteration of the scheduler
1197    loop.
1198 */
1199
1200       /* :-[  no local threads => look out for local sparks */
1201       /* the spark pool for the current PE */
1202       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1203       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1204           pool->hd < pool->tl) {
1205         /* 
1206          * ToDo: add GC code check that we really have enough heap afterwards!!
1207          * Old comment:
1208          * If we're here (no runnable threads) and we have pending
1209          * sparks, we must have a space problem.  Get enough space
1210          * to turn one of those pending sparks into a
1211          * thread... 
1212          */
1213
1214         spark = findSpark(rtsFalse);            /* get a spark */
1215         if (spark != (rtsSpark) NULL) {
1216           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1217           IF_PAR_DEBUG(fish, // schedule,
1218                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1219                              tso->id, tso, advisory_thread_count));
1220
1221           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1222             IF_PAR_DEBUG(fish, // schedule,
1223                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1224                             spark));
1225             return rtsFalse; /* failed to generate a thread */
1226           }                  /* otherwise fall through & pick-up new tso */
1227         } else {
1228           IF_PAR_DEBUG(fish, // schedule,
1229                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1230                              spark_queue_len(pool)));
1231           return rtsFalse;  /* failed to generate a thread */
1232         }
1233         return rtsTrue;  /* success in generating a thread */
1234   } else { /* no more threads permitted or pool empty */
1235     return rtsFalse;  /* failed to generateThread */
1236   }
1237 #else
1238   tso = NULL; // avoid compiler warning only
1239   return rtsFalse;  /* dummy in non-PAR setup */
1240 #endif // SPARKS
1241 }
1242 #endif // PARALLEL_HASKELL
1243
1244 /* ----------------------------------------------------------------------------
1245  * Get work from a remote node (PARALLEL_HASKELL only)
1246  * ------------------------------------------------------------------------- */
1247     
1248 #if defined(PARALLEL_HASKELL)
1249 static rtsBool
1250 scheduleGetRemoteWork(rtsBool *receivedFinish)
1251 {
1252   ASSERT(emptyRunQueue());
1253
1254   if (RtsFlags.ParFlags.BufferTime) {
1255         IF_PAR_DEBUG(verbose, 
1256                 debugBelch("...send all pending data,"));
1257         {
1258           nat i;
1259           for (i=1; i<=nPEs; i++)
1260             sendImmediately(i); // send all messages away immediately
1261         }
1262   }
1263 # ifndef SPARKS
1264         //++EDEN++ idle() , i.e. send all buffers, wait for work
1265         // suppress fishing in EDEN... just look for incoming messages
1266         // (blocking receive)
1267   IF_PAR_DEBUG(verbose, 
1268                debugBelch("...wait for incoming messages...\n"));
1269   *receivedFinish = processMessages(); // blocking receive...
1270
1271         // and reenter scheduling loop after having received something
1272         // (return rtsFalse below)
1273
1274 # else /* activate SPARKS machinery */
1275 /* We get here, if we have no work, tried to activate a local spark, but still
1276    have no work. We try to get a remote spark, by sending a FISH message.
1277    Thread migration should be added here, and triggered when a sequence of 
1278    fishes returns without work. */
1279         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1280
1281       /* =8-[  no local sparks => look for work on other PEs */
1282         /*
1283          * We really have absolutely no work.  Send out a fish
1284          * (there may be some out there already), and wait for
1285          * something to arrive.  We clearly can't run any threads
1286          * until a SCHEDULE or RESUME arrives, and so that's what
1287          * we're hoping to see.  (Of course, we still have to
1288          * respond to other types of messages.)
1289          */
1290         rtsTime now = msTime() /*CURRENT_TIME*/;
1291         IF_PAR_DEBUG(verbose, 
1292                      debugBelch("--  now=%ld\n", now));
1293         IF_PAR_DEBUG(fish, // verbose,
1294              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1295                  (last_fish_arrived_at!=0 &&
1296                   last_fish_arrived_at+delay > now)) {
1297                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1298                      now, last_fish_arrived_at+delay, 
1299                      last_fish_arrived_at,
1300                      delay);
1301              });
1302   
1303         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1304             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1305           if (last_fish_arrived_at==0 ||
1306               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1307             /* outstandingFishes is set in sendFish, processFish;
1308                avoid flooding system with fishes via delay */
1309     next_fish_to_send_at = 0;  
1310   } else {
1311     /* ToDo: this should be done in the main scheduling loop to avoid the
1312              busy wait here; not so bad if fish delay is very small  */
1313     int iq = 0; // DEBUGGING -- HWL
1314     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1315     /* send a fish when ready, but process messages that arrive in the meantime */
1316     do {
1317       if (PacketsWaiting()) {
1318         iq++; // DEBUGGING
1319         *receivedFinish = processMessages();
1320       }
1321       now = msTime();
1322     } while (!*receivedFinish || now<next_fish_to_send_at);
1323     // JB: This means the fish could become obsolete, if we receive
1324     // work. Better check for work again? 
1325     // last line: while (!receivedFinish || !haveWork || now<...)
1326     // next line: if (receivedFinish || haveWork )
1327
1328     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1329       return rtsFalse;  // NB: this will leave scheduler loop
1330                         // immediately after return!
1331                           
1332     IF_PAR_DEBUG(fish, // verbose,
1333                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1334
1335   }
1336
1337     // JB: IMHO, this should all be hidden inside sendFish(...)
1338     /* pe = choosePE(); 
1339        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1340                 NEW_FISH_HUNGER);
1341
1342     // Global statistics: count no. of fishes
1343     if (RtsFlags.ParFlags.ParStats.Global &&
1344          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1345            globalParStats.tot_fish_mess++;
1346            }
1347     */ 
1348
1349   /* delayed fishes must have been sent by now! */
1350   next_fish_to_send_at = 0;  
1351   }
1352       
1353   *receivedFinish = processMessages();
1354 # endif /* SPARKS */
1355
1356  return rtsFalse;
1357  /* NB: this function always returns rtsFalse, meaning the scheduler
1358     loop continues with the next iteration; 
1359     rationale: 
1360       return code means success in finding work; we enter this function
1361       if there is no local work, thus have to send a fish which takes
1362       time until it arrives with work; in the meantime we should process
1363       messages in the main loop;
1364  */
1365 }
1366 #endif // PARALLEL_HASKELL
1367
1368 /* ----------------------------------------------------------------------------
1369  * PAR/GRAN: Report stats & debugging info(?)
1370  * ------------------------------------------------------------------------- */
1371
1372 #if defined(PAR) || defined(GRAN)
1373 static void
1374 scheduleGranParReport(void)
1375 {
1376   ASSERT(run_queue_hd != END_TSO_QUEUE);
1377
1378   /* Take a thread from the run queue, if we have work */
1379   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1380
1381     /* If this TSO has got its outport closed in the meantime, 
1382      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1383      * It has to be marked as TH_DEAD for this purpose.
1384      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1385
1386 JB: TODO: investigate wether state change field could be nuked
1387      entirely and replaced by the normal tso state (whatnext
1388      field). All we want to do is to kill tsos from outside.
1389      */
1390
1391     /* ToDo: write something to the log-file
1392     if (RTSflags.ParFlags.granSimStats && !sameThread)
1393         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1394
1395     CurrentTSO = t;
1396     */
1397     /* the spark pool for the current PE */
1398     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1399
1400     IF_DEBUG(scheduler, 
1401              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1402                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1403
1404     IF_PAR_DEBUG(fish,
1405              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1406                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1407
1408     if (RtsFlags.ParFlags.ParStats.Full && 
1409         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1410         (emitSchedule || // forced emit
1411          (t && LastTSO && t->id != LastTSO->id))) {
1412       /* 
1413          we are running a different TSO, so write a schedule event to log file
1414          NB: If we use fair scheduling we also have to write  a deschedule 
1415              event for LastTSO; with unfair scheduling we know that the
1416              previous tso has blocked whenever we switch to another tso, so
1417              we don't need it in GUM for now
1418       */
1419       IF_PAR_DEBUG(fish, // schedule,
1420                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1421
1422       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1423                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1424       emitSchedule = rtsFalse;
1425     }
1426 }     
1427 #endif
1428
1429 /* ----------------------------------------------------------------------------
1430  * After running a thread...
1431  * ------------------------------------------------------------------------- */
1432
1433 static void
1434 schedulePostRunThread(void)
1435 {
1436 #if defined(PAR)
1437     /* HACK 675: if the last thread didn't yield, make sure to print a 
1438        SCHEDULE event to the log file when StgRunning the next thread, even
1439        if it is the same one as before */
1440     LastTSO = t; 
1441     TimeOfLastYield = CURRENT_TIME;
1442 #endif
1443
1444   /* some statistics gathering in the parallel case */
1445
1446 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1447   switch (ret) {
1448     case HeapOverflow:
1449 # if defined(GRAN)
1450       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1451       globalGranStats.tot_heapover++;
1452 # elif defined(PAR)
1453       globalParStats.tot_heapover++;
1454 # endif
1455       break;
1456
1457      case StackOverflow:
1458 # if defined(GRAN)
1459       IF_DEBUG(gran, 
1460                DumpGranEvent(GR_DESCHEDULE, t));
1461       globalGranStats.tot_stackover++;
1462 # elif defined(PAR)
1463       // IF_DEBUG(par, 
1464       // DumpGranEvent(GR_DESCHEDULE, t);
1465       globalParStats.tot_stackover++;
1466 # endif
1467       break;
1468
1469     case ThreadYielding:
1470 # if defined(GRAN)
1471       IF_DEBUG(gran, 
1472                DumpGranEvent(GR_DESCHEDULE, t));
1473       globalGranStats.tot_yields++;
1474 # elif defined(PAR)
1475       // IF_DEBUG(par, 
1476       // DumpGranEvent(GR_DESCHEDULE, t);
1477       globalParStats.tot_yields++;
1478 # endif
1479       break; 
1480
1481     case ThreadBlocked:
1482 # if defined(GRAN)
1483       IF_DEBUG(scheduler,
1484                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1485                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1486                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1487                if (t->block_info.closure!=(StgClosure*)NULL)
1488                  print_bq(t->block_info.closure);
1489                debugBelch("\n"));
1490
1491       // ??? needed; should emit block before
1492       IF_DEBUG(gran, 
1493                DumpGranEvent(GR_DESCHEDULE, t)); 
1494       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1495       /*
1496         ngoq Dogh!
1497       ASSERT(procStatus[CurrentProc]==Busy || 
1498               ((procStatus[CurrentProc]==Fetching) && 
1499               (t->block_info.closure!=(StgClosure*)NULL)));
1500       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1501           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1502             procStatus[CurrentProc]==Fetching)) 
1503         procStatus[CurrentProc] = Idle;
1504       */
1505 # elif defined(PAR)
1506 //++PAR++  blockThread() writes the event (change?)
1507 # endif
1508     break;
1509
1510   case ThreadFinished:
1511     break;
1512
1513   default:
1514     barf("parGlobalStats: unknown return code");
1515     break;
1516     }
1517 #endif
1518 }
1519
1520 /* -----------------------------------------------------------------------------
1521  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1522  * -------------------------------------------------------------------------- */
1523
1524 static rtsBool
1525 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1526 {
1527     // did the task ask for a large block?
1528     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1529         // if so, get one and push it on the front of the nursery.
1530         bdescr *bd;
1531         lnat blocks;
1532         
1533         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1534         
1535         IF_DEBUG(scheduler,
1536                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1537                             (long)t->id, whatNext_strs[t->what_next], blocks));
1538         
1539         // don't do this if the nursery is (nearly) full, we'll GC first.
1540         if (cap->r.rCurrentNursery->link != NULL ||
1541             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1542                                                // if the nursery has only one block.
1543             
1544             ACQUIRE_SM_LOCK
1545             bd = allocGroup( blocks );
1546             RELEASE_SM_LOCK
1547             cap->r.rNursery->n_blocks += blocks;
1548             
1549             // link the new group into the list
1550             bd->link = cap->r.rCurrentNursery;
1551             bd->u.back = cap->r.rCurrentNursery->u.back;
1552             if (cap->r.rCurrentNursery->u.back != NULL) {
1553                 cap->r.rCurrentNursery->u.back->link = bd;
1554             } else {
1555 #if !defined(THREADED_RTS)
1556                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1557                        g0s0 == cap->r.rNursery);
1558 #endif
1559                 cap->r.rNursery->blocks = bd;
1560             }             
1561             cap->r.rCurrentNursery->u.back = bd;
1562             
1563             // initialise it as a nursery block.  We initialise the
1564             // step, gen_no, and flags field of *every* sub-block in
1565             // this large block, because this is easier than making
1566             // sure that we always find the block head of a large
1567             // block whenever we call Bdescr() (eg. evacuate() and
1568             // isAlive() in the GC would both have to do this, at
1569             // least).
1570             { 
1571                 bdescr *x;
1572                 for (x = bd; x < bd + blocks; x++) {
1573                     x->step = cap->r.rNursery;
1574                     x->gen_no = 0;
1575                     x->flags = 0;
1576                 }
1577             }
1578             
1579             // This assert can be a killer if the app is doing lots
1580             // of large block allocations.
1581             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1582             
1583             // now update the nursery to point to the new block
1584             cap->r.rCurrentNursery = bd;
1585             
1586             // we might be unlucky and have another thread get on the
1587             // run queue before us and steal the large block, but in that
1588             // case the thread will just end up requesting another large
1589             // block.
1590             pushOnRunQueue(cap,t);
1591             return rtsFalse;  /* not actually GC'ing */
1592         }
1593     }
1594     
1595     IF_DEBUG(scheduler,
1596              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1597                         (long)t->id, whatNext_strs[t->what_next]));
1598 #if defined(GRAN)
1599     ASSERT(!is_on_queue(t,CurrentProc));
1600 #elif defined(PARALLEL_HASKELL)
1601     /* Currently we emit a DESCHEDULE event before GC in GUM.
1602        ToDo: either add separate event to distinguish SYSTEM time from rest
1603        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1604     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1605         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1606                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1607         emitSchedule = rtsTrue;
1608     }
1609 #endif
1610       
1611     pushOnRunQueue(cap,t);
1612     return rtsTrue;
1613     /* actual GC is done at the end of the while loop in schedule() */
1614 }
1615
1616 /* -----------------------------------------------------------------------------
1617  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1618  * -------------------------------------------------------------------------- */
1619
1620 static void
1621 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1622 {
1623     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1624                                   (long)t->id, whatNext_strs[t->what_next]));
1625     /* just adjust the stack for this thread, then pop it back
1626      * on the run queue.
1627      */
1628     { 
1629         /* enlarge the stack */
1630         StgTSO *new_t = threadStackOverflow(cap, t);
1631         
1632         /* The TSO attached to this Task may have moved, so update the
1633          * pointer to it.
1634          */
1635         if (task->tso == t) {
1636             task->tso = new_t;
1637         }
1638         pushOnRunQueue(cap,new_t);
1639     }
1640 }
1641
1642 /* -----------------------------------------------------------------------------
1643  * Handle a thread that returned to the scheduler with ThreadYielding
1644  * -------------------------------------------------------------------------- */
1645
1646 static rtsBool
1647 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1648 {
1649     // Reset the context switch flag.  We don't do this just before
1650     // running the thread, because that would mean we would lose ticks
1651     // during GC, which can lead to unfair scheduling (a thread hogs
1652     // the CPU because the tick always arrives during GC).  This way
1653     // penalises threads that do a lot of allocation, but that seems
1654     // better than the alternative.
1655     context_switch = 0;
1656     
1657     /* put the thread back on the run queue.  Then, if we're ready to
1658      * GC, check whether this is the last task to stop.  If so, wake
1659      * up the GC thread.  getThread will block during a GC until the
1660      * GC is finished.
1661      */
1662     IF_DEBUG(scheduler,
1663              if (t->what_next != prev_what_next) {
1664                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1665                             (long)t->id, whatNext_strs[t->what_next]);
1666              } else {
1667                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1668                             (long)t->id, whatNext_strs[t->what_next]);
1669              }
1670         );
1671     
1672     IF_DEBUG(sanity,
1673              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1674              checkTSO(t));
1675     ASSERT(t->link == END_TSO_QUEUE);
1676     
1677     // Shortcut if we're just switching evaluators: don't bother
1678     // doing stack squeezing (which can be expensive), just run the
1679     // thread.
1680     if (t->what_next != prev_what_next) {
1681         return rtsTrue;
1682     }
1683     
1684 #if defined(GRAN)
1685     ASSERT(!is_on_queue(t,CurrentProc));
1686       
1687     IF_DEBUG(sanity,
1688              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1689              checkThreadQsSanity(rtsTrue));
1690
1691 #endif
1692
1693     addToRunQueue(cap,t);
1694
1695 #if defined(GRAN)
1696     /* add a ContinueThread event to actually process the thread */
1697     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1698               ContinueThread,
1699               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1700     IF_GRAN_DEBUG(bq, 
1701                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1702                   G_EVENTQ(0);
1703                   G_CURR_THREADQ(0));
1704 #endif
1705     return rtsFalse;
1706 }
1707
1708 /* -----------------------------------------------------------------------------
1709  * Handle a thread that returned to the scheduler with ThreadBlocked
1710  * -------------------------------------------------------------------------- */
1711
1712 static void
1713 scheduleHandleThreadBlocked( StgTSO *t
1714 #if !defined(GRAN) && !defined(DEBUG)
1715     STG_UNUSED
1716 #endif
1717     )
1718 {
1719 #if defined(GRAN)
1720     IF_DEBUG(scheduler,
1721              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1722                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1723              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1724     
1725     // ??? needed; should emit block before
1726     IF_DEBUG(gran, 
1727              DumpGranEvent(GR_DESCHEDULE, t)); 
1728     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1729     /*
1730       ngoq Dogh!
1731       ASSERT(procStatus[CurrentProc]==Busy || 
1732       ((procStatus[CurrentProc]==Fetching) && 
1733       (t->block_info.closure!=(StgClosure*)NULL)));
1734       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1735       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1736       procStatus[CurrentProc]==Fetching)) 
1737       procStatus[CurrentProc] = Idle;
1738     */
1739 #elif defined(PAR)
1740     IF_DEBUG(scheduler,
1741              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1742                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1743     IF_PAR_DEBUG(bq,
1744                  
1745                  if (t->block_info.closure!=(StgClosure*)NULL) 
1746                  print_bq(t->block_info.closure));
1747     
1748     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1749     blockThread(t);
1750     
1751     /* whatever we schedule next, we must log that schedule */
1752     emitSchedule = rtsTrue;
1753     
1754 #else /* !GRAN */
1755
1756       // We don't need to do anything.  The thread is blocked, and it
1757       // has tidied up its stack and placed itself on whatever queue
1758       // it needs to be on.
1759
1760 #if !defined(THREADED_RTS)
1761     ASSERT(t->why_blocked != NotBlocked);
1762              // This might not be true under THREADED_RTS: we don't have
1763              // exclusive access to this TSO, so someone might have
1764              // woken it up by now.  This actually happens: try
1765              // conc023 +RTS -N2.
1766 #endif
1767
1768     IF_DEBUG(scheduler,
1769              debugBelch("--<< thread %d (%s) stopped: ", 
1770                         t->id, whatNext_strs[t->what_next]);
1771              printThreadBlockage(t);
1772              debugBelch("\n"));
1773     
1774     /* Only for dumping event to log file 
1775        ToDo: do I need this in GranSim, too?
1776        blockThread(t);
1777     */
1778 #endif
1779 }
1780
1781 /* -----------------------------------------------------------------------------
1782  * Handle a thread that returned to the scheduler with ThreadFinished
1783  * -------------------------------------------------------------------------- */
1784
1785 static rtsBool
1786 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1787 {
1788     /* Need to check whether this was a main thread, and if so,
1789      * return with the return value.
1790      *
1791      * We also end up here if the thread kills itself with an
1792      * uncaught exception, see Exception.cmm.
1793      */
1794     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1795                                   t->id, whatNext_strs[t->what_next]));
1796
1797 #if defined(GRAN)
1798       endThread(t, CurrentProc); // clean-up the thread
1799 #elif defined(PARALLEL_HASKELL)
1800       /* For now all are advisory -- HWL */
1801       //if(t->priority==AdvisoryPriority) ??
1802       advisory_thread_count--; // JB: Caution with this counter, buggy!
1803       
1804 # if defined(DIST)
1805       if(t->dist.priority==RevalPriority)
1806         FinishReval(t);
1807 # endif
1808     
1809 # if defined(EDENOLD)
1810       // the thread could still have an outport... (BUG)
1811       if (t->eden.outport != -1) {
1812       // delete the outport for the tso which has finished...
1813         IF_PAR_DEBUG(eden_ports,
1814                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1815                               t->eden.outport, t->id));
1816         deleteOPT(t);
1817       }
1818       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1819       if (t->eden.epid != -1) {
1820         IF_PAR_DEBUG(eden_ports,
1821                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1822                            t->id, t->eden.epid));
1823         removeTSOfromProcess(t);
1824       }
1825 # endif 
1826
1827 # if defined(PAR)
1828       if (RtsFlags.ParFlags.ParStats.Full &&
1829           !RtsFlags.ParFlags.ParStats.Suppressed) 
1830         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1831
1832       //  t->par only contains statistics: left out for now...
1833       IF_PAR_DEBUG(fish,
1834                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1835                               t->id,t,t->par.sparkname));
1836 # endif
1837 #endif // PARALLEL_HASKELL
1838
1839       //
1840       // Check whether the thread that just completed was a bound
1841       // thread, and if so return with the result.  
1842       //
1843       // There is an assumption here that all thread completion goes
1844       // through this point; we need to make sure that if a thread
1845       // ends up in the ThreadKilled state, that it stays on the run
1846       // queue so it can be dealt with here.
1847       //
1848
1849       if (t->bound) {
1850
1851           if (t->bound != task) {
1852 #if !defined(THREADED_RTS)
1853               // Must be a bound thread that is not the topmost one.  Leave
1854               // it on the run queue until the stack has unwound to the
1855               // point where we can deal with this.  Leaving it on the run
1856               // queue also ensures that the garbage collector knows about
1857               // this thread and its return value (it gets dropped from the
1858               // all_threads list so there's no other way to find it).
1859               appendToRunQueue(cap,t);
1860               return rtsFalse;
1861 #else
1862               // this cannot happen in the threaded RTS, because a
1863               // bound thread can only be run by the appropriate Task.
1864               barf("finished bound thread that isn't mine");
1865 #endif
1866           }
1867
1868           ASSERT(task->tso == t);
1869
1870           if (t->what_next == ThreadComplete) {
1871               if (task->ret) {
1872                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1873                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1874               }
1875               task->stat = Success;
1876           } else {
1877               if (task->ret) {
1878                   *(task->ret) = NULL;
1879               }
1880               if (sched_state >= SCHED_INTERRUPTING) {
1881                   task->stat = Interrupted;
1882               } else {
1883                   task->stat = Killed;
1884               }
1885           }
1886 #ifdef DEBUG
1887           removeThreadLabel((StgWord)task->tso->id);
1888 #endif
1889           return rtsTrue; // tells schedule() to return
1890       }
1891
1892       return rtsFalse;
1893 }
1894
1895 /* -----------------------------------------------------------------------------
1896  * Perform a heap census, if PROFILING
1897  * -------------------------------------------------------------------------- */
1898
1899 static rtsBool
1900 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1901 {
1902 #if defined(PROFILING)
1903     // When we have +RTS -i0 and we're heap profiling, do a census at
1904     // every GC.  This lets us get repeatable runs for debugging.
1905     if (performHeapProfile ||
1906         (RtsFlags.ProfFlags.profileInterval==0 &&
1907          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1908
1909         // checking black holes is necessary before GC, otherwise
1910         // there may be threads that are unreachable except by the
1911         // blackhole queue, which the GC will consider to be
1912         // deadlocked.
1913         scheduleCheckBlackHoles(&MainCapability);
1914
1915         IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
1916         GarbageCollect(GetRoots, rtsTrue);
1917
1918         IF_DEBUG(scheduler, sched_belch("performing heap census"));
1919         heapCensus();
1920
1921         performHeapProfile = rtsFalse;
1922         return rtsTrue;  // true <=> we already GC'd
1923     }
1924 #endif
1925     return rtsFalse;
1926 }
1927
1928 /* -----------------------------------------------------------------------------
1929  * Perform a garbage collection if necessary
1930  * -------------------------------------------------------------------------- */
1931
1932 static Capability *
1933 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1934               rtsBool force_major, void (*get_roots)(evac_fn))
1935 {
1936     StgTSO *t;
1937 #ifdef THREADED_RTS
1938     static volatile StgWord waiting_for_gc;
1939     rtsBool was_waiting;
1940     nat i;
1941 #endif
1942
1943 #ifdef THREADED_RTS
1944     // In order to GC, there must be no threads running Haskell code.
1945     // Therefore, the GC thread needs to hold *all* the capabilities,
1946     // and release them after the GC has completed.  
1947     //
1948     // This seems to be the simplest way: previous attempts involved
1949     // making all the threads with capabilities give up their
1950     // capabilities and sleep except for the *last* one, which
1951     // actually did the GC.  But it's quite hard to arrange for all
1952     // the other tasks to sleep and stay asleep.
1953     //
1954         
1955     was_waiting = cas(&waiting_for_gc, 0, 1);
1956     if (was_waiting) {
1957         do {
1958             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1959             if (cap) yieldCapability(&cap,task);
1960         } while (waiting_for_gc);
1961         return cap;  // NOTE: task->cap might have changed here
1962     }
1963
1964     for (i=0; i < n_capabilities; i++) {
1965         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1966         if (cap != &capabilities[i]) {
1967             Capability *pcap = &capabilities[i];
1968             // we better hope this task doesn't get migrated to
1969             // another Capability while we're waiting for this one.
1970             // It won't, because load balancing happens while we have
1971             // all the Capabilities, but even so it's a slightly
1972             // unsavoury invariant.
1973             task->cap = pcap;
1974             context_switch = 1;
1975             waitForReturnCapability(&pcap, task);
1976             if (pcap != &capabilities[i]) {
1977                 barf("scheduleDoGC: got the wrong capability");
1978             }
1979         }
1980     }
1981
1982     waiting_for_gc = rtsFalse;
1983 #endif
1984
1985     /* Kick any transactions which are invalid back to their
1986      * atomically frames.  When next scheduled they will try to
1987      * commit, this commit will fail and they will retry.
1988      */
1989     { 
1990         StgTSO *next;
1991
1992         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1993             if (t->what_next == ThreadRelocated) {
1994                 next = t->link;
1995             } else {
1996                 next = t->global_link;
1997                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1998                     if (!stmValidateNestOfTransactions (t -> trec)) {
1999                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
2000                         
2001                         // strip the stack back to the
2002                         // ATOMICALLY_FRAME, aborting the (nested)
2003                         // transaction, and saving the stack of any
2004                         // partially-evaluated thunks on the heap.
2005                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
2006                         
2007 #ifdef REG_R1
2008                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2009 #endif
2010                     }
2011                 }
2012             }
2013         }
2014     }
2015     
2016     // so this happens periodically:
2017     if (cap) scheduleCheckBlackHoles(cap);
2018     
2019     IF_DEBUG(scheduler, printAllThreads());
2020
2021     /*
2022      * We now have all the capabilities; if we're in an interrupting
2023      * state, then we should take the opportunity to delete all the
2024      * threads in the system.
2025      */
2026     if (sched_state >= SCHED_INTERRUPTING) {
2027         deleteAllThreads(&capabilities[0]);
2028         sched_state = SCHED_INTERRUPTED;
2029     }
2030
2031     /* everybody back, start the GC.
2032      * Could do it in this thread, or signal a condition var
2033      * to do it in another thread.  Either way, we need to
2034      * broadcast on gc_pending_cond afterward.
2035      */
2036 #if defined(THREADED_RTS)
2037     IF_DEBUG(scheduler,sched_belch("doing GC"));
2038 #endif
2039     GarbageCollect(get_roots, force_major);
2040     
2041 #if defined(THREADED_RTS)
2042     // release our stash of capabilities.
2043     for (i = 0; i < n_capabilities; i++) {
2044         if (cap != &capabilities[i]) {
2045             task->cap = &capabilities[i];
2046             releaseCapability(&capabilities[i]);
2047         }
2048     }
2049     if (cap) {
2050         task->cap = cap;
2051     } else {
2052         task->cap = NULL;
2053     }
2054 #endif
2055
2056 #if defined(GRAN)
2057     /* add a ContinueThread event to continue execution of current thread */
2058     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2059               ContinueThread,
2060               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2061     IF_GRAN_DEBUG(bq, 
2062                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2063                   G_EVENTQ(0);
2064                   G_CURR_THREADQ(0));
2065 #endif /* GRAN */
2066
2067     return cap;
2068 }
2069
2070 /* ---------------------------------------------------------------------------
2071  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2072  * used by Control.Concurrent for error checking.
2073  * ------------------------------------------------------------------------- */
2074  
2075 StgBool
2076 rtsSupportsBoundThreads(void)
2077 {
2078 #if defined(THREADED_RTS)
2079   return rtsTrue;
2080 #else
2081   return rtsFalse;
2082 #endif
2083 }
2084
2085 /* ---------------------------------------------------------------------------
2086  * isThreadBound(tso): check whether tso is bound to an OS thread.
2087  * ------------------------------------------------------------------------- */
2088  
2089 StgBool
2090 isThreadBound(StgTSO* tso USED_IF_THREADS)
2091 {
2092 #if defined(THREADED_RTS)
2093   return (tso->bound != NULL);
2094 #endif
2095   return rtsFalse;
2096 }
2097
2098 /* ---------------------------------------------------------------------------
2099  * Singleton fork(). Do not copy any running threads.
2100  * ------------------------------------------------------------------------- */
2101
2102 #if !defined(mingw32_HOST_OS)
2103 #define FORKPROCESS_PRIMOP_SUPPORTED
2104 #endif
2105
2106 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2107 static void 
2108 deleteThread_(Capability *cap, StgTSO *tso);
2109 #endif
2110 StgInt
2111 forkProcess(HsStablePtr *entry
2112 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2113             STG_UNUSED
2114 #endif
2115            )
2116 {
2117 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2118     Task *task;
2119     pid_t pid;
2120     StgTSO* t,*next;
2121     Capability *cap;
2122     
2123 #if defined(THREADED_RTS)
2124     if (RtsFlags.ParFlags.nNodes > 1) {
2125         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2126         stg_exit(EXIT_FAILURE);
2127     }
2128 #endif
2129
2130     IF_DEBUG(scheduler,sched_belch("forking!"));
2131     
2132     // ToDo: for SMP, we should probably acquire *all* the capabilities
2133     cap = rts_lock();
2134     
2135     pid = fork();
2136     
2137     if (pid) { // parent
2138         
2139         // just return the pid
2140         rts_unlock(cap);
2141         return pid;
2142         
2143     } else { // child
2144         
2145         // Now, all OS threads except the thread that forked are
2146         // stopped.  We need to stop all Haskell threads, including
2147         // those involved in foreign calls.  Also we need to delete
2148         // all Tasks, because they correspond to OS threads that are
2149         // now gone.
2150
2151         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2152             if (t->what_next == ThreadRelocated) {
2153                 next = t->link;
2154             } else {
2155                 next = t->global_link;
2156                 // don't allow threads to catch the ThreadKilled
2157                 // exception, but we do want to raiseAsync() because these
2158                 // threads may be evaluating thunks that we need later.
2159                 deleteThread_(cap,t);
2160             }
2161         }
2162         
2163         // Empty the run queue.  It seems tempting to let all the
2164         // killed threads stay on the run queue as zombies to be
2165         // cleaned up later, but some of them correspond to bound
2166         // threads for which the corresponding Task does not exist.
2167         cap->run_queue_hd = END_TSO_QUEUE;
2168         cap->run_queue_tl = END_TSO_QUEUE;
2169
2170         // Any suspended C-calling Tasks are no more, their OS threads
2171         // don't exist now:
2172         cap->suspended_ccalling_tasks = NULL;
2173
2174         // Empty the all_threads list.  Otherwise, the garbage
2175         // collector may attempt to resurrect some of these threads.
2176         all_threads = END_TSO_QUEUE;
2177
2178         // Wipe the task list, except the current Task.
2179         ACQUIRE_LOCK(&sched_mutex);
2180         for (task = all_tasks; task != NULL; task=task->all_link) {
2181             if (task != cap->running_task) {
2182                 discardTask(task);
2183             }
2184         }
2185         RELEASE_LOCK(&sched_mutex);
2186
2187 #if defined(THREADED_RTS)
2188         // Wipe our spare workers list, they no longer exist.  New
2189         // workers will be created if necessary.
2190         cap->spare_workers = NULL;
2191         cap->returning_tasks_hd = NULL;
2192         cap->returning_tasks_tl = NULL;
2193 #endif
2194
2195         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2196         rts_checkSchedStatus("forkProcess",cap);
2197         
2198         rts_unlock(cap);
2199         hs_exit();                      // clean up and exit
2200         stg_exit(EXIT_SUCCESS);
2201     }
2202 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2203     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2204     return -1;
2205 #endif
2206 }
2207
2208 /* ---------------------------------------------------------------------------
2209  * Delete all the threads in the system
2210  * ------------------------------------------------------------------------- */
2211    
2212 static void
2213 deleteAllThreads ( Capability *cap )
2214 {
2215   StgTSO* t, *next;
2216   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2217   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2218       if (t->what_next == ThreadRelocated) {
2219           next = t->link;
2220       } else {
2221           next = t->global_link;
2222           deleteThread(cap,t);
2223       }
2224   }      
2225
2226   // The run queue now contains a bunch of ThreadKilled threads.  We
2227   // must not throw these away: the main thread(s) will be in there
2228   // somewhere, and the main scheduler loop has to deal with it.
2229   // Also, the run queue is the only thing keeping these threads from
2230   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2231
2232 #if !defined(THREADED_RTS)
2233   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2234   ASSERT(sleeping_queue == END_TSO_QUEUE);
2235 #endif
2236 }
2237
2238 /* -----------------------------------------------------------------------------
2239    Managing the suspended_ccalling_tasks list.
2240    Locks required: sched_mutex
2241    -------------------------------------------------------------------------- */
2242
2243 STATIC_INLINE void
2244 suspendTask (Capability *cap, Task *task)
2245 {
2246     ASSERT(task->next == NULL && task->prev == NULL);
2247     task->next = cap->suspended_ccalling_tasks;
2248     task->prev = NULL;
2249     if (cap->suspended_ccalling_tasks) {
2250         cap->suspended_ccalling_tasks->prev = task;
2251     }
2252     cap->suspended_ccalling_tasks = task;
2253 }
2254
2255 STATIC_INLINE void
2256 recoverSuspendedTask (Capability *cap, Task *task)
2257 {
2258     if (task->prev) {
2259         task->prev->next = task->next;
2260     } else {
2261         ASSERT(cap->suspended_ccalling_tasks == task);
2262         cap->suspended_ccalling_tasks = task->next;
2263     }
2264     if (task->next) {
2265         task->next->prev = task->prev;
2266     }
2267     task->next = task->prev = NULL;
2268 }
2269
2270 /* ---------------------------------------------------------------------------
2271  * Suspending & resuming Haskell threads.
2272  * 
2273  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2274  * its capability before calling the C function.  This allows another
2275  * task to pick up the capability and carry on running Haskell
2276  * threads.  It also means that if the C call blocks, it won't lock
2277  * the whole system.
2278  *
2279  * The Haskell thread making the C call is put to sleep for the
2280  * duration of the call, on the susepended_ccalling_threads queue.  We
2281  * give out a token to the task, which it can use to resume the thread
2282  * on return from the C function.
2283  * ------------------------------------------------------------------------- */
2284    
2285 void *
2286 suspendThread (StgRegTable *reg)
2287 {
2288   Capability *cap;
2289   int saved_errno = errno;
2290   StgTSO *tso;
2291   Task *task;
2292
2293   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2294    */
2295   cap = regTableToCapability(reg);
2296
2297   task = cap->running_task;
2298   tso = cap->r.rCurrentTSO;
2299
2300   IF_DEBUG(scheduler,
2301            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2302
2303   // XXX this might not be necessary --SDM
2304   tso->what_next = ThreadRunGHC;
2305
2306   threadPaused(cap,tso);
2307
2308   if(tso->blocked_exceptions == NULL)  {
2309       tso->why_blocked = BlockedOnCCall;
2310       tso->blocked_exceptions = END_TSO_QUEUE;
2311   } else {
2312       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2313   }
2314
2315   // Hand back capability
2316   task->suspended_tso = tso;
2317
2318   ACQUIRE_LOCK(&cap->lock);
2319
2320   suspendTask(cap,task);
2321   cap->in_haskell = rtsFalse;
2322   releaseCapability_(cap);
2323   
2324   RELEASE_LOCK(&cap->lock);
2325
2326 #if defined(THREADED_RTS)
2327   /* Preparing to leave the RTS, so ensure there's a native thread/task
2328      waiting to take over.
2329   */
2330   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2331 #endif
2332
2333   errno = saved_errno;
2334   return task;
2335 }
2336
2337 StgRegTable *
2338 resumeThread (void *task_)
2339 {
2340     StgTSO *tso;
2341     Capability *cap;
2342     int saved_errno = errno;
2343     Task *task = task_;
2344
2345     cap = task->cap;
2346     // Wait for permission to re-enter the RTS with the result.
2347     waitForReturnCapability(&cap,task);
2348     // we might be on a different capability now... but if so, our
2349     // entry on the suspended_ccalling_tasks list will also have been
2350     // migrated.
2351
2352     // Remove the thread from the suspended list
2353     recoverSuspendedTask(cap,task);
2354
2355     tso = task->suspended_tso;
2356     task->suspended_tso = NULL;
2357     tso->link = END_TSO_QUEUE;
2358     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2359     
2360     if (tso->why_blocked == BlockedOnCCall) {
2361         awakenBlockedQueue(cap,tso->blocked_exceptions);
2362         tso->blocked_exceptions = NULL;
2363     }
2364     
2365     /* Reset blocking status */
2366     tso->why_blocked  = NotBlocked;
2367     
2368     cap->r.rCurrentTSO = tso;
2369     cap->in_haskell = rtsTrue;
2370     errno = saved_errno;
2371
2372     /* We might have GC'd, mark the TSO dirty again */
2373     dirtyTSO(tso);
2374
2375     IF_DEBUG(sanity, checkTSO(tso));
2376
2377     return &cap->r;
2378 }
2379
2380 /* ---------------------------------------------------------------------------
2381  * Comparing Thread ids.
2382  *
2383  * This is used from STG land in the implementation of the
2384  * instances of Eq/Ord for ThreadIds.
2385  * ------------------------------------------------------------------------ */
2386
2387 int
2388 cmp_thread(StgPtr tso1, StgPtr tso2) 
2389
2390   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2391   StgThreadID id2 = ((StgTSO *)tso2)->id;
2392  
2393   if (id1 < id2) return (-1);
2394   if (id1 > id2) return 1;
2395   return 0;
2396 }
2397
2398 /* ---------------------------------------------------------------------------
2399  * Fetching the ThreadID from an StgTSO.
2400  *
2401  * This is used in the implementation of Show for ThreadIds.
2402  * ------------------------------------------------------------------------ */
2403 int
2404 rts_getThreadId(StgPtr tso) 
2405 {
2406   return ((StgTSO *)tso)->id;
2407 }
2408
2409 #ifdef DEBUG
2410 void
2411 labelThread(StgPtr tso, char *label)
2412 {
2413   int len;
2414   void *buf;
2415
2416   /* Caveat: Once set, you can only set the thread name to "" */
2417   len = strlen(label)+1;
2418   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2419   strncpy(buf,label,len);
2420   /* Update will free the old memory for us */
2421   updateThreadLabel(((StgTSO *)tso)->id,buf);
2422 }
2423 #endif /* DEBUG */
2424
2425 /* ---------------------------------------------------------------------------
2426    Create a new thread.
2427
2428    The new thread starts with the given stack size.  Before the
2429    scheduler can run, however, this thread needs to have a closure
2430    (and possibly some arguments) pushed on its stack.  See
2431    pushClosure() in Schedule.h.
2432
2433    createGenThread() and createIOThread() (in SchedAPI.h) are
2434    convenient packaged versions of this function.
2435
2436    currently pri (priority) is only used in a GRAN setup -- HWL
2437    ------------------------------------------------------------------------ */
2438 #if defined(GRAN)
2439 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2440 StgTSO *
2441 createThread(nat size, StgInt pri)
2442 #else
2443 StgTSO *
2444 createThread(Capability *cap, nat size)
2445 #endif
2446 {
2447     StgTSO *tso;
2448     nat stack_size;
2449
2450     /* sched_mutex is *not* required */
2451
2452     /* First check whether we should create a thread at all */
2453 #if defined(PARALLEL_HASKELL)
2454     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2455     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2456         threadsIgnored++;
2457         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2458                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2459         return END_TSO_QUEUE;
2460     }
2461     threadsCreated++;
2462 #endif
2463
2464 #if defined(GRAN)
2465     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2466 #endif
2467
2468     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2469
2470     /* catch ridiculously small stack sizes */
2471     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2472         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2473     }
2474
2475     stack_size = size - TSO_STRUCT_SIZEW;
2476     
2477     tso = (StgTSO *)allocateLocal(cap, size);
2478     TICK_ALLOC_TSO(stack_size, 0);
2479
2480     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2481 #if defined(GRAN)
2482     SET_GRAN_HDR(tso, ThisPE);
2483 #endif
2484
2485     // Always start with the compiled code evaluator
2486     tso->what_next = ThreadRunGHC;
2487
2488     tso->why_blocked  = NotBlocked;
2489     tso->blocked_exceptions = NULL;
2490     tso->flags = TSO_DIRTY;
2491     
2492     tso->saved_errno = 0;
2493     tso->bound = NULL;
2494     
2495     tso->stack_size     = stack_size;
2496     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2497                           - TSO_STRUCT_SIZEW;
2498     tso->sp             = (P_)&(tso->stack) + stack_size;
2499
2500     tso->trec = NO_TREC;
2501     
2502 #ifdef PROFILING
2503     tso->prof.CCCS = CCS_MAIN;
2504 #endif
2505     
2506   /* put a stop frame on the stack */
2507     tso->sp -= sizeofW(StgStopFrame);
2508     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2509     tso->link = END_TSO_QUEUE;
2510     
2511   // ToDo: check this
2512 #if defined(GRAN)
2513     /* uses more flexible routine in GranSim */
2514     insertThread(tso, CurrentProc);
2515 #else
2516     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2517      * from its creation
2518      */
2519 #endif
2520     
2521 #if defined(GRAN) 
2522     if (RtsFlags.GranFlags.GranSimStats.Full) 
2523         DumpGranEvent(GR_START,tso);
2524 #elif defined(PARALLEL_HASKELL)
2525     if (RtsFlags.ParFlags.ParStats.Full) 
2526         DumpGranEvent(GR_STARTQ,tso);
2527     /* HACk to avoid SCHEDULE 
2528        LastTSO = tso; */
2529 #endif
2530     
2531     /* Link the new thread on the global thread list.
2532      */
2533     ACQUIRE_LOCK(&sched_mutex);
2534     tso->id = next_thread_id++;  // while we have the mutex
2535     tso->global_link = all_threads;
2536     all_threads = tso;
2537     RELEASE_LOCK(&sched_mutex);
2538     
2539 #if defined(DIST)
2540     tso->dist.priority = MandatoryPriority; //by default that is...
2541 #endif
2542     
2543 #if defined(GRAN)
2544     tso->gran.pri = pri;
2545 # if defined(DEBUG)
2546     tso->gran.magic = TSO_MAGIC; // debugging only
2547 # endif
2548     tso->gran.sparkname   = 0;
2549     tso->gran.startedat   = CURRENT_TIME; 
2550     tso->gran.exported    = 0;
2551     tso->gran.basicblocks = 0;
2552     tso->gran.allocs      = 0;
2553     tso->gran.exectime    = 0;
2554     tso->gran.fetchtime   = 0;
2555     tso->gran.fetchcount  = 0;
2556     tso->gran.blocktime   = 0;
2557     tso->gran.blockcount  = 0;
2558     tso->gran.blockedat   = 0;
2559     tso->gran.globalsparks = 0;
2560     tso->gran.localsparks  = 0;
2561     if (RtsFlags.GranFlags.Light)
2562         tso->gran.clock  = Now; /* local clock */
2563     else
2564         tso->gran.clock  = 0;
2565     
2566     IF_DEBUG(gran,printTSO(tso));
2567 #elif defined(PARALLEL_HASKELL)
2568 # if defined(DEBUG)
2569     tso->par.magic = TSO_MAGIC; // debugging only
2570 # endif
2571     tso->par.sparkname   = 0;
2572     tso->par.startedat   = CURRENT_TIME; 
2573     tso->par.exported    = 0;
2574     tso->par.basicblocks = 0;
2575     tso->par.allocs      = 0;
2576     tso->par.exectime    = 0;
2577     tso->par.fetchtime   = 0;
2578     tso->par.fetchcount  = 0;
2579     tso->par.blocktime   = 0;
2580     tso->par.blockcount  = 0;
2581     tso->par.blockedat   = 0;
2582     tso->par.globalsparks = 0;
2583     tso->par.localsparks  = 0;
2584 #endif
2585     
2586 #if defined(GRAN)
2587     globalGranStats.tot_threads_created++;
2588     globalGranStats.threads_created_on_PE[CurrentProc]++;
2589     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2590     globalGranStats.tot_sq_probes++;
2591 #elif defined(PARALLEL_HASKELL)
2592     // collect parallel global statistics (currently done together with GC stats)
2593     if (RtsFlags.ParFlags.ParStats.Global &&
2594         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2595         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2596         globalParStats.tot_threads_created++;
2597     }
2598 #endif 
2599     
2600 #if defined(GRAN)
2601     IF_GRAN_DEBUG(pri,
2602                   sched_belch("==__ schedule: Created TSO %d (%p);",
2603                               CurrentProc, tso, tso->id));
2604 #elif defined(PARALLEL_HASKELL)
2605     IF_PAR_DEBUG(verbose,
2606                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2607                              (long)tso->id, tso, advisory_thread_count));
2608 #else
2609     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2610                                    (long)tso->id, (long)tso->stack_size));
2611 #endif    
2612     return tso;
2613 }
2614
2615 #if defined(PAR)
2616 /* RFP:
2617    all parallel thread creation calls should fall through the following routine.
2618 */
2619 StgTSO *
2620 createThreadFromSpark(rtsSpark spark) 
2621 { StgTSO *tso;
2622   ASSERT(spark != (rtsSpark)NULL);
2623 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2624   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2625   { threadsIgnored++;
2626     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2627           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2628     return END_TSO_QUEUE;
2629   }
2630   else
2631   { threadsCreated++;
2632     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2633     if (tso==END_TSO_QUEUE)     
2634       barf("createSparkThread: Cannot create TSO");
2635 #if defined(DIST)
2636     tso->priority = AdvisoryPriority;
2637 #endif
2638     pushClosure(tso,spark);
2639     addToRunQueue(tso);
2640     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2641   }
2642   return tso;
2643 }
2644 #endif
2645
2646 /*
2647   Turn a spark into a thread.
2648   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2649 */
2650 #if 0
2651 StgTSO *
2652 activateSpark (rtsSpark spark) 
2653 {
2654   StgTSO *tso;
2655
2656   tso = createSparkThread(spark);
2657   if (RtsFlags.ParFlags.ParStats.Full) {   
2658     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2659       IF_PAR_DEBUG(verbose,
2660                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2661                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2662   }
2663   // ToDo: fwd info on local/global spark to thread -- HWL
2664   // tso->gran.exported =  spark->exported;
2665   // tso->gran.locked =   !spark->global;
2666   // tso->gran.sparkname = spark->name;
2667
2668   return tso;
2669 }
2670 #endif
2671
2672 /* ---------------------------------------------------------------------------
2673  * scheduleThread()
2674  *
2675  * scheduleThread puts a thread on the end  of the runnable queue.
2676  * This will usually be done immediately after a thread is created.
2677  * The caller of scheduleThread must create the thread using e.g.
2678  * createThread and push an appropriate closure
2679  * on this thread's stack before the scheduler is invoked.
2680  * ------------------------------------------------------------------------ */
2681
2682 void
2683 scheduleThread(Capability *cap, StgTSO *tso)
2684 {
2685     // The thread goes at the *end* of the run-queue, to avoid possible
2686     // starvation of any threads already on the queue.
2687     appendToRunQueue(cap,tso);
2688 }
2689
2690 Capability *
2691 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2692 {
2693     Task *task;
2694
2695     // We already created/initialised the Task
2696     task = cap->running_task;
2697
2698     // This TSO is now a bound thread; make the Task and TSO
2699     // point to each other.
2700     tso->bound = task;
2701
2702     task->tso = tso;
2703     task->ret = ret;
2704     task->stat = NoStatus;
2705
2706     appendToRunQueue(cap,tso);
2707
2708     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2709
2710 #if defined(GRAN)
2711     /* GranSim specific init */
2712     CurrentTSO = m->tso;                // the TSO to run
2713     procStatus[MainProc] = Busy;        // status of main PE
2714     CurrentProc = MainProc;             // PE to run it on
2715 #endif
2716
2717     cap = schedule(cap,task);
2718
2719     ASSERT(task->stat != NoStatus);
2720     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2721
2722     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2723     return cap;
2724 }
2725
2726 /* ----------------------------------------------------------------------------
2727  * Starting Tasks
2728  * ------------------------------------------------------------------------- */
2729
2730 #if defined(THREADED_RTS)
2731 void
2732 workerStart(Task *task)
2733 {
2734     Capability *cap;
2735
2736     // See startWorkerTask().
2737     ACQUIRE_LOCK(&task->lock);
2738     cap = task->cap;
2739     RELEASE_LOCK(&task->lock);
2740
2741     // set the thread-local pointer to the Task:
2742     taskEnter(task);
2743
2744     // schedule() runs without a lock.
2745     cap = schedule(cap,task);
2746
2747     // On exit from schedule(), we have a Capability.
2748     releaseCapability(cap);
2749     taskStop(task);
2750 }
2751 #endif
2752
2753 /* ---------------------------------------------------------------------------
2754  * initScheduler()
2755  *
2756  * Initialise the scheduler.  This resets all the queues - if the
2757  * queues contained any threads, they'll be garbage collected at the
2758  * next pass.
2759  *
2760  * ------------------------------------------------------------------------ */
2761
2762 void 
2763 initScheduler(void)
2764 {
2765 #if defined(GRAN)
2766   nat i;
2767   for (i=0; i<=MAX_PROC; i++) {
2768     run_queue_hds[i]      = END_TSO_QUEUE;
2769     run_queue_tls[i]      = END_TSO_QUEUE;
2770     blocked_queue_hds[i]  = END_TSO_QUEUE;
2771     blocked_queue_tls[i]  = END_TSO_QUEUE;
2772     ccalling_threadss[i]  = END_TSO_QUEUE;
2773     blackhole_queue[i]    = END_TSO_QUEUE;
2774     sleeping_queue        = END_TSO_QUEUE;
2775   }
2776 #elif !defined(THREADED_RTS)
2777   blocked_queue_hd  = END_TSO_QUEUE;
2778   blocked_queue_tl  = END_TSO_QUEUE;
2779   sleeping_queue    = END_TSO_QUEUE;
2780 #endif
2781
2782   blackhole_queue   = END_TSO_QUEUE;
2783   all_threads       = END_TSO_QUEUE;
2784
2785   context_switch = 0;
2786   sched_state    = SCHED_RUNNING;
2787
2788   RtsFlags.ConcFlags.ctxtSwitchTicks =
2789       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2790       
2791 #if defined(THREADED_RTS)
2792   /* Initialise the mutex and condition variables used by
2793    * the scheduler. */
2794   initMutex(&sched_mutex);
2795 #endif
2796   
2797   ACQUIRE_LOCK(&sched_mutex);
2798
2799   /* A capability holds the state a native thread needs in
2800    * order to execute STG code. At least one capability is
2801    * floating around (only THREADED_RTS builds have more than one).
2802    */
2803   initCapabilities();
2804
2805   initTaskManager();
2806
2807 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2808   initSparkPools();
2809 #endif
2810
2811 #if defined(THREADED_RTS)
2812   /*
2813    * Eagerly start one worker to run each Capability, except for
2814    * Capability 0.  The idea is that we're probably going to start a
2815    * bound thread on Capability 0 pretty soon, so we don't want a
2816    * worker task hogging it.
2817    */
2818   { 
2819       nat i;
2820       Capability *cap;
2821       for (i = 1; i < n_capabilities; i++) {
2822           cap = &capabilities[i];
2823           ACQUIRE_LOCK(&cap->lock);
2824           startWorkerTask(cap, workerStart);
2825           RELEASE_LOCK(&cap->lock);
2826       }
2827   }
2828 #endif
2829
2830   RELEASE_LOCK(&sched_mutex);
2831 }
2832
2833 void
2834 exitScheduler( void )
2835 {
2836     Task *task = NULL;
2837
2838 #if defined(THREADED_RTS)
2839     ACQUIRE_LOCK(&sched_mutex);
2840     task = newBoundTask();
2841     RELEASE_LOCK(&sched_mutex);
2842 #endif
2843
2844     // If we haven't killed all the threads yet, do it now.
2845     if (sched_state < SCHED_INTERRUPTED) {
2846         sched_state = SCHED_INTERRUPTING;
2847         scheduleDoGC(NULL,task,rtsFalse,GetRoots);    
2848     }
2849     sched_state = SCHED_SHUTTING_DOWN;
2850
2851 #if defined(THREADED_RTS)
2852     { 
2853         nat i;
2854         
2855         for (i = 0; i < n_capabilities; i++) {
2856             shutdownCapability(&capabilities[i], task);
2857         }
2858         boundTaskExiting(task);
2859         stopTaskManager();
2860     }
2861 #endif
2862 }
2863
2864 /* ---------------------------------------------------------------------------
2865    Where are the roots that we know about?
2866
2867         - all the threads on the runnable queue
2868         - all the threads on the blocked queue
2869         - all the threads on the sleeping queue
2870         - all the thread currently executing a _ccall_GC
2871         - all the "main threads"
2872      
2873    ------------------------------------------------------------------------ */
2874
2875 /* This has to be protected either by the scheduler monitor, or by the
2876         garbage collection monitor (probably the latter).
2877         KH @ 25/10/99
2878 */
2879
2880 void
2881 GetRoots( evac_fn evac )
2882 {
2883     nat i;
2884     Capability *cap;
2885     Task *task;
2886
2887 #if defined(GRAN)
2888     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2889         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2890             evac((StgClosure **)&run_queue_hds[i]);
2891         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2892             evac((StgClosure **)&run_queue_tls[i]);
2893         
2894         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2895             evac((StgClosure **)&blocked_queue_hds[i]);
2896         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2897             evac((StgClosure **)&blocked_queue_tls[i]);
2898         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2899             evac((StgClosure **)&ccalling_threads[i]);
2900     }
2901
2902     markEventQueue();
2903
2904 #else /* !GRAN */
2905
2906     for (i = 0; i < n_capabilities; i++) {
2907         cap = &capabilities[i];
2908         evac((StgClosure **)&cap->run_queue_hd);
2909         evac((StgClosure **)&cap->run_queue_tl);
2910         
2911         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2912              task=task->next) {
2913             IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
2914             evac((StgClosure **)&task->suspended_tso);
2915         }
2916     }
2917     
2918 #if !defined(THREADED_RTS)
2919     evac((StgClosure **)(void *)&blocked_queue_hd);
2920     evac((StgClosure **)(void *)&blocked_queue_tl);
2921     evac((StgClosure **)(void *)&sleeping_queue);
2922 #endif 
2923 #endif
2924
2925     // evac((StgClosure **)&blackhole_queue);
2926
2927 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2928     markSparkQueue(evac);
2929 #endif
2930     
2931 #if defined(RTS_USER_SIGNALS)
2932     // mark the signal handlers (signals should be already blocked)
2933     markSignalHandlers(evac);
2934 #endif
2935 }
2936
2937 /* -----------------------------------------------------------------------------
2938    performGC
2939
2940    This is the interface to the garbage collector from Haskell land.
2941    We provide this so that external C code can allocate and garbage
2942    collect when called from Haskell via _ccall_GC.
2943
2944    It might be useful to provide an interface whereby the programmer
2945    can specify more roots (ToDo).
2946    
2947    This needs to be protected by the GC condition variable above.  KH.
2948    -------------------------------------------------------------------------- */
2949
2950 static void (*extra_roots)(evac_fn);
2951
2952 static void
2953 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2954 {
2955     Task *task = myTask();
2956
2957     if (task == NULL) {
2958         ACQUIRE_LOCK(&sched_mutex);
2959         task = newBoundTask();
2960         RELEASE_LOCK(&sched_mutex);
2961         scheduleDoGC(NULL,task,force_major, get_roots);
2962         boundTaskExiting(task);
2963     } else {
2964         scheduleDoGC(NULL,task,force_major, get_roots);
2965     }
2966 }
2967
2968 void
2969 performGC(void)
2970 {
2971     performGC_(rtsFalse, GetRoots);
2972 }
2973
2974 void
2975 performMajorGC(void)
2976 {
2977     performGC_(rtsTrue, GetRoots);
2978 }
2979
2980 static void
2981 AllRoots(evac_fn evac)
2982 {
2983     GetRoots(evac);             // the scheduler's roots
2984     extra_roots(evac);          // the user's roots
2985 }
2986
2987 void
2988 performGCWithRoots(void (*get_roots)(evac_fn))
2989 {
2990     extra_roots = get_roots;
2991     performGC_(rtsFalse, AllRoots);
2992 }
2993
2994 /* -----------------------------------------------------------------------------
2995    Stack overflow
2996
2997    If the thread has reached its maximum stack size, then raise the
2998    StackOverflow exception in the offending thread.  Otherwise
2999    relocate the TSO into a larger chunk of memory and adjust its stack
3000    size appropriately.
3001    -------------------------------------------------------------------------- */
3002
3003 static StgTSO *
3004 threadStackOverflow(Capability *cap, StgTSO *tso)
3005 {
3006   nat new_stack_size, stack_words;
3007   lnat new_tso_size;
3008   StgPtr new_sp;
3009   StgTSO *dest;
3010
3011   IF_DEBUG(sanity,checkTSO(tso));
3012   if (tso->stack_size >= tso->max_stack_size) {
3013
3014     IF_DEBUG(gc,
3015              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
3016                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
3017              /* If we're debugging, just print out the top of the stack */
3018              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3019                                               tso->sp+64)));
3020
3021     /* Send this thread the StackOverflow exception */
3022     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
3023     return tso;
3024   }
3025
3026   /* Try to double the current stack size.  If that takes us over the
3027    * maximum stack size for this thread, then use the maximum instead.
3028    * Finally round up so the TSO ends up as a whole number of blocks.
3029    */
3030   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
3031   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
3032                                        TSO_STRUCT_SIZE)/sizeof(W_);
3033   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
3034   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
3035
3036   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
3037
3038   dest = (StgTSO *)allocate(new_tso_size);
3039   TICK_ALLOC_TSO(new_stack_size,0);
3040
3041   /* copy the TSO block and the old stack into the new area */
3042   memcpy(dest,tso,TSO_STRUCT_SIZE);
3043   stack_words = tso->stack + tso->stack_size - tso->sp;
3044   new_sp = (P_)dest + new_tso_size - stack_words;
3045   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
3046
3047   /* relocate the stack pointers... */
3048   dest->sp         = new_sp;
3049   dest->stack_size = new_stack_size;
3050         
3051   /* Mark the old TSO as relocated.  We have to check for relocated
3052    * TSOs in the garbage collector and any primops that deal with TSOs.
3053    *
3054    * It's important to set the sp value to just beyond the end
3055    * of the stack, so we don't attempt to scavenge any part of the
3056    * dead TSO's stack.
3057    */
3058   tso->what_next = ThreadRelocated;
3059   tso->link = dest;
3060   tso->sp = (P_)&(tso->stack[tso->stack_size]);
3061   tso->why_blocked = NotBlocked;
3062
3063   IF_PAR_DEBUG(verbose,
3064                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
3065                      tso->id, tso, tso->stack_size);
3066                /* If we're debugging, just print out the top of the stack */
3067                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3068                                                 tso->sp+64)));
3069   
3070   IF_DEBUG(sanity,checkTSO(tso));
3071 #if 0
3072   IF_DEBUG(scheduler,printTSO(dest));
3073 #endif
3074
3075   return dest;
3076 }
3077
3078 /* ---------------------------------------------------------------------------
3079    Wake up a queue that was blocked on some resource.
3080    ------------------------------------------------------------------------ */
3081
3082 #if defined(GRAN)
3083 STATIC_INLINE void
3084 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3085 {
3086 }
3087 #elif defined(PARALLEL_HASKELL)
3088 STATIC_INLINE void
3089 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3090 {
3091   /* write RESUME events to log file and
3092      update blocked and fetch time (depending on type of the orig closure) */
3093   if (RtsFlags.ParFlags.ParStats.Full) {
3094     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
3095                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3096                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3097     if (emptyRunQueue())
3098       emitSchedule = rtsTrue;
3099
3100     switch (get_itbl(node)->type) {
3101         case FETCH_ME_BQ:
3102           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3103           break;
3104         case RBH:
3105         case FETCH_ME:
3106         case BLACKHOLE_BQ:
3107           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3108           break;
3109 #ifdef DIST
3110         case MVAR:
3111           break;
3112 #endif    
3113         default:
3114           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3115         }
3116       }
3117 }
3118 #endif
3119
3120 #if defined(GRAN)
3121 StgBlockingQueueElement *
3122 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3123 {
3124     StgTSO *tso;
3125     PEs node_loc, tso_loc;
3126
3127     node_loc = where_is(node); // should be lifted out of loop
3128     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3129     tso_loc = where_is((StgClosure *)tso);
3130     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3131       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3132       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3133       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3134       // insertThread(tso, node_loc);
3135       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3136                 ResumeThread,
3137                 tso, node, (rtsSpark*)NULL);
3138       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3139       // len_local++;
3140       // len++;
3141     } else { // TSO is remote (actually should be FMBQ)
3142       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3143                                   RtsFlags.GranFlags.Costs.gunblocktime +
3144                                   RtsFlags.GranFlags.Costs.latency;
3145       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3146                 UnblockThread,
3147                 tso, node, (rtsSpark*)NULL);
3148       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3149       // len++;
3150     }
3151     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3152     IF_GRAN_DEBUG(bq,
3153                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3154                           (node_loc==tso_loc ? "Local" : "Global"), 
3155                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3156     tso->block_info.closure = NULL;
3157     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3158                              tso->id, tso));
3159 }
3160 #elif defined(PARALLEL_HASKELL)
3161 StgBlockingQueueElement *
3162 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3163 {
3164     StgBlockingQueueElement *next;
3165
3166     switch (get_itbl(bqe)->type) {
3167     case TSO:
3168       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3169       /* if it's a TSO just push it onto the run_queue */
3170       next = bqe->link;
3171       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3172       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3173       threadRunnable();
3174       unblockCount(bqe, node);
3175       /* reset blocking status after dumping event */
3176       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3177       break;
3178
3179     case BLOCKED_FETCH:
3180       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3181       next = bqe->link;
3182       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3183       PendingFetches = (StgBlockedFetch *)bqe;
3184       break;
3185
3186 # if defined(DEBUG)
3187       /* can ignore this case in a non-debugging setup; 
3188          see comments on RBHSave closures above */
3189     case CONSTR:
3190       /* check that the closure is an RBHSave closure */
3191       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3192              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3193              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3194       break;
3195
3196     default:
3197       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3198            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3199            (StgClosure *)bqe);
3200 # endif
3201     }
3202   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3203   return next;
3204 }
3205 #endif
3206
3207 StgTSO *
3208 unblockOne(Capability *cap, StgTSO *tso)
3209 {
3210   StgTSO *next;
3211
3212   ASSERT(get_itbl(tso)->type == TSO);
3213   ASSERT(tso->why_blocked != NotBlocked);
3214   tso->why_blocked = NotBlocked;
3215   next = tso->link;
3216   tso->link = END_TSO_QUEUE;
3217
3218   // We might have just migrated this TSO to our Capability:
3219   if (tso->bound) {
3220       tso->bound->cap = cap;
3221   }
3222
3223   appendToRunQueue(cap,tso);
3224
3225   // we're holding a newly woken thread, make sure we context switch
3226   // quickly so we can migrate it if necessary.
3227   context_switch = 1;
3228   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3229   return next;
3230 }
3231
3232
3233 #if defined(GRAN)
3234 void 
3235 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3236 {
3237   StgBlockingQueueElement *bqe;
3238   PEs node_loc;
3239   nat len = 0; 
3240
3241   IF_GRAN_DEBUG(bq, 
3242                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3243                       node, CurrentProc, CurrentTime[CurrentProc], 
3244                       CurrentTSO->id, CurrentTSO));
3245
3246   node_loc = where_is(node);
3247
3248   ASSERT(q == END_BQ_QUEUE ||
3249          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3250          get_itbl(q)->type == CONSTR); // closure (type constructor)
3251   ASSERT(is_unique(node));
3252
3253   /* FAKE FETCH: magically copy the node to the tso's proc;
3254      no Fetch necessary because in reality the node should not have been 
3255      moved to the other PE in the first place
3256   */
3257   if (CurrentProc!=node_loc) {
3258     IF_GRAN_DEBUG(bq, 
3259                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3260                         node, node_loc, CurrentProc, CurrentTSO->id, 
3261                         // CurrentTSO, where_is(CurrentTSO),
3262                         node->header.gran.procs));
3263     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3264     IF_GRAN_DEBUG(bq, 
3265                   debugBelch("## new bitmask of node %p is %#x\n",
3266                         node, node->header.gran.procs));
3267     if (RtsFlags.GranFlags.GranSimStats.Global) {
3268       globalGranStats.tot_fake_fetches++;
3269     }
3270   }
3271
3272   bqe = q;
3273   // ToDo: check: ASSERT(CurrentProc==node_loc);
3274   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3275     //next = bqe->link;
3276     /* 
3277        bqe points to the current element in the queue
3278        next points to the next element in the queue
3279     */
3280     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3281     //tso_loc = where_is(tso);
3282     len++;
3283     bqe = unblockOne(bqe, node);
3284   }
3285
3286   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3287      the closure to make room for the anchor of the BQ */
3288   if (bqe!=END_BQ_QUEUE) {
3289     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3290     /*
3291     ASSERT((info_ptr==&RBH_Save_0_info) ||
3292            (info_ptr==&RBH_Save_1_info) ||
3293            (info_ptr==&RBH_Save_2_info));
3294     */
3295     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3296     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3297     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3298
3299     IF_GRAN_DEBUG(bq,
3300                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3301                         node, info_type(node)));
3302   }
3303
3304   /* statistics gathering */
3305   if (RtsFlags.GranFlags.GranSimStats.Global) {
3306     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3307     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3308     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3309     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3310   }
3311   IF_GRAN_DEBUG(bq,
3312                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3313                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3314 }
3315 #elif defined(PARALLEL_HASKELL)
3316 void 
3317 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3318 {
3319   StgBlockingQueueElement *bqe;
3320
3321   IF_PAR_DEBUG(verbose, 
3322                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3323                      node, mytid));
3324 #ifdef DIST  
3325   //RFP
3326   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3327     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3328     return;
3329   }
3330 #endif
3331   
3332   ASSERT(q == END_BQ_QUEUE ||
3333          get_itbl(q)->type == TSO ||           
3334          get_itbl(q)->type == BLOCKED_FETCH || 
3335          get_itbl(q)->type == CONSTR); 
3336
3337   bqe = q;
3338   while (get_itbl(bqe)->type==TSO || 
3339          get_itbl(bqe)->type==BLOCKED_FETCH) {
3340     bqe = unblockOne(bqe, node);
3341   }
3342 }
3343
3344 #else   /* !GRAN && !PARALLEL_HASKELL */
3345
3346 void
3347 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3348 {
3349     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3350                              // Exception.cmm
3351     while (tso != END_TSO_QUEUE) {
3352         tso = unblockOne(cap,tso);
3353     }
3354 }
3355 #endif
3356
3357 /* ---------------------------------------------------------------------------
3358    Interrupt execution
3359    - usually called inside a signal handler so it mustn't do anything fancy.   
3360    ------------------------------------------------------------------------ */
3361
3362 void
3363 interruptStgRts(void)
3364 {
3365     sched_state = SCHED_INTERRUPTING;
3366     context_switch = 1;
3367 #if defined(THREADED_RTS)
3368     prodAllCapabilities();
3369 #endif
3370 }
3371
3372 /* -----------------------------------------------------------------------------
3373    Unblock a thread
3374
3375    This is for use when we raise an exception in another thread, which
3376    may be blocked.
3377    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3378    -------------------------------------------------------------------------- */
3379
3380 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3381 /*
3382   NB: only the type of the blocking queue is different in GranSim and GUM
3383       the operations on the queue-elements are the same
3384       long live polymorphism!
3385
3386   Locks: sched_mutex is held upon entry and exit.
3387
3388 */
3389 static void
3390 unblockThread(Capability *cap, StgTSO *tso)
3391 {
3392   StgBlockingQueueElement *t, **last;
3393
3394   switch (tso->why_blocked) {
3395
3396   case NotBlocked:
3397     return;  /* not blocked */
3398
3399   case BlockedOnSTM:
3400     // Be careful: nothing to do here!  We tell the scheduler that the thread
3401     // is runnable and we leave it to the stack-walking code to abort the 
3402     // transaction while unwinding the stack.  We should perhaps have a debugging
3403     // test to make sure that this really happens and that the 'zombie' transaction
3404     // does not get committed.
3405     goto done;
3406
3407   case BlockedOnMVar:
3408     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3409     {
3410       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3411       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3412
3413       last = (StgBlockingQueueElement **)&mvar->head;
3414       for (t = (StgBlockingQueueElement *)mvar->head; 
3415            t != END_BQ_QUEUE; 
3416            last = &t->link, last_tso = t, t = t->link) {
3417         if (t == (StgBlockingQueueElement *)tso) {
3418           *last = (StgBlockingQueueElement *)tso->link;
3419           if (mvar->tail == tso) {
3420             mvar->tail = (StgTSO *)last_tso;
3421           }
3422           goto done;
3423         }
3424       }
3425       barf("unblockThread (MVAR): TSO not found");
3426     }
3427
3428   case BlockedOnBlackHole:
3429     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3430     {
3431       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3432
3433       last = &bq->blocking_queue;
3434       for (t = bq->blocking_queue; 
3435            t != END_BQ_QUEUE; 
3436            last = &t->link, t = t->link) {
3437         if (t == (StgBlockingQueueElement *)tso) {
3438           *last = (StgBlockingQueueElement *)tso->link;
3439           goto done;
3440         }
3441       }
3442       barf("unblockThread (BLACKHOLE): TSO not found");
3443     }
3444
3445   case BlockedOnException:
3446     {
3447       StgTSO *target  = tso->block_info.tso;
3448
3449       ASSERT(get_itbl(target)->type == TSO);
3450
3451       if (target->what_next == ThreadRelocated) {
3452           target = target->link;
3453           ASSERT(get_itbl(target)->type == TSO);
3454       }
3455
3456       ASSERT(target->blocked_exceptions != NULL);
3457
3458       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3459       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3460            t != END_BQ_QUEUE; 
3461            last = &t->link, t = t->link) {
3462         ASSERT(get_itbl(t)->type == TSO);
3463         if (t == (StgBlockingQueueElement *)tso) {
3464           *last = (StgBlockingQueueElement *)tso->link;
3465           goto done;
3466         }
3467       }
3468       barf("unblockThread (Exception): TSO not found");
3469     }
3470
3471   case BlockedOnRead:
3472   case BlockedOnWrite:
3473 #if defined(mingw32_HOST_OS)
3474   case BlockedOnDoProc:
3475 #endif
3476     {
3477       /* take TSO off blocked_queue */
3478       StgBlockingQueueElement *prev = NULL;
3479       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3480            prev = t, t = t->link) {
3481         if (t == (StgBlockingQueueElement *)tso) {
3482           if (prev == NULL) {
3483             blocked_queue_hd = (StgTSO *)t->link;
3484             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3485               blocked_queue_tl = END_TSO_QUEUE;
3486             }
3487           } else {
3488             prev->link = t->link;
3489             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3490               blocked_queue_tl = (StgTSO *)prev;
3491             }
3492           }
3493 #if defined(mingw32_HOST_OS)
3494           /* (Cooperatively) signal that the worker thread should abort
3495            * the request.
3496            */
3497           abandonWorkRequest(tso->block_info.async_result->reqID);
3498 #endif
3499           goto done;
3500         }
3501       }
3502       barf("unblockThread (I/O): TSO not found");
3503     }
3504
3505   case BlockedOnDelay:
3506     {
3507       /* take TSO off sleeping_queue */
3508       StgBlockingQueueElement *prev = NULL;
3509       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3510            prev = t, t = t->link) {
3511         if (t == (StgBlockingQueueElement *)tso) {
3512           if (prev == NULL) {
3513             sleeping_queue = (StgTSO *)t->link;
3514           } else {
3515             prev->link = t->link;
3516           }
3517           goto done;
3518         }
3519       }
3520       barf("unblockThread (delay): TSO not found");
3521     }
3522
3523   default:
3524     barf("unblockThread");
3525   }
3526
3527  done:
3528   tso->link = END_TSO_QUEUE;
3529   tso->why_blocked = NotBlocked;
3530   tso->block_info.closure = NULL;
3531   pushOnRunQueue(cap,tso);
3532 }
3533 #else
3534 static void
3535 unblockThread(Capability *cap, StgTSO *tso)
3536 {
3537   StgTSO *t, **last;
3538   
3539   /* To avoid locking unnecessarily. */
3540   if (tso->why_blocked == NotBlocked) {
3541     return;
3542   }
3543
3544   switch (tso->why_blocked) {
3545
3546   case BlockedOnSTM:
3547     // Be careful: nothing to do here!  We tell the scheduler that the thread
3548     // is runnable and we leave it to the stack-walking code to abort the 
3549     // transaction while unwinding the stack.  We should perhaps have a debugging
3550     // test to make sure that this really happens and that the 'zombie' transaction
3551     // does not get committed.
3552     goto done;
3553
3554   case BlockedOnMVar:
3555     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3556     {
3557       StgTSO *last_tso = END_TSO_QUEUE;
3558       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3559
3560       last = &mvar->head;
3561       for (t = mvar->head; t != END_TSO_QUEUE; 
3562            last = &t->link, last_tso = t, t = t->link) {
3563         if (t == tso) {
3564           *last = tso->link;
3565           if (mvar->tail == tso) {
3566             mvar->tail = last_tso;
3567           }
3568           goto done;
3569         }
3570       }
3571       barf("unblockThread (MVAR): TSO not found");
3572     }
3573
3574   case BlockedOnBlackHole:
3575     {
3576       last = &blackhole_queue;
3577       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3578            last = &t->link, t = t->link) {
3579         if (t == tso) {
3580           *last = tso->link;
3581           goto done;
3582         }
3583       }
3584       barf("unblockThread (BLACKHOLE): TSO not found");
3585     }
3586
3587   case BlockedOnException:
3588     {
3589       StgTSO *target  = tso->block_info.tso;
3590
3591       ASSERT(get_itbl(target)->type == TSO);
3592
3593       while (target->what_next == ThreadRelocated) {
3594           target = target->link;
3595           ASSERT(get_itbl(target)->type == TSO);
3596       }
3597       
3598       ASSERT(target->blocked_exceptions != NULL);
3599
3600       last = &target->blocked_exceptions;
3601       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3602            last = &t->link, t = t->link) {
3603         ASSERT(get_itbl(t)->type == TSO);
3604         if (t == tso) {
3605           *last = tso->link;
3606           goto done;
3607         }
3608       }
3609       barf("unblockThread (Exception): TSO not found");
3610     }
3611
3612 #if !defined(THREADED_RTS)
3613   case BlockedOnRead:
3614   case BlockedOnWrite:
3615 #if defined(mingw32_HOST_OS)
3616   case BlockedOnDoProc:
3617 #endif
3618     {
3619       StgTSO *prev = NULL;
3620       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3621            prev = t, t = t->link) {
3622         if (t == tso) {
3623           if (prev == NULL) {
3624             blocked_queue_hd = t->link;
3625             if (blocked_queue_tl == t) {
3626               blocked_queue_tl = END_TSO_QUEUE;
3627             }
3628           } else {
3629             prev->link = t->link;
3630             if (blocked_queue_tl == t) {
3631               blocked_queue_tl = prev;
3632             }
3633           }
3634 #if defined(mingw32_HOST_OS)
3635           /* (Cooperatively) signal that the worker thread should abort
3636            * the request.
3637            */
3638           abandonWorkRequest(tso->block_info.async_result->reqID);
3639 #endif
3640           goto done;
3641         }
3642       }
3643       barf("unblockThread (I/O): TSO not found");
3644     }
3645
3646   case BlockedOnDelay:
3647     {
3648       StgTSO *prev = NULL;
3649       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3650            prev = t, t = t->link) {
3651         if (t == tso) {
3652           if (prev == NULL) {
3653             sleeping_queue = t->link;
3654           } else {
3655             prev->link = t->link;
3656           }
3657           goto done;
3658         }
3659       }
3660       barf("unblockThread (delay): TSO not found");
3661     }
3662 #endif
3663
3664   default:
3665     barf("unblockThread");
3666   }
3667
3668  done:
3669   tso->link = END_TSO_QUEUE;
3670   tso->why_blocked = NotBlocked;
3671   tso->block_info.closure = NULL;
3672   appendToRunQueue(cap,tso);
3673
3674   // We might have just migrated this TSO to our Capability:
3675   if (tso->bound) {
3676       tso->bound->cap = cap;
3677   }
3678 }
3679 #endif
3680
3681 /* -----------------------------------------------------------------------------
3682  * checkBlackHoles()
3683  *
3684  * Check the blackhole_queue for threads that can be woken up.  We do
3685  * this periodically: before every GC, and whenever the run queue is
3686  * empty.
3687  *
3688  * An elegant solution might be to just wake up all the blocked
3689  * threads with awakenBlockedQueue occasionally: they'll go back to
3690  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3691  * doesn't give us a way to tell whether we've actually managed to
3692  * wake up any threads, so we would be busy-waiting.
3693  *
3694  * -------------------------------------------------------------------------- */
3695
3696 static rtsBool
3697 checkBlackHoles (Capability *cap)
3698 {
3699     StgTSO **prev, *t;
3700     rtsBool any_woke_up = rtsFalse;
3701     StgHalfWord type;
3702
3703     // blackhole_queue is global:
3704     ASSERT_LOCK_HELD(&sched_mutex);
3705
3706     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3707
3708     // ASSUMES: sched_mutex
3709     prev = &blackhole_queue;
3710     t = blackhole_queue;
3711     while (t != END_TSO_QUEUE) {
3712         ASSERT(t->why_blocked == BlockedOnBlackHole);
3713         type = get_itbl(t->block_info.closure)->type;
3714         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3715             IF_DEBUG(sanity,checkTSO(t));
3716             t = unblockOne(cap, t);
3717             // urk, the threads migrate to the current capability
3718             // here, but we'd like to keep them on the original one.
3719             *prev = t;
3720             any_woke_up = rtsTrue;
3721         } else {
3722             prev = &t->link;
3723             t = t->link;
3724         }
3725     }
3726
3727     return any_woke_up;
3728 }
3729
3730 /* -----------------------------------------------------------------------------
3731  * raiseAsync()
3732  *
3733  * The following function implements the magic for raising an
3734  * asynchronous exception in an existing thread.
3735  *
3736  * We first remove the thread from any queue on which it might be
3737  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3738  *
3739  * We strip the stack down to the innermost CATCH_FRAME, building
3740  * thunks in the heap for all the active computations, so they can 
3741  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3742  * an application of the handler to the exception, and push it on
3743  * the top of the stack.
3744  * 
3745  * How exactly do we save all the active computations?  We create an
3746  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3747  * AP_STACKs pushes everything from the corresponding update frame
3748  * upwards onto the stack.  (Actually, it pushes everything up to the
3749  * next update frame plus a pointer to the next AP_STACK object.
3750  * Entering the next AP_STACK object pushes more onto the stack until we
3751  * reach the last AP_STACK object - at which point the stack should look
3752  * exactly as it did when we killed the TSO and we can continue
3753  * execution by entering the closure on top of the stack.
3754  *
3755  * We can also kill a thread entirely - this happens if either (a) the 
3756  * exception passed to raiseAsync is NULL, or (b) there's no
3757  * CATCH_FRAME on the stack.  In either case, we strip the entire
3758  * stack and replace the thread with a zombie.
3759  *
3760  * ToDo: in THREADED_RTS mode, this function is only safe if either
3761  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3762  * one Capability), or (b) we own the Capability that the TSO is
3763  * currently blocked on or on the run queue of.
3764  *
3765  * -------------------------------------------------------------------------- */
3766  
3767 void
3768 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3769 {
3770     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3771 }
3772
3773 void
3774 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3775 {
3776     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3777 }
3778
3779 static void
3780 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3781             rtsBool stop_at_atomically, StgPtr stop_here)
3782 {
3783     StgRetInfoTable *info;
3784     StgPtr sp, frame;
3785     nat i;
3786   
3787     // Thread already dead?
3788     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3789         return;
3790     }
3791
3792     IF_DEBUG(scheduler, 
3793              sched_belch("raising exception in thread %ld.", (long)tso->id));
3794     
3795     // Remove it from any blocking queues
3796     unblockThread(cap,tso);
3797
3798     // mark it dirty; we're about to change its stack.
3799     dirtyTSO(tso);
3800
3801     sp = tso->sp;
3802     
3803     // The stack freezing code assumes there's a closure pointer on
3804     // the top of the stack, so we have to arrange that this is the case...
3805     //
3806     if (sp[0] == (W_)&stg_enter_info) {
3807         sp++;
3808     } else {
3809         sp--;
3810         sp[0] = (W_)&stg_dummy_ret_closure;
3811     }
3812
3813     frame = sp + 1;
3814     while (stop_here == NULL || frame < stop_here) {
3815
3816         // 1. Let the top of the stack be the "current closure"
3817         //
3818         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3819         // CATCH_FRAME.
3820         //
3821         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3822         // current closure applied to the chunk of stack up to (but not
3823         // including) the update frame.  This closure becomes the "current
3824         // closure".  Go back to step 2.
3825         //
3826         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3827         // top of the stack applied to the exception.
3828         // 
3829         // 5. If it's a STOP_FRAME, then kill the thread.
3830         // 
3831         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3832         // transaction
3833        
3834         info = get_ret_itbl((StgClosure *)frame);
3835
3836         switch (info->i.type) {
3837
3838         case UPDATE_FRAME:
3839         {
3840             StgAP_STACK * ap;
3841             nat words;
3842             
3843             // First build an AP_STACK consisting of the stack chunk above the
3844             // current update frame, with the top word on the stack as the
3845             // fun field.
3846             //
3847             words = frame - sp - 1;
3848             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3849             
3850             ap->size = words;
3851             ap->fun  = (StgClosure *)sp[0];
3852             sp++;
3853             for(i=0; i < (nat)words; ++i) {
3854                 ap->payload[i] = (StgClosure *)*sp++;
3855             }
3856             
3857             SET_HDR(ap,&stg_AP_STACK_info,
3858                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3859             TICK_ALLOC_UP_THK(words+1,0);
3860             
3861             IF_DEBUG(scheduler,
3862                      debugBelch("sched: Updating ");
3863                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3864                      debugBelch(" with ");
3865                      printObj((StgClosure *)ap);
3866                 );
3867
3868             // Replace the updatee with an indirection
3869             //
3870             // Warning: if we're in a loop, more than one update frame on
3871             // the stack may point to the same object.  Be careful not to
3872             // overwrite an IND_OLDGEN in this case, because we'll screw
3873             // up the mutable lists.  To be on the safe side, don't
3874             // overwrite any kind of indirection at all.  See also
3875             // threadSqueezeStack in GC.c, where we have to make a similar
3876             // check.
3877             //
3878             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3879                 // revert the black hole
3880                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3881                                (StgClosure *)ap);
3882             }
3883             sp += sizeofW(StgUpdateFrame) - 1;
3884             sp[0] = (W_)ap; // push onto stack
3885             frame = sp + 1;
3886             continue; //no need to bump frame
3887         }
3888
3889         case STOP_FRAME:
3890             // We've stripped the entire stack, the thread is now dead.
3891             tso->what_next = ThreadKilled;
3892             tso->sp = frame + sizeofW(StgStopFrame);
3893             return;
3894
3895         case CATCH_FRAME:
3896             // If we find a CATCH_FRAME, and we've got an exception to raise,
3897             // then build the THUNK raise(exception), and leave it on
3898             // top of the CATCH_FRAME ready to enter.
3899             //
3900         {
3901 #ifdef PROFILING
3902             StgCatchFrame *cf = (StgCatchFrame *)frame;
3903 #endif
3904             StgThunk *raise;
3905             
3906             if (exception == NULL) break;
3907
3908             // we've got an exception to raise, so let's pass it to the
3909             // handler in this frame.
3910             //
3911             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3912             TICK_ALLOC_SE_THK(1,0);
3913             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3914             raise->payload[0] = exception;
3915             
3916             // throw away the stack from Sp up to the CATCH_FRAME.
3917             //
3918             sp = frame - 1;
3919             
3920             /* Ensure that async excpetions are blocked now, so we don't get
3921              * a surprise exception before we get around to executing the
3922              * handler.
3923              */
3924             if (tso->blocked_exceptions == NULL) {
3925                 tso->blocked_exceptions = END_TSO_QUEUE;
3926             }
3927
3928             /* Put the newly-built THUNK on top of the stack, ready to execute
3929              * when the thread restarts.
3930              */
3931             sp[0] = (W_)raise;
3932             sp[-1] = (W_)&stg_enter_info;
3933             tso->sp = sp-1;
3934             tso->what_next = ThreadRunGHC;
3935             IF_DEBUG(sanity, checkTSO(tso));
3936             return;
3937         }
3938             
3939         case ATOMICALLY_FRAME:
3940             if (stop_at_atomically) {
3941                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3942                 stmCondemnTransaction(cap, tso -> trec);
3943 #ifdef REG_R1
3944                 tso->sp = frame;
3945 #else
3946                 // R1 is not a register: the return convention for IO in
3947                 // this case puts the return value on the stack, so we
3948                 // need to set up the stack to return to the atomically
3949                 // frame properly...
3950                 tso->sp = frame - 2;
3951                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3952                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3953 #endif
3954                 tso->what_next = ThreadRunGHC;
3955                 return;
3956             }
3957             // Not stop_at_atomically... fall through and abort the
3958             // transaction.
3959             
3960         case CATCH_RETRY_FRAME:
3961             // IF we find an ATOMICALLY_FRAME then we abort the
3962             // current transaction and propagate the exception.  In
3963             // this case (unlike ordinary exceptions) we do not care
3964             // whether the transaction is valid or not because its
3965             // possible validity cannot have caused the exception
3966             // and will not be visible after the abort.
3967             IF_DEBUG(stm,
3968                      debugBelch("Found atomically block delivering async exception\n"));
3969             StgTRecHeader *trec = tso -> trec;
3970             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3971             stmAbortTransaction(cap, trec);
3972             tso -> trec = outer;
3973             break;
3974             
3975         default:
3976             break;
3977         }
3978
3979         // move on to the next stack frame
3980         frame += stack_frame_sizeW((StgClosure *)frame);
3981     }
3982
3983     // if we got here, then we stopped at stop_here
3984     ASSERT(stop_here != NULL);
3985 }
3986
3987 /* -----------------------------------------------------------------------------
3988    Deleting threads
3989
3990    This is used for interruption (^C) and forking, and corresponds to
3991    raising an exception but without letting the thread catch the
3992    exception.
3993    -------------------------------------------------------------------------- */
3994
3995 static void 
3996 deleteThread (Capability *cap, StgTSO *tso)
3997 {
3998   if (tso->why_blocked != BlockedOnCCall &&
3999       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
4000       raiseAsync(cap,tso,NULL);
4001   }
4002 }
4003
4004 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
4005 static void 
4006 deleteThread_(Capability *cap, StgTSO *tso)
4007 { // for forkProcess only:
4008   // like deleteThread(), but we delete threads in foreign calls, too.
4009
4010     if (tso->why_blocked == BlockedOnCCall ||
4011         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
4012         unblockOne(cap,tso);
4013         tso->what_next = ThreadKilled;
4014     } else {
4015         deleteThread(cap,tso);
4016     }
4017 }
4018 #endif
4019
4020 /* -----------------------------------------------------------------------------
4021    raiseExceptionHelper
4022    
4023    This function is called by the raise# primitve, just so that we can
4024    move some of the tricky bits of raising an exception from C-- into
4025    C.  Who knows, it might be a useful re-useable thing here too.
4026    -------------------------------------------------------------------------- */
4027
4028 StgWord
4029 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
4030 {
4031     Capability *cap = regTableToCapability(reg);
4032     StgThunk *raise_closure = NULL;
4033     StgPtr p, next;
4034     StgRetInfoTable *info;
4035     //
4036     // This closure represents the expression 'raise# E' where E
4037     // is the exception raise.  It is used to overwrite all the
4038     // thunks which are currently under evaluataion.
4039     //
4040
4041     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
4042     // LDV profiling: stg_raise_info has THUNK as its closure
4043     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
4044     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
4045     // 1 does not cause any problem unless profiling is performed.
4046     // However, when LDV profiling goes on, we need to linearly scan
4047     // small object pool, where raise_closure is stored, so we should
4048     // use MIN_UPD_SIZE.
4049     //
4050     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
4051     //                                 sizeofW(StgClosure)+1);
4052     //
4053
4054     //
4055     // Walk up the stack, looking for the catch frame.  On the way,
4056     // we update any closures pointed to from update frames with the
4057     // raise closure that we just built.
4058     //
4059     p = tso->sp;
4060     while(1) {
4061         info = get_ret_itbl((StgClosure *)p);
4062         next = p + stack_frame_sizeW((StgClosure *)p);
4063         switch (info->i.type) {
4064             
4065         case UPDATE_FRAME:
4066             // Only create raise_closure if we need to.
4067             if (raise_closure == NULL) {
4068                 raise_closure = 
4069                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4070                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
4071                 raise_closure->payload[0] = exception;
4072             }
4073             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
4074             p = next;
4075             continue;
4076
4077         case ATOMICALLY_FRAME:
4078             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
4079             tso->sp = p;
4080             return ATOMICALLY_FRAME;
4081             
4082         case CATCH_FRAME:
4083             tso->sp = p;
4084             return CATCH_FRAME;
4085
4086         case CATCH_STM_FRAME:
4087             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
4088             tso->sp = p;
4089             return CATCH_STM_FRAME;
4090             
4091         case STOP_FRAME:
4092             tso->sp = p;
4093             return STOP_FRAME;
4094
4095         case CATCH_RETRY_FRAME:
4096         default:
4097             p = next; 
4098             continue;
4099         }
4100     }
4101 }
4102
4103
4104 /* -----------------------------------------------------------------------------
4105    findRetryFrameHelper
4106
4107    This function is called by the retry# primitive.  It traverses the stack
4108    leaving tso->sp referring to the frame which should handle the retry.  
4109
4110    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
4111    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
4112
4113    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4114    despite the similar implementation.
4115
4116    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4117    not be created within memory transactions.
4118    -------------------------------------------------------------------------- */
4119
4120 StgWord
4121 findRetryFrameHelper (StgTSO *tso)
4122 {
4123   StgPtr           p, next;
4124   StgRetInfoTable *info;
4125
4126   p = tso -> sp;
4127   while (1) {
4128     info = get_ret_itbl((StgClosure *)p);
4129     next = p + stack_frame_sizeW((StgClosure *)p);
4130     switch (info->i.type) {
4131       
4132     case ATOMICALLY_FRAME:
4133       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4134       tso->sp = p;
4135       return ATOMICALLY_FRAME;
4136       
4137     case CATCH_RETRY_FRAME:
4138       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4139       tso->sp = p;
4140       return CATCH_RETRY_FRAME;
4141       
4142     case CATCH_STM_FRAME:
4143     default:
4144       ASSERT(info->i.type != CATCH_FRAME);
4145       ASSERT(info->i.type != STOP_FRAME);
4146       p = next; 
4147       continue;
4148     }
4149   }
4150 }
4151
4152 /* -----------------------------------------------------------------------------
4153    resurrectThreads is called after garbage collection on the list of
4154    threads found to be garbage.  Each of these threads will be woken
4155    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4156    on an MVar, or NonTermination if the thread was blocked on a Black
4157    Hole.
4158
4159    Locks: assumes we hold *all* the capabilities.
4160    -------------------------------------------------------------------------- */
4161
4162 void
4163 resurrectThreads (StgTSO *threads)
4164 {
4165     StgTSO *tso, *next;
4166     Capability *cap;
4167
4168     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4169         next = tso->global_link;
4170         tso->global_link = all_threads;
4171         all_threads = tso;
4172         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4173         
4174         // Wake up the thread on the Capability it was last on for a
4175         // bound thread, or last_free_capability otherwise.
4176         if (tso->bound) {
4177             cap = tso->bound->cap;
4178         } else {
4179             cap = last_free_capability;
4180         }
4181         
4182         switch (tso->why_blocked) {
4183         case BlockedOnMVar:
4184         case BlockedOnException:
4185             /* Called by GC - sched_mutex lock is currently held. */
4186             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4187             break;
4188         case BlockedOnBlackHole:
4189             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4190             break;
4191         case BlockedOnSTM:
4192             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4193             break;
4194         case NotBlocked:
4195             /* This might happen if the thread was blocked on a black hole
4196              * belonging to a thread that we've just woken up (raiseAsync
4197              * can wake up threads, remember...).
4198              */
4199             continue;
4200         default:
4201             barf("resurrectThreads: thread blocked in a strange way");
4202         }
4203     }
4204 }
4205
4206 /* ----------------------------------------------------------------------------
4207  * Debugging: why is a thread blocked
4208  * [Also provides useful information when debugging threaded programs
4209  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4210    ------------------------------------------------------------------------- */
4211
4212 #if DEBUG
4213 static void
4214 printThreadBlockage(StgTSO *tso)
4215 {
4216   switch (tso->why_blocked) {
4217   case BlockedOnRead:
4218     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4219     break;
4220   case BlockedOnWrite:
4221     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4222     break;
4223 #if defined(mingw32_HOST_OS)
4224     case BlockedOnDoProc:
4225     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4226     break;
4227 #endif
4228   case BlockedOnDelay:
4229     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4230     break;
4231   case BlockedOnMVar:
4232     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4233     break;
4234   case BlockedOnException:
4235     debugBelch("is blocked on delivering an exception to thread %d",
4236             tso->block_info.tso->id);
4237     break;
4238   case BlockedOnBlackHole:
4239     debugBelch("is blocked on a black hole");
4240     break;
4241   case NotBlocked:
4242     debugBelch("is not blocked");
4243     break;
4244 #if defined(PARALLEL_HASKELL)
4245   case BlockedOnGA:
4246     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4247             tso->block_info.closure, info_type(tso->block_info.closure));
4248     break;
4249   case BlockedOnGA_NoSend:
4250     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4251             tso->block_info.closure, info_type(tso->block_info.closure));
4252     break;
4253 #endif
4254   case BlockedOnCCall:
4255     debugBelch("is blocked on an external call");
4256     break;
4257   case BlockedOnCCall_NoUnblockExc:
4258     debugBelch("is blocked on an external call (exceptions were already blocked)");
4259     break;
4260   case BlockedOnSTM:
4261     debugBelch("is blocked on an STM operation");
4262     break;
4263   default:
4264     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4265          tso->why_blocked, tso->id, tso);
4266   }
4267 }
4268
4269 void
4270 printThreadStatus(StgTSO *t)
4271 {
4272     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4273     {
4274       void *label = lookupThreadLabel(t->id);
4275       if (label) debugBelch("[\"%s\"] ",(char *)label);
4276     }
4277     if (t->what_next == ThreadRelocated) {
4278         debugBelch("has been relocated...\n");
4279     } else {
4280         switch (t->what_next) {
4281         case ThreadKilled:
4282             debugBelch("has been killed");
4283             break;
4284         case ThreadComplete:
4285             debugBelch("has completed");
4286             break;
4287         default:
4288             printThreadBlockage(t);
4289         }
4290         debugBelch("\n");
4291     }
4292 }
4293
4294 void
4295 printAllThreads(void)
4296 {
4297   StgTSO *t, *next;
4298   nat i;
4299   Capability *cap;
4300
4301 # if defined(GRAN)
4302   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4303   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4304                        time_string, rtsFalse/*no commas!*/);
4305
4306   debugBelch("all threads at [%s]:\n", time_string);
4307 # elif defined(PARALLEL_HASKELL)
4308   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4309   ullong_format_string(CURRENT_TIME,
4310                        time_string, rtsFalse/*no commas!*/);
4311
4312   debugBelch("all threads at [%s]:\n", time_string);
4313 # else
4314   debugBelch("all threads:\n");
4315 # endif
4316
4317   for (i = 0; i < n_capabilities; i++) {
4318       cap = &capabilities[i];
4319       debugBelch("threads on capability %d:\n", cap->no);
4320       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4321           printThreadStatus(t);
4322       }
4323   }
4324
4325   debugBelch("other threads:\n");
4326   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4327       if (t->why_blocked != NotBlocked) {
4328           printThreadStatus(t);
4329       }
4330       if (t->what_next == ThreadRelocated) {
4331           next = t->link;
4332       } else {
4333           next = t->global_link;
4334       }
4335   }
4336 }
4337
4338 // useful from gdb
4339 void 
4340 printThreadQueue(StgTSO *t)
4341 {
4342     nat i = 0;
4343     for (; t != END_TSO_QUEUE; t = t->link) {
4344         printThreadStatus(t);
4345         i++;
4346     }
4347     debugBelch("%d threads on queue\n", i);
4348 }
4349
4350 /* 
4351    Print a whole blocking queue attached to node (debugging only).
4352 */
4353 # if defined(PARALLEL_HASKELL)
4354 void 
4355 print_bq (StgClosure *node)
4356 {
4357   StgBlockingQueueElement *bqe;
4358   StgTSO *tso;
4359   rtsBool end;
4360
4361   debugBelch("## BQ of closure %p (%s): ",
4362           node, info_type(node));
4363
4364   /* should cover all closures that may have a blocking queue */
4365   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4366          get_itbl(node)->type == FETCH_ME_BQ ||
4367          get_itbl(node)->type == RBH ||
4368          get_itbl(node)->type == MVAR);
4369     
4370   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4371
4372   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4373 }
4374
4375 /* 
4376    Print a whole blocking queue starting with the element bqe.
4377 */
4378 void 
4379 print_bqe (StgBlockingQueueElement *bqe)
4380 {
4381   rtsBool end;
4382
4383   /* 
4384      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4385   */
4386   for (end = (bqe==END_BQ_QUEUE);
4387        !end; // iterate until bqe points to a CONSTR
4388        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4389        bqe = end ? END_BQ_QUEUE : bqe->link) {
4390     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4391     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4392     /* types of closures that may appear in a blocking queue */
4393     ASSERT(get_itbl(bqe)->type == TSO ||           
4394            get_itbl(bqe)->type == BLOCKED_FETCH || 
4395            get_itbl(bqe)->type == CONSTR); 
4396     /* only BQs of an RBH end with an RBH_Save closure */
4397     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4398
4399     switch (get_itbl(bqe)->type) {
4400     case TSO:
4401       debugBelch(" TSO %u (%x),",
4402               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4403       break;
4404     case BLOCKED_FETCH:
4405       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4406               ((StgBlockedFetch *)bqe)->node, 
4407               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4408               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4409               ((StgBlockedFetch *)bqe)->ga.weight);
4410       break;
4411     case CONSTR:
4412       debugBelch(" %s (IP %p),",
4413               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4414                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4415                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4416                "RBH_Save_?"), get_itbl(bqe));
4417       break;
4418     default:
4419       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4420            info_type((StgClosure *)bqe)); // , node, info_type(node));
4421       break;
4422     }
4423   } /* for */
4424   debugBelch("\n");
4425 }
4426 # elif defined(GRAN)
4427 void 
4428 print_bq (StgClosure *node)
4429 {
4430   StgBlockingQueueElement *bqe;
4431   PEs node_loc, tso_loc;
4432   rtsBool end;
4433
4434   /* should cover all closures that may have a blocking queue */
4435   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4436          get_itbl(node)->type == FETCH_ME_BQ ||
4437          get_itbl(node)->type == RBH);
4438     
4439   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4440   node_loc = where_is(node);
4441
4442   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4443           node, info_type(node), node_loc);
4444
4445   /* 
4446      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4447   */
4448   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4449        !end; // iterate until bqe points to a CONSTR
4450        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4451     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4452     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4453     /* types of closures that may appear in a blocking queue */
4454     ASSERT(get_itbl(bqe)->type == TSO ||           
4455            get_itbl(bqe)->type == CONSTR); 
4456     /* only BQs of an RBH end with an RBH_Save closure */
4457     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4458
4459     tso_loc = where_is((StgClosure *)bqe);
4460     switch (get_itbl(bqe)->type) {
4461     case TSO:
4462       debugBelch(" TSO %d (%p) on [PE %d],",
4463               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4464       break;
4465     case CONSTR:
4466       debugBelch(" %s (IP %p),",
4467               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4468                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4469                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4470                "RBH_Save_?"), get_itbl(bqe));
4471       break;
4472     default:
4473       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4474            info_type((StgClosure *)bqe), node, info_type(node));
4475       break;
4476     }
4477   } /* for */
4478   debugBelch("\n");
4479 }
4480 # endif
4481
4482 #if defined(PARALLEL_HASKELL)
4483 static nat
4484 run_queue_len(void)
4485 {
4486     nat i;
4487     StgTSO *tso;
4488     
4489     for (i=0, tso=run_queue_hd; 
4490          tso != END_TSO_QUEUE;
4491          i++, tso=tso->link) {
4492         /* nothing */
4493     }
4494         
4495     return i;
4496 }
4497 #endif
4498
4499 void
4500 sched_belch(char *s, ...)
4501 {
4502     va_list ap;
4503     va_start(ap,s);
4504 #ifdef THREADED_RTS
4505     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4506 #elif defined(PARALLEL_HASKELL)
4507     debugBelch("== ");
4508 #else
4509     debugBelch("sched: ");
4510 #endif
4511     vdebugBelch(s, ap);
4512     debugBelch("\n");
4513     va_end(ap);
4514 }
4515
4516 #endif /* DEBUG */