d49d4ed8e534bd9060e6fa53b779880c4a92a62b
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool sched_state = SCHED_RUNNING;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
209 #if defined(GRAN)
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
211 #endif
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
216 #endif
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
219 #endif
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
223                                          StgTSO *t);
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
225                                     nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
228                                              StgTSO *t );
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static Capability *scheduleDoGC(Capability *cap, Task *task,
231                                 rtsBool force_major, 
232                                 void (*get_roots)(evac_fn));
233
234 static void unblockThread(Capability *cap, StgTSO *tso);
235 static rtsBool checkBlackHoles(Capability *cap);
236 static void AllRoots(evac_fn evac);
237
238 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
239
240 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
241                         rtsBool stop_at_atomically, StgPtr stop_here);
242
243 static void deleteThread (Capability *cap, StgTSO *tso);
244 static void deleteAllThreads (Capability *cap);
245
246 #ifdef DEBUG
247 static void printThreadBlockage(StgTSO *tso);
248 static void printThreadStatus(StgTSO *tso);
249 void printThreadQueue(StgTSO *tso);
250 #endif
251
252 #if defined(PARALLEL_HASKELL)
253 StgTSO * createSparkThread(rtsSpark spark);
254 StgTSO * activateSpark (rtsSpark spark);  
255 #endif
256
257 #ifdef DEBUG
258 static char *whatNext_strs[] = {
259   "(unknown)",
260   "ThreadRunGHC",
261   "ThreadInterpret",
262   "ThreadKilled",
263   "ThreadRelocated",
264   "ThreadComplete"
265 };
266 #endif
267
268 /* -----------------------------------------------------------------------------
269  * Putting a thread on the run queue: different scheduling policies
270  * -------------------------------------------------------------------------- */
271
272 STATIC_INLINE void
273 addToRunQueue( Capability *cap, StgTSO *t )
274 {
275 #if defined(PARALLEL_HASKELL)
276     if (RtsFlags.ParFlags.doFairScheduling) { 
277         // this does round-robin scheduling; good for concurrency
278         appendToRunQueue(cap,t);
279     } else {
280         // this does unfair scheduling; good for parallelism
281         pushOnRunQueue(cap,t);
282     }
283 #else
284     // this does round-robin scheduling; good for concurrency
285     appendToRunQueue(cap,t);
286 #endif
287 }
288
289 /* ---------------------------------------------------------------------------
290    Main scheduling loop.
291
292    We use round-robin scheduling, each thread returning to the
293    scheduler loop when one of these conditions is detected:
294
295       * out of heap space
296       * timer expires (thread yields)
297       * thread blocks
298       * thread ends
299       * stack overflow
300
301    GRAN version:
302      In a GranSim setup this loop iterates over the global event queue.
303      This revolves around the global event queue, which determines what 
304      to do next. Therefore, it's more complicated than either the 
305      concurrent or the parallel (GUM) setup.
306
307    GUM version:
308      GUM iterates over incoming messages.
309      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
310      and sends out a fish whenever it has nothing to do; in-between
311      doing the actual reductions (shared code below) it processes the
312      incoming messages and deals with delayed operations 
313      (see PendingFetches).
314      This is not the ugliest code you could imagine, but it's bloody close.
315
316    ------------------------------------------------------------------------ */
317
318 static Capability *
319 schedule (Capability *initialCapability, Task *task)
320 {
321   StgTSO *t;
322   Capability *cap;
323   StgThreadReturnCode ret;
324 #if defined(GRAN)
325   rtsEvent *event;
326 #elif defined(PARALLEL_HASKELL)
327   StgTSO *tso;
328   GlobalTaskId pe;
329   rtsBool receivedFinish = rtsFalse;
330 # if defined(DEBUG)
331   nat tp_size, sp_size; // stats only
332 # endif
333 #endif
334   nat prev_what_next;
335   rtsBool ready_to_gc;
336 #if defined(THREADED_RTS)
337   rtsBool first = rtsTrue;
338 #endif
339   
340   cap = initialCapability;
341
342   // Pre-condition: this task owns initialCapability.
343   // The sched_mutex is *NOT* held
344   // NB. on return, we still hold a capability.
345
346   IF_DEBUG(scheduler,
347            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
348                        task, initialCapability);
349       );
350
351   schedulePreLoop();
352
353   // -----------------------------------------------------------
354   // Scheduler loop starts here:
355
356 #if defined(PARALLEL_HASKELL)
357 #define TERMINATION_CONDITION        (!receivedFinish)
358 #elif defined(GRAN)
359 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
360 #else
361 #define TERMINATION_CONDITION        rtsTrue
362 #endif
363
364   while (TERMINATION_CONDITION) {
365
366 #if defined(GRAN)
367       /* Choose the processor with the next event */
368       CurrentProc = event->proc;
369       CurrentTSO = event->tso;
370 #endif
371
372 #if defined(THREADED_RTS)
373       if (first) {
374           // don't yield the first time, we want a chance to run this
375           // thread for a bit, even if there are others banging at the
376           // door.
377           first = rtsFalse;
378           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
379       } else {
380           // Yield the capability to higher-priority tasks if necessary.
381           yieldCapability(&cap, task);
382       }
383 #endif
384       
385 #if defined(THREADED_RTS)
386       schedulePushWork(cap,task);
387 #endif
388
389     // Check whether we have re-entered the RTS from Haskell without
390     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
391     // call).
392     if (cap->in_haskell) {
393           errorBelch("schedule: re-entered unsafely.\n"
394                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
395           stg_exit(EXIT_FAILURE);
396     }
397
398     // The interruption / shutdown sequence.
399     // 
400     // In order to cleanly shut down the runtime, we want to:
401     //   * make sure that all main threads return to their callers
402     //     with the state 'Interrupted'.
403     //   * clean up all OS threads assocated with the runtime
404     //   * free all memory etc.
405     //
406     // So the sequence for ^C goes like this:
407     //
408     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
409     //     arranges for some Capability to wake up
410     //
411     //   * all threads in the system are halted, and the zombies are
412     //     placed on the run queue for cleaning up.  We acquire all
413     //     the capabilities in order to delete the threads, this is
414     //     done by scheduleDoGC() for convenience (because GC already
415     //     needs to acquire all the capabilities).  We can't kill
416     //     threads involved in foreign calls.
417     // 
418     //   * sched_state := SCHED_INTERRUPTED
419     //
420     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
421     //
422     //   * sched_state := SCHED_SHUTTING_DOWN
423     //
424     //   * all workers exit when the run queue on their capability
425     //     drains.  All main threads will also exit when their TSO
426     //     reaches the head of the run queue and they can return.
427     //
428     //   * eventually all Capabilities will shut down, and the RTS can
429     //     exit.
430     //
431     //   * We might be left with threads blocked in foreign calls, 
432     //     we should really attempt to kill these somehow (TODO);
433     
434     switch (sched_state) {
435     case SCHED_RUNNING:
436         break;
437     case SCHED_INTERRUPTING:
438         IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
439 #if defined(THREADED_RTS)
440         discardSparksCap(cap);
441 #endif
442         /* scheduleDoGC() deletes all the threads */
443         cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
444         break;
445     case SCHED_INTERRUPTED:
446         IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED"));
447         break;
448     case SCHED_SHUTTING_DOWN:
449         IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
450         // If we are a worker, just exit.  If we're a bound thread
451         // then we will exit below when we've removed our TSO from
452         // the run queue.
453         if (task->tso == NULL && emptyRunQueue(cap)) {
454             return cap;
455         }
456         break;
457     default:
458         barf("sched_state: %d", sched_state);
459     }
460
461 #if defined(THREADED_RTS)
462     // If the run queue is empty, take a spark and turn it into a thread.
463     {
464         if (emptyRunQueue(cap)) {
465             StgClosure *spark;
466             spark = findSpark(cap);
467             if (spark != NULL) {
468                 IF_DEBUG(scheduler,
469                          sched_belch("turning spark of closure %p into a thread",
470                                      (StgClosure *)spark));
471                 createSparkThread(cap,spark);     
472             }
473         }
474     }
475 #endif // THREADED_RTS
476
477     scheduleStartSignalHandlers(cap);
478
479     // Only check the black holes here if we've nothing else to do.
480     // During normal execution, the black hole list only gets checked
481     // at GC time, to avoid repeatedly traversing this possibly long
482     // list each time around the scheduler.
483     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
484
485     // Any threads that were woken up by other Capabilities get
486     // appended to our run queue.
487     if (!emptyWakeupQueue(cap)) {
488         ACQUIRE_LOCK(&cap->lock);
489         if (emptyRunQueue(cap)) {
490             cap->run_queue_hd = cap->wakeup_queue_hd;
491             cap->run_queue_tl = cap->wakeup_queue_tl;
492         } else {
493             cap->run_queue_tl->link = cap->wakeup_queue_hd;
494             cap->run_queue_tl = cap->wakeup_queue_tl;
495         }
496         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
497         RELEASE_LOCK(&cap->lock);
498     }
499
500     scheduleCheckBlockedThreads(cap);
501
502     scheduleDetectDeadlock(cap,task);
503 #if defined(THREADED_RTS)
504     cap = task->cap;    // reload cap, it might have changed
505 #endif
506
507     // Normally, the only way we can get here with no threads to
508     // run is if a keyboard interrupt received during 
509     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
510     // Additionally, it is not fatal for the
511     // threaded RTS to reach here with no threads to run.
512     //
513     // win32: might be here due to awaitEvent() being abandoned
514     // as a result of a console event having been delivered.
515     if ( emptyRunQueue(cap) ) {
516 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
517         ASSERT(sched_state >= SCHED_INTERRUPTING);
518 #endif
519         continue; // nothing to do
520     }
521
522 #if defined(PARALLEL_HASKELL)
523     scheduleSendPendingMessages();
524     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
525         continue;
526
527 #if defined(SPARKS)
528     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
529 #endif
530
531     /* If we still have no work we need to send a FISH to get a spark
532        from another PE */
533     if (emptyRunQueue(cap)) {
534         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
535         ASSERT(rtsFalse); // should not happen at the moment
536     }
537     // from here: non-empty run queue.
538     //  TODO: merge above case with this, only one call processMessages() !
539     if (PacketsWaiting()) {  /* process incoming messages, if
540                                 any pending...  only in else
541                                 because getRemoteWork waits for
542                                 messages as well */
543         receivedFinish = processMessages();
544     }
545 #endif
546
547 #if defined(GRAN)
548     scheduleProcessEvent(event);
549 #endif
550
551     // 
552     // Get a thread to run
553     //
554     t = popRunQueue(cap);
555
556 #if defined(GRAN) || defined(PAR)
557     scheduleGranParReport(); // some kind of debuging output
558 #else
559     // Sanity check the thread we're about to run.  This can be
560     // expensive if there is lots of thread switching going on...
561     IF_DEBUG(sanity,checkTSO(t));
562 #endif
563
564 #if defined(THREADED_RTS)
565     // Check whether we can run this thread in the current task.
566     // If not, we have to pass our capability to the right task.
567     {
568         Task *bound = t->bound;
569       
570         if (bound) {
571             if (bound == task) {
572                 IF_DEBUG(scheduler,
573                          sched_belch("### Running thread %d in bound thread",
574                                      t->id));
575                 // yes, the Haskell thread is bound to the current native thread
576             } else {
577                 IF_DEBUG(scheduler,
578                          sched_belch("### thread %d bound to another OS thread",
579                                      t->id));
580                 // no, bound to a different Haskell thread: pass to that thread
581                 pushOnRunQueue(cap,t);
582                 continue;
583             }
584         } else {
585             // The thread we want to run is unbound.
586             if (task->tso) { 
587                 IF_DEBUG(scheduler,
588                          sched_belch("### this OS thread cannot run thread %d", t->id));
589                 // no, the current native thread is bound to a different
590                 // Haskell thread, so pass it to any worker thread
591                 pushOnRunQueue(cap,t);
592                 continue; 
593             }
594         }
595     }
596 #endif
597
598     cap->r.rCurrentTSO = t;
599     
600     /* context switches are initiated by the timer signal, unless
601      * the user specified "context switch as often as possible", with
602      * +RTS -C0
603      */
604     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
605         && !emptyThreadQueues(cap)) {
606         context_switch = 1;
607     }
608          
609 run_thread:
610
611     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
612                               (long)t->id, whatNext_strs[t->what_next]));
613
614 #if defined(PROFILING)
615     startHeapProfTimer();
616 #endif
617
618     // ----------------------------------------------------------------------
619     // Run the current thread 
620
621     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
622     ASSERT(t->cap == cap);
623
624     prev_what_next = t->what_next;
625
626     errno = t->saved_errno;
627     cap->in_haskell = rtsTrue;
628
629     dirtyTSO(t);
630
631     recent_activity = ACTIVITY_YES;
632
633     switch (prev_what_next) {
634         
635     case ThreadKilled:
636     case ThreadComplete:
637         /* Thread already finished, return to scheduler. */
638         ret = ThreadFinished;
639         break;
640         
641     case ThreadRunGHC:
642     {
643         StgRegTable *r;
644         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
645         cap = regTableToCapability(r);
646         ret = r->rRet;
647         break;
648     }
649     
650     case ThreadInterpret:
651         cap = interpretBCO(cap);
652         ret = cap->r.rRet;
653         break;
654         
655     default:
656         barf("schedule: invalid what_next field");
657     }
658
659     cap->in_haskell = rtsFalse;
660
661     // The TSO might have moved, eg. if it re-entered the RTS and a GC
662     // happened.  So find the new location:
663     t = cap->r.rCurrentTSO;
664
665     // We have run some Haskell code: there might be blackhole-blocked
666     // threads to wake up now.
667     // Lock-free test here should be ok, we're just setting a flag.
668     if ( blackhole_queue != END_TSO_QUEUE ) {
669         blackholes_need_checking = rtsTrue;
670     }
671     
672     // And save the current errno in this thread.
673     // XXX: possibly bogus for SMP because this thread might already
674     // be running again, see code below.
675     t->saved_errno = errno;
676
677 #if defined(THREADED_RTS)
678     // If ret is ThreadBlocked, and this Task is bound to the TSO that
679     // blocked, we are in limbo - the TSO is now owned by whatever it
680     // is blocked on, and may in fact already have been woken up,
681     // perhaps even on a different Capability.  It may be the case
682     // that task->cap != cap.  We better yield this Capability
683     // immediately and return to normaility.
684     if (ret == ThreadBlocked) {
685         IF_DEBUG(scheduler,
686                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
687                              t->id, whatNext_strs[t->what_next]));
688         continue;
689     }
690 #endif
691
692     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
693     ASSERT(t->cap == cap);
694
695     // ----------------------------------------------------------------------
696     
697     // Costs for the scheduler are assigned to CCS_SYSTEM
698 #if defined(PROFILING)
699     stopHeapProfTimer();
700     CCCS = CCS_SYSTEM;
701 #endif
702     
703 #if defined(THREADED_RTS)
704     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
705 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
706     IF_DEBUG(scheduler,debugBelch("sched: "););
707 #endif
708     
709     schedulePostRunThread();
710
711     ready_to_gc = rtsFalse;
712
713     switch (ret) {
714     case HeapOverflow:
715         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
716         break;
717
718     case StackOverflow:
719         scheduleHandleStackOverflow(cap,task,t);
720         break;
721
722     case ThreadYielding:
723         if (scheduleHandleYield(cap, t, prev_what_next)) {
724             // shortcut for switching between compiler/interpreter:
725             goto run_thread; 
726         }
727         break;
728
729     case ThreadBlocked:
730         scheduleHandleThreadBlocked(t);
731         break;
732
733     case ThreadFinished:
734         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
735         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
736         break;
737
738     default:
739       barf("schedule: invalid thread return code %d", (int)ret);
740     }
741
742     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
743     if (ready_to_gc) {
744       cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
745     }
746   } /* end of while() */
747
748   IF_PAR_DEBUG(verbose,
749                debugBelch("== Leaving schedule() after having received Finish\n"));
750 }
751
752 /* ----------------------------------------------------------------------------
753  * Setting up the scheduler loop
754  * ------------------------------------------------------------------------- */
755
756 static void
757 schedulePreLoop(void)
758 {
759 #if defined(GRAN) 
760     /* set up first event to get things going */
761     /* ToDo: assign costs for system setup and init MainTSO ! */
762     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
763               ContinueThread, 
764               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
765     
766     IF_DEBUG(gran,
767              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
768                         CurrentTSO);
769              G_TSO(CurrentTSO, 5));
770     
771     if (RtsFlags.GranFlags.Light) {
772         /* Save current time; GranSim Light only */
773         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
774     }      
775 #endif
776 }
777
778 /* -----------------------------------------------------------------------------
779  * schedulePushWork()
780  *
781  * Push work to other Capabilities if we have some.
782  * -------------------------------------------------------------------------- */
783
784 #if defined(THREADED_RTS)
785 static void
786 schedulePushWork(Capability *cap USED_IF_THREADS, 
787                  Task *task      USED_IF_THREADS)
788 {
789     Capability *free_caps[n_capabilities], *cap0;
790     nat i, n_free_caps;
791
792     // migration can be turned off with +RTS -qg
793     if (!RtsFlags.ParFlags.migrate) return;
794
795     // Check whether we have more threads on our run queue, or sparks
796     // in our pool, that we could hand to another Capability.
797     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
798         && sparkPoolSizeCap(cap) < 2) {
799         return;
800     }
801
802     // First grab as many free Capabilities as we can.
803     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
804         cap0 = &capabilities[i];
805         if (cap != cap0 && tryGrabCapability(cap0,task)) {
806             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
807                 // it already has some work, we just grabbed it at 
808                 // the wrong moment.  Or maybe it's deadlocked!
809                 releaseCapability(cap0);
810             } else {
811                 free_caps[n_free_caps++] = cap0;
812             }
813         }
814     }
815
816     // we now have n_free_caps free capabilities stashed in
817     // free_caps[].  Share our run queue equally with them.  This is
818     // probably the simplest thing we could do; improvements we might
819     // want to do include:
820     //
821     //   - giving high priority to moving relatively new threads, on 
822     //     the gournds that they haven't had time to build up a
823     //     working set in the cache on this CPU/Capability.
824     //
825     //   - giving low priority to moving long-lived threads
826
827     if (n_free_caps > 0) {
828         StgTSO *prev, *t, *next;
829         rtsBool pushed_to_all;
830
831         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
832
833         i = 0;
834         pushed_to_all = rtsFalse;
835
836         if (cap->run_queue_hd != END_TSO_QUEUE) {
837             prev = cap->run_queue_hd;
838             t = prev->link;
839             prev->link = END_TSO_QUEUE;
840             for (; t != END_TSO_QUEUE; t = next) {
841                 next = t->link;
842                 t->link = END_TSO_QUEUE;
843                 if (t->what_next == ThreadRelocated
844                     || t->bound == task) { // don't move my bound thread
845                     prev->link = t;
846                     prev = t;
847                 } else if (i == n_free_caps) {
848                     pushed_to_all = rtsTrue;
849                     i = 0;
850                     // keep one for us
851                     prev->link = t;
852                     prev = t;
853                 } else {
854                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
855                     appendToRunQueue(free_caps[i],t);
856                     if (t->bound) { t->bound->cap = free_caps[i]; }
857                     t->cap = free_caps[i];
858                     i++;
859                 }
860             }
861             cap->run_queue_tl = prev;
862         }
863
864         // If there are some free capabilities that we didn't push any
865         // threads to, then try to push a spark to each one.
866         if (!pushed_to_all) {
867             StgClosure *spark;
868             // i is the next free capability to push to
869             for (; i < n_free_caps; i++) {
870                 if (emptySparkPoolCap(free_caps[i])) {
871                     spark = findSpark(cap);
872                     if (spark != NULL) {
873                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
874                         newSpark(&(free_caps[i]->r), spark);
875                     }
876                 }
877             }
878         }
879
880         // release the capabilities
881         for (i = 0; i < n_free_caps; i++) {
882             task->cap = free_caps[i];
883             releaseCapability(free_caps[i]);
884         }
885     }
886     task->cap = cap; // reset to point to our Capability.
887 }
888 #endif
889
890 /* ----------------------------------------------------------------------------
891  * Start any pending signal handlers
892  * ------------------------------------------------------------------------- */
893
894 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
895 static void
896 scheduleStartSignalHandlers(Capability *cap)
897 {
898     if (signals_pending()) { // safe outside the lock
899         startSignalHandlers(cap);
900     }
901 }
902 #else
903 static void
904 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
905 {
906 }
907 #endif
908
909 /* ----------------------------------------------------------------------------
910  * Check for blocked threads that can be woken up.
911  * ------------------------------------------------------------------------- */
912
913 static void
914 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
915 {
916 #if !defined(THREADED_RTS)
917     //
918     // Check whether any waiting threads need to be woken up.  If the
919     // run queue is empty, and there are no other tasks running, we
920     // can wait indefinitely for something to happen.
921     //
922     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
923     {
924         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
925     }
926 #endif
927 }
928
929
930 /* ----------------------------------------------------------------------------
931  * Check for threads blocked on BLACKHOLEs that can be woken up
932  * ------------------------------------------------------------------------- */
933 static void
934 scheduleCheckBlackHoles (Capability *cap)
935 {
936     if ( blackholes_need_checking ) // check without the lock first
937     {
938         ACQUIRE_LOCK(&sched_mutex);
939         if ( blackholes_need_checking ) {
940             checkBlackHoles(cap);
941             blackholes_need_checking = rtsFalse;
942         }
943         RELEASE_LOCK(&sched_mutex);
944     }
945 }
946
947 /* ----------------------------------------------------------------------------
948  * Detect deadlock conditions and attempt to resolve them.
949  * ------------------------------------------------------------------------- */
950
951 static void
952 scheduleDetectDeadlock (Capability *cap, Task *task)
953 {
954
955 #if defined(PARALLEL_HASKELL)
956     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
957     return;
958 #endif
959
960     /* 
961      * Detect deadlock: when we have no threads to run, there are no
962      * threads blocked, waiting for I/O, or sleeping, and all the
963      * other tasks are waiting for work, we must have a deadlock of
964      * some description.
965      */
966     if ( emptyThreadQueues(cap) )
967     {
968 #if defined(THREADED_RTS)
969         /* 
970          * In the threaded RTS, we only check for deadlock if there
971          * has been no activity in a complete timeslice.  This means
972          * we won't eagerly start a full GC just because we don't have
973          * any threads to run currently.
974          */
975         if (recent_activity != ACTIVITY_INACTIVE) return;
976 #endif
977
978         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
979
980         // Garbage collection can release some new threads due to
981         // either (a) finalizers or (b) threads resurrected because
982         // they are unreachable and will therefore be sent an
983         // exception.  Any threads thus released will be immediately
984         // runnable.
985         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
986
987         recent_activity = ACTIVITY_DONE_GC;
988         
989         if ( !emptyRunQueue(cap) ) return;
990
991 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
992         /* If we have user-installed signal handlers, then wait
993          * for signals to arrive rather then bombing out with a
994          * deadlock.
995          */
996         if ( anyUserHandlers() ) {
997             IF_DEBUG(scheduler, 
998                      sched_belch("still deadlocked, waiting for signals..."));
999
1000             awaitUserSignals();
1001
1002             if (signals_pending()) {
1003                 startSignalHandlers(cap);
1004             }
1005
1006             // either we have threads to run, or we were interrupted:
1007             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1008         }
1009 #endif
1010
1011 #if !defined(THREADED_RTS)
1012         /* Probably a real deadlock.  Send the current main thread the
1013          * Deadlock exception.
1014          */
1015         if (task->tso) {
1016             switch (task->tso->why_blocked) {
1017             case BlockedOnSTM:
1018             case BlockedOnBlackHole:
1019             case BlockedOnException:
1020             case BlockedOnMVar:
1021                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
1022                 return;
1023             default:
1024                 barf("deadlock: main thread blocked in a strange way");
1025             }
1026         }
1027         return;
1028 #endif
1029     }
1030 }
1031
1032 /* ----------------------------------------------------------------------------
1033  * Process an event (GRAN only)
1034  * ------------------------------------------------------------------------- */
1035
1036 #if defined(GRAN)
1037 static StgTSO *
1038 scheduleProcessEvent(rtsEvent *event)
1039 {
1040     StgTSO *t;
1041
1042     if (RtsFlags.GranFlags.Light)
1043       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1044
1045     /* adjust time based on time-stamp */
1046     if (event->time > CurrentTime[CurrentProc] &&
1047         event->evttype != ContinueThread)
1048       CurrentTime[CurrentProc] = event->time;
1049     
1050     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1051     if (!RtsFlags.GranFlags.Light)
1052       handleIdlePEs();
1053
1054     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1055
1056     /* main event dispatcher in GranSim */
1057     switch (event->evttype) {
1058       /* Should just be continuing execution */
1059     case ContinueThread:
1060       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1061       /* ToDo: check assertion
1062       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1063              run_queue_hd != END_TSO_QUEUE);
1064       */
1065       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1066       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1067           procStatus[CurrentProc]==Fetching) {
1068         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1069               CurrentTSO->id, CurrentTSO, CurrentProc);
1070         goto next_thread;
1071       } 
1072       /* Ignore ContinueThreads for completed threads */
1073       if (CurrentTSO->what_next == ThreadComplete) {
1074         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1075               CurrentTSO->id, CurrentTSO, CurrentProc);
1076         goto next_thread;
1077       } 
1078       /* Ignore ContinueThreads for threads that are being migrated */
1079       if (PROCS(CurrentTSO)==Nowhere) { 
1080         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1081               CurrentTSO->id, CurrentTSO, CurrentProc);
1082         goto next_thread;
1083       }
1084       /* The thread should be at the beginning of the run queue */
1085       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1086         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1087               CurrentTSO->id, CurrentTSO, CurrentProc);
1088         break; // run the thread anyway
1089       }
1090       /*
1091       new_event(proc, proc, CurrentTime[proc],
1092                 FindWork,
1093                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1094       goto next_thread; 
1095       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1096       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1097
1098     case FetchNode:
1099       do_the_fetchnode(event);
1100       goto next_thread;             /* handle next event in event queue  */
1101       
1102     case GlobalBlock:
1103       do_the_globalblock(event);
1104       goto next_thread;             /* handle next event in event queue  */
1105       
1106     case FetchReply:
1107       do_the_fetchreply(event);
1108       goto next_thread;             /* handle next event in event queue  */
1109       
1110     case UnblockThread:   /* Move from the blocked queue to the tail of */
1111       do_the_unblock(event);
1112       goto next_thread;             /* handle next event in event queue  */
1113       
1114     case ResumeThread:  /* Move from the blocked queue to the tail of */
1115       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1116       event->tso->gran.blocktime += 
1117         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1118       do_the_startthread(event);
1119       goto next_thread;             /* handle next event in event queue  */
1120       
1121     case StartThread:
1122       do_the_startthread(event);
1123       goto next_thread;             /* handle next event in event queue  */
1124       
1125     case MoveThread:
1126       do_the_movethread(event);
1127       goto next_thread;             /* handle next event in event queue  */
1128       
1129     case MoveSpark:
1130       do_the_movespark(event);
1131       goto next_thread;             /* handle next event in event queue  */
1132       
1133     case FindWork:
1134       do_the_findwork(event);
1135       goto next_thread;             /* handle next event in event queue  */
1136       
1137     default:
1138       barf("Illegal event type %u\n", event->evttype);
1139     }  /* switch */
1140     
1141     /* This point was scheduler_loop in the old RTS */
1142
1143     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1144
1145     TimeOfLastEvent = CurrentTime[CurrentProc];
1146     TimeOfNextEvent = get_time_of_next_event();
1147     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1148     // CurrentTSO = ThreadQueueHd;
1149
1150     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1151                          TimeOfNextEvent));
1152
1153     if (RtsFlags.GranFlags.Light) 
1154       GranSimLight_leave_system(event, &ActiveTSO); 
1155
1156     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1157
1158     IF_DEBUG(gran, 
1159              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1160
1161     /* in a GranSim setup the TSO stays on the run queue */
1162     t = CurrentTSO;
1163     /* Take a thread from the run queue. */
1164     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1165
1166     IF_DEBUG(gran, 
1167              debugBelch("GRAN: About to run current thread, which is\n");
1168              G_TSO(t,5));
1169
1170     context_switch = 0; // turned on via GranYield, checking events and time slice
1171
1172     IF_DEBUG(gran, 
1173              DumpGranEvent(GR_SCHEDULE, t));
1174
1175     procStatus[CurrentProc] = Busy;
1176 }
1177 #endif // GRAN
1178
1179 /* ----------------------------------------------------------------------------
1180  * Send pending messages (PARALLEL_HASKELL only)
1181  * ------------------------------------------------------------------------- */
1182
1183 #if defined(PARALLEL_HASKELL)
1184 static StgTSO *
1185 scheduleSendPendingMessages(void)
1186 {
1187     StgSparkPool *pool;
1188     rtsSpark spark;
1189     StgTSO *t;
1190
1191 # if defined(PAR) // global Mem.Mgmt., omit for now
1192     if (PendingFetches != END_BF_QUEUE) {
1193         processFetches();
1194     }
1195 # endif
1196     
1197     if (RtsFlags.ParFlags.BufferTime) {
1198         // if we use message buffering, we must send away all message
1199         // packets which have become too old...
1200         sendOldBuffers(); 
1201     }
1202 }
1203 #endif
1204
1205 /* ----------------------------------------------------------------------------
1206  * Activate spark threads (PARALLEL_HASKELL only)
1207  * ------------------------------------------------------------------------- */
1208
1209 #if defined(PARALLEL_HASKELL)
1210 static void
1211 scheduleActivateSpark(void)
1212 {
1213 #if defined(SPARKS)
1214   ASSERT(emptyRunQueue());
1215 /* We get here if the run queue is empty and want some work.
1216    We try to turn a spark into a thread, and add it to the run queue,
1217    from where it will be picked up in the next iteration of the scheduler
1218    loop.
1219 */
1220
1221       /* :-[  no local threads => look out for local sparks */
1222       /* the spark pool for the current PE */
1223       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1224       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1225           pool->hd < pool->tl) {
1226         /* 
1227          * ToDo: add GC code check that we really have enough heap afterwards!!
1228          * Old comment:
1229          * If we're here (no runnable threads) and we have pending
1230          * sparks, we must have a space problem.  Get enough space
1231          * to turn one of those pending sparks into a
1232          * thread... 
1233          */
1234
1235         spark = findSpark(rtsFalse);            /* get a spark */
1236         if (spark != (rtsSpark) NULL) {
1237           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1238           IF_PAR_DEBUG(fish, // schedule,
1239                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1240                              tso->id, tso, advisory_thread_count));
1241
1242           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1243             IF_PAR_DEBUG(fish, // schedule,
1244                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1245                             spark));
1246             return rtsFalse; /* failed to generate a thread */
1247           }                  /* otherwise fall through & pick-up new tso */
1248         } else {
1249           IF_PAR_DEBUG(fish, // schedule,
1250                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1251                              spark_queue_len(pool)));
1252           return rtsFalse;  /* failed to generate a thread */
1253         }
1254         return rtsTrue;  /* success in generating a thread */
1255   } else { /* no more threads permitted or pool empty */
1256     return rtsFalse;  /* failed to generateThread */
1257   }
1258 #else
1259   tso = NULL; // avoid compiler warning only
1260   return rtsFalse;  /* dummy in non-PAR setup */
1261 #endif // SPARKS
1262 }
1263 #endif // PARALLEL_HASKELL
1264
1265 /* ----------------------------------------------------------------------------
1266  * Get work from a remote node (PARALLEL_HASKELL only)
1267  * ------------------------------------------------------------------------- */
1268     
1269 #if defined(PARALLEL_HASKELL)
1270 static rtsBool
1271 scheduleGetRemoteWork(rtsBool *receivedFinish)
1272 {
1273   ASSERT(emptyRunQueue());
1274
1275   if (RtsFlags.ParFlags.BufferTime) {
1276         IF_PAR_DEBUG(verbose, 
1277                 debugBelch("...send all pending data,"));
1278         {
1279           nat i;
1280           for (i=1; i<=nPEs; i++)
1281             sendImmediately(i); // send all messages away immediately
1282         }
1283   }
1284 # ifndef SPARKS
1285         //++EDEN++ idle() , i.e. send all buffers, wait for work
1286         // suppress fishing in EDEN... just look for incoming messages
1287         // (blocking receive)
1288   IF_PAR_DEBUG(verbose, 
1289                debugBelch("...wait for incoming messages...\n"));
1290   *receivedFinish = processMessages(); // blocking receive...
1291
1292         // and reenter scheduling loop after having received something
1293         // (return rtsFalse below)
1294
1295 # else /* activate SPARKS machinery */
1296 /* We get here, if we have no work, tried to activate a local spark, but still
1297    have no work. We try to get a remote spark, by sending a FISH message.
1298    Thread migration should be added here, and triggered when a sequence of 
1299    fishes returns without work. */
1300         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1301
1302       /* =8-[  no local sparks => look for work on other PEs */
1303         /*
1304          * We really have absolutely no work.  Send out a fish
1305          * (there may be some out there already), and wait for
1306          * something to arrive.  We clearly can't run any threads
1307          * until a SCHEDULE or RESUME arrives, and so that's what
1308          * we're hoping to see.  (Of course, we still have to
1309          * respond to other types of messages.)
1310          */
1311         rtsTime now = msTime() /*CURRENT_TIME*/;
1312         IF_PAR_DEBUG(verbose, 
1313                      debugBelch("--  now=%ld\n", now));
1314         IF_PAR_DEBUG(fish, // verbose,
1315              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1316                  (last_fish_arrived_at!=0 &&
1317                   last_fish_arrived_at+delay > now)) {
1318                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1319                      now, last_fish_arrived_at+delay, 
1320                      last_fish_arrived_at,
1321                      delay);
1322              });
1323   
1324         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1325             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1326           if (last_fish_arrived_at==0 ||
1327               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1328             /* outstandingFishes is set in sendFish, processFish;
1329                avoid flooding system with fishes via delay */
1330     next_fish_to_send_at = 0;  
1331   } else {
1332     /* ToDo: this should be done in the main scheduling loop to avoid the
1333              busy wait here; not so bad if fish delay is very small  */
1334     int iq = 0; // DEBUGGING -- HWL
1335     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1336     /* send a fish when ready, but process messages that arrive in the meantime */
1337     do {
1338       if (PacketsWaiting()) {
1339         iq++; // DEBUGGING
1340         *receivedFinish = processMessages();
1341       }
1342       now = msTime();
1343     } while (!*receivedFinish || now<next_fish_to_send_at);
1344     // JB: This means the fish could become obsolete, if we receive
1345     // work. Better check for work again? 
1346     // last line: while (!receivedFinish || !haveWork || now<...)
1347     // next line: if (receivedFinish || haveWork )
1348
1349     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1350       return rtsFalse;  // NB: this will leave scheduler loop
1351                         // immediately after return!
1352                           
1353     IF_PAR_DEBUG(fish, // verbose,
1354                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1355
1356   }
1357
1358     // JB: IMHO, this should all be hidden inside sendFish(...)
1359     /* pe = choosePE(); 
1360        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1361                 NEW_FISH_HUNGER);
1362
1363     // Global statistics: count no. of fishes
1364     if (RtsFlags.ParFlags.ParStats.Global &&
1365          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1366            globalParStats.tot_fish_mess++;
1367            }
1368     */ 
1369
1370   /* delayed fishes must have been sent by now! */
1371   next_fish_to_send_at = 0;  
1372   }
1373       
1374   *receivedFinish = processMessages();
1375 # endif /* SPARKS */
1376
1377  return rtsFalse;
1378  /* NB: this function always returns rtsFalse, meaning the scheduler
1379     loop continues with the next iteration; 
1380     rationale: 
1381       return code means success in finding work; we enter this function
1382       if there is no local work, thus have to send a fish which takes
1383       time until it arrives with work; in the meantime we should process
1384       messages in the main loop;
1385  */
1386 }
1387 #endif // PARALLEL_HASKELL
1388
1389 /* ----------------------------------------------------------------------------
1390  * PAR/GRAN: Report stats & debugging info(?)
1391  * ------------------------------------------------------------------------- */
1392
1393 #if defined(PAR) || defined(GRAN)
1394 static void
1395 scheduleGranParReport(void)
1396 {
1397   ASSERT(run_queue_hd != END_TSO_QUEUE);
1398
1399   /* Take a thread from the run queue, if we have work */
1400   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1401
1402     /* If this TSO has got its outport closed in the meantime, 
1403      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1404      * It has to be marked as TH_DEAD for this purpose.
1405      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1406
1407 JB: TODO: investigate wether state change field could be nuked
1408      entirely and replaced by the normal tso state (whatnext
1409      field). All we want to do is to kill tsos from outside.
1410      */
1411
1412     /* ToDo: write something to the log-file
1413     if (RTSflags.ParFlags.granSimStats && !sameThread)
1414         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1415
1416     CurrentTSO = t;
1417     */
1418     /* the spark pool for the current PE */
1419     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1420
1421     IF_DEBUG(scheduler, 
1422              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1423                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1424
1425     IF_PAR_DEBUG(fish,
1426              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1427                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1428
1429     if (RtsFlags.ParFlags.ParStats.Full && 
1430         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1431         (emitSchedule || // forced emit
1432          (t && LastTSO && t->id != LastTSO->id))) {
1433       /* 
1434          we are running a different TSO, so write a schedule event to log file
1435          NB: If we use fair scheduling we also have to write  a deschedule 
1436              event for LastTSO; with unfair scheduling we know that the
1437              previous tso has blocked whenever we switch to another tso, so
1438              we don't need it in GUM for now
1439       */
1440       IF_PAR_DEBUG(fish, // schedule,
1441                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1442
1443       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1444                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1445       emitSchedule = rtsFalse;
1446     }
1447 }     
1448 #endif
1449
1450 /* ----------------------------------------------------------------------------
1451  * After running a thread...
1452  * ------------------------------------------------------------------------- */
1453
1454 static void
1455 schedulePostRunThread(void)
1456 {
1457 #if defined(PAR)
1458     /* HACK 675: if the last thread didn't yield, make sure to print a 
1459        SCHEDULE event to the log file when StgRunning the next thread, even
1460        if it is the same one as before */
1461     LastTSO = t; 
1462     TimeOfLastYield = CURRENT_TIME;
1463 #endif
1464
1465   /* some statistics gathering in the parallel case */
1466
1467 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1468   switch (ret) {
1469     case HeapOverflow:
1470 # if defined(GRAN)
1471       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1472       globalGranStats.tot_heapover++;
1473 # elif defined(PAR)
1474       globalParStats.tot_heapover++;
1475 # endif
1476       break;
1477
1478      case StackOverflow:
1479 # if defined(GRAN)
1480       IF_DEBUG(gran, 
1481                DumpGranEvent(GR_DESCHEDULE, t));
1482       globalGranStats.tot_stackover++;
1483 # elif defined(PAR)
1484       // IF_DEBUG(par, 
1485       // DumpGranEvent(GR_DESCHEDULE, t);
1486       globalParStats.tot_stackover++;
1487 # endif
1488       break;
1489
1490     case ThreadYielding:
1491 # if defined(GRAN)
1492       IF_DEBUG(gran, 
1493                DumpGranEvent(GR_DESCHEDULE, t));
1494       globalGranStats.tot_yields++;
1495 # elif defined(PAR)
1496       // IF_DEBUG(par, 
1497       // DumpGranEvent(GR_DESCHEDULE, t);
1498       globalParStats.tot_yields++;
1499 # endif
1500       break; 
1501
1502     case ThreadBlocked:
1503 # if defined(GRAN)
1504       IF_DEBUG(scheduler,
1505                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1506                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1507                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1508                if (t->block_info.closure!=(StgClosure*)NULL)
1509                  print_bq(t->block_info.closure);
1510                debugBelch("\n"));
1511
1512       // ??? needed; should emit block before
1513       IF_DEBUG(gran, 
1514                DumpGranEvent(GR_DESCHEDULE, t)); 
1515       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1516       /*
1517         ngoq Dogh!
1518       ASSERT(procStatus[CurrentProc]==Busy || 
1519               ((procStatus[CurrentProc]==Fetching) && 
1520               (t->block_info.closure!=(StgClosure*)NULL)));
1521       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1522           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1523             procStatus[CurrentProc]==Fetching)) 
1524         procStatus[CurrentProc] = Idle;
1525       */
1526 # elif defined(PAR)
1527 //++PAR++  blockThread() writes the event (change?)
1528 # endif
1529     break;
1530
1531   case ThreadFinished:
1532     break;
1533
1534   default:
1535     barf("parGlobalStats: unknown return code");
1536     break;
1537     }
1538 #endif
1539 }
1540
1541 /* -----------------------------------------------------------------------------
1542  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1543  * -------------------------------------------------------------------------- */
1544
1545 static rtsBool
1546 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1547 {
1548     // did the task ask for a large block?
1549     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1550         // if so, get one and push it on the front of the nursery.
1551         bdescr *bd;
1552         lnat blocks;
1553         
1554         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1555         
1556         IF_DEBUG(scheduler,
1557                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1558                             (long)t->id, whatNext_strs[t->what_next], blocks));
1559         
1560         // don't do this if the nursery is (nearly) full, we'll GC first.
1561         if (cap->r.rCurrentNursery->link != NULL ||
1562             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1563                                                // if the nursery has only one block.
1564             
1565             ACQUIRE_SM_LOCK
1566             bd = allocGroup( blocks );
1567             RELEASE_SM_LOCK
1568             cap->r.rNursery->n_blocks += blocks;
1569             
1570             // link the new group into the list
1571             bd->link = cap->r.rCurrentNursery;
1572             bd->u.back = cap->r.rCurrentNursery->u.back;
1573             if (cap->r.rCurrentNursery->u.back != NULL) {
1574                 cap->r.rCurrentNursery->u.back->link = bd;
1575             } else {
1576 #if !defined(THREADED_RTS)
1577                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1578                        g0s0 == cap->r.rNursery);
1579 #endif
1580                 cap->r.rNursery->blocks = bd;
1581             }             
1582             cap->r.rCurrentNursery->u.back = bd;
1583             
1584             // initialise it as a nursery block.  We initialise the
1585             // step, gen_no, and flags field of *every* sub-block in
1586             // this large block, because this is easier than making
1587             // sure that we always find the block head of a large
1588             // block whenever we call Bdescr() (eg. evacuate() and
1589             // isAlive() in the GC would both have to do this, at
1590             // least).
1591             { 
1592                 bdescr *x;
1593                 for (x = bd; x < bd + blocks; x++) {
1594                     x->step = cap->r.rNursery;
1595                     x->gen_no = 0;
1596                     x->flags = 0;
1597                 }
1598             }
1599             
1600             // This assert can be a killer if the app is doing lots
1601             // of large block allocations.
1602             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1603             
1604             // now update the nursery to point to the new block
1605             cap->r.rCurrentNursery = bd;
1606             
1607             // we might be unlucky and have another thread get on the
1608             // run queue before us and steal the large block, but in that
1609             // case the thread will just end up requesting another large
1610             // block.
1611             pushOnRunQueue(cap,t);
1612             return rtsFalse;  /* not actually GC'ing */
1613         }
1614     }
1615     
1616     IF_DEBUG(scheduler,
1617              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1618                         (long)t->id, whatNext_strs[t->what_next]));
1619 #if defined(GRAN)
1620     ASSERT(!is_on_queue(t,CurrentProc));
1621 #elif defined(PARALLEL_HASKELL)
1622     /* Currently we emit a DESCHEDULE event before GC in GUM.
1623        ToDo: either add separate event to distinguish SYSTEM time from rest
1624        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1625     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1626         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1627                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1628         emitSchedule = rtsTrue;
1629     }
1630 #endif
1631       
1632     pushOnRunQueue(cap,t);
1633     return rtsTrue;
1634     /* actual GC is done at the end of the while loop in schedule() */
1635 }
1636
1637 /* -----------------------------------------------------------------------------
1638  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1639  * -------------------------------------------------------------------------- */
1640
1641 static void
1642 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1643 {
1644     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1645                                   (long)t->id, whatNext_strs[t->what_next]));
1646     /* just adjust the stack for this thread, then pop it back
1647      * on the run queue.
1648      */
1649     { 
1650         /* enlarge the stack */
1651         StgTSO *new_t = threadStackOverflow(cap, t);
1652         
1653         /* The TSO attached to this Task may have moved, so update the
1654          * pointer to it.
1655          */
1656         if (task->tso == t) {
1657             task->tso = new_t;
1658         }
1659         pushOnRunQueue(cap,new_t);
1660     }
1661 }
1662
1663 /* -----------------------------------------------------------------------------
1664  * Handle a thread that returned to the scheduler with ThreadYielding
1665  * -------------------------------------------------------------------------- */
1666
1667 static rtsBool
1668 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1669 {
1670     // Reset the context switch flag.  We don't do this just before
1671     // running the thread, because that would mean we would lose ticks
1672     // during GC, which can lead to unfair scheduling (a thread hogs
1673     // the CPU because the tick always arrives during GC).  This way
1674     // penalises threads that do a lot of allocation, but that seems
1675     // better than the alternative.
1676     context_switch = 0;
1677     
1678     /* put the thread back on the run queue.  Then, if we're ready to
1679      * GC, check whether this is the last task to stop.  If so, wake
1680      * up the GC thread.  getThread will block during a GC until the
1681      * GC is finished.
1682      */
1683     IF_DEBUG(scheduler,
1684              if (t->what_next != prev_what_next) {
1685                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1686                             (long)t->id, whatNext_strs[t->what_next]);
1687              } else {
1688                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1689                             (long)t->id, whatNext_strs[t->what_next]);
1690              }
1691         );
1692     
1693     IF_DEBUG(sanity,
1694              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1695              checkTSO(t));
1696     ASSERT(t->link == END_TSO_QUEUE);
1697     
1698     // Shortcut if we're just switching evaluators: don't bother
1699     // doing stack squeezing (which can be expensive), just run the
1700     // thread.
1701     if (t->what_next != prev_what_next) {
1702         return rtsTrue;
1703     }
1704     
1705 #if defined(GRAN)
1706     ASSERT(!is_on_queue(t,CurrentProc));
1707       
1708     IF_DEBUG(sanity,
1709              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1710              checkThreadQsSanity(rtsTrue));
1711
1712 #endif
1713
1714     addToRunQueue(cap,t);
1715
1716 #if defined(GRAN)
1717     /* add a ContinueThread event to actually process the thread */
1718     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1719               ContinueThread,
1720               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1721     IF_GRAN_DEBUG(bq, 
1722                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1723                   G_EVENTQ(0);
1724                   G_CURR_THREADQ(0));
1725 #endif
1726     return rtsFalse;
1727 }
1728
1729 /* -----------------------------------------------------------------------------
1730  * Handle a thread that returned to the scheduler with ThreadBlocked
1731  * -------------------------------------------------------------------------- */
1732
1733 static void
1734 scheduleHandleThreadBlocked( StgTSO *t
1735 #if !defined(GRAN) && !defined(DEBUG)
1736     STG_UNUSED
1737 #endif
1738     )
1739 {
1740 #if defined(GRAN)
1741     IF_DEBUG(scheduler,
1742              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1743                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1744              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1745     
1746     // ??? needed; should emit block before
1747     IF_DEBUG(gran, 
1748              DumpGranEvent(GR_DESCHEDULE, t)); 
1749     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1750     /*
1751       ngoq Dogh!
1752       ASSERT(procStatus[CurrentProc]==Busy || 
1753       ((procStatus[CurrentProc]==Fetching) && 
1754       (t->block_info.closure!=(StgClosure*)NULL)));
1755       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1756       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1757       procStatus[CurrentProc]==Fetching)) 
1758       procStatus[CurrentProc] = Idle;
1759     */
1760 #elif defined(PAR)
1761     IF_DEBUG(scheduler,
1762              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1763                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1764     IF_PAR_DEBUG(bq,
1765                  
1766                  if (t->block_info.closure!=(StgClosure*)NULL) 
1767                  print_bq(t->block_info.closure));
1768     
1769     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1770     blockThread(t);
1771     
1772     /* whatever we schedule next, we must log that schedule */
1773     emitSchedule = rtsTrue;
1774     
1775 #else /* !GRAN */
1776
1777       // We don't need to do anything.  The thread is blocked, and it
1778       // has tidied up its stack and placed itself on whatever queue
1779       // it needs to be on.
1780
1781 #if !defined(THREADED_RTS)
1782     ASSERT(t->why_blocked != NotBlocked);
1783              // This might not be true under THREADED_RTS: we don't have
1784              // exclusive access to this TSO, so someone might have
1785              // woken it up by now.  This actually happens: try
1786              // conc023 +RTS -N2.
1787 #endif
1788
1789     IF_DEBUG(scheduler,
1790              debugBelch("--<< thread %d (%s) stopped: ", 
1791                         t->id, whatNext_strs[t->what_next]);
1792              printThreadBlockage(t);
1793              debugBelch("\n"));
1794     
1795     /* Only for dumping event to log file 
1796        ToDo: do I need this in GranSim, too?
1797        blockThread(t);
1798     */
1799 #endif
1800 }
1801
1802 /* -----------------------------------------------------------------------------
1803  * Handle a thread that returned to the scheduler with ThreadFinished
1804  * -------------------------------------------------------------------------- */
1805
1806 static rtsBool
1807 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1808 {
1809     /* Need to check whether this was a main thread, and if so,
1810      * return with the return value.
1811      *
1812      * We also end up here if the thread kills itself with an
1813      * uncaught exception, see Exception.cmm.
1814      */
1815     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1816                                   t->id, whatNext_strs[t->what_next]));
1817
1818 #if defined(GRAN)
1819       endThread(t, CurrentProc); // clean-up the thread
1820 #elif defined(PARALLEL_HASKELL)
1821       /* For now all are advisory -- HWL */
1822       //if(t->priority==AdvisoryPriority) ??
1823       advisory_thread_count--; // JB: Caution with this counter, buggy!
1824       
1825 # if defined(DIST)
1826       if(t->dist.priority==RevalPriority)
1827         FinishReval(t);
1828 # endif
1829     
1830 # if defined(EDENOLD)
1831       // the thread could still have an outport... (BUG)
1832       if (t->eden.outport != -1) {
1833       // delete the outport for the tso which has finished...
1834         IF_PAR_DEBUG(eden_ports,
1835                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1836                               t->eden.outport, t->id));
1837         deleteOPT(t);
1838       }
1839       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1840       if (t->eden.epid != -1) {
1841         IF_PAR_DEBUG(eden_ports,
1842                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1843                            t->id, t->eden.epid));
1844         removeTSOfromProcess(t);
1845       }
1846 # endif 
1847
1848 # if defined(PAR)
1849       if (RtsFlags.ParFlags.ParStats.Full &&
1850           !RtsFlags.ParFlags.ParStats.Suppressed) 
1851         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1852
1853       //  t->par only contains statistics: left out for now...
1854       IF_PAR_DEBUG(fish,
1855                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1856                               t->id,t,t->par.sparkname));
1857 # endif
1858 #endif // PARALLEL_HASKELL
1859
1860       //
1861       // Check whether the thread that just completed was a bound
1862       // thread, and if so return with the result.  
1863       //
1864       // There is an assumption here that all thread completion goes
1865       // through this point; we need to make sure that if a thread
1866       // ends up in the ThreadKilled state, that it stays on the run
1867       // queue so it can be dealt with here.
1868       //
1869
1870       if (t->bound) {
1871
1872           if (t->bound != task) {
1873 #if !defined(THREADED_RTS)
1874               // Must be a bound thread that is not the topmost one.  Leave
1875               // it on the run queue until the stack has unwound to the
1876               // point where we can deal with this.  Leaving it on the run
1877               // queue also ensures that the garbage collector knows about
1878               // this thread and its return value (it gets dropped from the
1879               // all_threads list so there's no other way to find it).
1880               appendToRunQueue(cap,t);
1881               return rtsFalse;
1882 #else
1883               // this cannot happen in the threaded RTS, because a
1884               // bound thread can only be run by the appropriate Task.
1885               barf("finished bound thread that isn't mine");
1886 #endif
1887           }
1888
1889           ASSERT(task->tso == t);
1890
1891           if (t->what_next == ThreadComplete) {
1892               if (task->ret) {
1893                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1894                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1895               }
1896               task->stat = Success;
1897           } else {
1898               if (task->ret) {
1899                   *(task->ret) = NULL;
1900               }
1901               if (sched_state >= SCHED_INTERRUPTING) {
1902                   task->stat = Interrupted;
1903               } else {
1904                   task->stat = Killed;
1905               }
1906           }
1907 #ifdef DEBUG
1908           removeThreadLabel((StgWord)task->tso->id);
1909 #endif
1910           return rtsTrue; // tells schedule() to return
1911       }
1912
1913       return rtsFalse;
1914 }
1915
1916 /* -----------------------------------------------------------------------------
1917  * Perform a heap census, if PROFILING
1918  * -------------------------------------------------------------------------- */
1919
1920 static rtsBool
1921 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1922 {
1923 #if defined(PROFILING)
1924     // When we have +RTS -i0 and we're heap profiling, do a census at
1925     // every GC.  This lets us get repeatable runs for debugging.
1926     if (performHeapProfile ||
1927         (RtsFlags.ProfFlags.profileInterval==0 &&
1928          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1929
1930         // checking black holes is necessary before GC, otherwise
1931         // there may be threads that are unreachable except by the
1932         // blackhole queue, which the GC will consider to be
1933         // deadlocked.
1934         scheduleCheckBlackHoles(&MainCapability);
1935
1936         IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
1937         GarbageCollect(GetRoots, rtsTrue);
1938
1939         IF_DEBUG(scheduler, sched_belch("performing heap census"));
1940         heapCensus();
1941
1942         performHeapProfile = rtsFalse;
1943         return rtsTrue;  // true <=> we already GC'd
1944     }
1945 #endif
1946     return rtsFalse;
1947 }
1948
1949 /* -----------------------------------------------------------------------------
1950  * Perform a garbage collection if necessary
1951  * -------------------------------------------------------------------------- */
1952
1953 static Capability *
1954 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1955               rtsBool force_major, void (*get_roots)(evac_fn))
1956 {
1957     StgTSO *t;
1958 #ifdef THREADED_RTS
1959     static volatile StgWord waiting_for_gc;
1960     rtsBool was_waiting;
1961     nat i;
1962 #endif
1963
1964 #ifdef THREADED_RTS
1965     // In order to GC, there must be no threads running Haskell code.
1966     // Therefore, the GC thread needs to hold *all* the capabilities,
1967     // and release them after the GC has completed.  
1968     //
1969     // This seems to be the simplest way: previous attempts involved
1970     // making all the threads with capabilities give up their
1971     // capabilities and sleep except for the *last* one, which
1972     // actually did the GC.  But it's quite hard to arrange for all
1973     // the other tasks to sleep and stay asleep.
1974     //
1975         
1976     was_waiting = cas(&waiting_for_gc, 0, 1);
1977     if (was_waiting) {
1978         do {
1979             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1980             if (cap) yieldCapability(&cap,task);
1981         } while (waiting_for_gc);
1982         return cap;  // NOTE: task->cap might have changed here
1983     }
1984
1985     for (i=0; i < n_capabilities; i++) {
1986         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1987         if (cap != &capabilities[i]) {
1988             Capability *pcap = &capabilities[i];
1989             // we better hope this task doesn't get migrated to
1990             // another Capability while we're waiting for this one.
1991             // It won't, because load balancing happens while we have
1992             // all the Capabilities, but even so it's a slightly
1993             // unsavoury invariant.
1994             task->cap = pcap;
1995             context_switch = 1;
1996             waitForReturnCapability(&pcap, task);
1997             if (pcap != &capabilities[i]) {
1998                 barf("scheduleDoGC: got the wrong capability");
1999             }
2000         }
2001     }
2002
2003     waiting_for_gc = rtsFalse;
2004 #endif
2005
2006     /* Kick any transactions which are invalid back to their
2007      * atomically frames.  When next scheduled they will try to
2008      * commit, this commit will fail and they will retry.
2009      */
2010     { 
2011         StgTSO *next;
2012
2013         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2014             if (t->what_next == ThreadRelocated) {
2015                 next = t->link;
2016             } else {
2017                 next = t->global_link;
2018                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2019                     if (!stmValidateNestOfTransactions (t -> trec)) {
2020                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
2021                         
2022                         // strip the stack back to the
2023                         // ATOMICALLY_FRAME, aborting the (nested)
2024                         // transaction, and saving the stack of any
2025                         // partially-evaluated thunks on the heap.
2026                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
2027                         
2028 #ifdef REG_R1
2029                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2030 #endif
2031                     }
2032                 }
2033             }
2034         }
2035     }
2036     
2037     // so this happens periodically:
2038     if (cap) scheduleCheckBlackHoles(cap);
2039     
2040     IF_DEBUG(scheduler, printAllThreads());
2041
2042     /*
2043      * We now have all the capabilities; if we're in an interrupting
2044      * state, then we should take the opportunity to delete all the
2045      * threads in the system.
2046      */
2047     if (sched_state >= SCHED_INTERRUPTING) {
2048         deleteAllThreads(&capabilities[0]);
2049         sched_state = SCHED_INTERRUPTED;
2050     }
2051
2052     /* everybody back, start the GC.
2053      * Could do it in this thread, or signal a condition var
2054      * to do it in another thread.  Either way, we need to
2055      * broadcast on gc_pending_cond afterward.
2056      */
2057 #if defined(THREADED_RTS)
2058     IF_DEBUG(scheduler,sched_belch("doing GC"));
2059 #endif
2060     GarbageCollect(get_roots, force_major);
2061     
2062 #if defined(THREADED_RTS)
2063     // release our stash of capabilities.
2064     for (i = 0; i < n_capabilities; i++) {
2065         if (cap != &capabilities[i]) {
2066             task->cap = &capabilities[i];
2067             releaseCapability(&capabilities[i]);
2068         }
2069     }
2070     if (cap) {
2071         task->cap = cap;
2072     } else {
2073         task->cap = NULL;
2074     }
2075 #endif
2076
2077 #if defined(GRAN)
2078     /* add a ContinueThread event to continue execution of current thread */
2079     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2080               ContinueThread,
2081               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2082     IF_GRAN_DEBUG(bq, 
2083                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2084                   G_EVENTQ(0);
2085                   G_CURR_THREADQ(0));
2086 #endif /* GRAN */
2087
2088     return cap;
2089 }
2090
2091 /* ---------------------------------------------------------------------------
2092  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2093  * used by Control.Concurrent for error checking.
2094  * ------------------------------------------------------------------------- */
2095  
2096 StgBool
2097 rtsSupportsBoundThreads(void)
2098 {
2099 #if defined(THREADED_RTS)
2100   return rtsTrue;
2101 #else
2102   return rtsFalse;
2103 #endif
2104 }
2105
2106 /* ---------------------------------------------------------------------------
2107  * isThreadBound(tso): check whether tso is bound to an OS thread.
2108  * ------------------------------------------------------------------------- */
2109  
2110 StgBool
2111 isThreadBound(StgTSO* tso USED_IF_THREADS)
2112 {
2113 #if defined(THREADED_RTS)
2114   return (tso->bound != NULL);
2115 #endif
2116   return rtsFalse;
2117 }
2118
2119 /* ---------------------------------------------------------------------------
2120  * Singleton fork(). Do not copy any running threads.
2121  * ------------------------------------------------------------------------- */
2122
2123 #if !defined(mingw32_HOST_OS)
2124 #define FORKPROCESS_PRIMOP_SUPPORTED
2125 #endif
2126
2127 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2128 static void 
2129 deleteThread_(Capability *cap, StgTSO *tso);
2130 #endif
2131 StgInt
2132 forkProcess(HsStablePtr *entry
2133 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2134             STG_UNUSED
2135 #endif
2136            )
2137 {
2138 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2139     Task *task;
2140     pid_t pid;
2141     StgTSO* t,*next;
2142     Capability *cap;
2143     
2144 #if defined(THREADED_RTS)
2145     if (RtsFlags.ParFlags.nNodes > 1) {
2146         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2147         stg_exit(EXIT_FAILURE);
2148     }
2149 #endif
2150
2151     IF_DEBUG(scheduler,sched_belch("forking!"));
2152     
2153     // ToDo: for SMP, we should probably acquire *all* the capabilities
2154     cap = rts_lock();
2155     
2156     pid = fork();
2157     
2158     if (pid) { // parent
2159         
2160         // just return the pid
2161         rts_unlock(cap);
2162         return pid;
2163         
2164     } else { // child
2165         
2166         // Now, all OS threads except the thread that forked are
2167         // stopped.  We need to stop all Haskell threads, including
2168         // those involved in foreign calls.  Also we need to delete
2169         // all Tasks, because they correspond to OS threads that are
2170         // now gone.
2171
2172         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2173             if (t->what_next == ThreadRelocated) {
2174                 next = t->link;
2175             } else {
2176                 next = t->global_link;
2177                 // don't allow threads to catch the ThreadKilled
2178                 // exception, but we do want to raiseAsync() because these
2179                 // threads may be evaluating thunks that we need later.
2180                 deleteThread_(cap,t);
2181             }
2182         }
2183         
2184         // Empty the run queue.  It seems tempting to let all the
2185         // killed threads stay on the run queue as zombies to be
2186         // cleaned up later, but some of them correspond to bound
2187         // threads for which the corresponding Task does not exist.
2188         cap->run_queue_hd = END_TSO_QUEUE;
2189         cap->run_queue_tl = END_TSO_QUEUE;
2190
2191         // Any suspended C-calling Tasks are no more, their OS threads
2192         // don't exist now:
2193         cap->suspended_ccalling_tasks = NULL;
2194
2195         // Empty the all_threads list.  Otherwise, the garbage
2196         // collector may attempt to resurrect some of these threads.
2197         all_threads = END_TSO_QUEUE;
2198
2199         // Wipe the task list, except the current Task.
2200         ACQUIRE_LOCK(&sched_mutex);
2201         for (task = all_tasks; task != NULL; task=task->all_link) {
2202             if (task != cap->running_task) {
2203                 discardTask(task);
2204             }
2205         }
2206         RELEASE_LOCK(&sched_mutex);
2207
2208 #if defined(THREADED_RTS)
2209         // Wipe our spare workers list, they no longer exist.  New
2210         // workers will be created if necessary.
2211         cap->spare_workers = NULL;
2212         cap->returning_tasks_hd = NULL;
2213         cap->returning_tasks_tl = NULL;
2214 #endif
2215
2216         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2217         rts_checkSchedStatus("forkProcess",cap);
2218         
2219         rts_unlock(cap);
2220         hs_exit();                      // clean up and exit
2221         stg_exit(EXIT_SUCCESS);
2222     }
2223 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2224     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2225     return -1;
2226 #endif
2227 }
2228
2229 /* ---------------------------------------------------------------------------
2230  * Delete all the threads in the system
2231  * ------------------------------------------------------------------------- */
2232    
2233 static void
2234 deleteAllThreads ( Capability *cap )
2235 {
2236   StgTSO* t, *next;
2237   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2238   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2239       if (t->what_next == ThreadRelocated) {
2240           next = t->link;
2241       } else {
2242           next = t->global_link;
2243           deleteThread(cap,t);
2244       }
2245   }      
2246
2247   // The run queue now contains a bunch of ThreadKilled threads.  We
2248   // must not throw these away: the main thread(s) will be in there
2249   // somewhere, and the main scheduler loop has to deal with it.
2250   // Also, the run queue is the only thing keeping these threads from
2251   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2252
2253 #if !defined(THREADED_RTS)
2254   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2255   ASSERT(sleeping_queue == END_TSO_QUEUE);
2256 #endif
2257 }
2258
2259 /* -----------------------------------------------------------------------------
2260    Managing the suspended_ccalling_tasks list.
2261    Locks required: sched_mutex
2262    -------------------------------------------------------------------------- */
2263
2264 STATIC_INLINE void
2265 suspendTask (Capability *cap, Task *task)
2266 {
2267     ASSERT(task->next == NULL && task->prev == NULL);
2268     task->next = cap->suspended_ccalling_tasks;
2269     task->prev = NULL;
2270     if (cap->suspended_ccalling_tasks) {
2271         cap->suspended_ccalling_tasks->prev = task;
2272     }
2273     cap->suspended_ccalling_tasks = task;
2274 }
2275
2276 STATIC_INLINE void
2277 recoverSuspendedTask (Capability *cap, Task *task)
2278 {
2279     if (task->prev) {
2280         task->prev->next = task->next;
2281     } else {
2282         ASSERT(cap->suspended_ccalling_tasks == task);
2283         cap->suspended_ccalling_tasks = task->next;
2284     }
2285     if (task->next) {
2286         task->next->prev = task->prev;
2287     }
2288     task->next = task->prev = NULL;
2289 }
2290
2291 /* ---------------------------------------------------------------------------
2292  * Suspending & resuming Haskell threads.
2293  * 
2294  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2295  * its capability before calling the C function.  This allows another
2296  * task to pick up the capability and carry on running Haskell
2297  * threads.  It also means that if the C call blocks, it won't lock
2298  * the whole system.
2299  *
2300  * The Haskell thread making the C call is put to sleep for the
2301  * duration of the call, on the susepended_ccalling_threads queue.  We
2302  * give out a token to the task, which it can use to resume the thread
2303  * on return from the C function.
2304  * ------------------------------------------------------------------------- */
2305    
2306 void *
2307 suspendThread (StgRegTable *reg)
2308 {
2309   Capability *cap;
2310   int saved_errno = errno;
2311   StgTSO *tso;
2312   Task *task;
2313
2314   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2315    */
2316   cap = regTableToCapability(reg);
2317
2318   task = cap->running_task;
2319   tso = cap->r.rCurrentTSO;
2320
2321   IF_DEBUG(scheduler,
2322            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2323
2324   // XXX this might not be necessary --SDM
2325   tso->what_next = ThreadRunGHC;
2326
2327   threadPaused(cap,tso);
2328
2329   if(tso->blocked_exceptions == NULL)  {
2330       tso->why_blocked = BlockedOnCCall;
2331       tso->blocked_exceptions = END_TSO_QUEUE;
2332   } else {
2333       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2334   }
2335
2336   // Hand back capability
2337   task->suspended_tso = tso;
2338
2339   ACQUIRE_LOCK(&cap->lock);
2340
2341   suspendTask(cap,task);
2342   cap->in_haskell = rtsFalse;
2343   releaseCapability_(cap);
2344   
2345   RELEASE_LOCK(&cap->lock);
2346
2347 #if defined(THREADED_RTS)
2348   /* Preparing to leave the RTS, so ensure there's a native thread/task
2349      waiting to take over.
2350   */
2351   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2352 #endif
2353
2354   errno = saved_errno;
2355   return task;
2356 }
2357
2358 StgRegTable *
2359 resumeThread (void *task_)
2360 {
2361     StgTSO *tso;
2362     Capability *cap;
2363     int saved_errno = errno;
2364     Task *task = task_;
2365
2366     cap = task->cap;
2367     // Wait for permission to re-enter the RTS with the result.
2368     waitForReturnCapability(&cap,task);
2369     // we might be on a different capability now... but if so, our
2370     // entry on the suspended_ccalling_tasks list will also have been
2371     // migrated.
2372
2373     // Remove the thread from the suspended list
2374     recoverSuspendedTask(cap,task);
2375
2376     tso = task->suspended_tso;
2377     task->suspended_tso = NULL;
2378     tso->link = END_TSO_QUEUE;
2379     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2380     
2381     if (tso->why_blocked == BlockedOnCCall) {
2382         awakenBlockedQueue(cap,tso->blocked_exceptions);
2383         tso->blocked_exceptions = NULL;
2384     }
2385     
2386     /* Reset blocking status */
2387     tso->why_blocked  = NotBlocked;
2388     
2389     cap->r.rCurrentTSO = tso;
2390     cap->in_haskell = rtsTrue;
2391     errno = saved_errno;
2392
2393     /* We might have GC'd, mark the TSO dirty again */
2394     dirtyTSO(tso);
2395
2396     IF_DEBUG(sanity, checkTSO(tso));
2397
2398     return &cap->r;
2399 }
2400
2401 /* ---------------------------------------------------------------------------
2402  * Comparing Thread ids.
2403  *
2404  * This is used from STG land in the implementation of the
2405  * instances of Eq/Ord for ThreadIds.
2406  * ------------------------------------------------------------------------ */
2407
2408 int
2409 cmp_thread(StgPtr tso1, StgPtr tso2) 
2410
2411   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2412   StgThreadID id2 = ((StgTSO *)tso2)->id;
2413  
2414   if (id1 < id2) return (-1);
2415   if (id1 > id2) return 1;
2416   return 0;
2417 }
2418
2419 /* ---------------------------------------------------------------------------
2420  * Fetching the ThreadID from an StgTSO.
2421  *
2422  * This is used in the implementation of Show for ThreadIds.
2423  * ------------------------------------------------------------------------ */
2424 int
2425 rts_getThreadId(StgPtr tso) 
2426 {
2427   return ((StgTSO *)tso)->id;
2428 }
2429
2430 #ifdef DEBUG
2431 void
2432 labelThread(StgPtr tso, char *label)
2433 {
2434   int len;
2435   void *buf;
2436
2437   /* Caveat: Once set, you can only set the thread name to "" */
2438   len = strlen(label)+1;
2439   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2440   strncpy(buf,label,len);
2441   /* Update will free the old memory for us */
2442   updateThreadLabel(((StgTSO *)tso)->id,buf);
2443 }
2444 #endif /* DEBUG */
2445
2446 /* ---------------------------------------------------------------------------
2447    Create a new thread.
2448
2449    The new thread starts with the given stack size.  Before the
2450    scheduler can run, however, this thread needs to have a closure
2451    (and possibly some arguments) pushed on its stack.  See
2452    pushClosure() in Schedule.h.
2453
2454    createGenThread() and createIOThread() (in SchedAPI.h) are
2455    convenient packaged versions of this function.
2456
2457    currently pri (priority) is only used in a GRAN setup -- HWL
2458    ------------------------------------------------------------------------ */
2459 #if defined(GRAN)
2460 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2461 StgTSO *
2462 createThread(nat size, StgInt pri)
2463 #else
2464 StgTSO *
2465 createThread(Capability *cap, nat size)
2466 #endif
2467 {
2468     StgTSO *tso;
2469     nat stack_size;
2470
2471     /* sched_mutex is *not* required */
2472
2473     /* First check whether we should create a thread at all */
2474 #if defined(PARALLEL_HASKELL)
2475     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2476     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2477         threadsIgnored++;
2478         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2479                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2480         return END_TSO_QUEUE;
2481     }
2482     threadsCreated++;
2483 #endif
2484
2485 #if defined(GRAN)
2486     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2487 #endif
2488
2489     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2490
2491     /* catch ridiculously small stack sizes */
2492     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2493         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2494     }
2495
2496     stack_size = size - TSO_STRUCT_SIZEW;
2497     
2498     tso = (StgTSO *)allocateLocal(cap, size);
2499     TICK_ALLOC_TSO(stack_size, 0);
2500
2501     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2502 #if defined(GRAN)
2503     SET_GRAN_HDR(tso, ThisPE);
2504 #endif
2505
2506     // Always start with the compiled code evaluator
2507     tso->what_next = ThreadRunGHC;
2508
2509     tso->why_blocked  = NotBlocked;
2510     tso->blocked_exceptions = NULL;
2511     tso->flags = TSO_DIRTY;
2512     
2513     tso->saved_errno = 0;
2514     tso->bound = NULL;
2515     tso->cap = cap;
2516     
2517     tso->stack_size     = stack_size;
2518     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2519                           - TSO_STRUCT_SIZEW;
2520     tso->sp             = (P_)&(tso->stack) + stack_size;
2521
2522     tso->trec = NO_TREC;
2523     
2524 #ifdef PROFILING
2525     tso->prof.CCCS = CCS_MAIN;
2526 #endif
2527     
2528   /* put a stop frame on the stack */
2529     tso->sp -= sizeofW(StgStopFrame);
2530     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2531     tso->link = END_TSO_QUEUE;
2532     
2533   // ToDo: check this
2534 #if defined(GRAN)
2535     /* uses more flexible routine in GranSim */
2536     insertThread(tso, CurrentProc);
2537 #else
2538     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2539      * from its creation
2540      */
2541 #endif
2542     
2543 #if defined(GRAN) 
2544     if (RtsFlags.GranFlags.GranSimStats.Full) 
2545         DumpGranEvent(GR_START,tso);
2546 #elif defined(PARALLEL_HASKELL)
2547     if (RtsFlags.ParFlags.ParStats.Full) 
2548         DumpGranEvent(GR_STARTQ,tso);
2549     /* HACk to avoid SCHEDULE 
2550        LastTSO = tso; */
2551 #endif
2552     
2553     /* Link the new thread on the global thread list.
2554      */
2555     ACQUIRE_LOCK(&sched_mutex);
2556     tso->id = next_thread_id++;  // while we have the mutex
2557     tso->global_link = all_threads;
2558     all_threads = tso;
2559     RELEASE_LOCK(&sched_mutex);
2560     
2561 #if defined(DIST)
2562     tso->dist.priority = MandatoryPriority; //by default that is...
2563 #endif
2564     
2565 #if defined(GRAN)
2566     tso->gran.pri = pri;
2567 # if defined(DEBUG)
2568     tso->gran.magic = TSO_MAGIC; // debugging only
2569 # endif
2570     tso->gran.sparkname   = 0;
2571     tso->gran.startedat   = CURRENT_TIME; 
2572     tso->gran.exported    = 0;
2573     tso->gran.basicblocks = 0;
2574     tso->gran.allocs      = 0;
2575     tso->gran.exectime    = 0;
2576     tso->gran.fetchtime   = 0;
2577     tso->gran.fetchcount  = 0;
2578     tso->gran.blocktime   = 0;
2579     tso->gran.blockcount  = 0;
2580     tso->gran.blockedat   = 0;
2581     tso->gran.globalsparks = 0;
2582     tso->gran.localsparks  = 0;
2583     if (RtsFlags.GranFlags.Light)
2584         tso->gran.clock  = Now; /* local clock */
2585     else
2586         tso->gran.clock  = 0;
2587     
2588     IF_DEBUG(gran,printTSO(tso));
2589 #elif defined(PARALLEL_HASKELL)
2590 # if defined(DEBUG)
2591     tso->par.magic = TSO_MAGIC; // debugging only
2592 # endif
2593     tso->par.sparkname   = 0;
2594     tso->par.startedat   = CURRENT_TIME; 
2595     tso->par.exported    = 0;
2596     tso->par.basicblocks = 0;
2597     tso->par.allocs      = 0;
2598     tso->par.exectime    = 0;
2599     tso->par.fetchtime   = 0;
2600     tso->par.fetchcount  = 0;
2601     tso->par.blocktime   = 0;
2602     tso->par.blockcount  = 0;
2603     tso->par.blockedat   = 0;
2604     tso->par.globalsparks = 0;
2605     tso->par.localsparks  = 0;
2606 #endif
2607     
2608 #if defined(GRAN)
2609     globalGranStats.tot_threads_created++;
2610     globalGranStats.threads_created_on_PE[CurrentProc]++;
2611     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2612     globalGranStats.tot_sq_probes++;
2613 #elif defined(PARALLEL_HASKELL)
2614     // collect parallel global statistics (currently done together with GC stats)
2615     if (RtsFlags.ParFlags.ParStats.Global &&
2616         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2617         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2618         globalParStats.tot_threads_created++;
2619     }
2620 #endif 
2621     
2622 #if defined(GRAN)
2623     IF_GRAN_DEBUG(pri,
2624                   sched_belch("==__ schedule: Created TSO %d (%p);",
2625                               CurrentProc, tso, tso->id));
2626 #elif defined(PARALLEL_HASKELL)
2627     IF_PAR_DEBUG(verbose,
2628                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2629                              (long)tso->id, tso, advisory_thread_count));
2630 #else
2631     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2632                                    (long)tso->id, (long)tso->stack_size));
2633 #endif    
2634     return tso;
2635 }
2636
2637 #if defined(PAR)
2638 /* RFP:
2639    all parallel thread creation calls should fall through the following routine.
2640 */
2641 StgTSO *
2642 createThreadFromSpark(rtsSpark spark) 
2643 { StgTSO *tso;
2644   ASSERT(spark != (rtsSpark)NULL);
2645 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2646   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2647   { threadsIgnored++;
2648     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2649           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2650     return END_TSO_QUEUE;
2651   }
2652   else
2653   { threadsCreated++;
2654     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2655     if (tso==END_TSO_QUEUE)     
2656       barf("createSparkThread: Cannot create TSO");
2657 #if defined(DIST)
2658     tso->priority = AdvisoryPriority;
2659 #endif
2660     pushClosure(tso,spark);
2661     addToRunQueue(tso);
2662     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2663   }
2664   return tso;
2665 }
2666 #endif
2667
2668 /*
2669   Turn a spark into a thread.
2670   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2671 */
2672 #if 0
2673 StgTSO *
2674 activateSpark (rtsSpark spark) 
2675 {
2676   StgTSO *tso;
2677
2678   tso = createSparkThread(spark);
2679   if (RtsFlags.ParFlags.ParStats.Full) {   
2680     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2681       IF_PAR_DEBUG(verbose,
2682                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2683                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2684   }
2685   // ToDo: fwd info on local/global spark to thread -- HWL
2686   // tso->gran.exported =  spark->exported;
2687   // tso->gran.locked =   !spark->global;
2688   // tso->gran.sparkname = spark->name;
2689
2690   return tso;
2691 }
2692 #endif
2693
2694 /* ---------------------------------------------------------------------------
2695  * scheduleThread()
2696  *
2697  * scheduleThread puts a thread on the end  of the runnable queue.
2698  * This will usually be done immediately after a thread is created.
2699  * The caller of scheduleThread must create the thread using e.g.
2700  * createThread and push an appropriate closure
2701  * on this thread's stack before the scheduler is invoked.
2702  * ------------------------------------------------------------------------ */
2703
2704 void
2705 scheduleThread(Capability *cap, StgTSO *tso)
2706 {
2707     // The thread goes at the *end* of the run-queue, to avoid possible
2708     // starvation of any threads already on the queue.
2709     appendToRunQueue(cap,tso);
2710 }
2711
2712 Capability *
2713 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2714 {
2715     Task *task;
2716
2717     // We already created/initialised the Task
2718     task = cap->running_task;
2719
2720     // This TSO is now a bound thread; make the Task and TSO
2721     // point to each other.
2722     tso->bound = task;
2723     tso->cap = cap;
2724
2725     task->tso = tso;
2726     task->ret = ret;
2727     task->stat = NoStatus;
2728
2729     appendToRunQueue(cap,tso);
2730
2731     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2732
2733 #if defined(GRAN)
2734     /* GranSim specific init */
2735     CurrentTSO = m->tso;                // the TSO to run
2736     procStatus[MainProc] = Busy;        // status of main PE
2737     CurrentProc = MainProc;             // PE to run it on
2738 #endif
2739
2740     cap = schedule(cap,task);
2741
2742     ASSERT(task->stat != NoStatus);
2743     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2744
2745     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2746     return cap;
2747 }
2748
2749 /* ----------------------------------------------------------------------------
2750  * Starting Tasks
2751  * ------------------------------------------------------------------------- */
2752
2753 #if defined(THREADED_RTS)
2754 void
2755 workerStart(Task *task)
2756 {
2757     Capability *cap;
2758
2759     // See startWorkerTask().
2760     ACQUIRE_LOCK(&task->lock);
2761     cap = task->cap;
2762     RELEASE_LOCK(&task->lock);
2763
2764     // set the thread-local pointer to the Task:
2765     taskEnter(task);
2766
2767     // schedule() runs without a lock.
2768     cap = schedule(cap,task);
2769
2770     // On exit from schedule(), we have a Capability.
2771     releaseCapability(cap);
2772     taskStop(task);
2773 }
2774 #endif
2775
2776 /* ---------------------------------------------------------------------------
2777  * initScheduler()
2778  *
2779  * Initialise the scheduler.  This resets all the queues - if the
2780  * queues contained any threads, they'll be garbage collected at the
2781  * next pass.
2782  *
2783  * ------------------------------------------------------------------------ */
2784
2785 void 
2786 initScheduler(void)
2787 {
2788 #if defined(GRAN)
2789   nat i;
2790   for (i=0; i<=MAX_PROC; i++) {
2791     run_queue_hds[i]      = END_TSO_QUEUE;
2792     run_queue_tls[i]      = END_TSO_QUEUE;
2793     blocked_queue_hds[i]  = END_TSO_QUEUE;
2794     blocked_queue_tls[i]  = END_TSO_QUEUE;
2795     ccalling_threadss[i]  = END_TSO_QUEUE;
2796     blackhole_queue[i]    = END_TSO_QUEUE;
2797     sleeping_queue        = END_TSO_QUEUE;
2798   }
2799 #elif !defined(THREADED_RTS)
2800   blocked_queue_hd  = END_TSO_QUEUE;
2801   blocked_queue_tl  = END_TSO_QUEUE;
2802   sleeping_queue    = END_TSO_QUEUE;
2803 #endif
2804
2805   blackhole_queue   = END_TSO_QUEUE;
2806   all_threads       = END_TSO_QUEUE;
2807
2808   context_switch = 0;
2809   sched_state    = SCHED_RUNNING;
2810
2811   RtsFlags.ConcFlags.ctxtSwitchTicks =
2812       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2813       
2814 #if defined(THREADED_RTS)
2815   /* Initialise the mutex and condition variables used by
2816    * the scheduler. */
2817   initMutex(&sched_mutex);
2818 #endif
2819   
2820   ACQUIRE_LOCK(&sched_mutex);
2821
2822   /* A capability holds the state a native thread needs in
2823    * order to execute STG code. At least one capability is
2824    * floating around (only THREADED_RTS builds have more than one).
2825    */
2826   initCapabilities();
2827
2828   initTaskManager();
2829
2830 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2831   initSparkPools();
2832 #endif
2833
2834 #if defined(THREADED_RTS)
2835   /*
2836    * Eagerly start one worker to run each Capability, except for
2837    * Capability 0.  The idea is that we're probably going to start a
2838    * bound thread on Capability 0 pretty soon, so we don't want a
2839    * worker task hogging it.
2840    */
2841   { 
2842       nat i;
2843       Capability *cap;
2844       for (i = 1; i < n_capabilities; i++) {
2845           cap = &capabilities[i];
2846           ACQUIRE_LOCK(&cap->lock);
2847           startWorkerTask(cap, workerStart);
2848           RELEASE_LOCK(&cap->lock);
2849       }
2850   }
2851 #endif
2852
2853   RELEASE_LOCK(&sched_mutex);
2854 }
2855
2856 void
2857 exitScheduler( void )
2858 {
2859     Task *task = NULL;
2860
2861 #if defined(THREADED_RTS)
2862     ACQUIRE_LOCK(&sched_mutex);
2863     task = newBoundTask();
2864     RELEASE_LOCK(&sched_mutex);
2865 #endif
2866
2867     // If we haven't killed all the threads yet, do it now.
2868     if (sched_state < SCHED_INTERRUPTED) {
2869         sched_state = SCHED_INTERRUPTING;
2870         scheduleDoGC(NULL,task,rtsFalse,GetRoots);    
2871     }
2872     sched_state = SCHED_SHUTTING_DOWN;
2873
2874 #if defined(THREADED_RTS)
2875     { 
2876         nat i;
2877         
2878         for (i = 0; i < n_capabilities; i++) {
2879             shutdownCapability(&capabilities[i], task);
2880         }
2881         boundTaskExiting(task);
2882         stopTaskManager();
2883     }
2884 #endif
2885 }
2886
2887 /* ---------------------------------------------------------------------------
2888    Where are the roots that we know about?
2889
2890         - all the threads on the runnable queue
2891         - all the threads on the blocked queue
2892         - all the threads on the sleeping queue
2893         - all the thread currently executing a _ccall_GC
2894         - all the "main threads"
2895      
2896    ------------------------------------------------------------------------ */
2897
2898 /* This has to be protected either by the scheduler monitor, or by the
2899         garbage collection monitor (probably the latter).
2900         KH @ 25/10/99
2901 */
2902
2903 void
2904 GetRoots( evac_fn evac )
2905 {
2906     nat i;
2907     Capability *cap;
2908     Task *task;
2909
2910 #if defined(GRAN)
2911     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2912         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2913             evac((StgClosure **)&run_queue_hds[i]);
2914         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2915             evac((StgClosure **)&run_queue_tls[i]);
2916         
2917         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2918             evac((StgClosure **)&blocked_queue_hds[i]);
2919         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2920             evac((StgClosure **)&blocked_queue_tls[i]);
2921         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2922             evac((StgClosure **)&ccalling_threads[i]);
2923     }
2924
2925     markEventQueue();
2926
2927 #else /* !GRAN */
2928
2929     for (i = 0; i < n_capabilities; i++) {
2930         cap = &capabilities[i];
2931         evac((StgClosure **)(void *)&cap->run_queue_hd);
2932         evac((StgClosure **)(void *)&cap->run_queue_tl);
2933 #if defined(THREADED_RTS)
2934         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2935         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2936 #endif
2937         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2938              task=task->next) {
2939             IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
2940             evac((StgClosure **)(void *)&task->suspended_tso);
2941         }
2942
2943     }
2944     
2945
2946 #if !defined(THREADED_RTS)
2947     evac((StgClosure **)(void *)&blocked_queue_hd);
2948     evac((StgClosure **)(void *)&blocked_queue_tl);
2949     evac((StgClosure **)(void *)&sleeping_queue);
2950 #endif 
2951 #endif
2952
2953     // evac((StgClosure **)&blackhole_queue);
2954
2955 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2956     markSparkQueue(evac);
2957 #endif
2958     
2959 #if defined(RTS_USER_SIGNALS)
2960     // mark the signal handlers (signals should be already blocked)
2961     markSignalHandlers(evac);
2962 #endif
2963 }
2964
2965 /* -----------------------------------------------------------------------------
2966    performGC
2967
2968    This is the interface to the garbage collector from Haskell land.
2969    We provide this so that external C code can allocate and garbage
2970    collect when called from Haskell via _ccall_GC.
2971
2972    It might be useful to provide an interface whereby the programmer
2973    can specify more roots (ToDo).
2974    
2975    This needs to be protected by the GC condition variable above.  KH.
2976    -------------------------------------------------------------------------- */
2977
2978 static void (*extra_roots)(evac_fn);
2979
2980 static void
2981 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2982 {
2983     Task *task = myTask();
2984
2985     if (task == NULL) {
2986         ACQUIRE_LOCK(&sched_mutex);
2987         task = newBoundTask();
2988         RELEASE_LOCK(&sched_mutex);
2989         scheduleDoGC(NULL,task,force_major, get_roots);
2990         boundTaskExiting(task);
2991     } else {
2992         scheduleDoGC(NULL,task,force_major, get_roots);
2993     }
2994 }
2995
2996 void
2997 performGC(void)
2998 {
2999     performGC_(rtsFalse, GetRoots);
3000 }
3001
3002 void
3003 performMajorGC(void)
3004 {
3005     performGC_(rtsTrue, GetRoots);
3006 }
3007
3008 static void
3009 AllRoots(evac_fn evac)
3010 {
3011     GetRoots(evac);             // the scheduler's roots
3012     extra_roots(evac);          // the user's roots
3013 }
3014
3015 void
3016 performGCWithRoots(void (*get_roots)(evac_fn))
3017 {
3018     extra_roots = get_roots;
3019     performGC_(rtsFalse, AllRoots);
3020 }
3021
3022 /* -----------------------------------------------------------------------------
3023    Stack overflow
3024
3025    If the thread has reached its maximum stack size, then raise the
3026    StackOverflow exception in the offending thread.  Otherwise
3027    relocate the TSO into a larger chunk of memory and adjust its stack
3028    size appropriately.
3029    -------------------------------------------------------------------------- */
3030
3031 static StgTSO *
3032 threadStackOverflow(Capability *cap, StgTSO *tso)
3033 {
3034   nat new_stack_size, stack_words;
3035   lnat new_tso_size;
3036   StgPtr new_sp;
3037   StgTSO *dest;
3038
3039   IF_DEBUG(sanity,checkTSO(tso));
3040   if (tso->stack_size >= tso->max_stack_size) {
3041
3042     IF_DEBUG(gc,
3043              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
3044                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
3045              /* If we're debugging, just print out the top of the stack */
3046              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3047                                               tso->sp+64)));
3048
3049     /* Send this thread the StackOverflow exception */
3050     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
3051     return tso;
3052   }
3053
3054   /* Try to double the current stack size.  If that takes us over the
3055    * maximum stack size for this thread, then use the maximum instead.
3056    * Finally round up so the TSO ends up as a whole number of blocks.
3057    */
3058   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
3059   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
3060                                        TSO_STRUCT_SIZE)/sizeof(W_);
3061   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
3062   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
3063
3064   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
3065
3066   dest = (StgTSO *)allocate(new_tso_size);
3067   TICK_ALLOC_TSO(new_stack_size,0);
3068
3069   /* copy the TSO block and the old stack into the new area */
3070   memcpy(dest,tso,TSO_STRUCT_SIZE);
3071   stack_words = tso->stack + tso->stack_size - tso->sp;
3072   new_sp = (P_)dest + new_tso_size - stack_words;
3073   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
3074
3075   /* relocate the stack pointers... */
3076   dest->sp         = new_sp;
3077   dest->stack_size = new_stack_size;
3078         
3079   /* Mark the old TSO as relocated.  We have to check for relocated
3080    * TSOs in the garbage collector and any primops that deal with TSOs.
3081    *
3082    * It's important to set the sp value to just beyond the end
3083    * of the stack, so we don't attempt to scavenge any part of the
3084    * dead TSO's stack.
3085    */
3086   tso->what_next = ThreadRelocated;
3087   tso->link = dest;
3088   tso->sp = (P_)&(tso->stack[tso->stack_size]);
3089   tso->why_blocked = NotBlocked;
3090
3091   IF_PAR_DEBUG(verbose,
3092                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
3093                      tso->id, tso, tso->stack_size);
3094                /* If we're debugging, just print out the top of the stack */
3095                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3096                                                 tso->sp+64)));
3097   
3098   IF_DEBUG(sanity,checkTSO(tso));
3099 #if 0
3100   IF_DEBUG(scheduler,printTSO(dest));
3101 #endif
3102
3103   return dest;
3104 }
3105
3106 /* ---------------------------------------------------------------------------
3107    Wake up a queue that was blocked on some resource.
3108    ------------------------------------------------------------------------ */
3109
3110 #if defined(GRAN)
3111 STATIC_INLINE void
3112 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3113 {
3114 }
3115 #elif defined(PARALLEL_HASKELL)
3116 STATIC_INLINE void
3117 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3118 {
3119   /* write RESUME events to log file and
3120      update blocked and fetch time (depending on type of the orig closure) */
3121   if (RtsFlags.ParFlags.ParStats.Full) {
3122     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
3123                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3124                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3125     if (emptyRunQueue())
3126       emitSchedule = rtsTrue;
3127
3128     switch (get_itbl(node)->type) {
3129         case FETCH_ME_BQ:
3130           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3131           break;
3132         case RBH:
3133         case FETCH_ME:
3134         case BLACKHOLE_BQ:
3135           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3136           break;
3137 #ifdef DIST
3138         case MVAR:
3139           break;
3140 #endif    
3141         default:
3142           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3143         }
3144       }
3145 }
3146 #endif
3147
3148 #if defined(GRAN)
3149 StgBlockingQueueElement *
3150 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3151 {
3152     StgTSO *tso;
3153     PEs node_loc, tso_loc;
3154
3155     node_loc = where_is(node); // should be lifted out of loop
3156     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3157     tso_loc = where_is((StgClosure *)tso);
3158     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3159       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3160       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3161       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3162       // insertThread(tso, node_loc);
3163       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3164                 ResumeThread,
3165                 tso, node, (rtsSpark*)NULL);
3166       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3167       // len_local++;
3168       // len++;
3169     } else { // TSO is remote (actually should be FMBQ)
3170       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3171                                   RtsFlags.GranFlags.Costs.gunblocktime +
3172                                   RtsFlags.GranFlags.Costs.latency;
3173       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3174                 UnblockThread,
3175                 tso, node, (rtsSpark*)NULL);
3176       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3177       // len++;
3178     }
3179     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3180     IF_GRAN_DEBUG(bq,
3181                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3182                           (node_loc==tso_loc ? "Local" : "Global"), 
3183                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3184     tso->block_info.closure = NULL;
3185     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3186                              tso->id, tso));
3187 }
3188 #elif defined(PARALLEL_HASKELL)
3189 StgBlockingQueueElement *
3190 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3191 {
3192     StgBlockingQueueElement *next;
3193
3194     switch (get_itbl(bqe)->type) {
3195     case TSO:
3196       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3197       /* if it's a TSO just push it onto the run_queue */
3198       next = bqe->link;
3199       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3200       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3201       threadRunnable();
3202       unblockCount(bqe, node);
3203       /* reset blocking status after dumping event */
3204       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3205       break;
3206
3207     case BLOCKED_FETCH:
3208       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3209       next = bqe->link;
3210       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3211       PendingFetches = (StgBlockedFetch *)bqe;
3212       break;
3213
3214 # if defined(DEBUG)
3215       /* can ignore this case in a non-debugging setup; 
3216          see comments on RBHSave closures above */
3217     case CONSTR:
3218       /* check that the closure is an RBHSave closure */
3219       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3220              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3221              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3222       break;
3223
3224     default:
3225       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3226            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3227            (StgClosure *)bqe);
3228 # endif
3229     }
3230   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3231   return next;
3232 }
3233 #endif
3234
3235 StgTSO *
3236 unblockOne(Capability *cap, StgTSO *tso)
3237 {
3238   StgTSO *next;
3239
3240   ASSERT(get_itbl(tso)->type == TSO);
3241   ASSERT(tso->why_blocked != NotBlocked);
3242
3243   tso->why_blocked = NotBlocked;
3244   next = tso->link;
3245   tso->link = END_TSO_QUEUE;
3246
3247   if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
3248       // We are waking up this thread on the current Capability, which
3249       // might involve migrating it from the Capability it was last on.
3250       if (tso->bound) {
3251           ASSERT(tso->bound->cap == tso->cap);
3252           tso->bound->cap = cap;
3253       }
3254       tso->cap = cap;
3255       appendToRunQueue(cap,tso);
3256       // we're holding a newly woken thread, make sure we context switch
3257       // quickly so we can migrate it if necessary.
3258       context_switch = 1;
3259   } else {
3260       // we'll try to wake it up on the Capability it was last on.
3261       wakeupThreadOnCapability(tso->cap, tso);
3262   }
3263
3264   IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
3265   return next;
3266 }
3267
3268
3269 #if defined(GRAN)
3270 void 
3271 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3272 {
3273   StgBlockingQueueElement *bqe;
3274   PEs node_loc;
3275   nat len = 0; 
3276
3277   IF_GRAN_DEBUG(bq, 
3278                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3279                       node, CurrentProc, CurrentTime[CurrentProc], 
3280                       CurrentTSO->id, CurrentTSO));
3281
3282   node_loc = where_is(node);
3283
3284   ASSERT(q == END_BQ_QUEUE ||
3285          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3286          get_itbl(q)->type == CONSTR); // closure (type constructor)
3287   ASSERT(is_unique(node));
3288
3289   /* FAKE FETCH: magically copy the node to the tso's proc;
3290      no Fetch necessary because in reality the node should not have been 
3291      moved to the other PE in the first place
3292   */
3293   if (CurrentProc!=node_loc) {
3294     IF_GRAN_DEBUG(bq, 
3295                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3296                         node, node_loc, CurrentProc, CurrentTSO->id, 
3297                         // CurrentTSO, where_is(CurrentTSO),
3298                         node->header.gran.procs));
3299     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3300     IF_GRAN_DEBUG(bq, 
3301                   debugBelch("## new bitmask of node %p is %#x\n",
3302                         node, node->header.gran.procs));
3303     if (RtsFlags.GranFlags.GranSimStats.Global) {
3304       globalGranStats.tot_fake_fetches++;
3305     }
3306   }
3307
3308   bqe = q;
3309   // ToDo: check: ASSERT(CurrentProc==node_loc);
3310   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3311     //next = bqe->link;
3312     /* 
3313        bqe points to the current element in the queue
3314        next points to the next element in the queue
3315     */
3316     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3317     //tso_loc = where_is(tso);
3318     len++;
3319     bqe = unblockOne(bqe, node);
3320   }
3321
3322   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3323      the closure to make room for the anchor of the BQ */
3324   if (bqe!=END_BQ_QUEUE) {
3325     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3326     /*
3327     ASSERT((info_ptr==&RBH_Save_0_info) ||
3328            (info_ptr==&RBH_Save_1_info) ||
3329            (info_ptr==&RBH_Save_2_info));
3330     */
3331     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3332     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3333     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3334
3335     IF_GRAN_DEBUG(bq,
3336                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3337                         node, info_type(node)));
3338   }
3339
3340   /* statistics gathering */
3341   if (RtsFlags.GranFlags.GranSimStats.Global) {
3342     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3343     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3344     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3345     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3346   }
3347   IF_GRAN_DEBUG(bq,
3348                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3349                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3350 }
3351 #elif defined(PARALLEL_HASKELL)
3352 void 
3353 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3354 {
3355   StgBlockingQueueElement *bqe;
3356
3357   IF_PAR_DEBUG(verbose, 
3358                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3359                      node, mytid));
3360 #ifdef DIST  
3361   //RFP
3362   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3363     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3364     return;
3365   }
3366 #endif
3367   
3368   ASSERT(q == END_BQ_QUEUE ||
3369          get_itbl(q)->type == TSO ||           
3370          get_itbl(q)->type == BLOCKED_FETCH || 
3371          get_itbl(q)->type == CONSTR); 
3372
3373   bqe = q;
3374   while (get_itbl(bqe)->type==TSO || 
3375          get_itbl(bqe)->type==BLOCKED_FETCH) {
3376     bqe = unblockOne(bqe, node);
3377   }
3378 }
3379
3380 #else   /* !GRAN && !PARALLEL_HASKELL */
3381
3382 void
3383 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3384 {
3385     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3386                              // Exception.cmm
3387     while (tso != END_TSO_QUEUE) {
3388         tso = unblockOne(cap,tso);
3389     }
3390 }
3391 #endif
3392
3393 /* ---------------------------------------------------------------------------
3394    Interrupt execution
3395    - usually called inside a signal handler so it mustn't do anything fancy.   
3396    ------------------------------------------------------------------------ */
3397
3398 void
3399 interruptStgRts(void)
3400 {
3401     sched_state = SCHED_INTERRUPTING;
3402     context_switch = 1;
3403 #if defined(THREADED_RTS)
3404     prodAllCapabilities();
3405 #endif
3406 }
3407
3408 /* -----------------------------------------------------------------------------
3409    Unblock a thread
3410
3411    This is for use when we raise an exception in another thread, which
3412    may be blocked.
3413    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3414    -------------------------------------------------------------------------- */
3415
3416 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3417 /*
3418   NB: only the type of the blocking queue is different in GranSim and GUM
3419       the operations on the queue-elements are the same
3420       long live polymorphism!
3421
3422   Locks: sched_mutex is held upon entry and exit.
3423
3424 */
3425 static void
3426 unblockThread(Capability *cap, StgTSO *tso)
3427 {
3428   StgBlockingQueueElement *t, **last;
3429
3430   switch (tso->why_blocked) {
3431
3432   case NotBlocked:
3433     return;  /* not blocked */
3434
3435   case BlockedOnSTM:
3436     // Be careful: nothing to do here!  We tell the scheduler that the thread
3437     // is runnable and we leave it to the stack-walking code to abort the 
3438     // transaction while unwinding the stack.  We should perhaps have a debugging
3439     // test to make sure that this really happens and that the 'zombie' transaction
3440     // does not get committed.
3441     goto done;
3442
3443   case BlockedOnMVar:
3444     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3445     {
3446       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3447       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3448
3449       last = (StgBlockingQueueElement **)&mvar->head;
3450       for (t = (StgBlockingQueueElement *)mvar->head; 
3451            t != END_BQ_QUEUE; 
3452            last = &t->link, last_tso = t, t = t->link) {
3453         if (t == (StgBlockingQueueElement *)tso) {
3454           *last = (StgBlockingQueueElement *)tso->link;
3455           if (mvar->tail == tso) {
3456             mvar->tail = (StgTSO *)last_tso;
3457           }
3458           goto done;
3459         }
3460       }
3461       barf("unblockThread (MVAR): TSO not found");
3462     }
3463
3464   case BlockedOnBlackHole:
3465     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3466     {
3467       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3468
3469       last = &bq->blocking_queue;
3470       for (t = bq->blocking_queue; 
3471            t != END_BQ_QUEUE; 
3472            last = &t->link, t = t->link) {
3473         if (t == (StgBlockingQueueElement *)tso) {
3474           *last = (StgBlockingQueueElement *)tso->link;
3475           goto done;
3476         }
3477       }
3478       barf("unblockThread (BLACKHOLE): TSO not found");
3479     }
3480
3481   case BlockedOnException:
3482     {
3483       StgTSO *target  = tso->block_info.tso;
3484
3485       ASSERT(get_itbl(target)->type == TSO);
3486
3487       if (target->what_next == ThreadRelocated) {
3488           target = target->link;
3489           ASSERT(get_itbl(target)->type == TSO);
3490       }
3491
3492       ASSERT(target->blocked_exceptions != NULL);
3493
3494       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3495       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3496            t != END_BQ_QUEUE; 
3497            last = &t->link, t = t->link) {
3498         ASSERT(get_itbl(t)->type == TSO);
3499         if (t == (StgBlockingQueueElement *)tso) {
3500           *last = (StgBlockingQueueElement *)tso->link;
3501           goto done;
3502         }
3503       }
3504       barf("unblockThread (Exception): TSO not found");
3505     }
3506
3507   case BlockedOnRead:
3508   case BlockedOnWrite:
3509 #if defined(mingw32_HOST_OS)
3510   case BlockedOnDoProc:
3511 #endif
3512     {
3513       /* take TSO off blocked_queue */
3514       StgBlockingQueueElement *prev = NULL;
3515       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3516            prev = t, t = t->link) {
3517         if (t == (StgBlockingQueueElement *)tso) {
3518           if (prev == NULL) {
3519             blocked_queue_hd = (StgTSO *)t->link;
3520             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3521               blocked_queue_tl = END_TSO_QUEUE;
3522             }
3523           } else {
3524             prev->link = t->link;
3525             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3526               blocked_queue_tl = (StgTSO *)prev;
3527             }
3528           }
3529 #if defined(mingw32_HOST_OS)
3530           /* (Cooperatively) signal that the worker thread should abort
3531            * the request.
3532            */
3533           abandonWorkRequest(tso->block_info.async_result->reqID);
3534 #endif
3535           goto done;
3536         }
3537       }
3538       barf("unblockThread (I/O): TSO not found");
3539     }
3540
3541   case BlockedOnDelay:
3542     {
3543       /* take TSO off sleeping_queue */
3544       StgBlockingQueueElement *prev = NULL;
3545       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3546            prev = t, t = t->link) {
3547         if (t == (StgBlockingQueueElement *)tso) {
3548           if (prev == NULL) {
3549             sleeping_queue = (StgTSO *)t->link;
3550           } else {
3551             prev->link = t->link;
3552           }
3553           goto done;
3554         }
3555       }
3556       barf("unblockThread (delay): TSO not found");
3557     }
3558
3559   default:
3560     barf("unblockThread");
3561   }
3562
3563  done:
3564   tso->link = END_TSO_QUEUE;
3565   tso->why_blocked = NotBlocked;
3566   tso->block_info.closure = NULL;
3567   pushOnRunQueue(cap,tso);
3568 }
3569 #else
3570 static void
3571 unblockThread(Capability *cap, StgTSO *tso)
3572 {
3573   StgTSO *t, **last;
3574   
3575   /* To avoid locking unnecessarily. */
3576   if (tso->why_blocked == NotBlocked) {
3577     return;
3578   }
3579
3580   switch (tso->why_blocked) {
3581
3582   case BlockedOnSTM:
3583     // Be careful: nothing to do here!  We tell the scheduler that the thread
3584     // is runnable and we leave it to the stack-walking code to abort the 
3585     // transaction while unwinding the stack.  We should perhaps have a debugging
3586     // test to make sure that this really happens and that the 'zombie' transaction
3587     // does not get committed.
3588     goto done;
3589
3590   case BlockedOnMVar:
3591     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3592     {
3593       StgTSO *last_tso = END_TSO_QUEUE;
3594       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3595
3596       last = &mvar->head;
3597       for (t = mvar->head; t != END_TSO_QUEUE; 
3598            last = &t->link, last_tso = t, t = t->link) {
3599         if (t == tso) {
3600           *last = tso->link;
3601           if (mvar->tail == tso) {
3602             mvar->tail = last_tso;
3603           }
3604           goto done;
3605         }
3606       }
3607       barf("unblockThread (MVAR): TSO not found");
3608     }
3609
3610   case BlockedOnBlackHole:
3611     {
3612       last = &blackhole_queue;
3613       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3614            last = &t->link, t = t->link) {
3615         if (t == tso) {
3616           *last = tso->link;
3617           goto done;
3618         }
3619       }
3620       barf("unblockThread (BLACKHOLE): TSO not found");
3621     }
3622
3623   case BlockedOnException:
3624     {
3625       StgTSO *target  = tso->block_info.tso;
3626
3627       ASSERT(get_itbl(target)->type == TSO);
3628
3629       while (target->what_next == ThreadRelocated) {
3630           target = target->link;
3631           ASSERT(get_itbl(target)->type == TSO);
3632       }
3633       
3634       ASSERT(target->blocked_exceptions != NULL);
3635
3636       last = &target->blocked_exceptions;
3637       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3638            last = &t->link, t = t->link) {
3639         ASSERT(get_itbl(t)->type == TSO);
3640         if (t == tso) {
3641           *last = tso->link;
3642           goto done;
3643         }
3644       }
3645       barf("unblockThread (Exception): TSO not found");
3646     }
3647
3648 #if !defined(THREADED_RTS)
3649   case BlockedOnRead:
3650   case BlockedOnWrite:
3651 #if defined(mingw32_HOST_OS)
3652   case BlockedOnDoProc:
3653 #endif
3654     {
3655       StgTSO *prev = NULL;
3656       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3657            prev = t, t = t->link) {
3658         if (t == tso) {
3659           if (prev == NULL) {
3660             blocked_queue_hd = t->link;
3661             if (blocked_queue_tl == t) {
3662               blocked_queue_tl = END_TSO_QUEUE;
3663             }
3664           } else {
3665             prev->link = t->link;
3666             if (blocked_queue_tl == t) {
3667               blocked_queue_tl = prev;
3668             }
3669           }
3670 #if defined(mingw32_HOST_OS)
3671           /* (Cooperatively) signal that the worker thread should abort
3672            * the request.
3673            */
3674           abandonWorkRequest(tso->block_info.async_result->reqID);
3675 #endif
3676           goto done;
3677         }
3678       }
3679       barf("unblockThread (I/O): TSO not found");
3680     }
3681
3682   case BlockedOnDelay:
3683     {
3684       StgTSO *prev = NULL;
3685       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3686            prev = t, t = t->link) {
3687         if (t == tso) {
3688           if (prev == NULL) {
3689             sleeping_queue = t->link;
3690           } else {
3691             prev->link = t->link;
3692           }
3693           goto done;
3694         }
3695       }
3696       barf("unblockThread (delay): TSO not found");
3697     }
3698 #endif
3699
3700   default:
3701     barf("unblockThread");
3702   }
3703
3704  done:
3705   tso->link = END_TSO_QUEUE;
3706   tso->why_blocked = NotBlocked;
3707   tso->block_info.closure = NULL;
3708   appendToRunQueue(cap,tso);
3709
3710   // We might have just migrated this TSO to our Capability:
3711   if (tso->bound) {
3712       tso->bound->cap = cap;
3713   }
3714   tso->cap = cap;
3715 }
3716 #endif
3717
3718 /* -----------------------------------------------------------------------------
3719  * checkBlackHoles()
3720  *
3721  * Check the blackhole_queue for threads that can be woken up.  We do
3722  * this periodically: before every GC, and whenever the run queue is
3723  * empty.
3724  *
3725  * An elegant solution might be to just wake up all the blocked
3726  * threads with awakenBlockedQueue occasionally: they'll go back to
3727  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3728  * doesn't give us a way to tell whether we've actually managed to
3729  * wake up any threads, so we would be busy-waiting.
3730  *
3731  * -------------------------------------------------------------------------- */
3732
3733 static rtsBool
3734 checkBlackHoles (Capability *cap)
3735 {
3736     StgTSO **prev, *t;
3737     rtsBool any_woke_up = rtsFalse;
3738     StgHalfWord type;
3739
3740     // blackhole_queue is global:
3741     ASSERT_LOCK_HELD(&sched_mutex);
3742
3743     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3744
3745     // ASSUMES: sched_mutex
3746     prev = &blackhole_queue;
3747     t = blackhole_queue;
3748     while (t != END_TSO_QUEUE) {
3749         ASSERT(t->why_blocked == BlockedOnBlackHole);
3750         type = get_itbl(t->block_info.closure)->type;
3751         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3752             IF_DEBUG(sanity,checkTSO(t));
3753             t = unblockOne(cap, t);
3754             // urk, the threads migrate to the current capability
3755             // here, but we'd like to keep them on the original one.
3756             *prev = t;
3757             any_woke_up = rtsTrue;
3758         } else {
3759             prev = &t->link;
3760             t = t->link;
3761         }
3762     }
3763
3764     return any_woke_up;
3765 }
3766
3767 /* -----------------------------------------------------------------------------
3768  * raiseAsync()
3769  *
3770  * The following function implements the magic for raising an
3771  * asynchronous exception in an existing thread.
3772  *
3773  * We first remove the thread from any queue on which it might be
3774  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3775  *
3776  * We strip the stack down to the innermost CATCH_FRAME, building
3777  * thunks in the heap for all the active computations, so they can 
3778  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3779  * an application of the handler to the exception, and push it on
3780  * the top of the stack.
3781  * 
3782  * How exactly do we save all the active computations?  We create an
3783  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3784  * AP_STACKs pushes everything from the corresponding update frame
3785  * upwards onto the stack.  (Actually, it pushes everything up to the
3786  * next update frame plus a pointer to the next AP_STACK object.
3787  * Entering the next AP_STACK object pushes more onto the stack until we
3788  * reach the last AP_STACK object - at which point the stack should look
3789  * exactly as it did when we killed the TSO and we can continue
3790  * execution by entering the closure on top of the stack.
3791  *
3792  * We can also kill a thread entirely - this happens if either (a) the 
3793  * exception passed to raiseAsync is NULL, or (b) there's no
3794  * CATCH_FRAME on the stack.  In either case, we strip the entire
3795  * stack and replace the thread with a zombie.
3796  *
3797  * ToDo: in THREADED_RTS mode, this function is only safe if either
3798  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3799  * one Capability), or (b) we own the Capability that the TSO is
3800  * currently blocked on or on the run queue of.
3801  *
3802  * -------------------------------------------------------------------------- */
3803  
3804 void
3805 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3806 {
3807     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3808 }
3809
3810 void
3811 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3812 {
3813     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3814 }
3815
3816 static void
3817 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3818             rtsBool stop_at_atomically, StgPtr stop_here)
3819 {
3820     StgRetInfoTable *info;
3821     StgPtr sp, frame;
3822     nat i;
3823   
3824     // Thread already dead?
3825     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3826         return;
3827     }
3828
3829     IF_DEBUG(scheduler, 
3830              sched_belch("raising exception in thread %ld.", (long)tso->id));
3831     
3832     // Remove it from any blocking queues
3833     unblockThread(cap,tso);
3834
3835     // mark it dirty; we're about to change its stack.
3836     dirtyTSO(tso);
3837
3838     sp = tso->sp;
3839     
3840     // The stack freezing code assumes there's a closure pointer on
3841     // the top of the stack, so we have to arrange that this is the case...
3842     //
3843     if (sp[0] == (W_)&stg_enter_info) {
3844         sp++;
3845     } else {
3846         sp--;
3847         sp[0] = (W_)&stg_dummy_ret_closure;
3848     }
3849
3850     frame = sp + 1;
3851     while (stop_here == NULL || frame < stop_here) {
3852
3853         // 1. Let the top of the stack be the "current closure"
3854         //
3855         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3856         // CATCH_FRAME.
3857         //
3858         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3859         // current closure applied to the chunk of stack up to (but not
3860         // including) the update frame.  This closure becomes the "current
3861         // closure".  Go back to step 2.
3862         //
3863         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3864         // top of the stack applied to the exception.
3865         // 
3866         // 5. If it's a STOP_FRAME, then kill the thread.
3867         // 
3868         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3869         // transaction
3870        
3871         info = get_ret_itbl((StgClosure *)frame);
3872
3873         switch (info->i.type) {
3874
3875         case UPDATE_FRAME:
3876         {
3877             StgAP_STACK * ap;
3878             nat words;
3879             
3880             // First build an AP_STACK consisting of the stack chunk above the
3881             // current update frame, with the top word on the stack as the
3882             // fun field.
3883             //
3884             words = frame - sp - 1;
3885             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3886             
3887             ap->size = words;
3888             ap->fun  = (StgClosure *)sp[0];
3889             sp++;
3890             for(i=0; i < (nat)words; ++i) {
3891                 ap->payload[i] = (StgClosure *)*sp++;
3892             }
3893             
3894             SET_HDR(ap,&stg_AP_STACK_info,
3895                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3896             TICK_ALLOC_UP_THK(words+1,0);
3897             
3898             IF_DEBUG(scheduler,
3899                      debugBelch("sched: Updating ");
3900                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3901                      debugBelch(" with ");
3902                      printObj((StgClosure *)ap);
3903                 );
3904
3905             // Replace the updatee with an indirection
3906             //
3907             // Warning: if we're in a loop, more than one update frame on
3908             // the stack may point to the same object.  Be careful not to
3909             // overwrite an IND_OLDGEN in this case, because we'll screw
3910             // up the mutable lists.  To be on the safe side, don't
3911             // overwrite any kind of indirection at all.  See also
3912             // threadSqueezeStack in GC.c, where we have to make a similar
3913             // check.
3914             //
3915             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3916                 // revert the black hole
3917                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3918                                (StgClosure *)ap);
3919             }
3920             sp += sizeofW(StgUpdateFrame) - 1;
3921             sp[0] = (W_)ap; // push onto stack
3922             frame = sp + 1;
3923             continue; //no need to bump frame
3924         }
3925
3926         case STOP_FRAME:
3927             // We've stripped the entire stack, the thread is now dead.
3928             tso->what_next = ThreadKilled;
3929             tso->sp = frame + sizeofW(StgStopFrame);
3930             return;
3931
3932         case CATCH_FRAME:
3933             // If we find a CATCH_FRAME, and we've got an exception to raise,
3934             // then build the THUNK raise(exception), and leave it on
3935             // top of the CATCH_FRAME ready to enter.
3936             //
3937         {
3938 #ifdef PROFILING
3939             StgCatchFrame *cf = (StgCatchFrame *)frame;
3940 #endif
3941             StgThunk *raise;
3942             
3943             if (exception == NULL) break;
3944
3945             // we've got an exception to raise, so let's pass it to the
3946             // handler in this frame.
3947             //
3948             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3949             TICK_ALLOC_SE_THK(1,0);
3950             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3951             raise->payload[0] = exception;
3952             
3953             // throw away the stack from Sp up to the CATCH_FRAME.
3954             //
3955             sp = frame - 1;
3956             
3957             /* Ensure that async excpetions are blocked now, so we don't get
3958              * a surprise exception before we get around to executing the
3959              * handler.
3960              */
3961             if (tso->blocked_exceptions == NULL) {
3962                 tso->blocked_exceptions = END_TSO_QUEUE;
3963             }
3964
3965             /* Put the newly-built THUNK on top of the stack, ready to execute
3966              * when the thread restarts.
3967              */
3968             sp[0] = (W_)raise;
3969             sp[-1] = (W_)&stg_enter_info;
3970             tso->sp = sp-1;
3971             tso->what_next = ThreadRunGHC;
3972             IF_DEBUG(sanity, checkTSO(tso));
3973             return;
3974         }
3975             
3976         case ATOMICALLY_FRAME:
3977             if (stop_at_atomically) {
3978                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3979                 stmCondemnTransaction(cap, tso -> trec);
3980 #ifdef REG_R1
3981                 tso->sp = frame;
3982 #else
3983                 // R1 is not a register: the return convention for IO in
3984                 // this case puts the return value on the stack, so we
3985                 // need to set up the stack to return to the atomically
3986                 // frame properly...
3987                 tso->sp = frame - 2;
3988                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3989                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3990 #endif
3991                 tso->what_next = ThreadRunGHC;
3992                 return;
3993             }
3994             // Not stop_at_atomically... fall through and abort the
3995             // transaction.
3996             
3997         case CATCH_RETRY_FRAME:
3998             // IF we find an ATOMICALLY_FRAME then we abort the
3999             // current transaction and propagate the exception.  In
4000             // this case (unlike ordinary exceptions) we do not care
4001             // whether the transaction is valid or not because its
4002             // possible validity cannot have caused the exception
4003             // and will not be visible after the abort.
4004             IF_DEBUG(stm,
4005                      debugBelch("Found atomically block delivering async exception\n"));
4006             StgTRecHeader *trec = tso -> trec;
4007             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
4008             stmAbortTransaction(cap, trec);
4009             tso -> trec = outer;
4010             break;
4011             
4012         default:
4013             break;
4014         }
4015
4016         // move on to the next stack frame
4017         frame += stack_frame_sizeW((StgClosure *)frame);
4018     }
4019
4020     // if we got here, then we stopped at stop_here
4021     ASSERT(stop_here != NULL);
4022 }
4023
4024 /* -----------------------------------------------------------------------------
4025    Deleting threads
4026
4027    This is used for interruption (^C) and forking, and corresponds to
4028    raising an exception but without letting the thread catch the
4029    exception.
4030    -------------------------------------------------------------------------- */
4031
4032 static void 
4033 deleteThread (Capability *cap, StgTSO *tso)
4034 {
4035   if (tso->why_blocked != BlockedOnCCall &&
4036       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
4037       raiseAsync(cap,tso,NULL);
4038   }
4039 }
4040
4041 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
4042 static void 
4043 deleteThread_(Capability *cap, StgTSO *tso)
4044 { // for forkProcess only:
4045   // like deleteThread(), but we delete threads in foreign calls, too.
4046
4047     if (tso->why_blocked == BlockedOnCCall ||
4048         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
4049         unblockOne(cap,tso);
4050         tso->what_next = ThreadKilled;
4051     } else {
4052         deleteThread(cap,tso);
4053     }
4054 }
4055 #endif
4056
4057 /* -----------------------------------------------------------------------------
4058    raiseExceptionHelper
4059    
4060    This function is called by the raise# primitve, just so that we can
4061    move some of the tricky bits of raising an exception from C-- into
4062    C.  Who knows, it might be a useful re-useable thing here too.
4063    -------------------------------------------------------------------------- */
4064
4065 StgWord
4066 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
4067 {
4068     Capability *cap = regTableToCapability(reg);
4069     StgThunk *raise_closure = NULL;
4070     StgPtr p, next;
4071     StgRetInfoTable *info;
4072     //
4073     // This closure represents the expression 'raise# E' where E
4074     // is the exception raise.  It is used to overwrite all the
4075     // thunks which are currently under evaluataion.
4076     //
4077
4078     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
4079     // LDV profiling: stg_raise_info has THUNK as its closure
4080     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
4081     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
4082     // 1 does not cause any problem unless profiling is performed.
4083     // However, when LDV profiling goes on, we need to linearly scan
4084     // small object pool, where raise_closure is stored, so we should
4085     // use MIN_UPD_SIZE.
4086     //
4087     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
4088     //                                 sizeofW(StgClosure)+1);
4089     //
4090
4091     //
4092     // Walk up the stack, looking for the catch frame.  On the way,
4093     // we update any closures pointed to from update frames with the
4094     // raise closure that we just built.
4095     //
4096     p = tso->sp;
4097     while(1) {
4098         info = get_ret_itbl((StgClosure *)p);
4099         next = p + stack_frame_sizeW((StgClosure *)p);
4100         switch (info->i.type) {
4101             
4102         case UPDATE_FRAME:
4103             // Only create raise_closure if we need to.
4104             if (raise_closure == NULL) {
4105                 raise_closure = 
4106                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4107                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
4108                 raise_closure->payload[0] = exception;
4109             }
4110             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
4111             p = next;
4112             continue;
4113
4114         case ATOMICALLY_FRAME:
4115             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
4116             tso->sp = p;
4117             return ATOMICALLY_FRAME;
4118             
4119         case CATCH_FRAME:
4120             tso->sp = p;
4121             return CATCH_FRAME;
4122
4123         case CATCH_STM_FRAME:
4124             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
4125             tso->sp = p;
4126             return CATCH_STM_FRAME;
4127             
4128         case STOP_FRAME:
4129             tso->sp = p;
4130             return STOP_FRAME;
4131
4132         case CATCH_RETRY_FRAME:
4133         default:
4134             p = next; 
4135             continue;
4136         }
4137     }
4138 }
4139
4140
4141 /* -----------------------------------------------------------------------------
4142    findRetryFrameHelper
4143
4144    This function is called by the retry# primitive.  It traverses the stack
4145    leaving tso->sp referring to the frame which should handle the retry.  
4146
4147    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
4148    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
4149
4150    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4151    despite the similar implementation.
4152
4153    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4154    not be created within memory transactions.
4155    -------------------------------------------------------------------------- */
4156
4157 StgWord
4158 findRetryFrameHelper (StgTSO *tso)
4159 {
4160   StgPtr           p, next;
4161   StgRetInfoTable *info;
4162
4163   p = tso -> sp;
4164   while (1) {
4165     info = get_ret_itbl((StgClosure *)p);
4166     next = p + stack_frame_sizeW((StgClosure *)p);
4167     switch (info->i.type) {
4168       
4169     case ATOMICALLY_FRAME:
4170       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4171       tso->sp = p;
4172       return ATOMICALLY_FRAME;
4173       
4174     case CATCH_RETRY_FRAME:
4175       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4176       tso->sp = p;
4177       return CATCH_RETRY_FRAME;
4178       
4179     case CATCH_STM_FRAME:
4180     default:
4181       ASSERT(info->i.type != CATCH_FRAME);
4182       ASSERT(info->i.type != STOP_FRAME);
4183       p = next; 
4184       continue;
4185     }
4186   }
4187 }
4188
4189 /* -----------------------------------------------------------------------------
4190    resurrectThreads is called after garbage collection on the list of
4191    threads found to be garbage.  Each of these threads will be woken
4192    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4193    on an MVar, or NonTermination if the thread was blocked on a Black
4194    Hole.
4195
4196    Locks: assumes we hold *all* the capabilities.
4197    -------------------------------------------------------------------------- */
4198
4199 void
4200 resurrectThreads (StgTSO *threads)
4201 {
4202     StgTSO *tso, *next;
4203     Capability *cap;
4204
4205     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4206         next = tso->global_link;
4207         tso->global_link = all_threads;
4208         all_threads = tso;
4209         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4210         
4211         // Wake up the thread on the Capability it was last on
4212         cap = tso->cap;
4213         
4214         switch (tso->why_blocked) {
4215         case BlockedOnMVar:
4216         case BlockedOnException:
4217             /* Called by GC - sched_mutex lock is currently held. */
4218             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4219             break;
4220         case BlockedOnBlackHole:
4221             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4222             break;
4223         case BlockedOnSTM:
4224             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4225             break;
4226         case NotBlocked:
4227             /* This might happen if the thread was blocked on a black hole
4228              * belonging to a thread that we've just woken up (raiseAsync
4229              * can wake up threads, remember...).
4230              */
4231             continue;
4232         default:
4233             barf("resurrectThreads: thread blocked in a strange way");
4234         }
4235     }
4236 }
4237
4238 /* ----------------------------------------------------------------------------
4239  * Debugging: why is a thread blocked
4240  * [Also provides useful information when debugging threaded programs
4241  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4242    ------------------------------------------------------------------------- */
4243
4244 #if DEBUG
4245 static void
4246 printThreadBlockage(StgTSO *tso)
4247 {
4248   switch (tso->why_blocked) {
4249   case BlockedOnRead:
4250     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4251     break;
4252   case BlockedOnWrite:
4253     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4254     break;
4255 #if defined(mingw32_HOST_OS)
4256     case BlockedOnDoProc:
4257     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4258     break;
4259 #endif
4260   case BlockedOnDelay:
4261     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4262     break;
4263   case BlockedOnMVar:
4264     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4265     break;
4266   case BlockedOnException:
4267     debugBelch("is blocked on delivering an exception to thread %d",
4268             tso->block_info.tso->id);
4269     break;
4270   case BlockedOnBlackHole:
4271     debugBelch("is blocked on a black hole");
4272     break;
4273   case NotBlocked:
4274     debugBelch("is not blocked");
4275     break;
4276 #if defined(PARALLEL_HASKELL)
4277   case BlockedOnGA:
4278     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4279             tso->block_info.closure, info_type(tso->block_info.closure));
4280     break;
4281   case BlockedOnGA_NoSend:
4282     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4283             tso->block_info.closure, info_type(tso->block_info.closure));
4284     break;
4285 #endif
4286   case BlockedOnCCall:
4287     debugBelch("is blocked on an external call");
4288     break;
4289   case BlockedOnCCall_NoUnblockExc:
4290     debugBelch("is blocked on an external call (exceptions were already blocked)");
4291     break;
4292   case BlockedOnSTM:
4293     debugBelch("is blocked on an STM operation");
4294     break;
4295   default:
4296     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4297          tso->why_blocked, tso->id, tso);
4298   }
4299 }
4300
4301 void
4302 printThreadStatus(StgTSO *t)
4303 {
4304     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4305     {
4306       void *label = lookupThreadLabel(t->id);
4307       if (label) debugBelch("[\"%s\"] ",(char *)label);
4308     }
4309     if (t->what_next == ThreadRelocated) {
4310         debugBelch("has been relocated...\n");
4311     } else {
4312         switch (t->what_next) {
4313         case ThreadKilled:
4314             debugBelch("has been killed");
4315             break;
4316         case ThreadComplete:
4317             debugBelch("has completed");
4318             break;
4319         default:
4320             printThreadBlockage(t);
4321         }
4322         debugBelch("\n");
4323     }
4324 }
4325
4326 void
4327 printAllThreads(void)
4328 {
4329   StgTSO *t, *next;
4330   nat i;
4331   Capability *cap;
4332
4333 # if defined(GRAN)
4334   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4335   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4336                        time_string, rtsFalse/*no commas!*/);
4337
4338   debugBelch("all threads at [%s]:\n", time_string);
4339 # elif defined(PARALLEL_HASKELL)
4340   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4341   ullong_format_string(CURRENT_TIME,
4342                        time_string, rtsFalse/*no commas!*/);
4343
4344   debugBelch("all threads at [%s]:\n", time_string);
4345 # else
4346   debugBelch("all threads:\n");
4347 # endif
4348
4349   for (i = 0; i < n_capabilities; i++) {
4350       cap = &capabilities[i];
4351       debugBelch("threads on capability %d:\n", cap->no);
4352       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4353           printThreadStatus(t);
4354       }
4355   }
4356
4357   debugBelch("other threads:\n");
4358   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4359       if (t->why_blocked != NotBlocked) {
4360           printThreadStatus(t);
4361       }
4362       if (t->what_next == ThreadRelocated) {
4363           next = t->link;
4364       } else {
4365           next = t->global_link;
4366       }
4367   }
4368 }
4369
4370 // useful from gdb
4371 void 
4372 printThreadQueue(StgTSO *t)
4373 {
4374     nat i = 0;
4375     for (; t != END_TSO_QUEUE; t = t->link) {
4376         printThreadStatus(t);
4377         i++;
4378     }
4379     debugBelch("%d threads on queue\n", i);
4380 }
4381
4382 /* 
4383    Print a whole blocking queue attached to node (debugging only).
4384 */
4385 # if defined(PARALLEL_HASKELL)
4386 void 
4387 print_bq (StgClosure *node)
4388 {
4389   StgBlockingQueueElement *bqe;
4390   StgTSO *tso;
4391   rtsBool end;
4392
4393   debugBelch("## BQ of closure %p (%s): ",
4394           node, info_type(node));
4395
4396   /* should cover all closures that may have a blocking queue */
4397   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4398          get_itbl(node)->type == FETCH_ME_BQ ||
4399          get_itbl(node)->type == RBH ||
4400          get_itbl(node)->type == MVAR);
4401     
4402   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4403
4404   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4405 }
4406
4407 /* 
4408    Print a whole blocking queue starting with the element bqe.
4409 */
4410 void 
4411 print_bqe (StgBlockingQueueElement *bqe)
4412 {
4413   rtsBool end;
4414
4415   /* 
4416      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4417   */
4418   for (end = (bqe==END_BQ_QUEUE);
4419        !end; // iterate until bqe points to a CONSTR
4420        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4421        bqe = end ? END_BQ_QUEUE : bqe->link) {
4422     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4423     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4424     /* types of closures that may appear in a blocking queue */
4425     ASSERT(get_itbl(bqe)->type == TSO ||           
4426            get_itbl(bqe)->type == BLOCKED_FETCH || 
4427            get_itbl(bqe)->type == CONSTR); 
4428     /* only BQs of an RBH end with an RBH_Save closure */
4429     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4430
4431     switch (get_itbl(bqe)->type) {
4432     case TSO:
4433       debugBelch(" TSO %u (%x),",
4434               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4435       break;
4436     case BLOCKED_FETCH:
4437       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4438               ((StgBlockedFetch *)bqe)->node, 
4439               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4440               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4441               ((StgBlockedFetch *)bqe)->ga.weight);
4442       break;
4443     case CONSTR:
4444       debugBelch(" %s (IP %p),",
4445               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4446                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4447                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4448                "RBH_Save_?"), get_itbl(bqe));
4449       break;
4450     default:
4451       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4452            info_type((StgClosure *)bqe)); // , node, info_type(node));
4453       break;
4454     }
4455   } /* for */
4456   debugBelch("\n");
4457 }
4458 # elif defined(GRAN)
4459 void 
4460 print_bq (StgClosure *node)
4461 {
4462   StgBlockingQueueElement *bqe;
4463   PEs node_loc, tso_loc;
4464   rtsBool end;
4465
4466   /* should cover all closures that may have a blocking queue */
4467   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4468          get_itbl(node)->type == FETCH_ME_BQ ||
4469          get_itbl(node)->type == RBH);
4470     
4471   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4472   node_loc = where_is(node);
4473
4474   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4475           node, info_type(node), node_loc);
4476
4477   /* 
4478      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4479   */
4480   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4481        !end; // iterate until bqe points to a CONSTR
4482        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4483     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4484     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4485     /* types of closures that may appear in a blocking queue */
4486     ASSERT(get_itbl(bqe)->type == TSO ||           
4487            get_itbl(bqe)->type == CONSTR); 
4488     /* only BQs of an RBH end with an RBH_Save closure */
4489     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4490
4491     tso_loc = where_is((StgClosure *)bqe);
4492     switch (get_itbl(bqe)->type) {
4493     case TSO:
4494       debugBelch(" TSO %d (%p) on [PE %d],",
4495               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4496       break;
4497     case CONSTR:
4498       debugBelch(" %s (IP %p),",
4499               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4500                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4501                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4502                "RBH_Save_?"), get_itbl(bqe));
4503       break;
4504     default:
4505       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4506            info_type((StgClosure *)bqe), node, info_type(node));
4507       break;
4508     }
4509   } /* for */
4510   debugBelch("\n");
4511 }
4512 # endif
4513
4514 #if defined(PARALLEL_HASKELL)
4515 static nat
4516 run_queue_len(void)
4517 {
4518     nat i;
4519     StgTSO *tso;
4520     
4521     for (i=0, tso=run_queue_hd; 
4522          tso != END_TSO_QUEUE;
4523          i++, tso=tso->link) {
4524         /* nothing */
4525     }
4526         
4527     return i;
4528 }
4529 #endif
4530
4531 void
4532 sched_belch(char *s, ...)
4533 {
4534     va_list ap;
4535     va_start(ap,s);
4536 #ifdef THREADED_RTS
4537     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4538 #elif defined(PARALLEL_HASKELL)
4539     debugBelch("== ");
4540 #else
4541     debugBelch("sched: ");
4542 #endif
4543     vdebugBelch(s, ap);
4544     debugBelch("\n");
4545     va_end(ap);
4546 }
4547
4548 #endif /* DEBUG */