performGC_(): don't use the existing Task, always grab a new one
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool sched_state = SCHED_RUNNING;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
208 static void scheduleCheckBlackHoles (Capability *cap);
209 static void scheduleDetectDeadlock (Capability *cap, Task *task);
210 #if defined(GRAN)
211 static StgTSO *scheduleProcessEvent(rtsEvent *event);
212 #endif
213 #if defined(PARALLEL_HASKELL)
214 static StgTSO *scheduleSendPendingMessages(void);
215 static void scheduleActivateSpark(void);
216 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
217 #endif
218 #if defined(PAR) || defined(GRAN)
219 static void scheduleGranParReport(void);
220 #endif
221 static void schedulePostRunThread(void);
222 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
223 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
224                                          StgTSO *t);
225 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
226                                     nat prev_what_next );
227 static void scheduleHandleThreadBlocked( StgTSO *t );
228 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
229                                              StgTSO *t );
230 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
231 static Capability *scheduleDoGC(Capability *cap, Task *task,
232                                 rtsBool force_major, 
233                                 void (*get_roots)(evac_fn));
234
235 static void unblockThread(Capability *cap, StgTSO *tso);
236 static rtsBool checkBlackHoles(Capability *cap);
237 static void AllRoots(evac_fn evac);
238
239 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
240
241 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
242                         rtsBool stop_at_atomically, StgPtr stop_here);
243
244 static void deleteThread (Capability *cap, StgTSO *tso);
245 static void deleteAllThreads (Capability *cap);
246
247 #ifdef DEBUG
248 static void printThreadBlockage(StgTSO *tso);
249 static void printThreadStatus(StgTSO *tso);
250 void printThreadQueue(StgTSO *tso);
251 #endif
252
253 #if defined(PARALLEL_HASKELL)
254 StgTSO * createSparkThread(rtsSpark spark);
255 StgTSO * activateSpark (rtsSpark spark);  
256 #endif
257
258 #ifdef DEBUG
259 static char *whatNext_strs[] = {
260   "(unknown)",
261   "ThreadRunGHC",
262   "ThreadInterpret",
263   "ThreadKilled",
264   "ThreadRelocated",
265   "ThreadComplete"
266 };
267 #endif
268
269 /* -----------------------------------------------------------------------------
270  * Putting a thread on the run queue: different scheduling policies
271  * -------------------------------------------------------------------------- */
272
273 STATIC_INLINE void
274 addToRunQueue( Capability *cap, StgTSO *t )
275 {
276 #if defined(PARALLEL_HASKELL)
277     if (RtsFlags.ParFlags.doFairScheduling) { 
278         // this does round-robin scheduling; good for concurrency
279         appendToRunQueue(cap,t);
280     } else {
281         // this does unfair scheduling; good for parallelism
282         pushOnRunQueue(cap,t);
283     }
284 #else
285     // this does round-robin scheduling; good for concurrency
286     appendToRunQueue(cap,t);
287 #endif
288 }
289
290 /* ---------------------------------------------------------------------------
291    Main scheduling loop.
292
293    We use round-robin scheduling, each thread returning to the
294    scheduler loop when one of these conditions is detected:
295
296       * out of heap space
297       * timer expires (thread yields)
298       * thread blocks
299       * thread ends
300       * stack overflow
301
302    GRAN version:
303      In a GranSim setup this loop iterates over the global event queue.
304      This revolves around the global event queue, which determines what 
305      to do next. Therefore, it's more complicated than either the 
306      concurrent or the parallel (GUM) setup.
307
308    GUM version:
309      GUM iterates over incoming messages.
310      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
311      and sends out a fish whenever it has nothing to do; in-between
312      doing the actual reductions (shared code below) it processes the
313      incoming messages and deals with delayed operations 
314      (see PendingFetches).
315      This is not the ugliest code you could imagine, but it's bloody close.
316
317    ------------------------------------------------------------------------ */
318
319 static Capability *
320 schedule (Capability *initialCapability, Task *task)
321 {
322   StgTSO *t;
323   Capability *cap;
324   StgThreadReturnCode ret;
325 #if defined(GRAN)
326   rtsEvent *event;
327 #elif defined(PARALLEL_HASKELL)
328   StgTSO *tso;
329   GlobalTaskId pe;
330   rtsBool receivedFinish = rtsFalse;
331 # if defined(DEBUG)
332   nat tp_size, sp_size; // stats only
333 # endif
334 #endif
335   nat prev_what_next;
336   rtsBool ready_to_gc;
337 #if defined(THREADED_RTS)
338   rtsBool first = rtsTrue;
339 #endif
340   
341   cap = initialCapability;
342
343   // Pre-condition: this task owns initialCapability.
344   // The sched_mutex is *NOT* held
345   // NB. on return, we still hold a capability.
346
347   IF_DEBUG(scheduler,
348            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
349                        task, initialCapability);
350       );
351
352   schedulePreLoop();
353
354   // -----------------------------------------------------------
355   // Scheduler loop starts here:
356
357 #if defined(PARALLEL_HASKELL)
358 #define TERMINATION_CONDITION        (!receivedFinish)
359 #elif defined(GRAN)
360 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
361 #else
362 #define TERMINATION_CONDITION        rtsTrue
363 #endif
364
365   while (TERMINATION_CONDITION) {
366
367 #if defined(GRAN)
368       /* Choose the processor with the next event */
369       CurrentProc = event->proc;
370       CurrentTSO = event->tso;
371 #endif
372
373 #if defined(THREADED_RTS)
374       if (first) {
375           // don't yield the first time, we want a chance to run this
376           // thread for a bit, even if there are others banging at the
377           // door.
378           first = rtsFalse;
379           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
380       } else {
381           // Yield the capability to higher-priority tasks if necessary.
382           yieldCapability(&cap, task);
383       }
384 #endif
385       
386 #if defined(THREADED_RTS)
387       schedulePushWork(cap,task);
388 #endif
389
390     // Check whether we have re-entered the RTS from Haskell without
391     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
392     // call).
393     if (cap->in_haskell) {
394           errorBelch("schedule: re-entered unsafely.\n"
395                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
396           stg_exit(EXIT_FAILURE);
397     }
398
399     // The interruption / shutdown sequence.
400     // 
401     // In order to cleanly shut down the runtime, we want to:
402     //   * make sure that all main threads return to their callers
403     //     with the state 'Interrupted'.
404     //   * clean up all OS threads assocated with the runtime
405     //   * free all memory etc.
406     //
407     // So the sequence for ^C goes like this:
408     //
409     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
410     //     arranges for some Capability to wake up
411     //
412     //   * all threads in the system are halted, and the zombies are
413     //     placed on the run queue for cleaning up.  We acquire all
414     //     the capabilities in order to delete the threads, this is
415     //     done by scheduleDoGC() for convenience (because GC already
416     //     needs to acquire all the capabilities).  We can't kill
417     //     threads involved in foreign calls.
418     // 
419     //   * sched_state := SCHED_INTERRUPTED
420     //
421     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
422     //
423     //   * sched_state := SCHED_SHUTTING_DOWN
424     //
425     //   * all workers exit when the run queue on their capability
426     //     drains.  All main threads will also exit when their TSO
427     //     reaches the head of the run queue and they can return.
428     //
429     //   * eventually all Capabilities will shut down, and the RTS can
430     //     exit.
431     //
432     //   * We might be left with threads blocked in foreign calls, 
433     //     we should really attempt to kill these somehow (TODO);
434     
435     switch (sched_state) {
436     case SCHED_RUNNING:
437         break;
438     case SCHED_INTERRUPTING:
439         IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
440 #if defined(THREADED_RTS)
441         discardSparksCap(cap);
442 #endif
443         /* scheduleDoGC() deletes all the threads */
444         cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
445         break;
446     case SCHED_INTERRUPTED:
447         IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED"));
448         break;
449     case SCHED_SHUTTING_DOWN:
450         IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
451         // If we are a worker, just exit.  If we're a bound thread
452         // then we will exit below when we've removed our TSO from
453         // the run queue.
454         if (task->tso == NULL && emptyRunQueue(cap)) {
455             return cap;
456         }
457         break;
458     default:
459         barf("sched_state: %d", sched_state);
460     }
461
462 #if defined(THREADED_RTS)
463     // If the run queue is empty, take a spark and turn it into a thread.
464     {
465         if (emptyRunQueue(cap)) {
466             StgClosure *spark;
467             spark = findSpark(cap);
468             if (spark != NULL) {
469                 IF_DEBUG(scheduler,
470                          sched_belch("turning spark of closure %p into a thread",
471                                      (StgClosure *)spark));
472                 createSparkThread(cap,spark);     
473             }
474         }
475     }
476 #endif // THREADED_RTS
477
478     scheduleStartSignalHandlers(cap);
479
480     // Only check the black holes here if we've nothing else to do.
481     // During normal execution, the black hole list only gets checked
482     // at GC time, to avoid repeatedly traversing this possibly long
483     // list each time around the scheduler.
484     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
485
486     scheduleCheckWakeupThreads(cap);
487
488     scheduleCheckBlockedThreads(cap);
489
490     scheduleDetectDeadlock(cap,task);
491 #if defined(THREADED_RTS)
492     cap = task->cap;    // reload cap, it might have changed
493 #endif
494
495     // Normally, the only way we can get here with no threads to
496     // run is if a keyboard interrupt received during 
497     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
498     // Additionally, it is not fatal for the
499     // threaded RTS to reach here with no threads to run.
500     //
501     // win32: might be here due to awaitEvent() being abandoned
502     // as a result of a console event having been delivered.
503     if ( emptyRunQueue(cap) ) {
504 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
505         ASSERT(sched_state >= SCHED_INTERRUPTING);
506 #endif
507         continue; // nothing to do
508     }
509
510 #if defined(PARALLEL_HASKELL)
511     scheduleSendPendingMessages();
512     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
513         continue;
514
515 #if defined(SPARKS)
516     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
517 #endif
518
519     /* If we still have no work we need to send a FISH to get a spark
520        from another PE */
521     if (emptyRunQueue(cap)) {
522         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
523         ASSERT(rtsFalse); // should not happen at the moment
524     }
525     // from here: non-empty run queue.
526     //  TODO: merge above case with this, only one call processMessages() !
527     if (PacketsWaiting()) {  /* process incoming messages, if
528                                 any pending...  only in else
529                                 because getRemoteWork waits for
530                                 messages as well */
531         receivedFinish = processMessages();
532     }
533 #endif
534
535 #if defined(GRAN)
536     scheduleProcessEvent(event);
537 #endif
538
539     // 
540     // Get a thread to run
541     //
542     t = popRunQueue(cap);
543
544 #if defined(GRAN) || defined(PAR)
545     scheduleGranParReport(); // some kind of debuging output
546 #else
547     // Sanity check the thread we're about to run.  This can be
548     // expensive if there is lots of thread switching going on...
549     IF_DEBUG(sanity,checkTSO(t));
550 #endif
551
552 #if defined(THREADED_RTS)
553     // Check whether we can run this thread in the current task.
554     // If not, we have to pass our capability to the right task.
555     {
556         Task *bound = t->bound;
557       
558         if (bound) {
559             if (bound == task) {
560                 IF_DEBUG(scheduler,
561                          sched_belch("### Running thread %d in bound thread",
562                                      t->id));
563                 // yes, the Haskell thread is bound to the current native thread
564             } else {
565                 IF_DEBUG(scheduler,
566                          sched_belch("### thread %d bound to another OS thread",
567                                      t->id));
568                 // no, bound to a different Haskell thread: pass to that thread
569                 pushOnRunQueue(cap,t);
570                 continue;
571             }
572         } else {
573             // The thread we want to run is unbound.
574             if (task->tso) { 
575                 IF_DEBUG(scheduler,
576                          sched_belch("### this OS thread cannot run thread %d", t->id));
577                 // no, the current native thread is bound to a different
578                 // Haskell thread, so pass it to any worker thread
579                 pushOnRunQueue(cap,t);
580                 continue; 
581             }
582         }
583     }
584 #endif
585
586     cap->r.rCurrentTSO = t;
587     
588     /* context switches are initiated by the timer signal, unless
589      * the user specified "context switch as often as possible", with
590      * +RTS -C0
591      */
592     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
593         && !emptyThreadQueues(cap)) {
594         context_switch = 1;
595     }
596          
597 run_thread:
598
599     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
600                               (long)t->id, whatNext_strs[t->what_next]));
601
602 #if defined(PROFILING)
603     startHeapProfTimer();
604 #endif
605
606     // ----------------------------------------------------------------------
607     // Run the current thread 
608
609     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
610     ASSERT(t->cap == cap);
611
612     prev_what_next = t->what_next;
613
614     errno = t->saved_errno;
615     cap->in_haskell = rtsTrue;
616
617     dirtyTSO(t);
618
619     recent_activity = ACTIVITY_YES;
620
621     switch (prev_what_next) {
622         
623     case ThreadKilled:
624     case ThreadComplete:
625         /* Thread already finished, return to scheduler. */
626         ret = ThreadFinished;
627         break;
628         
629     case ThreadRunGHC:
630     {
631         StgRegTable *r;
632         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
633         cap = regTableToCapability(r);
634         ret = r->rRet;
635         break;
636     }
637     
638     case ThreadInterpret:
639         cap = interpretBCO(cap);
640         ret = cap->r.rRet;
641         break;
642         
643     default:
644         barf("schedule: invalid what_next field");
645     }
646
647     cap->in_haskell = rtsFalse;
648
649     // The TSO might have moved, eg. if it re-entered the RTS and a GC
650     // happened.  So find the new location:
651     t = cap->r.rCurrentTSO;
652
653     // We have run some Haskell code: there might be blackhole-blocked
654     // threads to wake up now.
655     // Lock-free test here should be ok, we're just setting a flag.
656     if ( blackhole_queue != END_TSO_QUEUE ) {
657         blackholes_need_checking = rtsTrue;
658     }
659     
660     // And save the current errno in this thread.
661     // XXX: possibly bogus for SMP because this thread might already
662     // be running again, see code below.
663     t->saved_errno = errno;
664
665 #if defined(THREADED_RTS)
666     // If ret is ThreadBlocked, and this Task is bound to the TSO that
667     // blocked, we are in limbo - the TSO is now owned by whatever it
668     // is blocked on, and may in fact already have been woken up,
669     // perhaps even on a different Capability.  It may be the case
670     // that task->cap != cap.  We better yield this Capability
671     // immediately and return to normaility.
672     if (ret == ThreadBlocked) {
673         IF_DEBUG(scheduler,
674                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
675                              t->id, whatNext_strs[t->what_next]));
676         continue;
677     }
678 #endif
679
680     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
681     ASSERT(t->cap == cap);
682
683     // ----------------------------------------------------------------------
684     
685     // Costs for the scheduler are assigned to CCS_SYSTEM
686 #if defined(PROFILING)
687     stopHeapProfTimer();
688     CCCS = CCS_SYSTEM;
689 #endif
690     
691 #if defined(THREADED_RTS)
692     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
693 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
694     IF_DEBUG(scheduler,debugBelch("sched: "););
695 #endif
696     
697     schedulePostRunThread();
698
699     ready_to_gc = rtsFalse;
700
701     switch (ret) {
702     case HeapOverflow:
703         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
704         break;
705
706     case StackOverflow:
707         scheduleHandleStackOverflow(cap,task,t);
708         break;
709
710     case ThreadYielding:
711         if (scheduleHandleYield(cap, t, prev_what_next)) {
712             // shortcut for switching between compiler/interpreter:
713             goto run_thread; 
714         }
715         break;
716
717     case ThreadBlocked:
718         scheduleHandleThreadBlocked(t);
719         break;
720
721     case ThreadFinished:
722         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
723         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
724         break;
725
726     default:
727       barf("schedule: invalid thread return code %d", (int)ret);
728     }
729
730     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
731     if (ready_to_gc) {
732       cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
733     }
734   } /* end of while() */
735
736   IF_PAR_DEBUG(verbose,
737                debugBelch("== Leaving schedule() after having received Finish\n"));
738 }
739
740 /* ----------------------------------------------------------------------------
741  * Setting up the scheduler loop
742  * ------------------------------------------------------------------------- */
743
744 static void
745 schedulePreLoop(void)
746 {
747 #if defined(GRAN) 
748     /* set up first event to get things going */
749     /* ToDo: assign costs for system setup and init MainTSO ! */
750     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
751               ContinueThread, 
752               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
753     
754     IF_DEBUG(gran,
755              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
756                         CurrentTSO);
757              G_TSO(CurrentTSO, 5));
758     
759     if (RtsFlags.GranFlags.Light) {
760         /* Save current time; GranSim Light only */
761         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
762     }      
763 #endif
764 }
765
766 /* -----------------------------------------------------------------------------
767  * schedulePushWork()
768  *
769  * Push work to other Capabilities if we have some.
770  * -------------------------------------------------------------------------- */
771
772 #if defined(THREADED_RTS)
773 static void
774 schedulePushWork(Capability *cap USED_IF_THREADS, 
775                  Task *task      USED_IF_THREADS)
776 {
777     Capability *free_caps[n_capabilities], *cap0;
778     nat i, n_free_caps;
779
780     // migration can be turned off with +RTS -qg
781     if (!RtsFlags.ParFlags.migrate) return;
782
783     // Check whether we have more threads on our run queue, or sparks
784     // in our pool, that we could hand to another Capability.
785     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
786         && sparkPoolSizeCap(cap) < 2) {
787         return;
788     }
789
790     // First grab as many free Capabilities as we can.
791     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
792         cap0 = &capabilities[i];
793         if (cap != cap0 && tryGrabCapability(cap0,task)) {
794             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
795                 // it already has some work, we just grabbed it at 
796                 // the wrong moment.  Or maybe it's deadlocked!
797                 releaseCapability(cap0);
798             } else {
799                 free_caps[n_free_caps++] = cap0;
800             }
801         }
802     }
803
804     // we now have n_free_caps free capabilities stashed in
805     // free_caps[].  Share our run queue equally with them.  This is
806     // probably the simplest thing we could do; improvements we might
807     // want to do include:
808     //
809     //   - giving high priority to moving relatively new threads, on 
810     //     the gournds that they haven't had time to build up a
811     //     working set in the cache on this CPU/Capability.
812     //
813     //   - giving low priority to moving long-lived threads
814
815     if (n_free_caps > 0) {
816         StgTSO *prev, *t, *next;
817         rtsBool pushed_to_all;
818
819         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
820
821         i = 0;
822         pushed_to_all = rtsFalse;
823
824         if (cap->run_queue_hd != END_TSO_QUEUE) {
825             prev = cap->run_queue_hd;
826             t = prev->link;
827             prev->link = END_TSO_QUEUE;
828             for (; t != END_TSO_QUEUE; t = next) {
829                 next = t->link;
830                 t->link = END_TSO_QUEUE;
831                 if (t->what_next == ThreadRelocated
832                     || t->bound == task // don't move my bound thread
833                     || tsoLocked(t)) {  // don't move a locked thread
834                     prev->link = t;
835                     prev = t;
836                 } else if (i == n_free_caps) {
837                     pushed_to_all = rtsTrue;
838                     i = 0;
839                     // keep one for us
840                     prev->link = t;
841                     prev = t;
842                 } else {
843                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
844                     appendToRunQueue(free_caps[i],t);
845                     if (t->bound) { t->bound->cap = free_caps[i]; }
846                     t->cap = free_caps[i];
847                     i++;
848                 }
849             }
850             cap->run_queue_tl = prev;
851         }
852
853         // If there are some free capabilities that we didn't push any
854         // threads to, then try to push a spark to each one.
855         if (!pushed_to_all) {
856             StgClosure *spark;
857             // i is the next free capability to push to
858             for (; i < n_free_caps; i++) {
859                 if (emptySparkPoolCap(free_caps[i])) {
860                     spark = findSpark(cap);
861                     if (spark != NULL) {
862                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
863                         newSpark(&(free_caps[i]->r), spark);
864                     }
865                 }
866             }
867         }
868
869         // release the capabilities
870         for (i = 0; i < n_free_caps; i++) {
871             task->cap = free_caps[i];
872             releaseCapability(free_caps[i]);
873         }
874     }
875     task->cap = cap; // reset to point to our Capability.
876 }
877 #endif
878
879 /* ----------------------------------------------------------------------------
880  * Start any pending signal handlers
881  * ------------------------------------------------------------------------- */
882
883 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
884 static void
885 scheduleStartSignalHandlers(Capability *cap)
886 {
887     if (signals_pending()) { // safe outside the lock
888         startSignalHandlers(cap);
889     }
890 }
891 #else
892 static void
893 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
894 {
895 }
896 #endif
897
898 /* ----------------------------------------------------------------------------
899  * Check for blocked threads that can be woken up.
900  * ------------------------------------------------------------------------- */
901
902 static void
903 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
904 {
905 #if !defined(THREADED_RTS)
906     //
907     // Check whether any waiting threads need to be woken up.  If the
908     // run queue is empty, and there are no other tasks running, we
909     // can wait indefinitely for something to happen.
910     //
911     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
912     {
913         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
914     }
915 #endif
916 }
917
918
919 /* ----------------------------------------------------------------------------
920  * Check for threads woken up by other Capabilities
921  * ------------------------------------------------------------------------- */
922
923 static void
924 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
925 {
926 #if defined(THREADED_RTS)
927     // Any threads that were woken up by other Capabilities get
928     // appended to our run queue.
929     if (!emptyWakeupQueue(cap)) {
930         ACQUIRE_LOCK(&cap->lock);
931         if (emptyRunQueue(cap)) {
932             cap->run_queue_hd = cap->wakeup_queue_hd;
933             cap->run_queue_tl = cap->wakeup_queue_tl;
934         } else {
935             cap->run_queue_tl->link = cap->wakeup_queue_hd;
936             cap->run_queue_tl = cap->wakeup_queue_tl;
937         }
938         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
939         RELEASE_LOCK(&cap->lock);
940     }
941 #endif
942 }
943
944 /* ----------------------------------------------------------------------------
945  * Check for threads blocked on BLACKHOLEs that can be woken up
946  * ------------------------------------------------------------------------- */
947 static void
948 scheduleCheckBlackHoles (Capability *cap)
949 {
950     if ( blackholes_need_checking ) // check without the lock first
951     {
952         ACQUIRE_LOCK(&sched_mutex);
953         if ( blackholes_need_checking ) {
954             checkBlackHoles(cap);
955             blackholes_need_checking = rtsFalse;
956         }
957         RELEASE_LOCK(&sched_mutex);
958     }
959 }
960
961 /* ----------------------------------------------------------------------------
962  * Detect deadlock conditions and attempt to resolve them.
963  * ------------------------------------------------------------------------- */
964
965 static void
966 scheduleDetectDeadlock (Capability *cap, Task *task)
967 {
968
969 #if defined(PARALLEL_HASKELL)
970     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
971     return;
972 #endif
973
974     /* 
975      * Detect deadlock: when we have no threads to run, there are no
976      * threads blocked, waiting for I/O, or sleeping, and all the
977      * other tasks are waiting for work, we must have a deadlock of
978      * some description.
979      */
980     if ( emptyThreadQueues(cap) )
981     {
982 #if defined(THREADED_RTS)
983         /* 
984          * In the threaded RTS, we only check for deadlock if there
985          * has been no activity in a complete timeslice.  This means
986          * we won't eagerly start a full GC just because we don't have
987          * any threads to run currently.
988          */
989         if (recent_activity != ACTIVITY_INACTIVE) return;
990 #endif
991
992         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
993
994         // Garbage collection can release some new threads due to
995         // either (a) finalizers or (b) threads resurrected because
996         // they are unreachable and will therefore be sent an
997         // exception.  Any threads thus released will be immediately
998         // runnable.
999         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
1000
1001         recent_activity = ACTIVITY_DONE_GC;
1002         
1003         if ( !emptyRunQueue(cap) ) return;
1004
1005 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
1006         /* If we have user-installed signal handlers, then wait
1007          * for signals to arrive rather then bombing out with a
1008          * deadlock.
1009          */
1010         if ( anyUserHandlers() ) {
1011             IF_DEBUG(scheduler, 
1012                      sched_belch("still deadlocked, waiting for signals..."));
1013
1014             awaitUserSignals();
1015
1016             if (signals_pending()) {
1017                 startSignalHandlers(cap);
1018             }
1019
1020             // either we have threads to run, or we were interrupted:
1021             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1022         }
1023 #endif
1024
1025 #if !defined(THREADED_RTS)
1026         /* Probably a real deadlock.  Send the current main thread the
1027          * Deadlock exception.
1028          */
1029         if (task->tso) {
1030             switch (task->tso->why_blocked) {
1031             case BlockedOnSTM:
1032             case BlockedOnBlackHole:
1033             case BlockedOnException:
1034             case BlockedOnMVar:
1035                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
1036                 return;
1037             default:
1038                 barf("deadlock: main thread blocked in a strange way");
1039             }
1040         }
1041         return;
1042 #endif
1043     }
1044 }
1045
1046 /* ----------------------------------------------------------------------------
1047  * Process an event (GRAN only)
1048  * ------------------------------------------------------------------------- */
1049
1050 #if defined(GRAN)
1051 static StgTSO *
1052 scheduleProcessEvent(rtsEvent *event)
1053 {
1054     StgTSO *t;
1055
1056     if (RtsFlags.GranFlags.Light)
1057       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1058
1059     /* adjust time based on time-stamp */
1060     if (event->time > CurrentTime[CurrentProc] &&
1061         event->evttype != ContinueThread)
1062       CurrentTime[CurrentProc] = event->time;
1063     
1064     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1065     if (!RtsFlags.GranFlags.Light)
1066       handleIdlePEs();
1067
1068     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1069
1070     /* main event dispatcher in GranSim */
1071     switch (event->evttype) {
1072       /* Should just be continuing execution */
1073     case ContinueThread:
1074       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1075       /* ToDo: check assertion
1076       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1077              run_queue_hd != END_TSO_QUEUE);
1078       */
1079       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1080       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1081           procStatus[CurrentProc]==Fetching) {
1082         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1083               CurrentTSO->id, CurrentTSO, CurrentProc);
1084         goto next_thread;
1085       } 
1086       /* Ignore ContinueThreads for completed threads */
1087       if (CurrentTSO->what_next == ThreadComplete) {
1088         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1089               CurrentTSO->id, CurrentTSO, CurrentProc);
1090         goto next_thread;
1091       } 
1092       /* Ignore ContinueThreads for threads that are being migrated */
1093       if (PROCS(CurrentTSO)==Nowhere) { 
1094         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1095               CurrentTSO->id, CurrentTSO, CurrentProc);
1096         goto next_thread;
1097       }
1098       /* The thread should be at the beginning of the run queue */
1099       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1100         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1101               CurrentTSO->id, CurrentTSO, CurrentProc);
1102         break; // run the thread anyway
1103       }
1104       /*
1105       new_event(proc, proc, CurrentTime[proc],
1106                 FindWork,
1107                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1108       goto next_thread; 
1109       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1110       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1111
1112     case FetchNode:
1113       do_the_fetchnode(event);
1114       goto next_thread;             /* handle next event in event queue  */
1115       
1116     case GlobalBlock:
1117       do_the_globalblock(event);
1118       goto next_thread;             /* handle next event in event queue  */
1119       
1120     case FetchReply:
1121       do_the_fetchreply(event);
1122       goto next_thread;             /* handle next event in event queue  */
1123       
1124     case UnblockThread:   /* Move from the blocked queue to the tail of */
1125       do_the_unblock(event);
1126       goto next_thread;             /* handle next event in event queue  */
1127       
1128     case ResumeThread:  /* Move from the blocked queue to the tail of */
1129       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1130       event->tso->gran.blocktime += 
1131         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1132       do_the_startthread(event);
1133       goto next_thread;             /* handle next event in event queue  */
1134       
1135     case StartThread:
1136       do_the_startthread(event);
1137       goto next_thread;             /* handle next event in event queue  */
1138       
1139     case MoveThread:
1140       do_the_movethread(event);
1141       goto next_thread;             /* handle next event in event queue  */
1142       
1143     case MoveSpark:
1144       do_the_movespark(event);
1145       goto next_thread;             /* handle next event in event queue  */
1146       
1147     case FindWork:
1148       do_the_findwork(event);
1149       goto next_thread;             /* handle next event in event queue  */
1150       
1151     default:
1152       barf("Illegal event type %u\n", event->evttype);
1153     }  /* switch */
1154     
1155     /* This point was scheduler_loop in the old RTS */
1156
1157     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1158
1159     TimeOfLastEvent = CurrentTime[CurrentProc];
1160     TimeOfNextEvent = get_time_of_next_event();
1161     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1162     // CurrentTSO = ThreadQueueHd;
1163
1164     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1165                          TimeOfNextEvent));
1166
1167     if (RtsFlags.GranFlags.Light) 
1168       GranSimLight_leave_system(event, &ActiveTSO); 
1169
1170     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1171
1172     IF_DEBUG(gran, 
1173              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1174
1175     /* in a GranSim setup the TSO stays on the run queue */
1176     t = CurrentTSO;
1177     /* Take a thread from the run queue. */
1178     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1179
1180     IF_DEBUG(gran, 
1181              debugBelch("GRAN: About to run current thread, which is\n");
1182              G_TSO(t,5));
1183
1184     context_switch = 0; // turned on via GranYield, checking events and time slice
1185
1186     IF_DEBUG(gran, 
1187              DumpGranEvent(GR_SCHEDULE, t));
1188
1189     procStatus[CurrentProc] = Busy;
1190 }
1191 #endif // GRAN
1192
1193 /* ----------------------------------------------------------------------------
1194  * Send pending messages (PARALLEL_HASKELL only)
1195  * ------------------------------------------------------------------------- */
1196
1197 #if defined(PARALLEL_HASKELL)
1198 static StgTSO *
1199 scheduleSendPendingMessages(void)
1200 {
1201     StgSparkPool *pool;
1202     rtsSpark spark;
1203     StgTSO *t;
1204
1205 # if defined(PAR) // global Mem.Mgmt., omit for now
1206     if (PendingFetches != END_BF_QUEUE) {
1207         processFetches();
1208     }
1209 # endif
1210     
1211     if (RtsFlags.ParFlags.BufferTime) {
1212         // if we use message buffering, we must send away all message
1213         // packets which have become too old...
1214         sendOldBuffers(); 
1215     }
1216 }
1217 #endif
1218
1219 /* ----------------------------------------------------------------------------
1220  * Activate spark threads (PARALLEL_HASKELL only)
1221  * ------------------------------------------------------------------------- */
1222
1223 #if defined(PARALLEL_HASKELL)
1224 static void
1225 scheduleActivateSpark(void)
1226 {
1227 #if defined(SPARKS)
1228   ASSERT(emptyRunQueue());
1229 /* We get here if the run queue is empty and want some work.
1230    We try to turn a spark into a thread, and add it to the run queue,
1231    from where it will be picked up in the next iteration of the scheduler
1232    loop.
1233 */
1234
1235       /* :-[  no local threads => look out for local sparks */
1236       /* the spark pool for the current PE */
1237       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1238       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1239           pool->hd < pool->tl) {
1240         /* 
1241          * ToDo: add GC code check that we really have enough heap afterwards!!
1242          * Old comment:
1243          * If we're here (no runnable threads) and we have pending
1244          * sparks, we must have a space problem.  Get enough space
1245          * to turn one of those pending sparks into a
1246          * thread... 
1247          */
1248
1249         spark = findSpark(rtsFalse);            /* get a spark */
1250         if (spark != (rtsSpark) NULL) {
1251           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1252           IF_PAR_DEBUG(fish, // schedule,
1253                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1254                              tso->id, tso, advisory_thread_count));
1255
1256           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1257             IF_PAR_DEBUG(fish, // schedule,
1258                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1259                             spark));
1260             return rtsFalse; /* failed to generate a thread */
1261           }                  /* otherwise fall through & pick-up new tso */
1262         } else {
1263           IF_PAR_DEBUG(fish, // schedule,
1264                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1265                              spark_queue_len(pool)));
1266           return rtsFalse;  /* failed to generate a thread */
1267         }
1268         return rtsTrue;  /* success in generating a thread */
1269   } else { /* no more threads permitted or pool empty */
1270     return rtsFalse;  /* failed to generateThread */
1271   }
1272 #else
1273   tso = NULL; // avoid compiler warning only
1274   return rtsFalse;  /* dummy in non-PAR setup */
1275 #endif // SPARKS
1276 }
1277 #endif // PARALLEL_HASKELL
1278
1279 /* ----------------------------------------------------------------------------
1280  * Get work from a remote node (PARALLEL_HASKELL only)
1281  * ------------------------------------------------------------------------- */
1282     
1283 #if defined(PARALLEL_HASKELL)
1284 static rtsBool
1285 scheduleGetRemoteWork(rtsBool *receivedFinish)
1286 {
1287   ASSERT(emptyRunQueue());
1288
1289   if (RtsFlags.ParFlags.BufferTime) {
1290         IF_PAR_DEBUG(verbose, 
1291                 debugBelch("...send all pending data,"));
1292         {
1293           nat i;
1294           for (i=1; i<=nPEs; i++)
1295             sendImmediately(i); // send all messages away immediately
1296         }
1297   }
1298 # ifndef SPARKS
1299         //++EDEN++ idle() , i.e. send all buffers, wait for work
1300         // suppress fishing in EDEN... just look for incoming messages
1301         // (blocking receive)
1302   IF_PAR_DEBUG(verbose, 
1303                debugBelch("...wait for incoming messages...\n"));
1304   *receivedFinish = processMessages(); // blocking receive...
1305
1306         // and reenter scheduling loop after having received something
1307         // (return rtsFalse below)
1308
1309 # else /* activate SPARKS machinery */
1310 /* We get here, if we have no work, tried to activate a local spark, but still
1311    have no work. We try to get a remote spark, by sending a FISH message.
1312    Thread migration should be added here, and triggered when a sequence of 
1313    fishes returns without work. */
1314         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1315
1316       /* =8-[  no local sparks => look for work on other PEs */
1317         /*
1318          * We really have absolutely no work.  Send out a fish
1319          * (there may be some out there already), and wait for
1320          * something to arrive.  We clearly can't run any threads
1321          * until a SCHEDULE or RESUME arrives, and so that's what
1322          * we're hoping to see.  (Of course, we still have to
1323          * respond to other types of messages.)
1324          */
1325         rtsTime now = msTime() /*CURRENT_TIME*/;
1326         IF_PAR_DEBUG(verbose, 
1327                      debugBelch("--  now=%ld\n", now));
1328         IF_PAR_DEBUG(fish, // verbose,
1329              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1330                  (last_fish_arrived_at!=0 &&
1331                   last_fish_arrived_at+delay > now)) {
1332                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1333                      now, last_fish_arrived_at+delay, 
1334                      last_fish_arrived_at,
1335                      delay);
1336              });
1337   
1338         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1339             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1340           if (last_fish_arrived_at==0 ||
1341               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1342             /* outstandingFishes is set in sendFish, processFish;
1343                avoid flooding system with fishes via delay */
1344     next_fish_to_send_at = 0;  
1345   } else {
1346     /* ToDo: this should be done in the main scheduling loop to avoid the
1347              busy wait here; not so bad if fish delay is very small  */
1348     int iq = 0; // DEBUGGING -- HWL
1349     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1350     /* send a fish when ready, but process messages that arrive in the meantime */
1351     do {
1352       if (PacketsWaiting()) {
1353         iq++; // DEBUGGING
1354         *receivedFinish = processMessages();
1355       }
1356       now = msTime();
1357     } while (!*receivedFinish || now<next_fish_to_send_at);
1358     // JB: This means the fish could become obsolete, if we receive
1359     // work. Better check for work again? 
1360     // last line: while (!receivedFinish || !haveWork || now<...)
1361     // next line: if (receivedFinish || haveWork )
1362
1363     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1364       return rtsFalse;  // NB: this will leave scheduler loop
1365                         // immediately after return!
1366                           
1367     IF_PAR_DEBUG(fish, // verbose,
1368                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1369
1370   }
1371
1372     // JB: IMHO, this should all be hidden inside sendFish(...)
1373     /* pe = choosePE(); 
1374        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1375                 NEW_FISH_HUNGER);
1376
1377     // Global statistics: count no. of fishes
1378     if (RtsFlags.ParFlags.ParStats.Global &&
1379          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1380            globalParStats.tot_fish_mess++;
1381            }
1382     */ 
1383
1384   /* delayed fishes must have been sent by now! */
1385   next_fish_to_send_at = 0;  
1386   }
1387       
1388   *receivedFinish = processMessages();
1389 # endif /* SPARKS */
1390
1391  return rtsFalse;
1392  /* NB: this function always returns rtsFalse, meaning the scheduler
1393     loop continues with the next iteration; 
1394     rationale: 
1395       return code means success in finding work; we enter this function
1396       if there is no local work, thus have to send a fish which takes
1397       time until it arrives with work; in the meantime we should process
1398       messages in the main loop;
1399  */
1400 }
1401 #endif // PARALLEL_HASKELL
1402
1403 /* ----------------------------------------------------------------------------
1404  * PAR/GRAN: Report stats & debugging info(?)
1405  * ------------------------------------------------------------------------- */
1406
1407 #if defined(PAR) || defined(GRAN)
1408 static void
1409 scheduleGranParReport(void)
1410 {
1411   ASSERT(run_queue_hd != END_TSO_QUEUE);
1412
1413   /* Take a thread from the run queue, if we have work */
1414   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1415
1416     /* If this TSO has got its outport closed in the meantime, 
1417      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1418      * It has to be marked as TH_DEAD for this purpose.
1419      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1420
1421 JB: TODO: investigate wether state change field could be nuked
1422      entirely and replaced by the normal tso state (whatnext
1423      field). All we want to do is to kill tsos from outside.
1424      */
1425
1426     /* ToDo: write something to the log-file
1427     if (RTSflags.ParFlags.granSimStats && !sameThread)
1428         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1429
1430     CurrentTSO = t;
1431     */
1432     /* the spark pool for the current PE */
1433     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1434
1435     IF_DEBUG(scheduler, 
1436              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1437                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1438
1439     IF_PAR_DEBUG(fish,
1440              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1441                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1442
1443     if (RtsFlags.ParFlags.ParStats.Full && 
1444         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1445         (emitSchedule || // forced emit
1446          (t && LastTSO && t->id != LastTSO->id))) {
1447       /* 
1448          we are running a different TSO, so write a schedule event to log file
1449          NB: If we use fair scheduling we also have to write  a deschedule 
1450              event for LastTSO; with unfair scheduling we know that the
1451              previous tso has blocked whenever we switch to another tso, so
1452              we don't need it in GUM for now
1453       */
1454       IF_PAR_DEBUG(fish, // schedule,
1455                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1456
1457       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1458                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1459       emitSchedule = rtsFalse;
1460     }
1461 }     
1462 #endif
1463
1464 /* ----------------------------------------------------------------------------
1465  * After running a thread...
1466  * ------------------------------------------------------------------------- */
1467
1468 static void
1469 schedulePostRunThread(void)
1470 {
1471 #if defined(PAR)
1472     /* HACK 675: if the last thread didn't yield, make sure to print a 
1473        SCHEDULE event to the log file when StgRunning the next thread, even
1474        if it is the same one as before */
1475     LastTSO = t; 
1476     TimeOfLastYield = CURRENT_TIME;
1477 #endif
1478
1479   /* some statistics gathering in the parallel case */
1480
1481 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1482   switch (ret) {
1483     case HeapOverflow:
1484 # if defined(GRAN)
1485       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1486       globalGranStats.tot_heapover++;
1487 # elif defined(PAR)
1488       globalParStats.tot_heapover++;
1489 # endif
1490       break;
1491
1492      case StackOverflow:
1493 # if defined(GRAN)
1494       IF_DEBUG(gran, 
1495                DumpGranEvent(GR_DESCHEDULE, t));
1496       globalGranStats.tot_stackover++;
1497 # elif defined(PAR)
1498       // IF_DEBUG(par, 
1499       // DumpGranEvent(GR_DESCHEDULE, t);
1500       globalParStats.tot_stackover++;
1501 # endif
1502       break;
1503
1504     case ThreadYielding:
1505 # if defined(GRAN)
1506       IF_DEBUG(gran, 
1507                DumpGranEvent(GR_DESCHEDULE, t));
1508       globalGranStats.tot_yields++;
1509 # elif defined(PAR)
1510       // IF_DEBUG(par, 
1511       // DumpGranEvent(GR_DESCHEDULE, t);
1512       globalParStats.tot_yields++;
1513 # endif
1514       break; 
1515
1516     case ThreadBlocked:
1517 # if defined(GRAN)
1518       IF_DEBUG(scheduler,
1519                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1520                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1521                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1522                if (t->block_info.closure!=(StgClosure*)NULL)
1523                  print_bq(t->block_info.closure);
1524                debugBelch("\n"));
1525
1526       // ??? needed; should emit block before
1527       IF_DEBUG(gran, 
1528                DumpGranEvent(GR_DESCHEDULE, t)); 
1529       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1530       /*
1531         ngoq Dogh!
1532       ASSERT(procStatus[CurrentProc]==Busy || 
1533               ((procStatus[CurrentProc]==Fetching) && 
1534               (t->block_info.closure!=(StgClosure*)NULL)));
1535       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1536           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1537             procStatus[CurrentProc]==Fetching)) 
1538         procStatus[CurrentProc] = Idle;
1539       */
1540 # elif defined(PAR)
1541 //++PAR++  blockThread() writes the event (change?)
1542 # endif
1543     break;
1544
1545   case ThreadFinished:
1546     break;
1547
1548   default:
1549     barf("parGlobalStats: unknown return code");
1550     break;
1551     }
1552 #endif
1553 }
1554
1555 /* -----------------------------------------------------------------------------
1556  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1557  * -------------------------------------------------------------------------- */
1558
1559 static rtsBool
1560 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1561 {
1562     // did the task ask for a large block?
1563     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1564         // if so, get one and push it on the front of the nursery.
1565         bdescr *bd;
1566         lnat blocks;
1567         
1568         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1569         
1570         IF_DEBUG(scheduler,
1571                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1572                             (long)t->id, whatNext_strs[t->what_next], blocks));
1573         
1574         // don't do this if the nursery is (nearly) full, we'll GC first.
1575         if (cap->r.rCurrentNursery->link != NULL ||
1576             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1577                                                // if the nursery has only one block.
1578             
1579             ACQUIRE_SM_LOCK
1580             bd = allocGroup( blocks );
1581             RELEASE_SM_LOCK
1582             cap->r.rNursery->n_blocks += blocks;
1583             
1584             // link the new group into the list
1585             bd->link = cap->r.rCurrentNursery;
1586             bd->u.back = cap->r.rCurrentNursery->u.back;
1587             if (cap->r.rCurrentNursery->u.back != NULL) {
1588                 cap->r.rCurrentNursery->u.back->link = bd;
1589             } else {
1590 #if !defined(THREADED_RTS)
1591                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1592                        g0s0 == cap->r.rNursery);
1593 #endif
1594                 cap->r.rNursery->blocks = bd;
1595             }             
1596             cap->r.rCurrentNursery->u.back = bd;
1597             
1598             // initialise it as a nursery block.  We initialise the
1599             // step, gen_no, and flags field of *every* sub-block in
1600             // this large block, because this is easier than making
1601             // sure that we always find the block head of a large
1602             // block whenever we call Bdescr() (eg. evacuate() and
1603             // isAlive() in the GC would both have to do this, at
1604             // least).
1605             { 
1606                 bdescr *x;
1607                 for (x = bd; x < bd + blocks; x++) {
1608                     x->step = cap->r.rNursery;
1609                     x->gen_no = 0;
1610                     x->flags = 0;
1611                 }
1612             }
1613             
1614             // This assert can be a killer if the app is doing lots
1615             // of large block allocations.
1616             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1617             
1618             // now update the nursery to point to the new block
1619             cap->r.rCurrentNursery = bd;
1620             
1621             // we might be unlucky and have another thread get on the
1622             // run queue before us and steal the large block, but in that
1623             // case the thread will just end up requesting another large
1624             // block.
1625             pushOnRunQueue(cap,t);
1626             return rtsFalse;  /* not actually GC'ing */
1627         }
1628     }
1629     
1630     IF_DEBUG(scheduler,
1631              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1632                         (long)t->id, whatNext_strs[t->what_next]));
1633 #if defined(GRAN)
1634     ASSERT(!is_on_queue(t,CurrentProc));
1635 #elif defined(PARALLEL_HASKELL)
1636     /* Currently we emit a DESCHEDULE event before GC in GUM.
1637        ToDo: either add separate event to distinguish SYSTEM time from rest
1638        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1639     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1640         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1641                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1642         emitSchedule = rtsTrue;
1643     }
1644 #endif
1645       
1646     pushOnRunQueue(cap,t);
1647     return rtsTrue;
1648     /* actual GC is done at the end of the while loop in schedule() */
1649 }
1650
1651 /* -----------------------------------------------------------------------------
1652  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1653  * -------------------------------------------------------------------------- */
1654
1655 static void
1656 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1657 {
1658     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1659                                   (long)t->id, whatNext_strs[t->what_next]));
1660     /* just adjust the stack for this thread, then pop it back
1661      * on the run queue.
1662      */
1663     { 
1664         /* enlarge the stack */
1665         StgTSO *new_t = threadStackOverflow(cap, t);
1666         
1667         /* The TSO attached to this Task may have moved, so update the
1668          * pointer to it.
1669          */
1670         if (task->tso == t) {
1671             task->tso = new_t;
1672         }
1673         pushOnRunQueue(cap,new_t);
1674     }
1675 }
1676
1677 /* -----------------------------------------------------------------------------
1678  * Handle a thread that returned to the scheduler with ThreadYielding
1679  * -------------------------------------------------------------------------- */
1680
1681 static rtsBool
1682 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1683 {
1684     // Reset the context switch flag.  We don't do this just before
1685     // running the thread, because that would mean we would lose ticks
1686     // during GC, which can lead to unfair scheduling (a thread hogs
1687     // the CPU because the tick always arrives during GC).  This way
1688     // penalises threads that do a lot of allocation, but that seems
1689     // better than the alternative.
1690     context_switch = 0;
1691     
1692     /* put the thread back on the run queue.  Then, if we're ready to
1693      * GC, check whether this is the last task to stop.  If so, wake
1694      * up the GC thread.  getThread will block during a GC until the
1695      * GC is finished.
1696      */
1697     IF_DEBUG(scheduler,
1698              if (t->what_next != prev_what_next) {
1699                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1700                             (long)t->id, whatNext_strs[t->what_next]);
1701              } else {
1702                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1703                             (long)t->id, whatNext_strs[t->what_next]);
1704              }
1705         );
1706     
1707     IF_DEBUG(sanity,
1708              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1709              checkTSO(t));
1710     ASSERT(t->link == END_TSO_QUEUE);
1711     
1712     // Shortcut if we're just switching evaluators: don't bother
1713     // doing stack squeezing (which can be expensive), just run the
1714     // thread.
1715     if (t->what_next != prev_what_next) {
1716         return rtsTrue;
1717     }
1718     
1719 #if defined(GRAN)
1720     ASSERT(!is_on_queue(t,CurrentProc));
1721       
1722     IF_DEBUG(sanity,
1723              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1724              checkThreadQsSanity(rtsTrue));
1725
1726 #endif
1727
1728     addToRunQueue(cap,t);
1729
1730 #if defined(GRAN)
1731     /* add a ContinueThread event to actually process the thread */
1732     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1733               ContinueThread,
1734               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1735     IF_GRAN_DEBUG(bq, 
1736                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1737                   G_EVENTQ(0);
1738                   G_CURR_THREADQ(0));
1739 #endif
1740     return rtsFalse;
1741 }
1742
1743 /* -----------------------------------------------------------------------------
1744  * Handle a thread that returned to the scheduler with ThreadBlocked
1745  * -------------------------------------------------------------------------- */
1746
1747 static void
1748 scheduleHandleThreadBlocked( StgTSO *t
1749 #if !defined(GRAN) && !defined(DEBUG)
1750     STG_UNUSED
1751 #endif
1752     )
1753 {
1754 #if defined(GRAN)
1755     IF_DEBUG(scheduler,
1756              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1757                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1758              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1759     
1760     // ??? needed; should emit block before
1761     IF_DEBUG(gran, 
1762              DumpGranEvent(GR_DESCHEDULE, t)); 
1763     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1764     /*
1765       ngoq Dogh!
1766       ASSERT(procStatus[CurrentProc]==Busy || 
1767       ((procStatus[CurrentProc]==Fetching) && 
1768       (t->block_info.closure!=(StgClosure*)NULL)));
1769       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1770       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1771       procStatus[CurrentProc]==Fetching)) 
1772       procStatus[CurrentProc] = Idle;
1773     */
1774 #elif defined(PAR)
1775     IF_DEBUG(scheduler,
1776              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1777                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1778     IF_PAR_DEBUG(bq,
1779                  
1780                  if (t->block_info.closure!=(StgClosure*)NULL) 
1781                  print_bq(t->block_info.closure));
1782     
1783     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1784     blockThread(t);
1785     
1786     /* whatever we schedule next, we must log that schedule */
1787     emitSchedule = rtsTrue;
1788     
1789 #else /* !GRAN */
1790
1791       // We don't need to do anything.  The thread is blocked, and it
1792       // has tidied up its stack and placed itself on whatever queue
1793       // it needs to be on.
1794
1795 #if !defined(THREADED_RTS)
1796     ASSERT(t->why_blocked != NotBlocked);
1797              // This might not be true under THREADED_RTS: we don't have
1798              // exclusive access to this TSO, so someone might have
1799              // woken it up by now.  This actually happens: try
1800              // conc023 +RTS -N2.
1801 #endif
1802
1803     IF_DEBUG(scheduler,
1804              debugBelch("--<< thread %d (%s) stopped: ", 
1805                         t->id, whatNext_strs[t->what_next]);
1806              printThreadBlockage(t);
1807              debugBelch("\n"));
1808     
1809     /* Only for dumping event to log file 
1810        ToDo: do I need this in GranSim, too?
1811        blockThread(t);
1812     */
1813 #endif
1814 }
1815
1816 /* -----------------------------------------------------------------------------
1817  * Handle a thread that returned to the scheduler with ThreadFinished
1818  * -------------------------------------------------------------------------- */
1819
1820 static rtsBool
1821 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1822 {
1823     /* Need to check whether this was a main thread, and if so,
1824      * return with the return value.
1825      *
1826      * We also end up here if the thread kills itself with an
1827      * uncaught exception, see Exception.cmm.
1828      */
1829     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1830                                   t->id, whatNext_strs[t->what_next]));
1831
1832 #if defined(GRAN)
1833       endThread(t, CurrentProc); // clean-up the thread
1834 #elif defined(PARALLEL_HASKELL)
1835       /* For now all are advisory -- HWL */
1836       //if(t->priority==AdvisoryPriority) ??
1837       advisory_thread_count--; // JB: Caution with this counter, buggy!
1838       
1839 # if defined(DIST)
1840       if(t->dist.priority==RevalPriority)
1841         FinishReval(t);
1842 # endif
1843     
1844 # if defined(EDENOLD)
1845       // the thread could still have an outport... (BUG)
1846       if (t->eden.outport != -1) {
1847       // delete the outport for the tso which has finished...
1848         IF_PAR_DEBUG(eden_ports,
1849                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1850                               t->eden.outport, t->id));
1851         deleteOPT(t);
1852       }
1853       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1854       if (t->eden.epid != -1) {
1855         IF_PAR_DEBUG(eden_ports,
1856                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1857                            t->id, t->eden.epid));
1858         removeTSOfromProcess(t);
1859       }
1860 # endif 
1861
1862 # if defined(PAR)
1863       if (RtsFlags.ParFlags.ParStats.Full &&
1864           !RtsFlags.ParFlags.ParStats.Suppressed) 
1865         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1866
1867       //  t->par only contains statistics: left out for now...
1868       IF_PAR_DEBUG(fish,
1869                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1870                               t->id,t,t->par.sparkname));
1871 # endif
1872 #endif // PARALLEL_HASKELL
1873
1874       //
1875       // Check whether the thread that just completed was a bound
1876       // thread, and if so return with the result.  
1877       //
1878       // There is an assumption here that all thread completion goes
1879       // through this point; we need to make sure that if a thread
1880       // ends up in the ThreadKilled state, that it stays on the run
1881       // queue so it can be dealt with here.
1882       //
1883
1884       if (t->bound) {
1885
1886           if (t->bound != task) {
1887 #if !defined(THREADED_RTS)
1888               // Must be a bound thread that is not the topmost one.  Leave
1889               // it on the run queue until the stack has unwound to the
1890               // point where we can deal with this.  Leaving it on the run
1891               // queue also ensures that the garbage collector knows about
1892               // this thread and its return value (it gets dropped from the
1893               // all_threads list so there's no other way to find it).
1894               appendToRunQueue(cap,t);
1895               return rtsFalse;
1896 #else
1897               // this cannot happen in the threaded RTS, because a
1898               // bound thread can only be run by the appropriate Task.
1899               barf("finished bound thread that isn't mine");
1900 #endif
1901           }
1902
1903           ASSERT(task->tso == t);
1904
1905           if (t->what_next == ThreadComplete) {
1906               if (task->ret) {
1907                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1908                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1909               }
1910               task->stat = Success;
1911           } else {
1912               if (task->ret) {
1913                   *(task->ret) = NULL;
1914               }
1915               if (sched_state >= SCHED_INTERRUPTING) {
1916                   task->stat = Interrupted;
1917               } else {
1918                   task->stat = Killed;
1919               }
1920           }
1921 #ifdef DEBUG
1922           removeThreadLabel((StgWord)task->tso->id);
1923 #endif
1924           return rtsTrue; // tells schedule() to return
1925       }
1926
1927       return rtsFalse;
1928 }
1929
1930 /* -----------------------------------------------------------------------------
1931  * Perform a heap census, if PROFILING
1932  * -------------------------------------------------------------------------- */
1933
1934 static rtsBool
1935 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1936 {
1937 #if defined(PROFILING)
1938     // When we have +RTS -i0 and we're heap profiling, do a census at
1939     // every GC.  This lets us get repeatable runs for debugging.
1940     if (performHeapProfile ||
1941         (RtsFlags.ProfFlags.profileInterval==0 &&
1942          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1943
1944         // checking black holes is necessary before GC, otherwise
1945         // there may be threads that are unreachable except by the
1946         // blackhole queue, which the GC will consider to be
1947         // deadlocked.
1948         scheduleCheckBlackHoles(&MainCapability);
1949
1950         IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
1951         GarbageCollect(GetRoots, rtsTrue);
1952
1953         IF_DEBUG(scheduler, sched_belch("performing heap census"));
1954         heapCensus();
1955
1956         performHeapProfile = rtsFalse;
1957         return rtsTrue;  // true <=> we already GC'd
1958     }
1959 #endif
1960     return rtsFalse;
1961 }
1962
1963 /* -----------------------------------------------------------------------------
1964  * Perform a garbage collection if necessary
1965  * -------------------------------------------------------------------------- */
1966
1967 static Capability *
1968 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1969               rtsBool force_major, void (*get_roots)(evac_fn))
1970 {
1971     StgTSO *t;
1972 #ifdef THREADED_RTS
1973     static volatile StgWord waiting_for_gc;
1974     rtsBool was_waiting;
1975     nat i;
1976 #endif
1977
1978 #ifdef THREADED_RTS
1979     // In order to GC, there must be no threads running Haskell code.
1980     // Therefore, the GC thread needs to hold *all* the capabilities,
1981     // and release them after the GC has completed.  
1982     //
1983     // This seems to be the simplest way: previous attempts involved
1984     // making all the threads with capabilities give up their
1985     // capabilities and sleep except for the *last* one, which
1986     // actually did the GC.  But it's quite hard to arrange for all
1987     // the other tasks to sleep and stay asleep.
1988     //
1989         
1990     was_waiting = cas(&waiting_for_gc, 0, 1);
1991     if (was_waiting) {
1992         do {
1993             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1994             if (cap) yieldCapability(&cap,task);
1995         } while (waiting_for_gc);
1996         return cap;  // NOTE: task->cap might have changed here
1997     }
1998
1999     for (i=0; i < n_capabilities; i++) {
2000         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
2001         if (cap != &capabilities[i]) {
2002             Capability *pcap = &capabilities[i];
2003             // we better hope this task doesn't get migrated to
2004             // another Capability while we're waiting for this one.
2005             // It won't, because load balancing happens while we have
2006             // all the Capabilities, but even so it's a slightly
2007             // unsavoury invariant.
2008             task->cap = pcap;
2009             context_switch = 1;
2010             waitForReturnCapability(&pcap, task);
2011             if (pcap != &capabilities[i]) {
2012                 barf("scheduleDoGC: got the wrong capability");
2013             }
2014         }
2015     }
2016
2017     waiting_for_gc = rtsFalse;
2018 #endif
2019
2020     /* Kick any transactions which are invalid back to their
2021      * atomically frames.  When next scheduled they will try to
2022      * commit, this commit will fail and they will retry.
2023      */
2024     { 
2025         StgTSO *next;
2026
2027         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2028             if (t->what_next == ThreadRelocated) {
2029                 next = t->link;
2030             } else {
2031                 next = t->global_link;
2032                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2033                     if (!stmValidateNestOfTransactions (t -> trec)) {
2034                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
2035                         
2036                         // strip the stack back to the
2037                         // ATOMICALLY_FRAME, aborting the (nested)
2038                         // transaction, and saving the stack of any
2039                         // partially-evaluated thunks on the heap.
2040                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
2041                         
2042 #ifdef REG_R1
2043                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2044 #endif
2045                     }
2046                 }
2047             }
2048         }
2049     }
2050     
2051     // so this happens periodically:
2052     if (cap) scheduleCheckBlackHoles(cap);
2053     
2054     IF_DEBUG(scheduler, printAllThreads());
2055
2056     /*
2057      * We now have all the capabilities; if we're in an interrupting
2058      * state, then we should take the opportunity to delete all the
2059      * threads in the system.
2060      */
2061     if (sched_state >= SCHED_INTERRUPTING) {
2062         deleteAllThreads(&capabilities[0]);
2063         sched_state = SCHED_INTERRUPTED;
2064     }
2065
2066     /* everybody back, start the GC.
2067      * Could do it in this thread, or signal a condition var
2068      * to do it in another thread.  Either way, we need to
2069      * broadcast on gc_pending_cond afterward.
2070      */
2071 #if defined(THREADED_RTS)
2072     IF_DEBUG(scheduler,sched_belch("doing GC"));
2073 #endif
2074     GarbageCollect(get_roots, force_major);
2075     
2076 #if defined(THREADED_RTS)
2077     // release our stash of capabilities.
2078     for (i = 0; i < n_capabilities; i++) {
2079         if (cap != &capabilities[i]) {
2080             task->cap = &capabilities[i];
2081             releaseCapability(&capabilities[i]);
2082         }
2083     }
2084     if (cap) {
2085         task->cap = cap;
2086     } else {
2087         task->cap = NULL;
2088     }
2089 #endif
2090
2091 #if defined(GRAN)
2092     /* add a ContinueThread event to continue execution of current thread */
2093     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2094               ContinueThread,
2095               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2096     IF_GRAN_DEBUG(bq, 
2097                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2098                   G_EVENTQ(0);
2099                   G_CURR_THREADQ(0));
2100 #endif /* GRAN */
2101
2102     return cap;
2103 }
2104
2105 /* ---------------------------------------------------------------------------
2106  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2107  * used by Control.Concurrent for error checking.
2108  * ------------------------------------------------------------------------- */
2109  
2110 StgBool
2111 rtsSupportsBoundThreads(void)
2112 {
2113 #if defined(THREADED_RTS)
2114   return rtsTrue;
2115 #else
2116   return rtsFalse;
2117 #endif
2118 }
2119
2120 /* ---------------------------------------------------------------------------
2121  * isThreadBound(tso): check whether tso is bound to an OS thread.
2122  * ------------------------------------------------------------------------- */
2123  
2124 StgBool
2125 isThreadBound(StgTSO* tso USED_IF_THREADS)
2126 {
2127 #if defined(THREADED_RTS)
2128   return (tso->bound != NULL);
2129 #endif
2130   return rtsFalse;
2131 }
2132
2133 /* ---------------------------------------------------------------------------
2134  * Singleton fork(). Do not copy any running threads.
2135  * ------------------------------------------------------------------------- */
2136
2137 #if !defined(mingw32_HOST_OS)
2138 #define FORKPROCESS_PRIMOP_SUPPORTED
2139 #endif
2140
2141 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2142 static void 
2143 deleteThread_(Capability *cap, StgTSO *tso);
2144 #endif
2145 StgInt
2146 forkProcess(HsStablePtr *entry
2147 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2148             STG_UNUSED
2149 #endif
2150            )
2151 {
2152 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2153     Task *task;
2154     pid_t pid;
2155     StgTSO* t,*next;
2156     Capability *cap;
2157     
2158 #if defined(THREADED_RTS)
2159     if (RtsFlags.ParFlags.nNodes > 1) {
2160         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2161         stg_exit(EXIT_FAILURE);
2162     }
2163 #endif
2164
2165     IF_DEBUG(scheduler,sched_belch("forking!"));
2166     
2167     // ToDo: for SMP, we should probably acquire *all* the capabilities
2168     cap = rts_lock();
2169     
2170     pid = fork();
2171     
2172     if (pid) { // parent
2173         
2174         // just return the pid
2175         rts_unlock(cap);
2176         return pid;
2177         
2178     } else { // child
2179         
2180         // Now, all OS threads except the thread that forked are
2181         // stopped.  We need to stop all Haskell threads, including
2182         // those involved in foreign calls.  Also we need to delete
2183         // all Tasks, because they correspond to OS threads that are
2184         // now gone.
2185
2186         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2187             if (t->what_next == ThreadRelocated) {
2188                 next = t->link;
2189             } else {
2190                 next = t->global_link;
2191                 // don't allow threads to catch the ThreadKilled
2192                 // exception, but we do want to raiseAsync() because these
2193                 // threads may be evaluating thunks that we need later.
2194                 deleteThread_(cap,t);
2195             }
2196         }
2197         
2198         // Empty the run queue.  It seems tempting to let all the
2199         // killed threads stay on the run queue as zombies to be
2200         // cleaned up later, but some of them correspond to bound
2201         // threads for which the corresponding Task does not exist.
2202         cap->run_queue_hd = END_TSO_QUEUE;
2203         cap->run_queue_tl = END_TSO_QUEUE;
2204
2205         // Any suspended C-calling Tasks are no more, their OS threads
2206         // don't exist now:
2207         cap->suspended_ccalling_tasks = NULL;
2208
2209         // Empty the all_threads list.  Otherwise, the garbage
2210         // collector may attempt to resurrect some of these threads.
2211         all_threads = END_TSO_QUEUE;
2212
2213         // Wipe the task list, except the current Task.
2214         ACQUIRE_LOCK(&sched_mutex);
2215         for (task = all_tasks; task != NULL; task=task->all_link) {
2216             if (task != cap->running_task) {
2217                 discardTask(task);
2218             }
2219         }
2220         RELEASE_LOCK(&sched_mutex);
2221
2222 #if defined(THREADED_RTS)
2223         // Wipe our spare workers list, they no longer exist.  New
2224         // workers will be created if necessary.
2225         cap->spare_workers = NULL;
2226         cap->returning_tasks_hd = NULL;
2227         cap->returning_tasks_tl = NULL;
2228 #endif
2229
2230         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2231         rts_checkSchedStatus("forkProcess",cap);
2232         
2233         rts_unlock(cap);
2234         hs_exit();                      // clean up and exit
2235         stg_exit(EXIT_SUCCESS);
2236     }
2237 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2238     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2239     return -1;
2240 #endif
2241 }
2242
2243 /* ---------------------------------------------------------------------------
2244  * Delete all the threads in the system
2245  * ------------------------------------------------------------------------- */
2246    
2247 static void
2248 deleteAllThreads ( Capability *cap )
2249 {
2250   StgTSO* t, *next;
2251   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2252   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2253       if (t->what_next == ThreadRelocated) {
2254           next = t->link;
2255       } else {
2256           next = t->global_link;
2257           deleteThread(cap,t);
2258       }
2259   }      
2260
2261   // The run queue now contains a bunch of ThreadKilled threads.  We
2262   // must not throw these away: the main thread(s) will be in there
2263   // somewhere, and the main scheduler loop has to deal with it.
2264   // Also, the run queue is the only thing keeping these threads from
2265   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2266
2267 #if !defined(THREADED_RTS)
2268   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2269   ASSERT(sleeping_queue == END_TSO_QUEUE);
2270 #endif
2271 }
2272
2273 /* -----------------------------------------------------------------------------
2274    Managing the suspended_ccalling_tasks list.
2275    Locks required: sched_mutex
2276    -------------------------------------------------------------------------- */
2277
2278 STATIC_INLINE void
2279 suspendTask (Capability *cap, Task *task)
2280 {
2281     ASSERT(task->next == NULL && task->prev == NULL);
2282     task->next = cap->suspended_ccalling_tasks;
2283     task->prev = NULL;
2284     if (cap->suspended_ccalling_tasks) {
2285         cap->suspended_ccalling_tasks->prev = task;
2286     }
2287     cap->suspended_ccalling_tasks = task;
2288 }
2289
2290 STATIC_INLINE void
2291 recoverSuspendedTask (Capability *cap, Task *task)
2292 {
2293     if (task->prev) {
2294         task->prev->next = task->next;
2295     } else {
2296         ASSERT(cap->suspended_ccalling_tasks == task);
2297         cap->suspended_ccalling_tasks = task->next;
2298     }
2299     if (task->next) {
2300         task->next->prev = task->prev;
2301     }
2302     task->next = task->prev = NULL;
2303 }
2304
2305 /* ---------------------------------------------------------------------------
2306  * Suspending & resuming Haskell threads.
2307  * 
2308  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2309  * its capability before calling the C function.  This allows another
2310  * task to pick up the capability and carry on running Haskell
2311  * threads.  It also means that if the C call blocks, it won't lock
2312  * the whole system.
2313  *
2314  * The Haskell thread making the C call is put to sleep for the
2315  * duration of the call, on the susepended_ccalling_threads queue.  We
2316  * give out a token to the task, which it can use to resume the thread
2317  * on return from the C function.
2318  * ------------------------------------------------------------------------- */
2319    
2320 void *
2321 suspendThread (StgRegTable *reg)
2322 {
2323   Capability *cap;
2324   int saved_errno = errno;
2325   StgTSO *tso;
2326   Task *task;
2327
2328   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2329    */
2330   cap = regTableToCapability(reg);
2331
2332   task = cap->running_task;
2333   tso = cap->r.rCurrentTSO;
2334
2335   IF_DEBUG(scheduler,
2336            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2337
2338   // XXX this might not be necessary --SDM
2339   tso->what_next = ThreadRunGHC;
2340
2341   threadPaused(cap,tso);
2342
2343   if(tso->blocked_exceptions == NULL)  {
2344       tso->why_blocked = BlockedOnCCall;
2345       tso->blocked_exceptions = END_TSO_QUEUE;
2346   } else {
2347       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2348   }
2349
2350   // Hand back capability
2351   task->suspended_tso = tso;
2352
2353   ACQUIRE_LOCK(&cap->lock);
2354
2355   suspendTask(cap,task);
2356   cap->in_haskell = rtsFalse;
2357   releaseCapability_(cap);
2358   
2359   RELEASE_LOCK(&cap->lock);
2360
2361 #if defined(THREADED_RTS)
2362   /* Preparing to leave the RTS, so ensure there's a native thread/task
2363      waiting to take over.
2364   */
2365   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2366 #endif
2367
2368   errno = saved_errno;
2369   return task;
2370 }
2371
2372 StgRegTable *
2373 resumeThread (void *task_)
2374 {
2375     StgTSO *tso;
2376     Capability *cap;
2377     int saved_errno = errno;
2378     Task *task = task_;
2379
2380     cap = task->cap;
2381     // Wait for permission to re-enter the RTS with the result.
2382     waitForReturnCapability(&cap,task);
2383     // we might be on a different capability now... but if so, our
2384     // entry on the suspended_ccalling_tasks list will also have been
2385     // migrated.
2386
2387     // Remove the thread from the suspended list
2388     recoverSuspendedTask(cap,task);
2389
2390     tso = task->suspended_tso;
2391     task->suspended_tso = NULL;
2392     tso->link = END_TSO_QUEUE;
2393     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2394     
2395     if (tso->why_blocked == BlockedOnCCall) {
2396         awakenBlockedQueue(cap,tso->blocked_exceptions);
2397         tso->blocked_exceptions = NULL;
2398     }
2399     
2400     /* Reset blocking status */
2401     tso->why_blocked  = NotBlocked;
2402     
2403     cap->r.rCurrentTSO = tso;
2404     cap->in_haskell = rtsTrue;
2405     errno = saved_errno;
2406
2407     /* We might have GC'd, mark the TSO dirty again */
2408     dirtyTSO(tso);
2409
2410     IF_DEBUG(sanity, checkTSO(tso));
2411
2412     return &cap->r;
2413 }
2414
2415 /* ---------------------------------------------------------------------------
2416  * Comparing Thread ids.
2417  *
2418  * This is used from STG land in the implementation of the
2419  * instances of Eq/Ord for ThreadIds.
2420  * ------------------------------------------------------------------------ */
2421
2422 int
2423 cmp_thread(StgPtr tso1, StgPtr tso2) 
2424
2425   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2426   StgThreadID id2 = ((StgTSO *)tso2)->id;
2427  
2428   if (id1 < id2) return (-1);
2429   if (id1 > id2) return 1;
2430   return 0;
2431 }
2432
2433 /* ---------------------------------------------------------------------------
2434  * Fetching the ThreadID from an StgTSO.
2435  *
2436  * This is used in the implementation of Show for ThreadIds.
2437  * ------------------------------------------------------------------------ */
2438 int
2439 rts_getThreadId(StgPtr tso) 
2440 {
2441   return ((StgTSO *)tso)->id;
2442 }
2443
2444 #ifdef DEBUG
2445 void
2446 labelThread(StgPtr tso, char *label)
2447 {
2448   int len;
2449   void *buf;
2450
2451   /* Caveat: Once set, you can only set the thread name to "" */
2452   len = strlen(label)+1;
2453   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2454   strncpy(buf,label,len);
2455   /* Update will free the old memory for us */
2456   updateThreadLabel(((StgTSO *)tso)->id,buf);
2457 }
2458 #endif /* DEBUG */
2459
2460 /* ---------------------------------------------------------------------------
2461    Create a new thread.
2462
2463    The new thread starts with the given stack size.  Before the
2464    scheduler can run, however, this thread needs to have a closure
2465    (and possibly some arguments) pushed on its stack.  See
2466    pushClosure() in Schedule.h.
2467
2468    createGenThread() and createIOThread() (in SchedAPI.h) are
2469    convenient packaged versions of this function.
2470
2471    currently pri (priority) is only used in a GRAN setup -- HWL
2472    ------------------------------------------------------------------------ */
2473 #if defined(GRAN)
2474 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2475 StgTSO *
2476 createThread(nat size, StgInt pri)
2477 #else
2478 StgTSO *
2479 createThread(Capability *cap, nat size)
2480 #endif
2481 {
2482     StgTSO *tso;
2483     nat stack_size;
2484
2485     /* sched_mutex is *not* required */
2486
2487     /* First check whether we should create a thread at all */
2488 #if defined(PARALLEL_HASKELL)
2489     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2490     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2491         threadsIgnored++;
2492         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2493                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2494         return END_TSO_QUEUE;
2495     }
2496     threadsCreated++;
2497 #endif
2498
2499 #if defined(GRAN)
2500     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2501 #endif
2502
2503     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2504
2505     /* catch ridiculously small stack sizes */
2506     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2507         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2508     }
2509
2510     stack_size = size - TSO_STRUCT_SIZEW;
2511     
2512     tso = (StgTSO *)allocateLocal(cap, size);
2513     TICK_ALLOC_TSO(stack_size, 0);
2514
2515     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2516 #if defined(GRAN)
2517     SET_GRAN_HDR(tso, ThisPE);
2518 #endif
2519
2520     // Always start with the compiled code evaluator
2521     tso->what_next = ThreadRunGHC;
2522
2523     tso->why_blocked  = NotBlocked;
2524     tso->blocked_exceptions = NULL;
2525     tso->flags = TSO_DIRTY;
2526     
2527     tso->saved_errno = 0;
2528     tso->bound = NULL;
2529     tso->cap = cap;
2530     
2531     tso->stack_size     = stack_size;
2532     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2533                           - TSO_STRUCT_SIZEW;
2534     tso->sp             = (P_)&(tso->stack) + stack_size;
2535
2536     tso->trec = NO_TREC;
2537     
2538 #ifdef PROFILING
2539     tso->prof.CCCS = CCS_MAIN;
2540 #endif
2541     
2542   /* put a stop frame on the stack */
2543     tso->sp -= sizeofW(StgStopFrame);
2544     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2545     tso->link = END_TSO_QUEUE;
2546     
2547   // ToDo: check this
2548 #if defined(GRAN)
2549     /* uses more flexible routine in GranSim */
2550     insertThread(tso, CurrentProc);
2551 #else
2552     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2553      * from its creation
2554      */
2555 #endif
2556     
2557 #if defined(GRAN) 
2558     if (RtsFlags.GranFlags.GranSimStats.Full) 
2559         DumpGranEvent(GR_START,tso);
2560 #elif defined(PARALLEL_HASKELL)
2561     if (RtsFlags.ParFlags.ParStats.Full) 
2562         DumpGranEvent(GR_STARTQ,tso);
2563     /* HACk to avoid SCHEDULE 
2564        LastTSO = tso; */
2565 #endif
2566     
2567     /* Link the new thread on the global thread list.
2568      */
2569     ACQUIRE_LOCK(&sched_mutex);
2570     tso->id = next_thread_id++;  // while we have the mutex
2571     tso->global_link = all_threads;
2572     all_threads = tso;
2573     RELEASE_LOCK(&sched_mutex);
2574     
2575 #if defined(DIST)
2576     tso->dist.priority = MandatoryPriority; //by default that is...
2577 #endif
2578     
2579 #if defined(GRAN)
2580     tso->gran.pri = pri;
2581 # if defined(DEBUG)
2582     tso->gran.magic = TSO_MAGIC; // debugging only
2583 # endif
2584     tso->gran.sparkname   = 0;
2585     tso->gran.startedat   = CURRENT_TIME; 
2586     tso->gran.exported    = 0;
2587     tso->gran.basicblocks = 0;
2588     tso->gran.allocs      = 0;
2589     tso->gran.exectime    = 0;
2590     tso->gran.fetchtime   = 0;
2591     tso->gran.fetchcount  = 0;
2592     tso->gran.blocktime   = 0;
2593     tso->gran.blockcount  = 0;
2594     tso->gran.blockedat   = 0;
2595     tso->gran.globalsparks = 0;
2596     tso->gran.localsparks  = 0;
2597     if (RtsFlags.GranFlags.Light)
2598         tso->gran.clock  = Now; /* local clock */
2599     else
2600         tso->gran.clock  = 0;
2601     
2602     IF_DEBUG(gran,printTSO(tso));
2603 #elif defined(PARALLEL_HASKELL)
2604 # if defined(DEBUG)
2605     tso->par.magic = TSO_MAGIC; // debugging only
2606 # endif
2607     tso->par.sparkname   = 0;
2608     tso->par.startedat   = CURRENT_TIME; 
2609     tso->par.exported    = 0;
2610     tso->par.basicblocks = 0;
2611     tso->par.allocs      = 0;
2612     tso->par.exectime    = 0;
2613     tso->par.fetchtime   = 0;
2614     tso->par.fetchcount  = 0;
2615     tso->par.blocktime   = 0;
2616     tso->par.blockcount  = 0;
2617     tso->par.blockedat   = 0;
2618     tso->par.globalsparks = 0;
2619     tso->par.localsparks  = 0;
2620 #endif
2621     
2622 #if defined(GRAN)
2623     globalGranStats.tot_threads_created++;
2624     globalGranStats.threads_created_on_PE[CurrentProc]++;
2625     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2626     globalGranStats.tot_sq_probes++;
2627 #elif defined(PARALLEL_HASKELL)
2628     // collect parallel global statistics (currently done together with GC stats)
2629     if (RtsFlags.ParFlags.ParStats.Global &&
2630         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2631         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2632         globalParStats.tot_threads_created++;
2633     }
2634 #endif 
2635     
2636 #if defined(GRAN)
2637     IF_GRAN_DEBUG(pri,
2638                   sched_belch("==__ schedule: Created TSO %d (%p);",
2639                               CurrentProc, tso, tso->id));
2640 #elif defined(PARALLEL_HASKELL)
2641     IF_PAR_DEBUG(verbose,
2642                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2643                              (long)tso->id, tso, advisory_thread_count));
2644 #else
2645     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2646                                    (long)tso->id, (long)tso->stack_size));
2647 #endif    
2648     return tso;
2649 }
2650
2651 #if defined(PAR)
2652 /* RFP:
2653    all parallel thread creation calls should fall through the following routine.
2654 */
2655 StgTSO *
2656 createThreadFromSpark(rtsSpark spark) 
2657 { StgTSO *tso;
2658   ASSERT(spark != (rtsSpark)NULL);
2659 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2660   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2661   { threadsIgnored++;
2662     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2663           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2664     return END_TSO_QUEUE;
2665   }
2666   else
2667   { threadsCreated++;
2668     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2669     if (tso==END_TSO_QUEUE)     
2670       barf("createSparkThread: Cannot create TSO");
2671 #if defined(DIST)
2672     tso->priority = AdvisoryPriority;
2673 #endif
2674     pushClosure(tso,spark);
2675     addToRunQueue(tso);
2676     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2677   }
2678   return tso;
2679 }
2680 #endif
2681
2682 /*
2683   Turn a spark into a thread.
2684   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2685 */
2686 #if 0
2687 StgTSO *
2688 activateSpark (rtsSpark spark) 
2689 {
2690   StgTSO *tso;
2691
2692   tso = createSparkThread(spark);
2693   if (RtsFlags.ParFlags.ParStats.Full) {   
2694     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2695       IF_PAR_DEBUG(verbose,
2696                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2697                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2698   }
2699   // ToDo: fwd info on local/global spark to thread -- HWL
2700   // tso->gran.exported =  spark->exported;
2701   // tso->gran.locked =   !spark->global;
2702   // tso->gran.sparkname = spark->name;
2703
2704   return tso;
2705 }
2706 #endif
2707
2708 /* ---------------------------------------------------------------------------
2709  * scheduleThread()
2710  *
2711  * scheduleThread puts a thread on the end  of the runnable queue.
2712  * This will usually be done immediately after a thread is created.
2713  * The caller of scheduleThread must create the thread using e.g.
2714  * createThread and push an appropriate closure
2715  * on this thread's stack before the scheduler is invoked.
2716  * ------------------------------------------------------------------------ */
2717
2718 void
2719 scheduleThread(Capability *cap, StgTSO *tso)
2720 {
2721     // The thread goes at the *end* of the run-queue, to avoid possible
2722     // starvation of any threads already on the queue.
2723     appendToRunQueue(cap,tso);
2724 }
2725
2726 void
2727 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2728 {
2729 #if defined(THREADED_RTS)
2730     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2731                               // move this thread from now on.
2732     cpu %= RtsFlags.ParFlags.nNodes;
2733     if (cpu == cap->no) {
2734         appendToRunQueue(cap,tso);
2735     } else {
2736         Capability *target_cap = &capabilities[cpu];
2737         if (tso->bound) {
2738             tso->bound->cap = target_cap;
2739         }
2740         tso->cap = target_cap;
2741         wakeupThreadOnCapability(target_cap,tso);
2742     }
2743 #else
2744     appendToRunQueue(cap,tso);
2745 #endif
2746 }
2747
2748 Capability *
2749 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2750 {
2751     Task *task;
2752
2753     // We already created/initialised the Task
2754     task = cap->running_task;
2755
2756     // This TSO is now a bound thread; make the Task and TSO
2757     // point to each other.
2758     tso->bound = task;
2759     tso->cap = cap;
2760
2761     task->tso = tso;
2762     task->ret = ret;
2763     task->stat = NoStatus;
2764
2765     appendToRunQueue(cap,tso);
2766
2767     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2768
2769 #if defined(GRAN)
2770     /* GranSim specific init */
2771     CurrentTSO = m->tso;                // the TSO to run
2772     procStatus[MainProc] = Busy;        // status of main PE
2773     CurrentProc = MainProc;             // PE to run it on
2774 #endif
2775
2776     cap = schedule(cap,task);
2777
2778     ASSERT(task->stat != NoStatus);
2779     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2780
2781     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2782     return cap;
2783 }
2784
2785 /* ----------------------------------------------------------------------------
2786  * Starting Tasks
2787  * ------------------------------------------------------------------------- */
2788
2789 #if defined(THREADED_RTS)
2790 void
2791 workerStart(Task *task)
2792 {
2793     Capability *cap;
2794
2795     // See startWorkerTask().
2796     ACQUIRE_LOCK(&task->lock);
2797     cap = task->cap;
2798     RELEASE_LOCK(&task->lock);
2799
2800     // set the thread-local pointer to the Task:
2801     taskEnter(task);
2802
2803     // schedule() runs without a lock.
2804     cap = schedule(cap,task);
2805
2806     // On exit from schedule(), we have a Capability.
2807     releaseCapability(cap);
2808     taskStop(task);
2809 }
2810 #endif
2811
2812 /* ---------------------------------------------------------------------------
2813  * initScheduler()
2814  *
2815  * Initialise the scheduler.  This resets all the queues - if the
2816  * queues contained any threads, they'll be garbage collected at the
2817  * next pass.
2818  *
2819  * ------------------------------------------------------------------------ */
2820
2821 void 
2822 initScheduler(void)
2823 {
2824 #if defined(GRAN)
2825   nat i;
2826   for (i=0; i<=MAX_PROC; i++) {
2827     run_queue_hds[i]      = END_TSO_QUEUE;
2828     run_queue_tls[i]      = END_TSO_QUEUE;
2829     blocked_queue_hds[i]  = END_TSO_QUEUE;
2830     blocked_queue_tls[i]  = END_TSO_QUEUE;
2831     ccalling_threadss[i]  = END_TSO_QUEUE;
2832     blackhole_queue[i]    = END_TSO_QUEUE;
2833     sleeping_queue        = END_TSO_QUEUE;
2834   }
2835 #elif !defined(THREADED_RTS)
2836   blocked_queue_hd  = END_TSO_QUEUE;
2837   blocked_queue_tl  = END_TSO_QUEUE;
2838   sleeping_queue    = END_TSO_QUEUE;
2839 #endif
2840
2841   blackhole_queue   = END_TSO_QUEUE;
2842   all_threads       = END_TSO_QUEUE;
2843
2844   context_switch = 0;
2845   sched_state    = SCHED_RUNNING;
2846
2847   RtsFlags.ConcFlags.ctxtSwitchTicks =
2848       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2849       
2850 #if defined(THREADED_RTS)
2851   /* Initialise the mutex and condition variables used by
2852    * the scheduler. */
2853   initMutex(&sched_mutex);
2854 #endif
2855   
2856   ACQUIRE_LOCK(&sched_mutex);
2857
2858   /* A capability holds the state a native thread needs in
2859    * order to execute STG code. At least one capability is
2860    * floating around (only THREADED_RTS builds have more than one).
2861    */
2862   initCapabilities();
2863
2864   initTaskManager();
2865
2866 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2867   initSparkPools();
2868 #endif
2869
2870 #if defined(THREADED_RTS)
2871   /*
2872    * Eagerly start one worker to run each Capability, except for
2873    * Capability 0.  The idea is that we're probably going to start a
2874    * bound thread on Capability 0 pretty soon, so we don't want a
2875    * worker task hogging it.
2876    */
2877   { 
2878       nat i;
2879       Capability *cap;
2880       for (i = 1; i < n_capabilities; i++) {
2881           cap = &capabilities[i];
2882           ACQUIRE_LOCK(&cap->lock);
2883           startWorkerTask(cap, workerStart);
2884           RELEASE_LOCK(&cap->lock);
2885       }
2886   }
2887 #endif
2888
2889   RELEASE_LOCK(&sched_mutex);
2890 }
2891
2892 void
2893 exitScheduler( void )
2894 {
2895     Task *task = NULL;
2896
2897 #if defined(THREADED_RTS)
2898     ACQUIRE_LOCK(&sched_mutex);
2899     task = newBoundTask();
2900     RELEASE_LOCK(&sched_mutex);
2901 #endif
2902
2903     // If we haven't killed all the threads yet, do it now.
2904     if (sched_state < SCHED_INTERRUPTED) {
2905         sched_state = SCHED_INTERRUPTING;
2906         scheduleDoGC(NULL,task,rtsFalse,GetRoots);    
2907     }
2908     sched_state = SCHED_SHUTTING_DOWN;
2909
2910 #if defined(THREADED_RTS)
2911     { 
2912         nat i;
2913         
2914         for (i = 0; i < n_capabilities; i++) {
2915             shutdownCapability(&capabilities[i], task);
2916         }
2917         boundTaskExiting(task);
2918         stopTaskManager();
2919     }
2920 #endif
2921 }
2922
2923 /* ---------------------------------------------------------------------------
2924    Where are the roots that we know about?
2925
2926         - all the threads on the runnable queue
2927         - all the threads on the blocked queue
2928         - all the threads on the sleeping queue
2929         - all the thread currently executing a _ccall_GC
2930         - all the "main threads"
2931      
2932    ------------------------------------------------------------------------ */
2933
2934 /* This has to be protected either by the scheduler monitor, or by the
2935         garbage collection monitor (probably the latter).
2936         KH @ 25/10/99
2937 */
2938
2939 void
2940 GetRoots( evac_fn evac )
2941 {
2942     nat i;
2943     Capability *cap;
2944     Task *task;
2945
2946 #if defined(GRAN)
2947     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2948         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2949             evac((StgClosure **)&run_queue_hds[i]);
2950         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2951             evac((StgClosure **)&run_queue_tls[i]);
2952         
2953         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2954             evac((StgClosure **)&blocked_queue_hds[i]);
2955         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2956             evac((StgClosure **)&blocked_queue_tls[i]);
2957         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2958             evac((StgClosure **)&ccalling_threads[i]);
2959     }
2960
2961     markEventQueue();
2962
2963 #else /* !GRAN */
2964
2965     for (i = 0; i < n_capabilities; i++) {
2966         cap = &capabilities[i];
2967         evac((StgClosure **)(void *)&cap->run_queue_hd);
2968         evac((StgClosure **)(void *)&cap->run_queue_tl);
2969 #if defined(THREADED_RTS)
2970         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2971         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2972 #endif
2973         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2974              task=task->next) {
2975             IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
2976             evac((StgClosure **)(void *)&task->suspended_tso);
2977         }
2978
2979     }
2980     
2981
2982 #if !defined(THREADED_RTS)
2983     evac((StgClosure **)(void *)&blocked_queue_hd);
2984     evac((StgClosure **)(void *)&blocked_queue_tl);
2985     evac((StgClosure **)(void *)&sleeping_queue);
2986 #endif 
2987 #endif
2988
2989     // evac((StgClosure **)&blackhole_queue);
2990
2991 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2992     markSparkQueue(evac);
2993 #endif
2994     
2995 #if defined(RTS_USER_SIGNALS)
2996     // mark the signal handlers (signals should be already blocked)
2997     markSignalHandlers(evac);
2998 #endif
2999 }
3000
3001 /* -----------------------------------------------------------------------------
3002    performGC
3003
3004    This is the interface to the garbage collector from Haskell land.
3005    We provide this so that external C code can allocate and garbage
3006    collect when called from Haskell via _ccall_GC.
3007
3008    It might be useful to provide an interface whereby the programmer
3009    can specify more roots (ToDo).
3010    
3011    This needs to be protected by the GC condition variable above.  KH.
3012    -------------------------------------------------------------------------- */
3013
3014 static void (*extra_roots)(evac_fn);
3015
3016 static void
3017 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
3018 {
3019     Task *task;
3020     // We must grab a new Task here, because the existing Task may be
3021     // associated with a particular Capability, and chained onto the 
3022     // suspended_ccalling_tasks queue.
3023     ACQUIRE_LOCK(&sched_mutex);
3024     task = newBoundTask();
3025     RELEASE_LOCK(&sched_mutex);
3026     scheduleDoGC(NULL,task,force_major, get_roots);
3027     boundTaskExiting(task);
3028 }
3029
3030 void
3031 performGC(void)
3032 {
3033     performGC_(rtsFalse, GetRoots);
3034 }
3035
3036 void
3037 performMajorGC(void)
3038 {
3039     performGC_(rtsTrue, GetRoots);
3040 }
3041
3042 static void
3043 AllRoots(evac_fn evac)
3044 {
3045     GetRoots(evac);             // the scheduler's roots
3046     extra_roots(evac);          // the user's roots
3047 }
3048
3049 void
3050 performGCWithRoots(void (*get_roots)(evac_fn))
3051 {
3052     extra_roots = get_roots;
3053     performGC_(rtsFalse, AllRoots);
3054 }
3055
3056 /* -----------------------------------------------------------------------------
3057    Stack overflow
3058
3059    If the thread has reached its maximum stack size, then raise the
3060    StackOverflow exception in the offending thread.  Otherwise
3061    relocate the TSO into a larger chunk of memory and adjust its stack
3062    size appropriately.
3063    -------------------------------------------------------------------------- */
3064
3065 static StgTSO *
3066 threadStackOverflow(Capability *cap, StgTSO *tso)
3067 {
3068   nat new_stack_size, stack_words;
3069   lnat new_tso_size;
3070   StgPtr new_sp;
3071   StgTSO *dest;
3072
3073   IF_DEBUG(sanity,checkTSO(tso));
3074   if (tso->stack_size >= tso->max_stack_size) {
3075
3076     IF_DEBUG(gc,
3077              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
3078                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
3079              /* If we're debugging, just print out the top of the stack */
3080              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3081                                               tso->sp+64)));
3082
3083     /* Send this thread the StackOverflow exception */
3084     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
3085     return tso;
3086   }
3087
3088   /* Try to double the current stack size.  If that takes us over the
3089    * maximum stack size for this thread, then use the maximum instead.
3090    * Finally round up so the TSO ends up as a whole number of blocks.
3091    */
3092   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
3093   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
3094                                        TSO_STRUCT_SIZE)/sizeof(W_);
3095   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
3096   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
3097
3098   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
3099
3100   dest = (StgTSO *)allocate(new_tso_size);
3101   TICK_ALLOC_TSO(new_stack_size,0);
3102
3103   /* copy the TSO block and the old stack into the new area */
3104   memcpy(dest,tso,TSO_STRUCT_SIZE);
3105   stack_words = tso->stack + tso->stack_size - tso->sp;
3106   new_sp = (P_)dest + new_tso_size - stack_words;
3107   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
3108
3109   /* relocate the stack pointers... */
3110   dest->sp         = new_sp;
3111   dest->stack_size = new_stack_size;
3112         
3113   /* Mark the old TSO as relocated.  We have to check for relocated
3114    * TSOs in the garbage collector and any primops that deal with TSOs.
3115    *
3116    * It's important to set the sp value to just beyond the end
3117    * of the stack, so we don't attempt to scavenge any part of the
3118    * dead TSO's stack.
3119    */
3120   tso->what_next = ThreadRelocated;
3121   tso->link = dest;
3122   tso->sp = (P_)&(tso->stack[tso->stack_size]);
3123   tso->why_blocked = NotBlocked;
3124
3125   IF_PAR_DEBUG(verbose,
3126                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
3127                      tso->id, tso, tso->stack_size);
3128                /* If we're debugging, just print out the top of the stack */
3129                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3130                                                 tso->sp+64)));
3131   
3132   IF_DEBUG(sanity,checkTSO(tso));
3133 #if 0
3134   IF_DEBUG(scheduler,printTSO(dest));
3135 #endif
3136
3137   return dest;
3138 }
3139
3140 /* ---------------------------------------------------------------------------
3141    Wake up a queue that was blocked on some resource.
3142    ------------------------------------------------------------------------ */
3143
3144 #if defined(GRAN)
3145 STATIC_INLINE void
3146 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3147 {
3148 }
3149 #elif defined(PARALLEL_HASKELL)
3150 STATIC_INLINE void
3151 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3152 {
3153   /* write RESUME events to log file and
3154      update blocked and fetch time (depending on type of the orig closure) */
3155   if (RtsFlags.ParFlags.ParStats.Full) {
3156     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
3157                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3158                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3159     if (emptyRunQueue())
3160       emitSchedule = rtsTrue;
3161
3162     switch (get_itbl(node)->type) {
3163         case FETCH_ME_BQ:
3164           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3165           break;
3166         case RBH:
3167         case FETCH_ME:
3168         case BLACKHOLE_BQ:
3169           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3170           break;
3171 #ifdef DIST
3172         case MVAR:
3173           break;
3174 #endif    
3175         default:
3176           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3177         }
3178       }
3179 }
3180 #endif
3181
3182 #if defined(GRAN)
3183 StgBlockingQueueElement *
3184 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3185 {
3186     StgTSO *tso;
3187     PEs node_loc, tso_loc;
3188
3189     node_loc = where_is(node); // should be lifted out of loop
3190     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3191     tso_loc = where_is((StgClosure *)tso);
3192     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3193       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3194       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3195       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3196       // insertThread(tso, node_loc);
3197       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3198                 ResumeThread,
3199                 tso, node, (rtsSpark*)NULL);
3200       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3201       // len_local++;
3202       // len++;
3203     } else { // TSO is remote (actually should be FMBQ)
3204       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3205                                   RtsFlags.GranFlags.Costs.gunblocktime +
3206                                   RtsFlags.GranFlags.Costs.latency;
3207       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3208                 UnblockThread,
3209                 tso, node, (rtsSpark*)NULL);
3210       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3211       // len++;
3212     }
3213     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3214     IF_GRAN_DEBUG(bq,
3215                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3216                           (node_loc==tso_loc ? "Local" : "Global"), 
3217                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3218     tso->block_info.closure = NULL;
3219     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3220                              tso->id, tso));
3221 }
3222 #elif defined(PARALLEL_HASKELL)
3223 StgBlockingQueueElement *
3224 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3225 {
3226     StgBlockingQueueElement *next;
3227
3228     switch (get_itbl(bqe)->type) {
3229     case TSO:
3230       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3231       /* if it's a TSO just push it onto the run_queue */
3232       next = bqe->link;
3233       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3234       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3235       threadRunnable();
3236       unblockCount(bqe, node);
3237       /* reset blocking status after dumping event */
3238       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3239       break;
3240
3241     case BLOCKED_FETCH:
3242       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3243       next = bqe->link;
3244       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3245       PendingFetches = (StgBlockedFetch *)bqe;
3246       break;
3247
3248 # if defined(DEBUG)
3249       /* can ignore this case in a non-debugging setup; 
3250          see comments on RBHSave closures above */
3251     case CONSTR:
3252       /* check that the closure is an RBHSave closure */
3253       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3254              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3255              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3256       break;
3257
3258     default:
3259       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3260            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3261            (StgClosure *)bqe);
3262 # endif
3263     }
3264   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3265   return next;
3266 }
3267 #endif
3268
3269 StgTSO *
3270 unblockOne(Capability *cap, StgTSO *tso)
3271 {
3272   StgTSO *next;
3273
3274   ASSERT(get_itbl(tso)->type == TSO);
3275   ASSERT(tso->why_blocked != NotBlocked);
3276
3277   tso->why_blocked = NotBlocked;
3278   next = tso->link;
3279   tso->link = END_TSO_QUEUE;
3280
3281 #if defined(THREADED_RTS)
3282   if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
3283       // We are waking up this thread on the current Capability, which
3284       // might involve migrating it from the Capability it was last on.
3285       if (tso->bound) {
3286           ASSERT(tso->bound->cap == tso->cap);
3287           tso->bound->cap = cap;
3288       }
3289       tso->cap = cap;
3290       appendToRunQueue(cap,tso);
3291       // we're holding a newly woken thread, make sure we context switch
3292       // quickly so we can migrate it if necessary.
3293       context_switch = 1;
3294   } else {
3295       // we'll try to wake it up on the Capability it was last on.
3296       wakeupThreadOnCapability(tso->cap, tso);
3297   }
3298 #else
3299   appendToRunQueue(cap,tso);
3300   context_switch = 1;
3301 #endif
3302
3303   IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
3304   return next;
3305 }
3306
3307
3308 #if defined(GRAN)
3309 void 
3310 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3311 {
3312   StgBlockingQueueElement *bqe;
3313   PEs node_loc;
3314   nat len = 0; 
3315
3316   IF_GRAN_DEBUG(bq, 
3317                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3318                       node, CurrentProc, CurrentTime[CurrentProc], 
3319                       CurrentTSO->id, CurrentTSO));
3320
3321   node_loc = where_is(node);
3322
3323   ASSERT(q == END_BQ_QUEUE ||
3324          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3325          get_itbl(q)->type == CONSTR); // closure (type constructor)
3326   ASSERT(is_unique(node));
3327
3328   /* FAKE FETCH: magically copy the node to the tso's proc;
3329      no Fetch necessary because in reality the node should not have been 
3330      moved to the other PE in the first place
3331   */
3332   if (CurrentProc!=node_loc) {
3333     IF_GRAN_DEBUG(bq, 
3334                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3335                         node, node_loc, CurrentProc, CurrentTSO->id, 
3336                         // CurrentTSO, where_is(CurrentTSO),
3337                         node->header.gran.procs));
3338     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3339     IF_GRAN_DEBUG(bq, 
3340                   debugBelch("## new bitmask of node %p is %#x\n",
3341                         node, node->header.gran.procs));
3342     if (RtsFlags.GranFlags.GranSimStats.Global) {
3343       globalGranStats.tot_fake_fetches++;
3344     }
3345   }
3346
3347   bqe = q;
3348   // ToDo: check: ASSERT(CurrentProc==node_loc);
3349   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3350     //next = bqe->link;
3351     /* 
3352        bqe points to the current element in the queue
3353        next points to the next element in the queue
3354     */
3355     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3356     //tso_loc = where_is(tso);
3357     len++;
3358     bqe = unblockOne(bqe, node);
3359   }
3360
3361   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3362      the closure to make room for the anchor of the BQ */
3363   if (bqe!=END_BQ_QUEUE) {
3364     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3365     /*
3366     ASSERT((info_ptr==&RBH_Save_0_info) ||
3367            (info_ptr==&RBH_Save_1_info) ||
3368            (info_ptr==&RBH_Save_2_info));
3369     */
3370     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3371     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3372     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3373
3374     IF_GRAN_DEBUG(bq,
3375                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3376                         node, info_type(node)));
3377   }
3378
3379   /* statistics gathering */
3380   if (RtsFlags.GranFlags.GranSimStats.Global) {
3381     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3382     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3383     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3384     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3385   }
3386   IF_GRAN_DEBUG(bq,
3387                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3388                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3389 }
3390 #elif defined(PARALLEL_HASKELL)
3391 void 
3392 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3393 {
3394   StgBlockingQueueElement *bqe;
3395
3396   IF_PAR_DEBUG(verbose, 
3397                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3398                      node, mytid));
3399 #ifdef DIST  
3400   //RFP
3401   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3402     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3403     return;
3404   }
3405 #endif
3406   
3407   ASSERT(q == END_BQ_QUEUE ||
3408          get_itbl(q)->type == TSO ||           
3409          get_itbl(q)->type == BLOCKED_FETCH || 
3410          get_itbl(q)->type == CONSTR); 
3411
3412   bqe = q;
3413   while (get_itbl(bqe)->type==TSO || 
3414          get_itbl(bqe)->type==BLOCKED_FETCH) {
3415     bqe = unblockOne(bqe, node);
3416   }
3417 }
3418
3419 #else   /* !GRAN && !PARALLEL_HASKELL */
3420
3421 void
3422 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3423 {
3424     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3425                              // Exception.cmm
3426     while (tso != END_TSO_QUEUE) {
3427         tso = unblockOne(cap,tso);
3428     }
3429 }
3430 #endif
3431
3432 /* ---------------------------------------------------------------------------
3433    Interrupt execution
3434    - usually called inside a signal handler so it mustn't do anything fancy.   
3435    ------------------------------------------------------------------------ */
3436
3437 void
3438 interruptStgRts(void)
3439 {
3440     sched_state = SCHED_INTERRUPTING;
3441     context_switch = 1;
3442 #if defined(THREADED_RTS)
3443     prodAllCapabilities();
3444 #endif
3445 }
3446
3447 /* -----------------------------------------------------------------------------
3448    Unblock a thread
3449
3450    This is for use when we raise an exception in another thread, which
3451    may be blocked.
3452    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3453    -------------------------------------------------------------------------- */
3454
3455 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3456 /*
3457   NB: only the type of the blocking queue is different in GranSim and GUM
3458       the operations on the queue-elements are the same
3459       long live polymorphism!
3460
3461   Locks: sched_mutex is held upon entry and exit.
3462
3463 */
3464 static void
3465 unblockThread(Capability *cap, StgTSO *tso)
3466 {
3467   StgBlockingQueueElement *t, **last;
3468
3469   switch (tso->why_blocked) {
3470
3471   case NotBlocked:
3472     return;  /* not blocked */
3473
3474   case BlockedOnSTM:
3475     // Be careful: nothing to do here!  We tell the scheduler that the thread
3476     // is runnable and we leave it to the stack-walking code to abort the 
3477     // transaction while unwinding the stack.  We should perhaps have a debugging
3478     // test to make sure that this really happens and that the 'zombie' transaction
3479     // does not get committed.
3480     goto done;
3481
3482   case BlockedOnMVar:
3483     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3484     {
3485       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3486       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3487
3488       last = (StgBlockingQueueElement **)&mvar->head;
3489       for (t = (StgBlockingQueueElement *)mvar->head; 
3490            t != END_BQ_QUEUE; 
3491            last = &t->link, last_tso = t, t = t->link) {
3492         if (t == (StgBlockingQueueElement *)tso) {
3493           *last = (StgBlockingQueueElement *)tso->link;
3494           if (mvar->tail == tso) {
3495             mvar->tail = (StgTSO *)last_tso;
3496           }
3497           goto done;
3498         }
3499       }
3500       barf("unblockThread (MVAR): TSO not found");
3501     }
3502
3503   case BlockedOnBlackHole:
3504     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3505     {
3506       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3507
3508       last = &bq->blocking_queue;
3509       for (t = bq->blocking_queue; 
3510            t != END_BQ_QUEUE; 
3511            last = &t->link, t = t->link) {
3512         if (t == (StgBlockingQueueElement *)tso) {
3513           *last = (StgBlockingQueueElement *)tso->link;
3514           goto done;
3515         }
3516       }
3517       barf("unblockThread (BLACKHOLE): TSO not found");
3518     }
3519
3520   case BlockedOnException:
3521     {
3522       StgTSO *target  = tso->block_info.tso;
3523
3524       ASSERT(get_itbl(target)->type == TSO);
3525
3526       if (target->what_next == ThreadRelocated) {
3527           target = target->link;
3528           ASSERT(get_itbl(target)->type == TSO);
3529       }
3530
3531       ASSERT(target->blocked_exceptions != NULL);
3532
3533       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3534       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3535            t != END_BQ_QUEUE; 
3536            last = &t->link, t = t->link) {
3537         ASSERT(get_itbl(t)->type == TSO);
3538         if (t == (StgBlockingQueueElement *)tso) {
3539           *last = (StgBlockingQueueElement *)tso->link;
3540           goto done;
3541         }
3542       }
3543       barf("unblockThread (Exception): TSO not found");
3544     }
3545
3546   case BlockedOnRead:
3547   case BlockedOnWrite:
3548 #if defined(mingw32_HOST_OS)
3549   case BlockedOnDoProc:
3550 #endif
3551     {
3552       /* take TSO off blocked_queue */
3553       StgBlockingQueueElement *prev = NULL;
3554       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3555            prev = t, t = t->link) {
3556         if (t == (StgBlockingQueueElement *)tso) {
3557           if (prev == NULL) {
3558             blocked_queue_hd = (StgTSO *)t->link;
3559             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3560               blocked_queue_tl = END_TSO_QUEUE;
3561             }
3562           } else {
3563             prev->link = t->link;
3564             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3565               blocked_queue_tl = (StgTSO *)prev;
3566             }
3567           }
3568 #if defined(mingw32_HOST_OS)
3569           /* (Cooperatively) signal that the worker thread should abort
3570            * the request.
3571            */
3572           abandonWorkRequest(tso->block_info.async_result->reqID);
3573 #endif
3574           goto done;
3575         }
3576       }
3577       barf("unblockThread (I/O): TSO not found");
3578     }
3579
3580   case BlockedOnDelay:
3581     {
3582       /* take TSO off sleeping_queue */
3583       StgBlockingQueueElement *prev = NULL;
3584       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3585            prev = t, t = t->link) {
3586         if (t == (StgBlockingQueueElement *)tso) {
3587           if (prev == NULL) {
3588             sleeping_queue = (StgTSO *)t->link;
3589           } else {
3590             prev->link = t->link;
3591           }
3592           goto done;
3593         }
3594       }
3595       barf("unblockThread (delay): TSO not found");
3596     }
3597
3598   default:
3599     barf("unblockThread");
3600   }
3601
3602  done:
3603   tso->link = END_TSO_QUEUE;
3604   tso->why_blocked = NotBlocked;
3605   tso->block_info.closure = NULL;
3606   pushOnRunQueue(cap,tso);
3607 }
3608 #else
3609 static void
3610 unblockThread(Capability *cap, StgTSO *tso)
3611 {
3612   StgTSO *t, **last;
3613   
3614   /* To avoid locking unnecessarily. */
3615   if (tso->why_blocked == NotBlocked) {
3616     return;
3617   }
3618
3619   switch (tso->why_blocked) {
3620
3621   case BlockedOnSTM:
3622     // Be careful: nothing to do here!  We tell the scheduler that the thread
3623     // is runnable and we leave it to the stack-walking code to abort the 
3624     // transaction while unwinding the stack.  We should perhaps have a debugging
3625     // test to make sure that this really happens and that the 'zombie' transaction
3626     // does not get committed.
3627     goto done;
3628
3629   case BlockedOnMVar:
3630     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3631     {
3632       StgTSO *last_tso = END_TSO_QUEUE;
3633       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3634
3635       last = &mvar->head;
3636       for (t = mvar->head; t != END_TSO_QUEUE; 
3637            last = &t->link, last_tso = t, t = t->link) {
3638         if (t == tso) {
3639           *last = tso->link;
3640           if (mvar->tail == tso) {
3641             mvar->tail = last_tso;
3642           }
3643           goto done;
3644         }
3645       }
3646       barf("unblockThread (MVAR): TSO not found");
3647     }
3648
3649   case BlockedOnBlackHole:
3650     {
3651       last = &blackhole_queue;
3652       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3653            last = &t->link, t = t->link) {
3654         if (t == tso) {
3655           *last = tso->link;
3656           goto done;
3657         }
3658       }
3659       barf("unblockThread (BLACKHOLE): TSO not found");
3660     }
3661
3662   case BlockedOnException:
3663     {
3664       StgTSO *target  = tso->block_info.tso;
3665
3666       ASSERT(get_itbl(target)->type == TSO);
3667
3668       while (target->what_next == ThreadRelocated) {
3669           target = target->link;
3670           ASSERT(get_itbl(target)->type == TSO);
3671       }
3672       
3673       ASSERT(target->blocked_exceptions != NULL);
3674
3675       last = &target->blocked_exceptions;
3676       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3677            last = &t->link, t = t->link) {
3678         ASSERT(get_itbl(t)->type == TSO);
3679         if (t == tso) {
3680           *last = tso->link;
3681           goto done;
3682         }
3683       }
3684       barf("unblockThread (Exception): TSO not found");
3685     }
3686
3687 #if !defined(THREADED_RTS)
3688   case BlockedOnRead:
3689   case BlockedOnWrite:
3690 #if defined(mingw32_HOST_OS)
3691   case BlockedOnDoProc:
3692 #endif
3693     {
3694       StgTSO *prev = NULL;
3695       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3696            prev = t, t = t->link) {
3697         if (t == tso) {
3698           if (prev == NULL) {
3699             blocked_queue_hd = t->link;
3700             if (blocked_queue_tl == t) {
3701               blocked_queue_tl = END_TSO_QUEUE;
3702             }
3703           } else {
3704             prev->link = t->link;
3705             if (blocked_queue_tl == t) {
3706               blocked_queue_tl = prev;
3707             }
3708           }
3709 #if defined(mingw32_HOST_OS)
3710           /* (Cooperatively) signal that the worker thread should abort
3711            * the request.
3712            */
3713           abandonWorkRequest(tso->block_info.async_result->reqID);
3714 #endif
3715           goto done;
3716         }
3717       }
3718       barf("unblockThread (I/O): TSO not found");
3719     }
3720
3721   case BlockedOnDelay:
3722     {
3723       StgTSO *prev = NULL;
3724       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3725            prev = t, t = t->link) {
3726         if (t == tso) {
3727           if (prev == NULL) {
3728             sleeping_queue = t->link;
3729           } else {
3730             prev->link = t->link;
3731           }
3732           goto done;
3733         }
3734       }
3735       barf("unblockThread (delay): TSO not found");
3736     }
3737 #endif
3738
3739   default:
3740     barf("unblockThread");
3741   }
3742
3743  done:
3744   tso->link = END_TSO_QUEUE;
3745   tso->why_blocked = NotBlocked;
3746   tso->block_info.closure = NULL;
3747   appendToRunQueue(cap,tso);
3748
3749   // We might have just migrated this TSO to our Capability:
3750   if (tso->bound) {
3751       tso->bound->cap = cap;
3752   }
3753   tso->cap = cap;
3754 }
3755 #endif
3756
3757 /* -----------------------------------------------------------------------------
3758  * checkBlackHoles()
3759  *
3760  * Check the blackhole_queue for threads that can be woken up.  We do
3761  * this periodically: before every GC, and whenever the run queue is
3762  * empty.
3763  *
3764  * An elegant solution might be to just wake up all the blocked
3765  * threads with awakenBlockedQueue occasionally: they'll go back to
3766  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3767  * doesn't give us a way to tell whether we've actually managed to
3768  * wake up any threads, so we would be busy-waiting.
3769  *
3770  * -------------------------------------------------------------------------- */
3771
3772 static rtsBool
3773 checkBlackHoles (Capability *cap)
3774 {
3775     StgTSO **prev, *t;
3776     rtsBool any_woke_up = rtsFalse;
3777     StgHalfWord type;
3778
3779     // blackhole_queue is global:
3780     ASSERT_LOCK_HELD(&sched_mutex);
3781
3782     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3783
3784     // ASSUMES: sched_mutex
3785     prev = &blackhole_queue;
3786     t = blackhole_queue;
3787     while (t != END_TSO_QUEUE) {
3788         ASSERT(t->why_blocked == BlockedOnBlackHole);
3789         type = get_itbl(t->block_info.closure)->type;
3790         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3791             IF_DEBUG(sanity,checkTSO(t));
3792             t = unblockOne(cap, t);
3793             // urk, the threads migrate to the current capability
3794             // here, but we'd like to keep them on the original one.
3795             *prev = t;
3796             any_woke_up = rtsTrue;
3797         } else {
3798             prev = &t->link;
3799             t = t->link;
3800         }
3801     }
3802
3803     return any_woke_up;
3804 }
3805
3806 /* -----------------------------------------------------------------------------
3807  * raiseAsync()
3808  *
3809  * The following function implements the magic for raising an
3810  * asynchronous exception in an existing thread.
3811  *
3812  * We first remove the thread from any queue on which it might be
3813  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3814  *
3815  * We strip the stack down to the innermost CATCH_FRAME, building
3816  * thunks in the heap for all the active computations, so they can 
3817  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3818  * an application of the handler to the exception, and push it on
3819  * the top of the stack.
3820  * 
3821  * How exactly do we save all the active computations?  We create an
3822  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3823  * AP_STACKs pushes everything from the corresponding update frame
3824  * upwards onto the stack.  (Actually, it pushes everything up to the
3825  * next update frame plus a pointer to the next AP_STACK object.
3826  * Entering the next AP_STACK object pushes more onto the stack until we
3827  * reach the last AP_STACK object - at which point the stack should look
3828  * exactly as it did when we killed the TSO and we can continue
3829  * execution by entering the closure on top of the stack.
3830  *
3831  * We can also kill a thread entirely - this happens if either (a) the 
3832  * exception passed to raiseAsync is NULL, or (b) there's no
3833  * CATCH_FRAME on the stack.  In either case, we strip the entire
3834  * stack and replace the thread with a zombie.
3835  *
3836  * ToDo: in THREADED_RTS mode, this function is only safe if either
3837  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3838  * one Capability), or (b) we own the Capability that the TSO is
3839  * currently blocked on or on the run queue of.
3840  *
3841  * -------------------------------------------------------------------------- */
3842  
3843 void
3844 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3845 {
3846     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3847 }
3848
3849 void
3850 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3851 {
3852     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3853 }
3854
3855 static void
3856 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3857             rtsBool stop_at_atomically, StgPtr stop_here)
3858 {
3859     StgRetInfoTable *info;
3860     StgPtr sp, frame;
3861     nat i;
3862   
3863     // Thread already dead?
3864     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3865         return;
3866     }
3867
3868     IF_DEBUG(scheduler, 
3869              sched_belch("raising exception in thread %ld.", (long)tso->id));
3870     
3871     // Remove it from any blocking queues
3872     unblockThread(cap,tso);
3873
3874     // mark it dirty; we're about to change its stack.
3875     dirtyTSO(tso);
3876
3877     sp = tso->sp;
3878     
3879     // The stack freezing code assumes there's a closure pointer on
3880     // the top of the stack, so we have to arrange that this is the case...
3881     //
3882     if (sp[0] == (W_)&stg_enter_info) {
3883         sp++;
3884     } else {
3885         sp--;
3886         sp[0] = (W_)&stg_dummy_ret_closure;
3887     }
3888
3889     frame = sp + 1;
3890     while (stop_here == NULL || frame < stop_here) {
3891
3892         // 1. Let the top of the stack be the "current closure"
3893         //
3894         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3895         // CATCH_FRAME.
3896         //
3897         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3898         // current closure applied to the chunk of stack up to (but not
3899         // including) the update frame.  This closure becomes the "current
3900         // closure".  Go back to step 2.
3901         //
3902         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3903         // top of the stack applied to the exception.
3904         // 
3905         // 5. If it's a STOP_FRAME, then kill the thread.
3906         // 
3907         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3908         // transaction
3909        
3910         info = get_ret_itbl((StgClosure *)frame);
3911
3912         switch (info->i.type) {
3913
3914         case UPDATE_FRAME:
3915         {
3916             StgAP_STACK * ap;
3917             nat words;
3918             
3919             // First build an AP_STACK consisting of the stack chunk above the
3920             // current update frame, with the top word on the stack as the
3921             // fun field.
3922             //
3923             words = frame - sp - 1;
3924             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3925             
3926             ap->size = words;
3927             ap->fun  = (StgClosure *)sp[0];
3928             sp++;
3929             for(i=0; i < (nat)words; ++i) {
3930                 ap->payload[i] = (StgClosure *)*sp++;
3931             }
3932             
3933             SET_HDR(ap,&stg_AP_STACK_info,
3934                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3935             TICK_ALLOC_UP_THK(words+1,0);
3936             
3937             IF_DEBUG(scheduler,
3938                      debugBelch("sched: Updating ");
3939                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3940                      debugBelch(" with ");
3941                      printObj((StgClosure *)ap);
3942                 );
3943
3944             // Replace the updatee with an indirection
3945             //
3946             // Warning: if we're in a loop, more than one update frame on
3947             // the stack may point to the same object.  Be careful not to
3948             // overwrite an IND_OLDGEN in this case, because we'll screw
3949             // up the mutable lists.  To be on the safe side, don't
3950             // overwrite any kind of indirection at all.  See also
3951             // threadSqueezeStack in GC.c, where we have to make a similar
3952             // check.
3953             //
3954             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3955                 // revert the black hole
3956                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3957                                (StgClosure *)ap);
3958             }
3959             sp += sizeofW(StgUpdateFrame) - 1;
3960             sp[0] = (W_)ap; // push onto stack
3961             frame = sp + 1;
3962             continue; //no need to bump frame
3963         }
3964
3965         case STOP_FRAME:
3966             // We've stripped the entire stack, the thread is now dead.
3967             tso->what_next = ThreadKilled;
3968             tso->sp = frame + sizeofW(StgStopFrame);
3969             return;
3970
3971         case CATCH_FRAME:
3972             // If we find a CATCH_FRAME, and we've got an exception to raise,
3973             // then build the THUNK raise(exception), and leave it on
3974             // top of the CATCH_FRAME ready to enter.
3975             //
3976         {
3977 #ifdef PROFILING
3978             StgCatchFrame *cf = (StgCatchFrame *)frame;
3979 #endif
3980             StgThunk *raise;
3981             
3982             if (exception == NULL) break;
3983
3984             // we've got an exception to raise, so let's pass it to the
3985             // handler in this frame.
3986             //
3987             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3988             TICK_ALLOC_SE_THK(1,0);
3989             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3990             raise->payload[0] = exception;
3991             
3992             // throw away the stack from Sp up to the CATCH_FRAME.
3993             //
3994             sp = frame - 1;
3995             
3996             /* Ensure that async excpetions are blocked now, so we don't get
3997              * a surprise exception before we get around to executing the
3998              * handler.
3999              */
4000             if (tso->blocked_exceptions == NULL) {
4001                 tso->blocked_exceptions = END_TSO_QUEUE;
4002             }
4003
4004             /* Put the newly-built THUNK on top of the stack, ready to execute
4005              * when the thread restarts.
4006              */
4007             sp[0] = (W_)raise;
4008             sp[-1] = (W_)&stg_enter_info;
4009             tso->sp = sp-1;
4010             tso->what_next = ThreadRunGHC;
4011             IF_DEBUG(sanity, checkTSO(tso));
4012             return;
4013         }
4014             
4015         case ATOMICALLY_FRAME:
4016             if (stop_at_atomically) {
4017                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
4018                 stmCondemnTransaction(cap, tso -> trec);
4019 #ifdef REG_R1
4020                 tso->sp = frame;
4021 #else
4022                 // R1 is not a register: the return convention for IO in
4023                 // this case puts the return value on the stack, so we
4024                 // need to set up the stack to return to the atomically
4025                 // frame properly...
4026                 tso->sp = frame - 2;
4027                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
4028                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
4029 #endif
4030                 tso->what_next = ThreadRunGHC;
4031                 return;
4032             }
4033             // Not stop_at_atomically... fall through and abort the
4034             // transaction.
4035             
4036         case CATCH_RETRY_FRAME:
4037             // IF we find an ATOMICALLY_FRAME then we abort the
4038             // current transaction and propagate the exception.  In
4039             // this case (unlike ordinary exceptions) we do not care
4040             // whether the transaction is valid or not because its
4041             // possible validity cannot have caused the exception
4042             // and will not be visible after the abort.
4043             IF_DEBUG(stm,
4044                      debugBelch("Found atomically block delivering async exception\n"));
4045             StgTRecHeader *trec = tso -> trec;
4046             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
4047             stmAbortTransaction(cap, trec);
4048             tso -> trec = outer;
4049             break;
4050             
4051         default:
4052             break;
4053         }
4054
4055         // move on to the next stack frame
4056         frame += stack_frame_sizeW((StgClosure *)frame);
4057     }
4058
4059     // if we got here, then we stopped at stop_here
4060     ASSERT(stop_here != NULL);
4061 }
4062
4063 /* -----------------------------------------------------------------------------
4064    Deleting threads
4065
4066    This is used for interruption (^C) and forking, and corresponds to
4067    raising an exception but without letting the thread catch the
4068    exception.
4069    -------------------------------------------------------------------------- */
4070
4071 static void 
4072 deleteThread (Capability *cap, StgTSO *tso)
4073 {
4074   if (tso->why_blocked != BlockedOnCCall &&
4075       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
4076       raiseAsync(cap,tso,NULL);
4077   }
4078 }
4079
4080 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
4081 static void 
4082 deleteThread_(Capability *cap, StgTSO *tso)
4083 { // for forkProcess only:
4084   // like deleteThread(), but we delete threads in foreign calls, too.
4085
4086     if (tso->why_blocked == BlockedOnCCall ||
4087         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
4088         unblockOne(cap,tso);
4089         tso->what_next = ThreadKilled;
4090     } else {
4091         deleteThread(cap,tso);
4092     }
4093 }
4094 #endif
4095
4096 /* -----------------------------------------------------------------------------
4097    raiseExceptionHelper
4098    
4099    This function is called by the raise# primitve, just so that we can
4100    move some of the tricky bits of raising an exception from C-- into
4101    C.  Who knows, it might be a useful re-useable thing here too.
4102    -------------------------------------------------------------------------- */
4103
4104 StgWord
4105 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
4106 {
4107     Capability *cap = regTableToCapability(reg);
4108     StgThunk *raise_closure = NULL;
4109     StgPtr p, next;
4110     StgRetInfoTable *info;
4111     //
4112     // This closure represents the expression 'raise# E' where E
4113     // is the exception raise.  It is used to overwrite all the
4114     // thunks which are currently under evaluataion.
4115     //
4116
4117     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
4118     // LDV profiling: stg_raise_info has THUNK as its closure
4119     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
4120     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
4121     // 1 does not cause any problem unless profiling is performed.
4122     // However, when LDV profiling goes on, we need to linearly scan
4123     // small object pool, where raise_closure is stored, so we should
4124     // use MIN_UPD_SIZE.
4125     //
4126     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
4127     //                                 sizeofW(StgClosure)+1);
4128     //
4129
4130     //
4131     // Walk up the stack, looking for the catch frame.  On the way,
4132     // we update any closures pointed to from update frames with the
4133     // raise closure that we just built.
4134     //
4135     p = tso->sp;
4136     while(1) {
4137         info = get_ret_itbl((StgClosure *)p);
4138         next = p + stack_frame_sizeW((StgClosure *)p);
4139         switch (info->i.type) {
4140             
4141         case UPDATE_FRAME:
4142             // Only create raise_closure if we need to.
4143             if (raise_closure == NULL) {
4144                 raise_closure = 
4145                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4146                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
4147                 raise_closure->payload[0] = exception;
4148             }
4149             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
4150             p = next;
4151             continue;
4152
4153         case ATOMICALLY_FRAME:
4154             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
4155             tso->sp = p;
4156             return ATOMICALLY_FRAME;
4157             
4158         case CATCH_FRAME:
4159             tso->sp = p;
4160             return CATCH_FRAME;
4161
4162         case CATCH_STM_FRAME:
4163             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
4164             tso->sp = p;
4165             return CATCH_STM_FRAME;
4166             
4167         case STOP_FRAME:
4168             tso->sp = p;
4169             return STOP_FRAME;
4170
4171         case CATCH_RETRY_FRAME:
4172         default:
4173             p = next; 
4174             continue;
4175         }
4176     }
4177 }
4178
4179
4180 /* -----------------------------------------------------------------------------
4181    findRetryFrameHelper
4182
4183    This function is called by the retry# primitive.  It traverses the stack
4184    leaving tso->sp referring to the frame which should handle the retry.  
4185
4186    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
4187    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
4188
4189    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4190    despite the similar implementation.
4191
4192    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4193    not be created within memory transactions.
4194    -------------------------------------------------------------------------- */
4195
4196 StgWord
4197 findRetryFrameHelper (StgTSO *tso)
4198 {
4199   StgPtr           p, next;
4200   StgRetInfoTable *info;
4201
4202   p = tso -> sp;
4203   while (1) {
4204     info = get_ret_itbl((StgClosure *)p);
4205     next = p + stack_frame_sizeW((StgClosure *)p);
4206     switch (info->i.type) {
4207       
4208     case ATOMICALLY_FRAME:
4209       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4210       tso->sp = p;
4211       return ATOMICALLY_FRAME;
4212       
4213     case CATCH_RETRY_FRAME:
4214       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4215       tso->sp = p;
4216       return CATCH_RETRY_FRAME;
4217       
4218     case CATCH_STM_FRAME:
4219     default:
4220       ASSERT(info->i.type != CATCH_FRAME);
4221       ASSERT(info->i.type != STOP_FRAME);
4222       p = next; 
4223       continue;
4224     }
4225   }
4226 }
4227
4228 /* -----------------------------------------------------------------------------
4229    resurrectThreads is called after garbage collection on the list of
4230    threads found to be garbage.  Each of these threads will be woken
4231    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4232    on an MVar, or NonTermination if the thread was blocked on a Black
4233    Hole.
4234
4235    Locks: assumes we hold *all* the capabilities.
4236    -------------------------------------------------------------------------- */
4237
4238 void
4239 resurrectThreads (StgTSO *threads)
4240 {
4241     StgTSO *tso, *next;
4242     Capability *cap;
4243
4244     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4245         next = tso->global_link;
4246         tso->global_link = all_threads;
4247         all_threads = tso;
4248         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4249         
4250         // Wake up the thread on the Capability it was last on
4251         cap = tso->cap;
4252         
4253         switch (tso->why_blocked) {
4254         case BlockedOnMVar:
4255         case BlockedOnException:
4256             /* Called by GC - sched_mutex lock is currently held. */
4257             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4258             break;
4259         case BlockedOnBlackHole:
4260             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4261             break;
4262         case BlockedOnSTM:
4263             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4264             break;
4265         case NotBlocked:
4266             /* This might happen if the thread was blocked on a black hole
4267              * belonging to a thread that we've just woken up (raiseAsync
4268              * can wake up threads, remember...).
4269              */
4270             continue;
4271         default:
4272             barf("resurrectThreads: thread blocked in a strange way");
4273         }
4274     }
4275 }
4276
4277 /* ----------------------------------------------------------------------------
4278  * Debugging: why is a thread blocked
4279  * [Also provides useful information when debugging threaded programs
4280  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4281    ------------------------------------------------------------------------- */
4282
4283 #if DEBUG
4284 static void
4285 printThreadBlockage(StgTSO *tso)
4286 {
4287   switch (tso->why_blocked) {
4288   case BlockedOnRead:
4289     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4290     break;
4291   case BlockedOnWrite:
4292     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4293     break;
4294 #if defined(mingw32_HOST_OS)
4295     case BlockedOnDoProc:
4296     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4297     break;
4298 #endif
4299   case BlockedOnDelay:
4300     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4301     break;
4302   case BlockedOnMVar:
4303     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4304     break;
4305   case BlockedOnException:
4306     debugBelch("is blocked on delivering an exception to thread %d",
4307             tso->block_info.tso->id);
4308     break;
4309   case BlockedOnBlackHole:
4310     debugBelch("is blocked on a black hole");
4311     break;
4312   case NotBlocked:
4313     debugBelch("is not blocked");
4314     break;
4315 #if defined(PARALLEL_HASKELL)
4316   case BlockedOnGA:
4317     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4318             tso->block_info.closure, info_type(tso->block_info.closure));
4319     break;
4320   case BlockedOnGA_NoSend:
4321     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4322             tso->block_info.closure, info_type(tso->block_info.closure));
4323     break;
4324 #endif
4325   case BlockedOnCCall:
4326     debugBelch("is blocked on an external call");
4327     break;
4328   case BlockedOnCCall_NoUnblockExc:
4329     debugBelch("is blocked on an external call (exceptions were already blocked)");
4330     break;
4331   case BlockedOnSTM:
4332     debugBelch("is blocked on an STM operation");
4333     break;
4334   default:
4335     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4336          tso->why_blocked, tso->id, tso);
4337   }
4338 }
4339
4340 void
4341 printThreadStatus(StgTSO *t)
4342 {
4343     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4344     {
4345       void *label = lookupThreadLabel(t->id);
4346       if (label) debugBelch("[\"%s\"] ",(char *)label);
4347     }
4348     if (t->what_next == ThreadRelocated) {
4349         debugBelch("has been relocated...\n");
4350     } else {
4351         switch (t->what_next) {
4352         case ThreadKilled:
4353             debugBelch("has been killed");
4354             break;
4355         case ThreadComplete:
4356             debugBelch("has completed");
4357             break;
4358         default:
4359             printThreadBlockage(t);
4360         }
4361         debugBelch("\n");
4362     }
4363 }
4364
4365 void
4366 printAllThreads(void)
4367 {
4368   StgTSO *t, *next;
4369   nat i;
4370   Capability *cap;
4371
4372 # if defined(GRAN)
4373   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4374   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4375                        time_string, rtsFalse/*no commas!*/);
4376
4377   debugBelch("all threads at [%s]:\n", time_string);
4378 # elif defined(PARALLEL_HASKELL)
4379   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4380   ullong_format_string(CURRENT_TIME,
4381                        time_string, rtsFalse/*no commas!*/);
4382
4383   debugBelch("all threads at [%s]:\n", time_string);
4384 # else
4385   debugBelch("all threads:\n");
4386 # endif
4387
4388   for (i = 0; i < n_capabilities; i++) {
4389       cap = &capabilities[i];
4390       debugBelch("threads on capability %d:\n", cap->no);
4391       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4392           printThreadStatus(t);
4393       }
4394   }
4395
4396   debugBelch("other threads:\n");
4397   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4398       if (t->why_blocked != NotBlocked) {
4399           printThreadStatus(t);
4400       }
4401       if (t->what_next == ThreadRelocated) {
4402           next = t->link;
4403       } else {
4404           next = t->global_link;
4405       }
4406   }
4407 }
4408
4409 // useful from gdb
4410 void 
4411 printThreadQueue(StgTSO *t)
4412 {
4413     nat i = 0;
4414     for (; t != END_TSO_QUEUE; t = t->link) {
4415         printThreadStatus(t);
4416         i++;
4417     }
4418     debugBelch("%d threads on queue\n", i);
4419 }
4420
4421 /* 
4422    Print a whole blocking queue attached to node (debugging only).
4423 */
4424 # if defined(PARALLEL_HASKELL)
4425 void 
4426 print_bq (StgClosure *node)
4427 {
4428   StgBlockingQueueElement *bqe;
4429   StgTSO *tso;
4430   rtsBool end;
4431
4432   debugBelch("## BQ of closure %p (%s): ",
4433           node, info_type(node));
4434
4435   /* should cover all closures that may have a blocking queue */
4436   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4437          get_itbl(node)->type == FETCH_ME_BQ ||
4438          get_itbl(node)->type == RBH ||
4439          get_itbl(node)->type == MVAR);
4440     
4441   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4442
4443   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4444 }
4445
4446 /* 
4447    Print a whole blocking queue starting with the element bqe.
4448 */
4449 void 
4450 print_bqe (StgBlockingQueueElement *bqe)
4451 {
4452   rtsBool end;
4453
4454   /* 
4455      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4456   */
4457   for (end = (bqe==END_BQ_QUEUE);
4458        !end; // iterate until bqe points to a CONSTR
4459        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4460        bqe = end ? END_BQ_QUEUE : bqe->link) {
4461     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4462     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4463     /* types of closures that may appear in a blocking queue */
4464     ASSERT(get_itbl(bqe)->type == TSO ||           
4465            get_itbl(bqe)->type == BLOCKED_FETCH || 
4466            get_itbl(bqe)->type == CONSTR); 
4467     /* only BQs of an RBH end with an RBH_Save closure */
4468     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4469
4470     switch (get_itbl(bqe)->type) {
4471     case TSO:
4472       debugBelch(" TSO %u (%x),",
4473               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4474       break;
4475     case BLOCKED_FETCH:
4476       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4477               ((StgBlockedFetch *)bqe)->node, 
4478               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4479               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4480               ((StgBlockedFetch *)bqe)->ga.weight);
4481       break;
4482     case CONSTR:
4483       debugBelch(" %s (IP %p),",
4484               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4485                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4486                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4487                "RBH_Save_?"), get_itbl(bqe));
4488       break;
4489     default:
4490       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4491            info_type((StgClosure *)bqe)); // , node, info_type(node));
4492       break;
4493     }
4494   } /* for */
4495   debugBelch("\n");
4496 }
4497 # elif defined(GRAN)
4498 void 
4499 print_bq (StgClosure *node)
4500 {
4501   StgBlockingQueueElement *bqe;
4502   PEs node_loc, tso_loc;
4503   rtsBool end;
4504
4505   /* should cover all closures that may have a blocking queue */
4506   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4507          get_itbl(node)->type == FETCH_ME_BQ ||
4508          get_itbl(node)->type == RBH);
4509     
4510   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4511   node_loc = where_is(node);
4512
4513   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4514           node, info_type(node), node_loc);
4515
4516   /* 
4517      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4518   */
4519   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4520        !end; // iterate until bqe points to a CONSTR
4521        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4522     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4523     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4524     /* types of closures that may appear in a blocking queue */
4525     ASSERT(get_itbl(bqe)->type == TSO ||           
4526            get_itbl(bqe)->type == CONSTR); 
4527     /* only BQs of an RBH end with an RBH_Save closure */
4528     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4529
4530     tso_loc = where_is((StgClosure *)bqe);
4531     switch (get_itbl(bqe)->type) {
4532     case TSO:
4533       debugBelch(" TSO %d (%p) on [PE %d],",
4534               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4535       break;
4536     case CONSTR:
4537       debugBelch(" %s (IP %p),",
4538               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4539                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4540                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4541                "RBH_Save_?"), get_itbl(bqe));
4542       break;
4543     default:
4544       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4545            info_type((StgClosure *)bqe), node, info_type(node));
4546       break;
4547     }
4548   } /* for */
4549   debugBelch("\n");
4550 }
4551 # endif
4552
4553 #if defined(PARALLEL_HASKELL)
4554 static nat
4555 run_queue_len(void)
4556 {
4557     nat i;
4558     StgTSO *tso;
4559     
4560     for (i=0, tso=run_queue_hd; 
4561          tso != END_TSO_QUEUE;
4562          i++, tso=tso->link) {
4563         /* nothing */
4564     }
4565         
4566     return i;
4567 }
4568 #endif
4569
4570 void
4571 sched_belch(char *s, ...)
4572 {
4573     va_list ap;
4574     va_start(ap,s);
4575 #ifdef THREADED_RTS
4576     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4577 #elif defined(PARALLEL_HASKELL)
4578     debugBelch("== ");
4579 #else
4580     debugBelch("sched: ");
4581 #endif
4582     vdebugBelch(s, ap);
4583     debugBelch("\n");
4584     va_end(ap);
4585 }
4586
4587 #endif /* DEBUG */