remove empty dir
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool sched_state = SCHED_RUNNING;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
208 static void scheduleCheckBlackHoles (Capability *cap);
209 static void scheduleDetectDeadlock (Capability *cap, Task *task);
210 #if defined(GRAN)
211 static StgTSO *scheduleProcessEvent(rtsEvent *event);
212 #endif
213 #if defined(PARALLEL_HASKELL)
214 static StgTSO *scheduleSendPendingMessages(void);
215 static void scheduleActivateSpark(void);
216 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
217 #endif
218 #if defined(PAR) || defined(GRAN)
219 static void scheduleGranParReport(void);
220 #endif
221 static void schedulePostRunThread(void);
222 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
223 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
224                                          StgTSO *t);
225 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
226                                     nat prev_what_next );
227 static void scheduleHandleThreadBlocked( StgTSO *t );
228 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
229                                              StgTSO *t );
230 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
231 static Capability *scheduleDoGC(Capability *cap, Task *task,
232                                 rtsBool force_major, 
233                                 void (*get_roots)(evac_fn));
234
235 static void unblockThread(Capability *cap, StgTSO *tso);
236 static rtsBool checkBlackHoles(Capability *cap);
237 static void AllRoots(evac_fn evac);
238
239 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
240
241 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
242                         rtsBool stop_at_atomically, StgPtr stop_here);
243
244 static void deleteThread (Capability *cap, StgTSO *tso);
245 static void deleteAllThreads (Capability *cap);
246
247 #ifdef DEBUG
248 static void printThreadBlockage(StgTSO *tso);
249 static void printThreadStatus(StgTSO *tso);
250 void printThreadQueue(StgTSO *tso);
251 #endif
252
253 #if defined(PARALLEL_HASKELL)
254 StgTSO * createSparkThread(rtsSpark spark);
255 StgTSO * activateSpark (rtsSpark spark);  
256 #endif
257
258 #ifdef DEBUG
259 static char *whatNext_strs[] = {
260   "(unknown)",
261   "ThreadRunGHC",
262   "ThreadInterpret",
263   "ThreadKilled",
264   "ThreadRelocated",
265   "ThreadComplete"
266 };
267 #endif
268
269 /* -----------------------------------------------------------------------------
270  * Putting a thread on the run queue: different scheduling policies
271  * -------------------------------------------------------------------------- */
272
273 STATIC_INLINE void
274 addToRunQueue( Capability *cap, StgTSO *t )
275 {
276 #if defined(PARALLEL_HASKELL)
277     if (RtsFlags.ParFlags.doFairScheduling) { 
278         // this does round-robin scheduling; good for concurrency
279         appendToRunQueue(cap,t);
280     } else {
281         // this does unfair scheduling; good for parallelism
282         pushOnRunQueue(cap,t);
283     }
284 #else
285     // this does round-robin scheduling; good for concurrency
286     appendToRunQueue(cap,t);
287 #endif
288 }
289
290 /* ---------------------------------------------------------------------------
291    Main scheduling loop.
292
293    We use round-robin scheduling, each thread returning to the
294    scheduler loop when one of these conditions is detected:
295
296       * out of heap space
297       * timer expires (thread yields)
298       * thread blocks
299       * thread ends
300       * stack overflow
301
302    GRAN version:
303      In a GranSim setup this loop iterates over the global event queue.
304      This revolves around the global event queue, which determines what 
305      to do next. Therefore, it's more complicated than either the 
306      concurrent or the parallel (GUM) setup.
307
308    GUM version:
309      GUM iterates over incoming messages.
310      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
311      and sends out a fish whenever it has nothing to do; in-between
312      doing the actual reductions (shared code below) it processes the
313      incoming messages and deals with delayed operations 
314      (see PendingFetches).
315      This is not the ugliest code you could imagine, but it's bloody close.
316
317    ------------------------------------------------------------------------ */
318
319 static Capability *
320 schedule (Capability *initialCapability, Task *task)
321 {
322   StgTSO *t;
323   Capability *cap;
324   StgThreadReturnCode ret;
325 #if defined(GRAN)
326   rtsEvent *event;
327 #elif defined(PARALLEL_HASKELL)
328   StgTSO *tso;
329   GlobalTaskId pe;
330   rtsBool receivedFinish = rtsFalse;
331 # if defined(DEBUG)
332   nat tp_size, sp_size; // stats only
333 # endif
334 #endif
335   nat prev_what_next;
336   rtsBool ready_to_gc;
337 #if defined(THREADED_RTS)
338   rtsBool first = rtsTrue;
339 #endif
340   
341   cap = initialCapability;
342
343   // Pre-condition: this task owns initialCapability.
344   // The sched_mutex is *NOT* held
345   // NB. on return, we still hold a capability.
346
347   IF_DEBUG(scheduler,
348            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
349                        task, initialCapability);
350       );
351
352   schedulePreLoop();
353
354   // -----------------------------------------------------------
355   // Scheduler loop starts here:
356
357 #if defined(PARALLEL_HASKELL)
358 #define TERMINATION_CONDITION        (!receivedFinish)
359 #elif defined(GRAN)
360 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
361 #else
362 #define TERMINATION_CONDITION        rtsTrue
363 #endif
364
365   while (TERMINATION_CONDITION) {
366
367 #if defined(GRAN)
368       /* Choose the processor with the next event */
369       CurrentProc = event->proc;
370       CurrentTSO = event->tso;
371 #endif
372
373 #if defined(THREADED_RTS)
374       if (first) {
375           // don't yield the first time, we want a chance to run this
376           // thread for a bit, even if there are others banging at the
377           // door.
378           first = rtsFalse;
379           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
380       } else {
381           // Yield the capability to higher-priority tasks if necessary.
382           yieldCapability(&cap, task);
383       }
384 #endif
385       
386 #if defined(THREADED_RTS)
387       schedulePushWork(cap,task);
388 #endif
389
390     // Check whether we have re-entered the RTS from Haskell without
391     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
392     // call).
393     if (cap->in_haskell) {
394           errorBelch("schedule: re-entered unsafely.\n"
395                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
396           stg_exit(EXIT_FAILURE);
397     }
398
399     // The interruption / shutdown sequence.
400     // 
401     // In order to cleanly shut down the runtime, we want to:
402     //   * make sure that all main threads return to their callers
403     //     with the state 'Interrupted'.
404     //   * clean up all OS threads assocated with the runtime
405     //   * free all memory etc.
406     //
407     // So the sequence for ^C goes like this:
408     //
409     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
410     //     arranges for some Capability to wake up
411     //
412     //   * all threads in the system are halted, and the zombies are
413     //     placed on the run queue for cleaning up.  We acquire all
414     //     the capabilities in order to delete the threads, this is
415     //     done by scheduleDoGC() for convenience (because GC already
416     //     needs to acquire all the capabilities).  We can't kill
417     //     threads involved in foreign calls.
418     // 
419     //   * sched_state := SCHED_INTERRUPTED
420     //
421     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
422     //
423     //   * sched_state := SCHED_SHUTTING_DOWN
424     //
425     //   * all workers exit when the run queue on their capability
426     //     drains.  All main threads will also exit when their TSO
427     //     reaches the head of the run queue and they can return.
428     //
429     //   * eventually all Capabilities will shut down, and the RTS can
430     //     exit.
431     //
432     //   * We might be left with threads blocked in foreign calls, 
433     //     we should really attempt to kill these somehow (TODO);
434     
435     switch (sched_state) {
436     case SCHED_RUNNING:
437         break;
438     case SCHED_INTERRUPTING:
439         IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
440 #if defined(THREADED_RTS)
441         discardSparksCap(cap);
442 #endif
443         /* scheduleDoGC() deletes all the threads */
444         cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
445         break;
446     case SCHED_INTERRUPTED:
447         IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED"));
448         break;
449     case SCHED_SHUTTING_DOWN:
450         IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
451         // If we are a worker, just exit.  If we're a bound thread
452         // then we will exit below when we've removed our TSO from
453         // the run queue.
454         if (task->tso == NULL && emptyRunQueue(cap)) {
455             return cap;
456         }
457         break;
458     default:
459         barf("sched_state: %d", sched_state);
460     }
461
462 #if defined(THREADED_RTS)
463     // If the run queue is empty, take a spark and turn it into a thread.
464     {
465         if (emptyRunQueue(cap)) {
466             StgClosure *spark;
467             spark = findSpark(cap);
468             if (spark != NULL) {
469                 IF_DEBUG(scheduler,
470                          sched_belch("turning spark of closure %p into a thread",
471                                      (StgClosure *)spark));
472                 createSparkThread(cap,spark);     
473             }
474         }
475     }
476 #endif // THREADED_RTS
477
478     scheduleStartSignalHandlers(cap);
479
480     // Only check the black holes here if we've nothing else to do.
481     // During normal execution, the black hole list only gets checked
482     // at GC time, to avoid repeatedly traversing this possibly long
483     // list each time around the scheduler.
484     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
485
486     scheduleCheckWakeupThreads(cap);
487
488     scheduleCheckBlockedThreads(cap);
489
490     scheduleDetectDeadlock(cap,task);
491 #if defined(THREADED_RTS)
492     cap = task->cap;    // reload cap, it might have changed
493 #endif
494
495     // Normally, the only way we can get here with no threads to
496     // run is if a keyboard interrupt received during 
497     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
498     // Additionally, it is not fatal for the
499     // threaded RTS to reach here with no threads to run.
500     //
501     // win32: might be here due to awaitEvent() being abandoned
502     // as a result of a console event having been delivered.
503     if ( emptyRunQueue(cap) ) {
504 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
505         ASSERT(sched_state >= SCHED_INTERRUPTING);
506 #endif
507         continue; // nothing to do
508     }
509
510 #if defined(PARALLEL_HASKELL)
511     scheduleSendPendingMessages();
512     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
513         continue;
514
515 #if defined(SPARKS)
516     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
517 #endif
518
519     /* If we still have no work we need to send a FISH to get a spark
520        from another PE */
521     if (emptyRunQueue(cap)) {
522         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
523         ASSERT(rtsFalse); // should not happen at the moment
524     }
525     // from here: non-empty run queue.
526     //  TODO: merge above case with this, only one call processMessages() !
527     if (PacketsWaiting()) {  /* process incoming messages, if
528                                 any pending...  only in else
529                                 because getRemoteWork waits for
530                                 messages as well */
531         receivedFinish = processMessages();
532     }
533 #endif
534
535 #if defined(GRAN)
536     scheduleProcessEvent(event);
537 #endif
538
539     // 
540     // Get a thread to run
541     //
542     t = popRunQueue(cap);
543
544 #if defined(GRAN) || defined(PAR)
545     scheduleGranParReport(); // some kind of debuging output
546 #else
547     // Sanity check the thread we're about to run.  This can be
548     // expensive if there is lots of thread switching going on...
549     IF_DEBUG(sanity,checkTSO(t));
550 #endif
551
552 #if defined(THREADED_RTS)
553     // Check whether we can run this thread in the current task.
554     // If not, we have to pass our capability to the right task.
555     {
556         Task *bound = t->bound;
557       
558         if (bound) {
559             if (bound == task) {
560                 IF_DEBUG(scheduler,
561                          sched_belch("### Running thread %d in bound thread",
562                                      t->id));
563                 // yes, the Haskell thread is bound to the current native thread
564             } else {
565                 IF_DEBUG(scheduler,
566                          sched_belch("### thread %d bound to another OS thread",
567                                      t->id));
568                 // no, bound to a different Haskell thread: pass to that thread
569                 pushOnRunQueue(cap,t);
570                 continue;
571             }
572         } else {
573             // The thread we want to run is unbound.
574             if (task->tso) { 
575                 IF_DEBUG(scheduler,
576                          sched_belch("### this OS thread cannot run thread %d", t->id));
577                 // no, the current native thread is bound to a different
578                 // Haskell thread, so pass it to any worker thread
579                 pushOnRunQueue(cap,t);
580                 continue; 
581             }
582         }
583     }
584 #endif
585
586     cap->r.rCurrentTSO = t;
587     
588     /* context switches are initiated by the timer signal, unless
589      * the user specified "context switch as often as possible", with
590      * +RTS -C0
591      */
592     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
593         && !emptyThreadQueues(cap)) {
594         context_switch = 1;
595     }
596          
597 run_thread:
598
599     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
600                               (long)t->id, whatNext_strs[t->what_next]));
601
602 #if defined(PROFILING)
603     startHeapProfTimer();
604 #endif
605
606     // ----------------------------------------------------------------------
607     // Run the current thread 
608
609     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
610     ASSERT(t->cap == cap);
611
612     prev_what_next = t->what_next;
613
614     errno = t->saved_errno;
615     cap->in_haskell = rtsTrue;
616
617     dirtyTSO(t);
618
619     recent_activity = ACTIVITY_YES;
620
621     switch (prev_what_next) {
622         
623     case ThreadKilled:
624     case ThreadComplete:
625         /* Thread already finished, return to scheduler. */
626         ret = ThreadFinished;
627         break;
628         
629     case ThreadRunGHC:
630     {
631         StgRegTable *r;
632         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
633         cap = regTableToCapability(r);
634         ret = r->rRet;
635         break;
636     }
637     
638     case ThreadInterpret:
639         cap = interpretBCO(cap);
640         ret = cap->r.rRet;
641         break;
642         
643     default:
644         barf("schedule: invalid what_next field");
645     }
646
647     cap->in_haskell = rtsFalse;
648
649     // The TSO might have moved, eg. if it re-entered the RTS and a GC
650     // happened.  So find the new location:
651     t = cap->r.rCurrentTSO;
652
653     // We have run some Haskell code: there might be blackhole-blocked
654     // threads to wake up now.
655     // Lock-free test here should be ok, we're just setting a flag.
656     if ( blackhole_queue != END_TSO_QUEUE ) {
657         blackholes_need_checking = rtsTrue;
658     }
659     
660     // And save the current errno in this thread.
661     // XXX: possibly bogus for SMP because this thread might already
662     // be running again, see code below.
663     t->saved_errno = errno;
664
665 #if defined(THREADED_RTS)
666     // If ret is ThreadBlocked, and this Task is bound to the TSO that
667     // blocked, we are in limbo - the TSO is now owned by whatever it
668     // is blocked on, and may in fact already have been woken up,
669     // perhaps even on a different Capability.  It may be the case
670     // that task->cap != cap.  We better yield this Capability
671     // immediately and return to normaility.
672     if (ret == ThreadBlocked) {
673         IF_DEBUG(scheduler,
674                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
675                              t->id, whatNext_strs[t->what_next]));
676         continue;
677     }
678 #endif
679
680     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
681     ASSERT(t->cap == cap);
682
683     // ----------------------------------------------------------------------
684     
685     // Costs for the scheduler are assigned to CCS_SYSTEM
686 #if defined(PROFILING)
687     stopHeapProfTimer();
688     CCCS = CCS_SYSTEM;
689 #endif
690     
691 #if defined(THREADED_RTS)
692     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
693 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
694     IF_DEBUG(scheduler,debugBelch("sched: "););
695 #endif
696     
697     schedulePostRunThread();
698
699     ready_to_gc = rtsFalse;
700
701     switch (ret) {
702     case HeapOverflow:
703         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
704         break;
705
706     case StackOverflow:
707         scheduleHandleStackOverflow(cap,task,t);
708         break;
709
710     case ThreadYielding:
711         if (scheduleHandleYield(cap, t, prev_what_next)) {
712             // shortcut for switching between compiler/interpreter:
713             goto run_thread; 
714         }
715         break;
716
717     case ThreadBlocked:
718         scheduleHandleThreadBlocked(t);
719         break;
720
721     case ThreadFinished:
722         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
723         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
724         break;
725
726     default:
727       barf("schedule: invalid thread return code %d", (int)ret);
728     }
729
730     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
731     if (ready_to_gc) {
732       cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
733     }
734   } /* end of while() */
735
736   IF_PAR_DEBUG(verbose,
737                debugBelch("== Leaving schedule() after having received Finish\n"));
738 }
739
740 /* ----------------------------------------------------------------------------
741  * Setting up the scheduler loop
742  * ------------------------------------------------------------------------- */
743
744 static void
745 schedulePreLoop(void)
746 {
747 #if defined(GRAN) 
748     /* set up first event to get things going */
749     /* ToDo: assign costs for system setup and init MainTSO ! */
750     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
751               ContinueThread, 
752               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
753     
754     IF_DEBUG(gran,
755              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
756                         CurrentTSO);
757              G_TSO(CurrentTSO, 5));
758     
759     if (RtsFlags.GranFlags.Light) {
760         /* Save current time; GranSim Light only */
761         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
762     }      
763 #endif
764 }
765
766 /* -----------------------------------------------------------------------------
767  * schedulePushWork()
768  *
769  * Push work to other Capabilities if we have some.
770  * -------------------------------------------------------------------------- */
771
772 #if defined(THREADED_RTS)
773 static void
774 schedulePushWork(Capability *cap USED_IF_THREADS, 
775                  Task *task      USED_IF_THREADS)
776 {
777     Capability *free_caps[n_capabilities], *cap0;
778     nat i, n_free_caps;
779
780     // migration can be turned off with +RTS -qg
781     if (!RtsFlags.ParFlags.migrate) return;
782
783     // Check whether we have more threads on our run queue, or sparks
784     // in our pool, that we could hand to another Capability.
785     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
786         && sparkPoolSizeCap(cap) < 2) {
787         return;
788     }
789
790     // First grab as many free Capabilities as we can.
791     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
792         cap0 = &capabilities[i];
793         if (cap != cap0 && tryGrabCapability(cap0,task)) {
794             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
795                 // it already has some work, we just grabbed it at 
796                 // the wrong moment.  Or maybe it's deadlocked!
797                 releaseCapability(cap0);
798             } else {
799                 free_caps[n_free_caps++] = cap0;
800             }
801         }
802     }
803
804     // we now have n_free_caps free capabilities stashed in
805     // free_caps[].  Share our run queue equally with them.  This is
806     // probably the simplest thing we could do; improvements we might
807     // want to do include:
808     //
809     //   - giving high priority to moving relatively new threads, on 
810     //     the gournds that they haven't had time to build up a
811     //     working set in the cache on this CPU/Capability.
812     //
813     //   - giving low priority to moving long-lived threads
814
815     if (n_free_caps > 0) {
816         StgTSO *prev, *t, *next;
817         rtsBool pushed_to_all;
818
819         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
820
821         i = 0;
822         pushed_to_all = rtsFalse;
823
824         if (cap->run_queue_hd != END_TSO_QUEUE) {
825             prev = cap->run_queue_hd;
826             t = prev->link;
827             prev->link = END_TSO_QUEUE;
828             for (; t != END_TSO_QUEUE; t = next) {
829                 next = t->link;
830                 t->link = END_TSO_QUEUE;
831                 if (t->what_next == ThreadRelocated
832                     || t->bound == task // don't move my bound thread
833                     || tsoLocked(t)) {  // don't move a locked thread
834                     prev->link = t;
835                     prev = t;
836                 } else if (i == n_free_caps) {
837                     pushed_to_all = rtsTrue;
838                     i = 0;
839                     // keep one for us
840                     prev->link = t;
841                     prev = t;
842                 } else {
843                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
844                     appendToRunQueue(free_caps[i],t);
845                     if (t->bound) { t->bound->cap = free_caps[i]; }
846                     t->cap = free_caps[i];
847                     i++;
848                 }
849             }
850             cap->run_queue_tl = prev;
851         }
852
853         // If there are some free capabilities that we didn't push any
854         // threads to, then try to push a spark to each one.
855         if (!pushed_to_all) {
856             StgClosure *spark;
857             // i is the next free capability to push to
858             for (; i < n_free_caps; i++) {
859                 if (emptySparkPoolCap(free_caps[i])) {
860                     spark = findSpark(cap);
861                     if (spark != NULL) {
862                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
863                         newSpark(&(free_caps[i]->r), spark);
864                     }
865                 }
866             }
867         }
868
869         // release the capabilities
870         for (i = 0; i < n_free_caps; i++) {
871             task->cap = free_caps[i];
872             releaseCapability(free_caps[i]);
873         }
874     }
875     task->cap = cap; // reset to point to our Capability.
876 }
877 #endif
878
879 /* ----------------------------------------------------------------------------
880  * Start any pending signal handlers
881  * ------------------------------------------------------------------------- */
882
883 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
884 static void
885 scheduleStartSignalHandlers(Capability *cap)
886 {
887     if (signals_pending()) { // safe outside the lock
888         startSignalHandlers(cap);
889     }
890 }
891 #else
892 static void
893 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
894 {
895 }
896 #endif
897
898 /* ----------------------------------------------------------------------------
899  * Check for blocked threads that can be woken up.
900  * ------------------------------------------------------------------------- */
901
902 static void
903 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
904 {
905 #if !defined(THREADED_RTS)
906     //
907     // Check whether any waiting threads need to be woken up.  If the
908     // run queue is empty, and there are no other tasks running, we
909     // can wait indefinitely for something to happen.
910     //
911     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
912     {
913         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
914     }
915 #endif
916 }
917
918
919 /* ----------------------------------------------------------------------------
920  * Check for threads woken up by other Capabilities
921  * ------------------------------------------------------------------------- */
922
923 static void
924 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
925 {
926 #if defined(THREADED_RTS)
927     // Any threads that were woken up by other Capabilities get
928     // appended to our run queue.
929     if (!emptyWakeupQueue(cap)) {
930         ACQUIRE_LOCK(&cap->lock);
931         if (emptyRunQueue(cap)) {
932             cap->run_queue_hd = cap->wakeup_queue_hd;
933             cap->run_queue_tl = cap->wakeup_queue_tl;
934         } else {
935             cap->run_queue_tl->link = cap->wakeup_queue_hd;
936             cap->run_queue_tl = cap->wakeup_queue_tl;
937         }
938         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
939         RELEASE_LOCK(&cap->lock);
940     }
941 #endif
942 }
943
944 /* ----------------------------------------------------------------------------
945  * Check for threads blocked on BLACKHOLEs that can be woken up
946  * ------------------------------------------------------------------------- */
947 static void
948 scheduleCheckBlackHoles (Capability *cap)
949 {
950     if ( blackholes_need_checking ) // check without the lock first
951     {
952         ACQUIRE_LOCK(&sched_mutex);
953         if ( blackholes_need_checking ) {
954             checkBlackHoles(cap);
955             blackholes_need_checking = rtsFalse;
956         }
957         RELEASE_LOCK(&sched_mutex);
958     }
959 }
960
961 /* ----------------------------------------------------------------------------
962  * Detect deadlock conditions and attempt to resolve them.
963  * ------------------------------------------------------------------------- */
964
965 static void
966 scheduleDetectDeadlock (Capability *cap, Task *task)
967 {
968
969 #if defined(PARALLEL_HASKELL)
970     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
971     return;
972 #endif
973
974     /* 
975      * Detect deadlock: when we have no threads to run, there are no
976      * threads blocked, waiting for I/O, or sleeping, and all the
977      * other tasks are waiting for work, we must have a deadlock of
978      * some description.
979      */
980     if ( emptyThreadQueues(cap) )
981     {
982 #if defined(THREADED_RTS)
983         /* 
984          * In the threaded RTS, we only check for deadlock if there
985          * has been no activity in a complete timeslice.  This means
986          * we won't eagerly start a full GC just because we don't have
987          * any threads to run currently.
988          */
989         if (recent_activity != ACTIVITY_INACTIVE) return;
990 #endif
991
992         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
993
994         // Garbage collection can release some new threads due to
995         // either (a) finalizers or (b) threads resurrected because
996         // they are unreachable and will therefore be sent an
997         // exception.  Any threads thus released will be immediately
998         // runnable.
999         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
1000
1001         recent_activity = ACTIVITY_DONE_GC;
1002         
1003         if ( !emptyRunQueue(cap) ) return;
1004
1005 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
1006         /* If we have user-installed signal handlers, then wait
1007          * for signals to arrive rather then bombing out with a
1008          * deadlock.
1009          */
1010         if ( anyUserHandlers() ) {
1011             IF_DEBUG(scheduler, 
1012                      sched_belch("still deadlocked, waiting for signals..."));
1013
1014             awaitUserSignals();
1015
1016             if (signals_pending()) {
1017                 startSignalHandlers(cap);
1018             }
1019
1020             // either we have threads to run, or we were interrupted:
1021             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1022         }
1023 #endif
1024
1025 #if !defined(THREADED_RTS)
1026         /* Probably a real deadlock.  Send the current main thread the
1027          * Deadlock exception.
1028          */
1029         if (task->tso) {
1030             switch (task->tso->why_blocked) {
1031             case BlockedOnSTM:
1032             case BlockedOnBlackHole:
1033             case BlockedOnException:
1034             case BlockedOnMVar:
1035                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
1036                 return;
1037             default:
1038                 barf("deadlock: main thread blocked in a strange way");
1039             }
1040         }
1041         return;
1042 #endif
1043     }
1044 }
1045
1046 /* ----------------------------------------------------------------------------
1047  * Process an event (GRAN only)
1048  * ------------------------------------------------------------------------- */
1049
1050 #if defined(GRAN)
1051 static StgTSO *
1052 scheduleProcessEvent(rtsEvent *event)
1053 {
1054     StgTSO *t;
1055
1056     if (RtsFlags.GranFlags.Light)
1057       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1058
1059     /* adjust time based on time-stamp */
1060     if (event->time > CurrentTime[CurrentProc] &&
1061         event->evttype != ContinueThread)
1062       CurrentTime[CurrentProc] = event->time;
1063     
1064     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1065     if (!RtsFlags.GranFlags.Light)
1066       handleIdlePEs();
1067
1068     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1069
1070     /* main event dispatcher in GranSim */
1071     switch (event->evttype) {
1072       /* Should just be continuing execution */
1073     case ContinueThread:
1074       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1075       /* ToDo: check assertion
1076       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1077              run_queue_hd != END_TSO_QUEUE);
1078       */
1079       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1080       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1081           procStatus[CurrentProc]==Fetching) {
1082         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1083               CurrentTSO->id, CurrentTSO, CurrentProc);
1084         goto next_thread;
1085       } 
1086       /* Ignore ContinueThreads for completed threads */
1087       if (CurrentTSO->what_next == ThreadComplete) {
1088         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1089               CurrentTSO->id, CurrentTSO, CurrentProc);
1090         goto next_thread;
1091       } 
1092       /* Ignore ContinueThreads for threads that are being migrated */
1093       if (PROCS(CurrentTSO)==Nowhere) { 
1094         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1095               CurrentTSO->id, CurrentTSO, CurrentProc);
1096         goto next_thread;
1097       }
1098       /* The thread should be at the beginning of the run queue */
1099       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1100         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1101               CurrentTSO->id, CurrentTSO, CurrentProc);
1102         break; // run the thread anyway
1103       }
1104       /*
1105       new_event(proc, proc, CurrentTime[proc],
1106                 FindWork,
1107                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1108       goto next_thread; 
1109       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1110       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1111
1112     case FetchNode:
1113       do_the_fetchnode(event);
1114       goto next_thread;             /* handle next event in event queue  */
1115       
1116     case GlobalBlock:
1117       do_the_globalblock(event);
1118       goto next_thread;             /* handle next event in event queue  */
1119       
1120     case FetchReply:
1121       do_the_fetchreply(event);
1122       goto next_thread;             /* handle next event in event queue  */
1123       
1124     case UnblockThread:   /* Move from the blocked queue to the tail of */
1125       do_the_unblock(event);
1126       goto next_thread;             /* handle next event in event queue  */
1127       
1128     case ResumeThread:  /* Move from the blocked queue to the tail of */
1129       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1130       event->tso->gran.blocktime += 
1131         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1132       do_the_startthread(event);
1133       goto next_thread;             /* handle next event in event queue  */
1134       
1135     case StartThread:
1136       do_the_startthread(event);
1137       goto next_thread;             /* handle next event in event queue  */
1138       
1139     case MoveThread:
1140       do_the_movethread(event);
1141       goto next_thread;             /* handle next event in event queue  */
1142       
1143     case MoveSpark:
1144       do_the_movespark(event);
1145       goto next_thread;             /* handle next event in event queue  */
1146       
1147     case FindWork:
1148       do_the_findwork(event);
1149       goto next_thread;             /* handle next event in event queue  */
1150       
1151     default:
1152       barf("Illegal event type %u\n", event->evttype);
1153     }  /* switch */
1154     
1155     /* This point was scheduler_loop in the old RTS */
1156
1157     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1158
1159     TimeOfLastEvent = CurrentTime[CurrentProc];
1160     TimeOfNextEvent = get_time_of_next_event();
1161     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1162     // CurrentTSO = ThreadQueueHd;
1163
1164     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1165                          TimeOfNextEvent));
1166
1167     if (RtsFlags.GranFlags.Light) 
1168       GranSimLight_leave_system(event, &ActiveTSO); 
1169
1170     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1171
1172     IF_DEBUG(gran, 
1173              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1174
1175     /* in a GranSim setup the TSO stays on the run queue */
1176     t = CurrentTSO;
1177     /* Take a thread from the run queue. */
1178     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1179
1180     IF_DEBUG(gran, 
1181              debugBelch("GRAN: About to run current thread, which is\n");
1182              G_TSO(t,5));
1183
1184     context_switch = 0; // turned on via GranYield, checking events and time slice
1185
1186     IF_DEBUG(gran, 
1187              DumpGranEvent(GR_SCHEDULE, t));
1188
1189     procStatus[CurrentProc] = Busy;
1190 }
1191 #endif // GRAN
1192
1193 /* ----------------------------------------------------------------------------
1194  * Send pending messages (PARALLEL_HASKELL only)
1195  * ------------------------------------------------------------------------- */
1196
1197 #if defined(PARALLEL_HASKELL)
1198 static StgTSO *
1199 scheduleSendPendingMessages(void)
1200 {
1201     StgSparkPool *pool;
1202     rtsSpark spark;
1203     StgTSO *t;
1204
1205 # if defined(PAR) // global Mem.Mgmt., omit for now
1206     if (PendingFetches != END_BF_QUEUE) {
1207         processFetches();
1208     }
1209 # endif
1210     
1211     if (RtsFlags.ParFlags.BufferTime) {
1212         // if we use message buffering, we must send away all message
1213         // packets which have become too old...
1214         sendOldBuffers(); 
1215     }
1216 }
1217 #endif
1218
1219 /* ----------------------------------------------------------------------------
1220  * Activate spark threads (PARALLEL_HASKELL only)
1221  * ------------------------------------------------------------------------- */
1222
1223 #if defined(PARALLEL_HASKELL)
1224 static void
1225 scheduleActivateSpark(void)
1226 {
1227 #if defined(SPARKS)
1228   ASSERT(emptyRunQueue());
1229 /* We get here if the run queue is empty and want some work.
1230    We try to turn a spark into a thread, and add it to the run queue,
1231    from where it will be picked up in the next iteration of the scheduler
1232    loop.
1233 */
1234
1235       /* :-[  no local threads => look out for local sparks */
1236       /* the spark pool for the current PE */
1237       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1238       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1239           pool->hd < pool->tl) {
1240         /* 
1241          * ToDo: add GC code check that we really have enough heap afterwards!!
1242          * Old comment:
1243          * If we're here (no runnable threads) and we have pending
1244          * sparks, we must have a space problem.  Get enough space
1245          * to turn one of those pending sparks into a
1246          * thread... 
1247          */
1248
1249         spark = findSpark(rtsFalse);            /* get a spark */
1250         if (spark != (rtsSpark) NULL) {
1251           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1252           IF_PAR_DEBUG(fish, // schedule,
1253                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1254                              tso->id, tso, advisory_thread_count));
1255
1256           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1257             IF_PAR_DEBUG(fish, // schedule,
1258                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1259                             spark));
1260             return rtsFalse; /* failed to generate a thread */
1261           }                  /* otherwise fall through & pick-up new tso */
1262         } else {
1263           IF_PAR_DEBUG(fish, // schedule,
1264                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1265                              spark_queue_len(pool)));
1266           return rtsFalse;  /* failed to generate a thread */
1267         }
1268         return rtsTrue;  /* success in generating a thread */
1269   } else { /* no more threads permitted or pool empty */
1270     return rtsFalse;  /* failed to generateThread */
1271   }
1272 #else
1273   tso = NULL; // avoid compiler warning only
1274   return rtsFalse;  /* dummy in non-PAR setup */
1275 #endif // SPARKS
1276 }
1277 #endif // PARALLEL_HASKELL
1278
1279 /* ----------------------------------------------------------------------------
1280  * Get work from a remote node (PARALLEL_HASKELL only)
1281  * ------------------------------------------------------------------------- */
1282     
1283 #if defined(PARALLEL_HASKELL)
1284 static rtsBool
1285 scheduleGetRemoteWork(rtsBool *receivedFinish)
1286 {
1287   ASSERT(emptyRunQueue());
1288
1289   if (RtsFlags.ParFlags.BufferTime) {
1290         IF_PAR_DEBUG(verbose, 
1291                 debugBelch("...send all pending data,"));
1292         {
1293           nat i;
1294           for (i=1; i<=nPEs; i++)
1295             sendImmediately(i); // send all messages away immediately
1296         }
1297   }
1298 # ifndef SPARKS
1299         //++EDEN++ idle() , i.e. send all buffers, wait for work
1300         // suppress fishing in EDEN... just look for incoming messages
1301         // (blocking receive)
1302   IF_PAR_DEBUG(verbose, 
1303                debugBelch("...wait for incoming messages...\n"));
1304   *receivedFinish = processMessages(); // blocking receive...
1305
1306         // and reenter scheduling loop after having received something
1307         // (return rtsFalse below)
1308
1309 # else /* activate SPARKS machinery */
1310 /* We get here, if we have no work, tried to activate a local spark, but still
1311    have no work. We try to get a remote spark, by sending a FISH message.
1312    Thread migration should be added here, and triggered when a sequence of 
1313    fishes returns without work. */
1314         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1315
1316       /* =8-[  no local sparks => look for work on other PEs */
1317         /*
1318          * We really have absolutely no work.  Send out a fish
1319          * (there may be some out there already), and wait for
1320          * something to arrive.  We clearly can't run any threads
1321          * until a SCHEDULE or RESUME arrives, and so that's what
1322          * we're hoping to see.  (Of course, we still have to
1323          * respond to other types of messages.)
1324          */
1325         rtsTime now = msTime() /*CURRENT_TIME*/;
1326         IF_PAR_DEBUG(verbose, 
1327                      debugBelch("--  now=%ld\n", now));
1328         IF_PAR_DEBUG(fish, // verbose,
1329              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1330                  (last_fish_arrived_at!=0 &&
1331                   last_fish_arrived_at+delay > now)) {
1332                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1333                      now, last_fish_arrived_at+delay, 
1334                      last_fish_arrived_at,
1335                      delay);
1336              });
1337   
1338         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1339             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1340           if (last_fish_arrived_at==0 ||
1341               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1342             /* outstandingFishes is set in sendFish, processFish;
1343                avoid flooding system with fishes via delay */
1344     next_fish_to_send_at = 0;  
1345   } else {
1346     /* ToDo: this should be done in the main scheduling loop to avoid the
1347              busy wait here; not so bad if fish delay is very small  */
1348     int iq = 0; // DEBUGGING -- HWL
1349     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1350     /* send a fish when ready, but process messages that arrive in the meantime */
1351     do {
1352       if (PacketsWaiting()) {
1353         iq++; // DEBUGGING
1354         *receivedFinish = processMessages();
1355       }
1356       now = msTime();
1357     } while (!*receivedFinish || now<next_fish_to_send_at);
1358     // JB: This means the fish could become obsolete, if we receive
1359     // work. Better check for work again? 
1360     // last line: while (!receivedFinish || !haveWork || now<...)
1361     // next line: if (receivedFinish || haveWork )
1362
1363     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1364       return rtsFalse;  // NB: this will leave scheduler loop
1365                         // immediately after return!
1366                           
1367     IF_PAR_DEBUG(fish, // verbose,
1368                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1369
1370   }
1371
1372     // JB: IMHO, this should all be hidden inside sendFish(...)
1373     /* pe = choosePE(); 
1374        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1375                 NEW_FISH_HUNGER);
1376
1377     // Global statistics: count no. of fishes
1378     if (RtsFlags.ParFlags.ParStats.Global &&
1379          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1380            globalParStats.tot_fish_mess++;
1381            }
1382     */ 
1383
1384   /* delayed fishes must have been sent by now! */
1385   next_fish_to_send_at = 0;  
1386   }
1387       
1388   *receivedFinish = processMessages();
1389 # endif /* SPARKS */
1390
1391  return rtsFalse;
1392  /* NB: this function always returns rtsFalse, meaning the scheduler
1393     loop continues with the next iteration; 
1394     rationale: 
1395       return code means success in finding work; we enter this function
1396       if there is no local work, thus have to send a fish which takes
1397       time until it arrives with work; in the meantime we should process
1398       messages in the main loop;
1399  */
1400 }
1401 #endif // PARALLEL_HASKELL
1402
1403 /* ----------------------------------------------------------------------------
1404  * PAR/GRAN: Report stats & debugging info(?)
1405  * ------------------------------------------------------------------------- */
1406
1407 #if defined(PAR) || defined(GRAN)
1408 static void
1409 scheduleGranParReport(void)
1410 {
1411   ASSERT(run_queue_hd != END_TSO_QUEUE);
1412
1413   /* Take a thread from the run queue, if we have work */
1414   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1415
1416     /* If this TSO has got its outport closed in the meantime, 
1417      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1418      * It has to be marked as TH_DEAD for this purpose.
1419      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1420
1421 JB: TODO: investigate wether state change field could be nuked
1422      entirely and replaced by the normal tso state (whatnext
1423      field). All we want to do is to kill tsos from outside.
1424      */
1425
1426     /* ToDo: write something to the log-file
1427     if (RTSflags.ParFlags.granSimStats && !sameThread)
1428         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1429
1430     CurrentTSO = t;
1431     */
1432     /* the spark pool for the current PE */
1433     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1434
1435     IF_DEBUG(scheduler, 
1436              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1437                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1438
1439     IF_PAR_DEBUG(fish,
1440              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1441                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1442
1443     if (RtsFlags.ParFlags.ParStats.Full && 
1444         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1445         (emitSchedule || // forced emit
1446          (t && LastTSO && t->id != LastTSO->id))) {
1447       /* 
1448          we are running a different TSO, so write a schedule event to log file
1449          NB: If we use fair scheduling we also have to write  a deschedule 
1450              event for LastTSO; with unfair scheduling we know that the
1451              previous tso has blocked whenever we switch to another tso, so
1452              we don't need it in GUM for now
1453       */
1454       IF_PAR_DEBUG(fish, // schedule,
1455                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1456
1457       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1458                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1459       emitSchedule = rtsFalse;
1460     }
1461 }     
1462 #endif
1463
1464 /* ----------------------------------------------------------------------------
1465  * After running a thread...
1466  * ------------------------------------------------------------------------- */
1467
1468 static void
1469 schedulePostRunThread(void)
1470 {
1471 #if defined(PAR)
1472     /* HACK 675: if the last thread didn't yield, make sure to print a 
1473        SCHEDULE event to the log file when StgRunning the next thread, even
1474        if it is the same one as before */
1475     LastTSO = t; 
1476     TimeOfLastYield = CURRENT_TIME;
1477 #endif
1478
1479   /* some statistics gathering in the parallel case */
1480
1481 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1482   switch (ret) {
1483     case HeapOverflow:
1484 # if defined(GRAN)
1485       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1486       globalGranStats.tot_heapover++;
1487 # elif defined(PAR)
1488       globalParStats.tot_heapover++;
1489 # endif
1490       break;
1491
1492      case StackOverflow:
1493 # if defined(GRAN)
1494       IF_DEBUG(gran, 
1495                DumpGranEvent(GR_DESCHEDULE, t));
1496       globalGranStats.tot_stackover++;
1497 # elif defined(PAR)
1498       // IF_DEBUG(par, 
1499       // DumpGranEvent(GR_DESCHEDULE, t);
1500       globalParStats.tot_stackover++;
1501 # endif
1502       break;
1503
1504     case ThreadYielding:
1505 # if defined(GRAN)
1506       IF_DEBUG(gran, 
1507                DumpGranEvent(GR_DESCHEDULE, t));
1508       globalGranStats.tot_yields++;
1509 # elif defined(PAR)
1510       // IF_DEBUG(par, 
1511       // DumpGranEvent(GR_DESCHEDULE, t);
1512       globalParStats.tot_yields++;
1513 # endif
1514       break; 
1515
1516     case ThreadBlocked:
1517 # if defined(GRAN)
1518       IF_DEBUG(scheduler,
1519                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1520                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1521                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1522                if (t->block_info.closure!=(StgClosure*)NULL)
1523                  print_bq(t->block_info.closure);
1524                debugBelch("\n"));
1525
1526       // ??? needed; should emit block before
1527       IF_DEBUG(gran, 
1528                DumpGranEvent(GR_DESCHEDULE, t)); 
1529       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1530       /*
1531         ngoq Dogh!
1532       ASSERT(procStatus[CurrentProc]==Busy || 
1533               ((procStatus[CurrentProc]==Fetching) && 
1534               (t->block_info.closure!=(StgClosure*)NULL)));
1535       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1536           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1537             procStatus[CurrentProc]==Fetching)) 
1538         procStatus[CurrentProc] = Idle;
1539       */
1540 # elif defined(PAR)
1541 //++PAR++  blockThread() writes the event (change?)
1542 # endif
1543     break;
1544
1545   case ThreadFinished:
1546     break;
1547
1548   default:
1549     barf("parGlobalStats: unknown return code");
1550     break;
1551     }
1552 #endif
1553 }
1554
1555 /* -----------------------------------------------------------------------------
1556  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1557  * -------------------------------------------------------------------------- */
1558
1559 static rtsBool
1560 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1561 {
1562     // did the task ask for a large block?
1563     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1564         // if so, get one and push it on the front of the nursery.
1565         bdescr *bd;
1566         lnat blocks;
1567         
1568         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1569         
1570         IF_DEBUG(scheduler,
1571                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1572                             (long)t->id, whatNext_strs[t->what_next], blocks));
1573         
1574         // don't do this if the nursery is (nearly) full, we'll GC first.
1575         if (cap->r.rCurrentNursery->link != NULL ||
1576             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1577                                                // if the nursery has only one block.
1578             
1579             ACQUIRE_SM_LOCK
1580             bd = allocGroup( blocks );
1581             RELEASE_SM_LOCK
1582             cap->r.rNursery->n_blocks += blocks;
1583             
1584             // link the new group into the list
1585             bd->link = cap->r.rCurrentNursery;
1586             bd->u.back = cap->r.rCurrentNursery->u.back;
1587             if (cap->r.rCurrentNursery->u.back != NULL) {
1588                 cap->r.rCurrentNursery->u.back->link = bd;
1589             } else {
1590 #if !defined(THREADED_RTS)
1591                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1592                        g0s0 == cap->r.rNursery);
1593 #endif
1594                 cap->r.rNursery->blocks = bd;
1595             }             
1596             cap->r.rCurrentNursery->u.back = bd;
1597             
1598             // initialise it as a nursery block.  We initialise the
1599             // step, gen_no, and flags field of *every* sub-block in
1600             // this large block, because this is easier than making
1601             // sure that we always find the block head of a large
1602             // block whenever we call Bdescr() (eg. evacuate() and
1603             // isAlive() in the GC would both have to do this, at
1604             // least).
1605             { 
1606                 bdescr *x;
1607                 for (x = bd; x < bd + blocks; x++) {
1608                     x->step = cap->r.rNursery;
1609                     x->gen_no = 0;
1610                     x->flags = 0;
1611                 }
1612             }
1613             
1614             // This assert can be a killer if the app is doing lots
1615             // of large block allocations.
1616             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1617             
1618             // now update the nursery to point to the new block
1619             cap->r.rCurrentNursery = bd;
1620             
1621             // we might be unlucky and have another thread get on the
1622             // run queue before us and steal the large block, but in that
1623             // case the thread will just end up requesting another large
1624             // block.
1625             pushOnRunQueue(cap,t);
1626             return rtsFalse;  /* not actually GC'ing */
1627         }
1628     }
1629     
1630     IF_DEBUG(scheduler,
1631              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1632                         (long)t->id, whatNext_strs[t->what_next]));
1633 #if defined(GRAN)
1634     ASSERT(!is_on_queue(t,CurrentProc));
1635 #elif defined(PARALLEL_HASKELL)
1636     /* Currently we emit a DESCHEDULE event before GC in GUM.
1637        ToDo: either add separate event to distinguish SYSTEM time from rest
1638        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1639     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1640         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1641                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1642         emitSchedule = rtsTrue;
1643     }
1644 #endif
1645       
1646     pushOnRunQueue(cap,t);
1647     return rtsTrue;
1648     /* actual GC is done at the end of the while loop in schedule() */
1649 }
1650
1651 /* -----------------------------------------------------------------------------
1652  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1653  * -------------------------------------------------------------------------- */
1654
1655 static void
1656 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1657 {
1658     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1659                                   (long)t->id, whatNext_strs[t->what_next]));
1660     /* just adjust the stack for this thread, then pop it back
1661      * on the run queue.
1662      */
1663     { 
1664         /* enlarge the stack */
1665         StgTSO *new_t = threadStackOverflow(cap, t);
1666         
1667         /* The TSO attached to this Task may have moved, so update the
1668          * pointer to it.
1669          */
1670         if (task->tso == t) {
1671             task->tso = new_t;
1672         }
1673         pushOnRunQueue(cap,new_t);
1674     }
1675 }
1676
1677 /* -----------------------------------------------------------------------------
1678  * Handle a thread that returned to the scheduler with ThreadYielding
1679  * -------------------------------------------------------------------------- */
1680
1681 static rtsBool
1682 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1683 {
1684     // Reset the context switch flag.  We don't do this just before
1685     // running the thread, because that would mean we would lose ticks
1686     // during GC, which can lead to unfair scheduling (a thread hogs
1687     // the CPU because the tick always arrives during GC).  This way
1688     // penalises threads that do a lot of allocation, but that seems
1689     // better than the alternative.
1690     context_switch = 0;
1691     
1692     /* put the thread back on the run queue.  Then, if we're ready to
1693      * GC, check whether this is the last task to stop.  If so, wake
1694      * up the GC thread.  getThread will block during a GC until the
1695      * GC is finished.
1696      */
1697     IF_DEBUG(scheduler,
1698              if (t->what_next != prev_what_next) {
1699                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1700                             (long)t->id, whatNext_strs[t->what_next]);
1701              } else {
1702                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1703                             (long)t->id, whatNext_strs[t->what_next]);
1704              }
1705         );
1706     
1707     IF_DEBUG(sanity,
1708              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1709              checkTSO(t));
1710     ASSERT(t->link == END_TSO_QUEUE);
1711     
1712     // Shortcut if we're just switching evaluators: don't bother
1713     // doing stack squeezing (which can be expensive), just run the
1714     // thread.
1715     if (t->what_next != prev_what_next) {
1716         return rtsTrue;
1717     }
1718     
1719 #if defined(GRAN)
1720     ASSERT(!is_on_queue(t,CurrentProc));
1721       
1722     IF_DEBUG(sanity,
1723              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1724              checkThreadQsSanity(rtsTrue));
1725
1726 #endif
1727
1728     addToRunQueue(cap,t);
1729
1730 #if defined(GRAN)
1731     /* add a ContinueThread event to actually process the thread */
1732     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1733               ContinueThread,
1734               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1735     IF_GRAN_DEBUG(bq, 
1736                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1737                   G_EVENTQ(0);
1738                   G_CURR_THREADQ(0));
1739 #endif
1740     return rtsFalse;
1741 }
1742
1743 /* -----------------------------------------------------------------------------
1744  * Handle a thread that returned to the scheduler with ThreadBlocked
1745  * -------------------------------------------------------------------------- */
1746
1747 static void
1748 scheduleHandleThreadBlocked( StgTSO *t
1749 #if !defined(GRAN) && !defined(DEBUG)
1750     STG_UNUSED
1751 #endif
1752     )
1753 {
1754 #if defined(GRAN)
1755     IF_DEBUG(scheduler,
1756              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1757                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1758              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1759     
1760     // ??? needed; should emit block before
1761     IF_DEBUG(gran, 
1762              DumpGranEvent(GR_DESCHEDULE, t)); 
1763     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1764     /*
1765       ngoq Dogh!
1766       ASSERT(procStatus[CurrentProc]==Busy || 
1767       ((procStatus[CurrentProc]==Fetching) && 
1768       (t->block_info.closure!=(StgClosure*)NULL)));
1769       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1770       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1771       procStatus[CurrentProc]==Fetching)) 
1772       procStatus[CurrentProc] = Idle;
1773     */
1774 #elif defined(PAR)
1775     IF_DEBUG(scheduler,
1776              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1777                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1778     IF_PAR_DEBUG(bq,
1779                  
1780                  if (t->block_info.closure!=(StgClosure*)NULL) 
1781                  print_bq(t->block_info.closure));
1782     
1783     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1784     blockThread(t);
1785     
1786     /* whatever we schedule next, we must log that schedule */
1787     emitSchedule = rtsTrue;
1788     
1789 #else /* !GRAN */
1790
1791       // We don't need to do anything.  The thread is blocked, and it
1792       // has tidied up its stack and placed itself on whatever queue
1793       // it needs to be on.
1794
1795 #if !defined(THREADED_RTS)
1796     ASSERT(t->why_blocked != NotBlocked);
1797              // This might not be true under THREADED_RTS: we don't have
1798              // exclusive access to this TSO, so someone might have
1799              // woken it up by now.  This actually happens: try
1800              // conc023 +RTS -N2.
1801 #endif
1802
1803     IF_DEBUG(scheduler,
1804              debugBelch("--<< thread %d (%s) stopped: ", 
1805                         t->id, whatNext_strs[t->what_next]);
1806              printThreadBlockage(t);
1807              debugBelch("\n"));
1808     
1809     /* Only for dumping event to log file 
1810        ToDo: do I need this in GranSim, too?
1811        blockThread(t);
1812     */
1813 #endif
1814 }
1815
1816 /* -----------------------------------------------------------------------------
1817  * Handle a thread that returned to the scheduler with ThreadFinished
1818  * -------------------------------------------------------------------------- */
1819
1820 static rtsBool
1821 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1822 {
1823     /* Need to check whether this was a main thread, and if so,
1824      * return with the return value.
1825      *
1826      * We also end up here if the thread kills itself with an
1827      * uncaught exception, see Exception.cmm.
1828      */
1829     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1830                                   t->id, whatNext_strs[t->what_next]));
1831
1832 #if defined(GRAN)
1833       endThread(t, CurrentProc); // clean-up the thread
1834 #elif defined(PARALLEL_HASKELL)
1835       /* For now all are advisory -- HWL */
1836       //if(t->priority==AdvisoryPriority) ??
1837       advisory_thread_count--; // JB: Caution with this counter, buggy!
1838       
1839 # if defined(DIST)
1840       if(t->dist.priority==RevalPriority)
1841         FinishReval(t);
1842 # endif
1843     
1844 # if defined(EDENOLD)
1845       // the thread could still have an outport... (BUG)
1846       if (t->eden.outport != -1) {
1847       // delete the outport for the tso which has finished...
1848         IF_PAR_DEBUG(eden_ports,
1849                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1850                               t->eden.outport, t->id));
1851         deleteOPT(t);
1852       }
1853       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1854       if (t->eden.epid != -1) {
1855         IF_PAR_DEBUG(eden_ports,
1856                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1857                            t->id, t->eden.epid));
1858         removeTSOfromProcess(t);
1859       }
1860 # endif 
1861
1862 # if defined(PAR)
1863       if (RtsFlags.ParFlags.ParStats.Full &&
1864           !RtsFlags.ParFlags.ParStats.Suppressed) 
1865         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1866
1867       //  t->par only contains statistics: left out for now...
1868       IF_PAR_DEBUG(fish,
1869                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1870                               t->id,t,t->par.sparkname));
1871 # endif
1872 #endif // PARALLEL_HASKELL
1873
1874       //
1875       // Check whether the thread that just completed was a bound
1876       // thread, and if so return with the result.  
1877       //
1878       // There is an assumption here that all thread completion goes
1879       // through this point; we need to make sure that if a thread
1880       // ends up in the ThreadKilled state, that it stays on the run
1881       // queue so it can be dealt with here.
1882       //
1883
1884       if (t->bound) {
1885
1886           if (t->bound != task) {
1887 #if !defined(THREADED_RTS)
1888               // Must be a bound thread that is not the topmost one.  Leave
1889               // it on the run queue until the stack has unwound to the
1890               // point where we can deal with this.  Leaving it on the run
1891               // queue also ensures that the garbage collector knows about
1892               // this thread and its return value (it gets dropped from the
1893               // all_threads list so there's no other way to find it).
1894               appendToRunQueue(cap,t);
1895               return rtsFalse;
1896 #else
1897               // this cannot happen in the threaded RTS, because a
1898               // bound thread can only be run by the appropriate Task.
1899               barf("finished bound thread that isn't mine");
1900 #endif
1901           }
1902
1903           ASSERT(task->tso == t);
1904
1905           if (t->what_next == ThreadComplete) {
1906               if (task->ret) {
1907                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1908                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1909               }
1910               task->stat = Success;
1911           } else {
1912               if (task->ret) {
1913                   *(task->ret) = NULL;
1914               }
1915               if (sched_state >= SCHED_INTERRUPTING) {
1916                   task->stat = Interrupted;
1917               } else {
1918                   task->stat = Killed;
1919               }
1920           }
1921 #ifdef DEBUG
1922           removeThreadLabel((StgWord)task->tso->id);
1923 #endif
1924           return rtsTrue; // tells schedule() to return
1925       }
1926
1927       return rtsFalse;
1928 }
1929
1930 /* -----------------------------------------------------------------------------
1931  * Perform a heap census, if PROFILING
1932  * -------------------------------------------------------------------------- */
1933
1934 static rtsBool
1935 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1936 {
1937 #if defined(PROFILING)
1938     // When we have +RTS -i0 and we're heap profiling, do a census at
1939     // every GC.  This lets us get repeatable runs for debugging.
1940     if (performHeapProfile ||
1941         (RtsFlags.ProfFlags.profileInterval==0 &&
1942          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1943
1944         // checking black holes is necessary before GC, otherwise
1945         // there may be threads that are unreachable except by the
1946         // blackhole queue, which the GC will consider to be
1947         // deadlocked.
1948         scheduleCheckBlackHoles(&MainCapability);
1949
1950         IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
1951         GarbageCollect(GetRoots, rtsTrue);
1952
1953         IF_DEBUG(scheduler, sched_belch("performing heap census"));
1954         heapCensus();
1955
1956         performHeapProfile = rtsFalse;
1957         return rtsTrue;  // true <=> we already GC'd
1958     }
1959 #endif
1960     return rtsFalse;
1961 }
1962
1963 /* -----------------------------------------------------------------------------
1964  * Perform a garbage collection if necessary
1965  * -------------------------------------------------------------------------- */
1966
1967 static Capability *
1968 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1969               rtsBool force_major, void (*get_roots)(evac_fn))
1970 {
1971     StgTSO *t;
1972 #ifdef THREADED_RTS
1973     static volatile StgWord waiting_for_gc;
1974     rtsBool was_waiting;
1975     nat i;
1976 #endif
1977
1978 #ifdef THREADED_RTS
1979     // In order to GC, there must be no threads running Haskell code.
1980     // Therefore, the GC thread needs to hold *all* the capabilities,
1981     // and release them after the GC has completed.  
1982     //
1983     // This seems to be the simplest way: previous attempts involved
1984     // making all the threads with capabilities give up their
1985     // capabilities and sleep except for the *last* one, which
1986     // actually did the GC.  But it's quite hard to arrange for all
1987     // the other tasks to sleep and stay asleep.
1988     //
1989         
1990     was_waiting = cas(&waiting_for_gc, 0, 1);
1991     if (was_waiting) {
1992         do {
1993             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1994             if (cap) yieldCapability(&cap,task);
1995         } while (waiting_for_gc);
1996         return cap;  // NOTE: task->cap might have changed here
1997     }
1998
1999     for (i=0; i < n_capabilities; i++) {
2000         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
2001         if (cap != &capabilities[i]) {
2002             Capability *pcap = &capabilities[i];
2003             // we better hope this task doesn't get migrated to
2004             // another Capability while we're waiting for this one.
2005             // It won't, because load balancing happens while we have
2006             // all the Capabilities, but even so it's a slightly
2007             // unsavoury invariant.
2008             task->cap = pcap;
2009             context_switch = 1;
2010             waitForReturnCapability(&pcap, task);
2011             if (pcap != &capabilities[i]) {
2012                 barf("scheduleDoGC: got the wrong capability");
2013             }
2014         }
2015     }
2016
2017     waiting_for_gc = rtsFalse;
2018 #endif
2019
2020     /* Kick any transactions which are invalid back to their
2021      * atomically frames.  When next scheduled they will try to
2022      * commit, this commit will fail and they will retry.
2023      */
2024     { 
2025         StgTSO *next;
2026
2027         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2028             if (t->what_next == ThreadRelocated) {
2029                 next = t->link;
2030             } else {
2031                 next = t->global_link;
2032                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2033                     if (!stmValidateNestOfTransactions (t -> trec)) {
2034                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
2035                         
2036                         // strip the stack back to the
2037                         // ATOMICALLY_FRAME, aborting the (nested)
2038                         // transaction, and saving the stack of any
2039                         // partially-evaluated thunks on the heap.
2040                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
2041                         
2042 #ifdef REG_R1
2043                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2044 #endif
2045                     }
2046                 }
2047             }
2048         }
2049     }
2050     
2051     // so this happens periodically:
2052     if (cap) scheduleCheckBlackHoles(cap);
2053     
2054     IF_DEBUG(scheduler, printAllThreads());
2055
2056     /*
2057      * We now have all the capabilities; if we're in an interrupting
2058      * state, then we should take the opportunity to delete all the
2059      * threads in the system.
2060      */
2061     if (sched_state >= SCHED_INTERRUPTING) {
2062         deleteAllThreads(&capabilities[0]);
2063         sched_state = SCHED_INTERRUPTED;
2064     }
2065
2066     /* everybody back, start the GC.
2067      * Could do it in this thread, or signal a condition var
2068      * to do it in another thread.  Either way, we need to
2069      * broadcast on gc_pending_cond afterward.
2070      */
2071 #if defined(THREADED_RTS)
2072     IF_DEBUG(scheduler,sched_belch("doing GC"));
2073 #endif
2074     GarbageCollect(get_roots, force_major);
2075     
2076 #if defined(THREADED_RTS)
2077     // release our stash of capabilities.
2078     for (i = 0; i < n_capabilities; i++) {
2079         if (cap != &capabilities[i]) {
2080             task->cap = &capabilities[i];
2081             releaseCapability(&capabilities[i]);
2082         }
2083     }
2084     if (cap) {
2085         task->cap = cap;
2086     } else {
2087         task->cap = NULL;
2088     }
2089 #endif
2090
2091 #if defined(GRAN)
2092     /* add a ContinueThread event to continue execution of current thread */
2093     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2094               ContinueThread,
2095               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2096     IF_GRAN_DEBUG(bq, 
2097                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2098                   G_EVENTQ(0);
2099                   G_CURR_THREADQ(0));
2100 #endif /* GRAN */
2101
2102     return cap;
2103 }
2104
2105 /* ---------------------------------------------------------------------------
2106  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2107  * used by Control.Concurrent for error checking.
2108  * ------------------------------------------------------------------------- */
2109  
2110 StgBool
2111 rtsSupportsBoundThreads(void)
2112 {
2113 #if defined(THREADED_RTS)
2114   return rtsTrue;
2115 #else
2116   return rtsFalse;
2117 #endif
2118 }
2119
2120 /* ---------------------------------------------------------------------------
2121  * isThreadBound(tso): check whether tso is bound to an OS thread.
2122  * ------------------------------------------------------------------------- */
2123  
2124 StgBool
2125 isThreadBound(StgTSO* tso USED_IF_THREADS)
2126 {
2127 #if defined(THREADED_RTS)
2128   return (tso->bound != NULL);
2129 #endif
2130   return rtsFalse;
2131 }
2132
2133 /* ---------------------------------------------------------------------------
2134  * Singleton fork(). Do not copy any running threads.
2135  * ------------------------------------------------------------------------- */
2136
2137 #if !defined(mingw32_HOST_OS)
2138 #define FORKPROCESS_PRIMOP_SUPPORTED
2139 #endif
2140
2141 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2142 static void 
2143 deleteThread_(Capability *cap, StgTSO *tso);
2144 #endif
2145 StgInt
2146 forkProcess(HsStablePtr *entry
2147 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2148             STG_UNUSED
2149 #endif
2150            )
2151 {
2152 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2153     Task *task;
2154     pid_t pid;
2155     StgTSO* t,*next;
2156     Capability *cap;
2157     
2158 #if defined(THREADED_RTS)
2159     if (RtsFlags.ParFlags.nNodes > 1) {
2160         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2161         stg_exit(EXIT_FAILURE);
2162     }
2163 #endif
2164
2165     IF_DEBUG(scheduler,sched_belch("forking!"));
2166     
2167     // ToDo: for SMP, we should probably acquire *all* the capabilities
2168     cap = rts_lock();
2169     
2170     pid = fork();
2171     
2172     if (pid) { // parent
2173         
2174         // just return the pid
2175         rts_unlock(cap);
2176         return pid;
2177         
2178     } else { // child
2179         
2180         // Now, all OS threads except the thread that forked are
2181         // stopped.  We need to stop all Haskell threads, including
2182         // those involved in foreign calls.  Also we need to delete
2183         // all Tasks, because they correspond to OS threads that are
2184         // now gone.
2185
2186         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2187             if (t->what_next == ThreadRelocated) {
2188                 next = t->link;
2189             } else {
2190                 next = t->global_link;
2191                 // don't allow threads to catch the ThreadKilled
2192                 // exception, but we do want to raiseAsync() because these
2193                 // threads may be evaluating thunks that we need later.
2194                 deleteThread_(cap,t);
2195             }
2196         }
2197         
2198         // Empty the run queue.  It seems tempting to let all the
2199         // killed threads stay on the run queue as zombies to be
2200         // cleaned up later, but some of them correspond to bound
2201         // threads for which the corresponding Task does not exist.
2202         cap->run_queue_hd = END_TSO_QUEUE;
2203         cap->run_queue_tl = END_TSO_QUEUE;
2204
2205         // Any suspended C-calling Tasks are no more, their OS threads
2206         // don't exist now:
2207         cap->suspended_ccalling_tasks = NULL;
2208
2209         // Empty the all_threads list.  Otherwise, the garbage
2210         // collector may attempt to resurrect some of these threads.
2211         all_threads = END_TSO_QUEUE;
2212
2213         // Wipe the task list, except the current Task.
2214         ACQUIRE_LOCK(&sched_mutex);
2215         for (task = all_tasks; task != NULL; task=task->all_link) {
2216             if (task != cap->running_task) {
2217                 discardTask(task);
2218             }
2219         }
2220         RELEASE_LOCK(&sched_mutex);
2221
2222 #if defined(THREADED_RTS)
2223         // Wipe our spare workers list, they no longer exist.  New
2224         // workers will be created if necessary.
2225         cap->spare_workers = NULL;
2226         cap->returning_tasks_hd = NULL;
2227         cap->returning_tasks_tl = NULL;
2228 #endif
2229
2230         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2231         rts_checkSchedStatus("forkProcess",cap);
2232         
2233         rts_unlock(cap);
2234         hs_exit();                      // clean up and exit
2235         stg_exit(EXIT_SUCCESS);
2236     }
2237 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2238     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2239     return -1;
2240 #endif
2241 }
2242
2243 /* ---------------------------------------------------------------------------
2244  * Delete all the threads in the system
2245  * ------------------------------------------------------------------------- */
2246    
2247 static void
2248 deleteAllThreads ( Capability *cap )
2249 {
2250   StgTSO* t, *next;
2251   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2252   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2253       if (t->what_next == ThreadRelocated) {
2254           next = t->link;
2255       } else {
2256           next = t->global_link;
2257           deleteThread(cap,t);
2258       }
2259   }      
2260
2261   // The run queue now contains a bunch of ThreadKilled threads.  We
2262   // must not throw these away: the main thread(s) will be in there
2263   // somewhere, and the main scheduler loop has to deal with it.
2264   // Also, the run queue is the only thing keeping these threads from
2265   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2266
2267 #if !defined(THREADED_RTS)
2268   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2269   ASSERT(sleeping_queue == END_TSO_QUEUE);
2270 #endif
2271 }
2272
2273 /* -----------------------------------------------------------------------------
2274    Managing the suspended_ccalling_tasks list.
2275    Locks required: sched_mutex
2276    -------------------------------------------------------------------------- */
2277
2278 STATIC_INLINE void
2279 suspendTask (Capability *cap, Task *task)
2280 {
2281     ASSERT(task->next == NULL && task->prev == NULL);
2282     task->next = cap->suspended_ccalling_tasks;
2283     task->prev = NULL;
2284     if (cap->suspended_ccalling_tasks) {
2285         cap->suspended_ccalling_tasks->prev = task;
2286     }
2287     cap->suspended_ccalling_tasks = task;
2288 }
2289
2290 STATIC_INLINE void
2291 recoverSuspendedTask (Capability *cap, Task *task)
2292 {
2293     if (task->prev) {
2294         task->prev->next = task->next;
2295     } else {
2296         ASSERT(cap->suspended_ccalling_tasks == task);
2297         cap->suspended_ccalling_tasks = task->next;
2298     }
2299     if (task->next) {
2300         task->next->prev = task->prev;
2301     }
2302     task->next = task->prev = NULL;
2303 }
2304
2305 /* ---------------------------------------------------------------------------
2306  * Suspending & resuming Haskell threads.
2307  * 
2308  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2309  * its capability before calling the C function.  This allows another
2310  * task to pick up the capability and carry on running Haskell
2311  * threads.  It also means that if the C call blocks, it won't lock
2312  * the whole system.
2313  *
2314  * The Haskell thread making the C call is put to sleep for the
2315  * duration of the call, on the susepended_ccalling_threads queue.  We
2316  * give out a token to the task, which it can use to resume the thread
2317  * on return from the C function.
2318  * ------------------------------------------------------------------------- */
2319    
2320 void *
2321 suspendThread (StgRegTable *reg)
2322 {
2323   Capability *cap;
2324   int saved_errno = errno;
2325   StgTSO *tso;
2326   Task *task;
2327
2328   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2329    */
2330   cap = regTableToCapability(reg);
2331
2332   task = cap->running_task;
2333   tso = cap->r.rCurrentTSO;
2334
2335   IF_DEBUG(scheduler,
2336            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2337
2338   // XXX this might not be necessary --SDM
2339   tso->what_next = ThreadRunGHC;
2340
2341   threadPaused(cap,tso);
2342
2343   if(tso->blocked_exceptions == NULL)  {
2344       tso->why_blocked = BlockedOnCCall;
2345       tso->blocked_exceptions = END_TSO_QUEUE;
2346   } else {
2347       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2348   }
2349
2350   // Hand back capability
2351   task->suspended_tso = tso;
2352
2353   ACQUIRE_LOCK(&cap->lock);
2354
2355   suspendTask(cap,task);
2356   cap->in_haskell = rtsFalse;
2357   releaseCapability_(cap);
2358   
2359   RELEASE_LOCK(&cap->lock);
2360
2361 #if defined(THREADED_RTS)
2362   /* Preparing to leave the RTS, so ensure there's a native thread/task
2363      waiting to take over.
2364   */
2365   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2366 #endif
2367
2368   errno = saved_errno;
2369   return task;
2370 }
2371
2372 StgRegTable *
2373 resumeThread (void *task_)
2374 {
2375     StgTSO *tso;
2376     Capability *cap;
2377     int saved_errno = errno;
2378     Task *task = task_;
2379
2380     cap = task->cap;
2381     // Wait for permission to re-enter the RTS with the result.
2382     waitForReturnCapability(&cap,task);
2383     // we might be on a different capability now... but if so, our
2384     // entry on the suspended_ccalling_tasks list will also have been
2385     // migrated.
2386
2387     // Remove the thread from the suspended list
2388     recoverSuspendedTask(cap,task);
2389
2390     tso = task->suspended_tso;
2391     task->suspended_tso = NULL;
2392     tso->link = END_TSO_QUEUE;
2393     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2394     
2395     if (tso->why_blocked == BlockedOnCCall) {
2396         awakenBlockedQueue(cap,tso->blocked_exceptions);
2397         tso->blocked_exceptions = NULL;
2398     }
2399     
2400     /* Reset blocking status */
2401     tso->why_blocked  = NotBlocked;
2402     
2403     cap->r.rCurrentTSO = tso;
2404     cap->in_haskell = rtsTrue;
2405     errno = saved_errno;
2406
2407     /* We might have GC'd, mark the TSO dirty again */
2408     dirtyTSO(tso);
2409
2410     IF_DEBUG(sanity, checkTSO(tso));
2411
2412     return &cap->r;
2413 }
2414
2415 /* ---------------------------------------------------------------------------
2416  * Comparing Thread ids.
2417  *
2418  * This is used from STG land in the implementation of the
2419  * instances of Eq/Ord for ThreadIds.
2420  * ------------------------------------------------------------------------ */
2421
2422 int
2423 cmp_thread(StgPtr tso1, StgPtr tso2) 
2424
2425   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2426   StgThreadID id2 = ((StgTSO *)tso2)->id;
2427  
2428   if (id1 < id2) return (-1);
2429   if (id1 > id2) return 1;
2430   return 0;
2431 }
2432
2433 /* ---------------------------------------------------------------------------
2434  * Fetching the ThreadID from an StgTSO.
2435  *
2436  * This is used in the implementation of Show for ThreadIds.
2437  * ------------------------------------------------------------------------ */
2438 int
2439 rts_getThreadId(StgPtr tso) 
2440 {
2441   return ((StgTSO *)tso)->id;
2442 }
2443
2444 #ifdef DEBUG
2445 void
2446 labelThread(StgPtr tso, char *label)
2447 {
2448   int len;
2449   void *buf;
2450
2451   /* Caveat: Once set, you can only set the thread name to "" */
2452   len = strlen(label)+1;
2453   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2454   strncpy(buf,label,len);
2455   /* Update will free the old memory for us */
2456   updateThreadLabel(((StgTSO *)tso)->id,buf);
2457 }
2458 #endif /* DEBUG */
2459
2460 /* ---------------------------------------------------------------------------
2461    Create a new thread.
2462
2463    The new thread starts with the given stack size.  Before the
2464    scheduler can run, however, this thread needs to have a closure
2465    (and possibly some arguments) pushed on its stack.  See
2466    pushClosure() in Schedule.h.
2467
2468    createGenThread() and createIOThread() (in SchedAPI.h) are
2469    convenient packaged versions of this function.
2470
2471    currently pri (priority) is only used in a GRAN setup -- HWL
2472    ------------------------------------------------------------------------ */
2473 #if defined(GRAN)
2474 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2475 StgTSO *
2476 createThread(nat size, StgInt pri)
2477 #else
2478 StgTSO *
2479 createThread(Capability *cap, nat size)
2480 #endif
2481 {
2482     StgTSO *tso;
2483     nat stack_size;
2484
2485     /* sched_mutex is *not* required */
2486
2487     /* First check whether we should create a thread at all */
2488 #if defined(PARALLEL_HASKELL)
2489     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2490     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2491         threadsIgnored++;
2492         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2493                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2494         return END_TSO_QUEUE;
2495     }
2496     threadsCreated++;
2497 #endif
2498
2499 #if defined(GRAN)
2500     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2501 #endif
2502
2503     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2504
2505     /* catch ridiculously small stack sizes */
2506     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2507         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2508     }
2509
2510     stack_size = size - TSO_STRUCT_SIZEW;
2511     
2512     tso = (StgTSO *)allocateLocal(cap, size);
2513     TICK_ALLOC_TSO(stack_size, 0);
2514
2515     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2516 #if defined(GRAN)
2517     SET_GRAN_HDR(tso, ThisPE);
2518 #endif
2519
2520     // Always start with the compiled code evaluator
2521     tso->what_next = ThreadRunGHC;
2522
2523     tso->why_blocked  = NotBlocked;
2524     tso->blocked_exceptions = NULL;
2525     tso->flags = TSO_DIRTY;
2526     
2527     tso->saved_errno = 0;
2528     tso->bound = NULL;
2529     tso->cap = cap;
2530     
2531     tso->stack_size     = stack_size;
2532     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2533                           - TSO_STRUCT_SIZEW;
2534     tso->sp             = (P_)&(tso->stack) + stack_size;
2535
2536     tso->trec = NO_TREC;
2537     
2538 #ifdef PROFILING
2539     tso->prof.CCCS = CCS_MAIN;
2540 #endif
2541     
2542   /* put a stop frame on the stack */
2543     tso->sp -= sizeofW(StgStopFrame);
2544     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2545     tso->link = END_TSO_QUEUE;
2546     
2547   // ToDo: check this
2548 #if defined(GRAN)
2549     /* uses more flexible routine in GranSim */
2550     insertThread(tso, CurrentProc);
2551 #else
2552     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2553      * from its creation
2554      */
2555 #endif
2556     
2557 #if defined(GRAN) 
2558     if (RtsFlags.GranFlags.GranSimStats.Full) 
2559         DumpGranEvent(GR_START,tso);
2560 #elif defined(PARALLEL_HASKELL)
2561     if (RtsFlags.ParFlags.ParStats.Full) 
2562         DumpGranEvent(GR_STARTQ,tso);
2563     /* HACk to avoid SCHEDULE 
2564        LastTSO = tso; */
2565 #endif
2566     
2567     /* Link the new thread on the global thread list.
2568      */
2569     ACQUIRE_LOCK(&sched_mutex);
2570     tso->id = next_thread_id++;  // while we have the mutex
2571     tso->global_link = all_threads;
2572     all_threads = tso;
2573     RELEASE_LOCK(&sched_mutex);
2574     
2575 #if defined(DIST)
2576     tso->dist.priority = MandatoryPriority; //by default that is...
2577 #endif
2578     
2579 #if defined(GRAN)
2580     tso->gran.pri = pri;
2581 # if defined(DEBUG)
2582     tso->gran.magic = TSO_MAGIC; // debugging only
2583 # endif
2584     tso->gran.sparkname   = 0;
2585     tso->gran.startedat   = CURRENT_TIME; 
2586     tso->gran.exported    = 0;
2587     tso->gran.basicblocks = 0;
2588     tso->gran.allocs      = 0;
2589     tso->gran.exectime    = 0;
2590     tso->gran.fetchtime   = 0;
2591     tso->gran.fetchcount  = 0;
2592     tso->gran.blocktime   = 0;
2593     tso->gran.blockcount  = 0;
2594     tso->gran.blockedat   = 0;
2595     tso->gran.globalsparks = 0;
2596     tso->gran.localsparks  = 0;
2597     if (RtsFlags.GranFlags.Light)
2598         tso->gran.clock  = Now; /* local clock */
2599     else
2600         tso->gran.clock  = 0;
2601     
2602     IF_DEBUG(gran,printTSO(tso));
2603 #elif defined(PARALLEL_HASKELL)
2604 # if defined(DEBUG)
2605     tso->par.magic = TSO_MAGIC; // debugging only
2606 # endif
2607     tso->par.sparkname   = 0;
2608     tso->par.startedat   = CURRENT_TIME; 
2609     tso->par.exported    = 0;
2610     tso->par.basicblocks = 0;
2611     tso->par.allocs      = 0;
2612     tso->par.exectime    = 0;
2613     tso->par.fetchtime   = 0;
2614     tso->par.fetchcount  = 0;
2615     tso->par.blocktime   = 0;
2616     tso->par.blockcount  = 0;
2617     tso->par.blockedat   = 0;
2618     tso->par.globalsparks = 0;
2619     tso->par.localsparks  = 0;
2620 #endif
2621     
2622 #if defined(GRAN)
2623     globalGranStats.tot_threads_created++;
2624     globalGranStats.threads_created_on_PE[CurrentProc]++;
2625     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2626     globalGranStats.tot_sq_probes++;
2627 #elif defined(PARALLEL_HASKELL)
2628     // collect parallel global statistics (currently done together with GC stats)
2629     if (RtsFlags.ParFlags.ParStats.Global &&
2630         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2631         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2632         globalParStats.tot_threads_created++;
2633     }
2634 #endif 
2635     
2636 #if defined(GRAN)
2637     IF_GRAN_DEBUG(pri,
2638                   sched_belch("==__ schedule: Created TSO %d (%p);",
2639                               CurrentProc, tso, tso->id));
2640 #elif defined(PARALLEL_HASKELL)
2641     IF_PAR_DEBUG(verbose,
2642                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2643                              (long)tso->id, tso, advisory_thread_count));
2644 #else
2645     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2646                                    (long)tso->id, (long)tso->stack_size));
2647 #endif    
2648     return tso;
2649 }
2650
2651 #if defined(PAR)
2652 /* RFP:
2653    all parallel thread creation calls should fall through the following routine.
2654 */
2655 StgTSO *
2656 createThreadFromSpark(rtsSpark spark) 
2657 { StgTSO *tso;
2658   ASSERT(spark != (rtsSpark)NULL);
2659 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2660   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2661   { threadsIgnored++;
2662     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2663           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2664     return END_TSO_QUEUE;
2665   }
2666   else
2667   { threadsCreated++;
2668     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2669     if (tso==END_TSO_QUEUE)     
2670       barf("createSparkThread: Cannot create TSO");
2671 #if defined(DIST)
2672     tso->priority = AdvisoryPriority;
2673 #endif
2674     pushClosure(tso,spark);
2675     addToRunQueue(tso);
2676     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2677   }
2678   return tso;
2679 }
2680 #endif
2681
2682 /*
2683   Turn a spark into a thread.
2684   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2685 */
2686 #if 0
2687 StgTSO *
2688 activateSpark (rtsSpark spark) 
2689 {
2690   StgTSO *tso;
2691
2692   tso = createSparkThread(spark);
2693   if (RtsFlags.ParFlags.ParStats.Full) {   
2694     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2695       IF_PAR_DEBUG(verbose,
2696                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2697                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2698   }
2699   // ToDo: fwd info on local/global spark to thread -- HWL
2700   // tso->gran.exported =  spark->exported;
2701   // tso->gran.locked =   !spark->global;
2702   // tso->gran.sparkname = spark->name;
2703
2704   return tso;
2705 }
2706 #endif
2707
2708 /* ---------------------------------------------------------------------------
2709  * scheduleThread()
2710  *
2711  * scheduleThread puts a thread on the end  of the runnable queue.
2712  * This will usually be done immediately after a thread is created.
2713  * The caller of scheduleThread must create the thread using e.g.
2714  * createThread and push an appropriate closure
2715  * on this thread's stack before the scheduler is invoked.
2716  * ------------------------------------------------------------------------ */
2717
2718 void
2719 scheduleThread(Capability *cap, StgTSO *tso)
2720 {
2721     // The thread goes at the *end* of the run-queue, to avoid possible
2722     // starvation of any threads already on the queue.
2723     appendToRunQueue(cap,tso);
2724 }
2725
2726 void
2727 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2728 {
2729 #if defined(THREADED_RTS)
2730     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2731                               // move this thread from now on.
2732     cpu %= RtsFlags.ParFlags.nNodes;
2733     if (cpu == cap->no) {
2734         appendToRunQueue(cap,tso);
2735     } else {
2736         Capability *target_cap = &capabilities[cpu];
2737         if (tso->bound) {
2738             tso->bound->cap = target_cap;
2739         }
2740         tso->cap = target_cap;
2741         wakeupThreadOnCapability(target_cap,tso);
2742     }
2743 #else
2744     appendToRunQueue(cap,tso);
2745 #endif
2746 }
2747
2748 Capability *
2749 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2750 {
2751     Task *task;
2752
2753     // We already created/initialised the Task
2754     task = cap->running_task;
2755
2756     // This TSO is now a bound thread; make the Task and TSO
2757     // point to each other.
2758     tso->bound = task;
2759     tso->cap = cap;
2760
2761     task->tso = tso;
2762     task->ret = ret;
2763     task->stat = NoStatus;
2764
2765     appendToRunQueue(cap,tso);
2766
2767     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2768
2769 #if defined(GRAN)
2770     /* GranSim specific init */
2771     CurrentTSO = m->tso;                // the TSO to run
2772     procStatus[MainProc] = Busy;        // status of main PE
2773     CurrentProc = MainProc;             // PE to run it on
2774 #endif
2775
2776     cap = schedule(cap,task);
2777
2778     ASSERT(task->stat != NoStatus);
2779     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2780
2781     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2782     return cap;
2783 }
2784
2785 /* ----------------------------------------------------------------------------
2786  * Starting Tasks
2787  * ------------------------------------------------------------------------- */
2788
2789 #if defined(THREADED_RTS)
2790 void
2791 workerStart(Task *task)
2792 {
2793     Capability *cap;
2794
2795     // See startWorkerTask().
2796     ACQUIRE_LOCK(&task->lock);
2797     cap = task->cap;
2798     RELEASE_LOCK(&task->lock);
2799
2800     // set the thread-local pointer to the Task:
2801     taskEnter(task);
2802
2803     // schedule() runs without a lock.
2804     cap = schedule(cap,task);
2805
2806     // On exit from schedule(), we have a Capability.
2807     releaseCapability(cap);
2808     taskStop(task);
2809 }
2810 #endif
2811
2812 /* ---------------------------------------------------------------------------
2813  * initScheduler()
2814  *
2815  * Initialise the scheduler.  This resets all the queues - if the
2816  * queues contained any threads, they'll be garbage collected at the
2817  * next pass.
2818  *
2819  * ------------------------------------------------------------------------ */
2820
2821 void 
2822 initScheduler(void)
2823 {
2824 #if defined(GRAN)
2825   nat i;
2826   for (i=0; i<=MAX_PROC; i++) {
2827     run_queue_hds[i]      = END_TSO_QUEUE;
2828     run_queue_tls[i]      = END_TSO_QUEUE;
2829     blocked_queue_hds[i]  = END_TSO_QUEUE;
2830     blocked_queue_tls[i]  = END_TSO_QUEUE;
2831     ccalling_threadss[i]  = END_TSO_QUEUE;
2832     blackhole_queue[i]    = END_TSO_QUEUE;
2833     sleeping_queue        = END_TSO_QUEUE;
2834   }
2835 #elif !defined(THREADED_RTS)
2836   blocked_queue_hd  = END_TSO_QUEUE;
2837   blocked_queue_tl  = END_TSO_QUEUE;
2838   sleeping_queue    = END_TSO_QUEUE;
2839 #endif
2840
2841   blackhole_queue   = END_TSO_QUEUE;
2842   all_threads       = END_TSO_QUEUE;
2843
2844   context_switch = 0;
2845   sched_state    = SCHED_RUNNING;
2846
2847   RtsFlags.ConcFlags.ctxtSwitchTicks =
2848       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2849       
2850 #if defined(THREADED_RTS)
2851   /* Initialise the mutex and condition variables used by
2852    * the scheduler. */
2853   initMutex(&sched_mutex);
2854 #endif
2855   
2856   ACQUIRE_LOCK(&sched_mutex);
2857
2858   /* A capability holds the state a native thread needs in
2859    * order to execute STG code. At least one capability is
2860    * floating around (only THREADED_RTS builds have more than one).
2861    */
2862   initCapabilities();
2863
2864   initTaskManager();
2865
2866 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2867   initSparkPools();
2868 #endif
2869
2870 #if defined(THREADED_RTS)
2871   /*
2872    * Eagerly start one worker to run each Capability, except for
2873    * Capability 0.  The idea is that we're probably going to start a
2874    * bound thread on Capability 0 pretty soon, so we don't want a
2875    * worker task hogging it.
2876    */
2877   { 
2878       nat i;
2879       Capability *cap;
2880       for (i = 1; i < n_capabilities; i++) {
2881           cap = &capabilities[i];
2882           ACQUIRE_LOCK(&cap->lock);
2883           startWorkerTask(cap, workerStart);
2884           RELEASE_LOCK(&cap->lock);
2885       }
2886   }
2887 #endif
2888
2889   RELEASE_LOCK(&sched_mutex);
2890 }
2891
2892 void
2893 exitScheduler( void )
2894 {
2895     Task *task = NULL;
2896
2897 #if defined(THREADED_RTS)
2898     ACQUIRE_LOCK(&sched_mutex);
2899     task = newBoundTask();
2900     RELEASE_LOCK(&sched_mutex);
2901 #endif
2902
2903     // If we haven't killed all the threads yet, do it now.
2904     if (sched_state < SCHED_INTERRUPTED) {
2905         sched_state = SCHED_INTERRUPTING;
2906         scheduleDoGC(NULL,task,rtsFalse,GetRoots);    
2907     }
2908     sched_state = SCHED_SHUTTING_DOWN;
2909
2910 #if defined(THREADED_RTS)
2911     { 
2912         nat i;
2913         
2914         for (i = 0; i < n_capabilities; i++) {
2915             shutdownCapability(&capabilities[i], task);
2916         }
2917         boundTaskExiting(task);
2918         stopTaskManager();
2919     }
2920 #endif
2921 }
2922
2923 /* ---------------------------------------------------------------------------
2924    Where are the roots that we know about?
2925
2926         - all the threads on the runnable queue
2927         - all the threads on the blocked queue
2928         - all the threads on the sleeping queue
2929         - all the thread currently executing a _ccall_GC
2930         - all the "main threads"
2931      
2932    ------------------------------------------------------------------------ */
2933
2934 /* This has to be protected either by the scheduler monitor, or by the
2935         garbage collection monitor (probably the latter).
2936         KH @ 25/10/99
2937 */
2938
2939 void
2940 GetRoots( evac_fn evac )
2941 {
2942     nat i;
2943     Capability *cap;
2944     Task *task;
2945
2946 #if defined(GRAN)
2947     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2948         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2949             evac((StgClosure **)&run_queue_hds[i]);
2950         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2951             evac((StgClosure **)&run_queue_tls[i]);
2952         
2953         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2954             evac((StgClosure **)&blocked_queue_hds[i]);
2955         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2956             evac((StgClosure **)&blocked_queue_tls[i]);
2957         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2958             evac((StgClosure **)&ccalling_threads[i]);
2959     }
2960
2961     markEventQueue();
2962
2963 #else /* !GRAN */
2964
2965     for (i = 0; i < n_capabilities; i++) {
2966         cap = &capabilities[i];
2967         evac((StgClosure **)(void *)&cap->run_queue_hd);
2968         evac((StgClosure **)(void *)&cap->run_queue_tl);
2969 #if defined(THREADED_RTS)
2970         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2971         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2972 #endif
2973         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2974              task=task->next) {
2975             IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
2976             evac((StgClosure **)(void *)&task->suspended_tso);
2977         }
2978
2979     }
2980     
2981
2982 #if !defined(THREADED_RTS)
2983     evac((StgClosure **)(void *)&blocked_queue_hd);
2984     evac((StgClosure **)(void *)&blocked_queue_tl);
2985     evac((StgClosure **)(void *)&sleeping_queue);
2986 #endif 
2987 #endif
2988
2989     // evac((StgClosure **)&blackhole_queue);
2990
2991 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2992     markSparkQueue(evac);
2993 #endif
2994     
2995 #if defined(RTS_USER_SIGNALS)
2996     // mark the signal handlers (signals should be already blocked)
2997     markSignalHandlers(evac);
2998 #endif
2999 }
3000
3001 /* -----------------------------------------------------------------------------
3002    performGC
3003
3004    This is the interface to the garbage collector from Haskell land.
3005    We provide this so that external C code can allocate and garbage
3006    collect when called from Haskell via _ccall_GC.
3007
3008    It might be useful to provide an interface whereby the programmer
3009    can specify more roots (ToDo).
3010    
3011    This needs to be protected by the GC condition variable above.  KH.
3012    -------------------------------------------------------------------------- */
3013
3014 static void (*extra_roots)(evac_fn);
3015
3016 static void
3017 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
3018 {
3019     Task *task = myTask();
3020
3021     if (task == NULL) {
3022         ACQUIRE_LOCK(&sched_mutex);
3023         task = newBoundTask();
3024         RELEASE_LOCK(&sched_mutex);
3025         scheduleDoGC(NULL,task,force_major, get_roots);
3026         boundTaskExiting(task);
3027     } else {
3028         scheduleDoGC(NULL,task,force_major, get_roots);
3029     }
3030 }
3031
3032 void
3033 performGC(void)
3034 {
3035     performGC_(rtsFalse, GetRoots);
3036 }
3037
3038 void
3039 performMajorGC(void)
3040 {
3041     performGC_(rtsTrue, GetRoots);
3042 }
3043
3044 static void
3045 AllRoots(evac_fn evac)
3046 {
3047     GetRoots(evac);             // the scheduler's roots
3048     extra_roots(evac);          // the user's roots
3049 }
3050
3051 void
3052 performGCWithRoots(void (*get_roots)(evac_fn))
3053 {
3054     extra_roots = get_roots;
3055     performGC_(rtsFalse, AllRoots);
3056 }
3057
3058 /* -----------------------------------------------------------------------------
3059    Stack overflow
3060
3061    If the thread has reached its maximum stack size, then raise the
3062    StackOverflow exception in the offending thread.  Otherwise
3063    relocate the TSO into a larger chunk of memory and adjust its stack
3064    size appropriately.
3065    -------------------------------------------------------------------------- */
3066
3067 static StgTSO *
3068 threadStackOverflow(Capability *cap, StgTSO *tso)
3069 {
3070   nat new_stack_size, stack_words;
3071   lnat new_tso_size;
3072   StgPtr new_sp;
3073   StgTSO *dest;
3074
3075   IF_DEBUG(sanity,checkTSO(tso));
3076   if (tso->stack_size >= tso->max_stack_size) {
3077
3078     IF_DEBUG(gc,
3079              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
3080                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
3081              /* If we're debugging, just print out the top of the stack */
3082              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3083                                               tso->sp+64)));
3084
3085     /* Send this thread the StackOverflow exception */
3086     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
3087     return tso;
3088   }
3089
3090   /* Try to double the current stack size.  If that takes us over the
3091    * maximum stack size for this thread, then use the maximum instead.
3092    * Finally round up so the TSO ends up as a whole number of blocks.
3093    */
3094   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
3095   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
3096                                        TSO_STRUCT_SIZE)/sizeof(W_);
3097   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
3098   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
3099
3100   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
3101
3102   dest = (StgTSO *)allocate(new_tso_size);
3103   TICK_ALLOC_TSO(new_stack_size,0);
3104
3105   /* copy the TSO block and the old stack into the new area */
3106   memcpy(dest,tso,TSO_STRUCT_SIZE);
3107   stack_words = tso->stack + tso->stack_size - tso->sp;
3108   new_sp = (P_)dest + new_tso_size - stack_words;
3109   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
3110
3111   /* relocate the stack pointers... */
3112   dest->sp         = new_sp;
3113   dest->stack_size = new_stack_size;
3114         
3115   /* Mark the old TSO as relocated.  We have to check for relocated
3116    * TSOs in the garbage collector and any primops that deal with TSOs.
3117    *
3118    * It's important to set the sp value to just beyond the end
3119    * of the stack, so we don't attempt to scavenge any part of the
3120    * dead TSO's stack.
3121    */
3122   tso->what_next = ThreadRelocated;
3123   tso->link = dest;
3124   tso->sp = (P_)&(tso->stack[tso->stack_size]);
3125   tso->why_blocked = NotBlocked;
3126
3127   IF_PAR_DEBUG(verbose,
3128                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
3129                      tso->id, tso, tso->stack_size);
3130                /* If we're debugging, just print out the top of the stack */
3131                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3132                                                 tso->sp+64)));
3133   
3134   IF_DEBUG(sanity,checkTSO(tso));
3135 #if 0
3136   IF_DEBUG(scheduler,printTSO(dest));
3137 #endif
3138
3139   return dest;
3140 }
3141
3142 /* ---------------------------------------------------------------------------
3143    Wake up a queue that was blocked on some resource.
3144    ------------------------------------------------------------------------ */
3145
3146 #if defined(GRAN)
3147 STATIC_INLINE void
3148 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3149 {
3150 }
3151 #elif defined(PARALLEL_HASKELL)
3152 STATIC_INLINE void
3153 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3154 {
3155   /* write RESUME events to log file and
3156      update blocked and fetch time (depending on type of the orig closure) */
3157   if (RtsFlags.ParFlags.ParStats.Full) {
3158     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
3159                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3160                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3161     if (emptyRunQueue())
3162       emitSchedule = rtsTrue;
3163
3164     switch (get_itbl(node)->type) {
3165         case FETCH_ME_BQ:
3166           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3167           break;
3168         case RBH:
3169         case FETCH_ME:
3170         case BLACKHOLE_BQ:
3171           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3172           break;
3173 #ifdef DIST
3174         case MVAR:
3175           break;
3176 #endif    
3177         default:
3178           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3179         }
3180       }
3181 }
3182 #endif
3183
3184 #if defined(GRAN)
3185 StgBlockingQueueElement *
3186 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3187 {
3188     StgTSO *tso;
3189     PEs node_loc, tso_loc;
3190
3191     node_loc = where_is(node); // should be lifted out of loop
3192     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3193     tso_loc = where_is((StgClosure *)tso);
3194     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3195       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3196       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3197       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3198       // insertThread(tso, node_loc);
3199       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3200                 ResumeThread,
3201                 tso, node, (rtsSpark*)NULL);
3202       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3203       // len_local++;
3204       // len++;
3205     } else { // TSO is remote (actually should be FMBQ)
3206       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3207                                   RtsFlags.GranFlags.Costs.gunblocktime +
3208                                   RtsFlags.GranFlags.Costs.latency;
3209       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3210                 UnblockThread,
3211                 tso, node, (rtsSpark*)NULL);
3212       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3213       // len++;
3214     }
3215     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3216     IF_GRAN_DEBUG(bq,
3217                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3218                           (node_loc==tso_loc ? "Local" : "Global"), 
3219                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3220     tso->block_info.closure = NULL;
3221     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3222                              tso->id, tso));
3223 }
3224 #elif defined(PARALLEL_HASKELL)
3225 StgBlockingQueueElement *
3226 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3227 {
3228     StgBlockingQueueElement *next;
3229
3230     switch (get_itbl(bqe)->type) {
3231     case TSO:
3232       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3233       /* if it's a TSO just push it onto the run_queue */
3234       next = bqe->link;
3235       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3236       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3237       threadRunnable();
3238       unblockCount(bqe, node);
3239       /* reset blocking status after dumping event */
3240       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3241       break;
3242
3243     case BLOCKED_FETCH:
3244       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3245       next = bqe->link;
3246       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3247       PendingFetches = (StgBlockedFetch *)bqe;
3248       break;
3249
3250 # if defined(DEBUG)
3251       /* can ignore this case in a non-debugging setup; 
3252          see comments on RBHSave closures above */
3253     case CONSTR:
3254       /* check that the closure is an RBHSave closure */
3255       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3256              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3257              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3258       break;
3259
3260     default:
3261       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3262            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3263            (StgClosure *)bqe);
3264 # endif
3265     }
3266   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3267   return next;
3268 }
3269 #endif
3270
3271 StgTSO *
3272 unblockOne(Capability *cap, StgTSO *tso)
3273 {
3274   StgTSO *next;
3275
3276   ASSERT(get_itbl(tso)->type == TSO);
3277   ASSERT(tso->why_blocked != NotBlocked);
3278
3279   tso->why_blocked = NotBlocked;
3280   next = tso->link;
3281   tso->link = END_TSO_QUEUE;
3282
3283 #if defined(THREADED_RTS)
3284   if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
3285       // We are waking up this thread on the current Capability, which
3286       // might involve migrating it from the Capability it was last on.
3287       if (tso->bound) {
3288           ASSERT(tso->bound->cap == tso->cap);
3289           tso->bound->cap = cap;
3290       }
3291       tso->cap = cap;
3292       appendToRunQueue(cap,tso);
3293       // we're holding a newly woken thread, make sure we context switch
3294       // quickly so we can migrate it if necessary.
3295       context_switch = 1;
3296   } else {
3297       // we'll try to wake it up on the Capability it was last on.
3298       wakeupThreadOnCapability(tso->cap, tso);
3299   }
3300 #else
3301   appendToRunQueue(cap,tso);
3302   context_switch = 1;
3303 #endif
3304
3305   IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
3306   return next;
3307 }
3308
3309
3310 #if defined(GRAN)
3311 void 
3312 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3313 {
3314   StgBlockingQueueElement *bqe;
3315   PEs node_loc;
3316   nat len = 0; 
3317
3318   IF_GRAN_DEBUG(bq, 
3319                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3320                       node, CurrentProc, CurrentTime[CurrentProc], 
3321                       CurrentTSO->id, CurrentTSO));
3322
3323   node_loc = where_is(node);
3324
3325   ASSERT(q == END_BQ_QUEUE ||
3326          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3327          get_itbl(q)->type == CONSTR); // closure (type constructor)
3328   ASSERT(is_unique(node));
3329
3330   /* FAKE FETCH: magically copy the node to the tso's proc;
3331      no Fetch necessary because in reality the node should not have been 
3332      moved to the other PE in the first place
3333   */
3334   if (CurrentProc!=node_loc) {
3335     IF_GRAN_DEBUG(bq, 
3336                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3337                         node, node_loc, CurrentProc, CurrentTSO->id, 
3338                         // CurrentTSO, where_is(CurrentTSO),
3339                         node->header.gran.procs));
3340     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3341     IF_GRAN_DEBUG(bq, 
3342                   debugBelch("## new bitmask of node %p is %#x\n",
3343                         node, node->header.gran.procs));
3344     if (RtsFlags.GranFlags.GranSimStats.Global) {
3345       globalGranStats.tot_fake_fetches++;
3346     }
3347   }
3348
3349   bqe = q;
3350   // ToDo: check: ASSERT(CurrentProc==node_loc);
3351   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3352     //next = bqe->link;
3353     /* 
3354        bqe points to the current element in the queue
3355        next points to the next element in the queue
3356     */
3357     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3358     //tso_loc = where_is(tso);
3359     len++;
3360     bqe = unblockOne(bqe, node);
3361   }
3362
3363   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3364      the closure to make room for the anchor of the BQ */
3365   if (bqe!=END_BQ_QUEUE) {
3366     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3367     /*
3368     ASSERT((info_ptr==&RBH_Save_0_info) ||
3369            (info_ptr==&RBH_Save_1_info) ||
3370            (info_ptr==&RBH_Save_2_info));
3371     */
3372     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3373     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3374     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3375
3376     IF_GRAN_DEBUG(bq,
3377                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3378                         node, info_type(node)));
3379   }
3380
3381   /* statistics gathering */
3382   if (RtsFlags.GranFlags.GranSimStats.Global) {
3383     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3384     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3385     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3386     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3387   }
3388   IF_GRAN_DEBUG(bq,
3389                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3390                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3391 }
3392 #elif defined(PARALLEL_HASKELL)
3393 void 
3394 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3395 {
3396   StgBlockingQueueElement *bqe;
3397
3398   IF_PAR_DEBUG(verbose, 
3399                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3400                      node, mytid));
3401 #ifdef DIST  
3402   //RFP
3403   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3404     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3405     return;
3406   }
3407 #endif
3408   
3409   ASSERT(q == END_BQ_QUEUE ||
3410          get_itbl(q)->type == TSO ||           
3411          get_itbl(q)->type == BLOCKED_FETCH || 
3412          get_itbl(q)->type == CONSTR); 
3413
3414   bqe = q;
3415   while (get_itbl(bqe)->type==TSO || 
3416          get_itbl(bqe)->type==BLOCKED_FETCH) {
3417     bqe = unblockOne(bqe, node);
3418   }
3419 }
3420
3421 #else   /* !GRAN && !PARALLEL_HASKELL */
3422
3423 void
3424 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3425 {
3426     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3427                              // Exception.cmm
3428     while (tso != END_TSO_QUEUE) {
3429         tso = unblockOne(cap,tso);
3430     }
3431 }
3432 #endif
3433
3434 /* ---------------------------------------------------------------------------
3435    Interrupt execution
3436    - usually called inside a signal handler so it mustn't do anything fancy.   
3437    ------------------------------------------------------------------------ */
3438
3439 void
3440 interruptStgRts(void)
3441 {
3442     sched_state = SCHED_INTERRUPTING;
3443     context_switch = 1;
3444 #if defined(THREADED_RTS)
3445     prodAllCapabilities();
3446 #endif
3447 }
3448
3449 /* -----------------------------------------------------------------------------
3450    Unblock a thread
3451
3452    This is for use when we raise an exception in another thread, which
3453    may be blocked.
3454    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3455    -------------------------------------------------------------------------- */
3456
3457 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3458 /*
3459   NB: only the type of the blocking queue is different in GranSim and GUM
3460       the operations on the queue-elements are the same
3461       long live polymorphism!
3462
3463   Locks: sched_mutex is held upon entry and exit.
3464
3465 */
3466 static void
3467 unblockThread(Capability *cap, StgTSO *tso)
3468 {
3469   StgBlockingQueueElement *t, **last;
3470
3471   switch (tso->why_blocked) {
3472
3473   case NotBlocked:
3474     return;  /* not blocked */
3475
3476   case BlockedOnSTM:
3477     // Be careful: nothing to do here!  We tell the scheduler that the thread
3478     // is runnable and we leave it to the stack-walking code to abort the 
3479     // transaction while unwinding the stack.  We should perhaps have a debugging
3480     // test to make sure that this really happens and that the 'zombie' transaction
3481     // does not get committed.
3482     goto done;
3483
3484   case BlockedOnMVar:
3485     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3486     {
3487       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3488       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3489
3490       last = (StgBlockingQueueElement **)&mvar->head;
3491       for (t = (StgBlockingQueueElement *)mvar->head; 
3492            t != END_BQ_QUEUE; 
3493            last = &t->link, last_tso = t, t = t->link) {
3494         if (t == (StgBlockingQueueElement *)tso) {
3495           *last = (StgBlockingQueueElement *)tso->link;
3496           if (mvar->tail == tso) {
3497             mvar->tail = (StgTSO *)last_tso;
3498           }
3499           goto done;
3500         }
3501       }
3502       barf("unblockThread (MVAR): TSO not found");
3503     }
3504
3505   case BlockedOnBlackHole:
3506     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3507     {
3508       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3509
3510       last = &bq->blocking_queue;
3511       for (t = bq->blocking_queue; 
3512            t != END_BQ_QUEUE; 
3513            last = &t->link, t = t->link) {
3514         if (t == (StgBlockingQueueElement *)tso) {
3515           *last = (StgBlockingQueueElement *)tso->link;
3516           goto done;
3517         }
3518       }
3519       barf("unblockThread (BLACKHOLE): TSO not found");
3520     }
3521
3522   case BlockedOnException:
3523     {
3524       StgTSO *target  = tso->block_info.tso;
3525
3526       ASSERT(get_itbl(target)->type == TSO);
3527
3528       if (target->what_next == ThreadRelocated) {
3529           target = target->link;
3530           ASSERT(get_itbl(target)->type == TSO);
3531       }
3532
3533       ASSERT(target->blocked_exceptions != NULL);
3534
3535       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3536       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3537            t != END_BQ_QUEUE; 
3538            last = &t->link, t = t->link) {
3539         ASSERT(get_itbl(t)->type == TSO);
3540         if (t == (StgBlockingQueueElement *)tso) {
3541           *last = (StgBlockingQueueElement *)tso->link;
3542           goto done;
3543         }
3544       }
3545       barf("unblockThread (Exception): TSO not found");
3546     }
3547
3548   case BlockedOnRead:
3549   case BlockedOnWrite:
3550 #if defined(mingw32_HOST_OS)
3551   case BlockedOnDoProc:
3552 #endif
3553     {
3554       /* take TSO off blocked_queue */
3555       StgBlockingQueueElement *prev = NULL;
3556       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3557            prev = t, t = t->link) {
3558         if (t == (StgBlockingQueueElement *)tso) {
3559           if (prev == NULL) {
3560             blocked_queue_hd = (StgTSO *)t->link;
3561             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3562               blocked_queue_tl = END_TSO_QUEUE;
3563             }
3564           } else {
3565             prev->link = t->link;
3566             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3567               blocked_queue_tl = (StgTSO *)prev;
3568             }
3569           }
3570 #if defined(mingw32_HOST_OS)
3571           /* (Cooperatively) signal that the worker thread should abort
3572            * the request.
3573            */
3574           abandonWorkRequest(tso->block_info.async_result->reqID);
3575 #endif
3576           goto done;
3577         }
3578       }
3579       barf("unblockThread (I/O): TSO not found");
3580     }
3581
3582   case BlockedOnDelay:
3583     {
3584       /* take TSO off sleeping_queue */
3585       StgBlockingQueueElement *prev = NULL;
3586       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3587            prev = t, t = t->link) {
3588         if (t == (StgBlockingQueueElement *)tso) {
3589           if (prev == NULL) {
3590             sleeping_queue = (StgTSO *)t->link;
3591           } else {
3592             prev->link = t->link;
3593           }
3594           goto done;
3595         }
3596       }
3597       barf("unblockThread (delay): TSO not found");
3598     }
3599
3600   default:
3601     barf("unblockThread");
3602   }
3603
3604  done:
3605   tso->link = END_TSO_QUEUE;
3606   tso->why_blocked = NotBlocked;
3607   tso->block_info.closure = NULL;
3608   pushOnRunQueue(cap,tso);
3609 }
3610 #else
3611 static void
3612 unblockThread(Capability *cap, StgTSO *tso)
3613 {
3614   StgTSO *t, **last;
3615   
3616   /* To avoid locking unnecessarily. */
3617   if (tso->why_blocked == NotBlocked) {
3618     return;
3619   }
3620
3621   switch (tso->why_blocked) {
3622
3623   case BlockedOnSTM:
3624     // Be careful: nothing to do here!  We tell the scheduler that the thread
3625     // is runnable and we leave it to the stack-walking code to abort the 
3626     // transaction while unwinding the stack.  We should perhaps have a debugging
3627     // test to make sure that this really happens and that the 'zombie' transaction
3628     // does not get committed.
3629     goto done;
3630
3631   case BlockedOnMVar:
3632     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3633     {
3634       StgTSO *last_tso = END_TSO_QUEUE;
3635       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3636
3637       last = &mvar->head;
3638       for (t = mvar->head; t != END_TSO_QUEUE; 
3639            last = &t->link, last_tso = t, t = t->link) {
3640         if (t == tso) {
3641           *last = tso->link;
3642           if (mvar->tail == tso) {
3643             mvar->tail = last_tso;
3644           }
3645           goto done;
3646         }
3647       }
3648       barf("unblockThread (MVAR): TSO not found");
3649     }
3650
3651   case BlockedOnBlackHole:
3652     {
3653       last = &blackhole_queue;
3654       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3655            last = &t->link, t = t->link) {
3656         if (t == tso) {
3657           *last = tso->link;
3658           goto done;
3659         }
3660       }
3661       barf("unblockThread (BLACKHOLE): TSO not found");
3662     }
3663
3664   case BlockedOnException:
3665     {
3666       StgTSO *target  = tso->block_info.tso;
3667
3668       ASSERT(get_itbl(target)->type == TSO);
3669
3670       while (target->what_next == ThreadRelocated) {
3671           target = target->link;
3672           ASSERT(get_itbl(target)->type == TSO);
3673       }
3674       
3675       ASSERT(target->blocked_exceptions != NULL);
3676
3677       last = &target->blocked_exceptions;
3678       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3679            last = &t->link, t = t->link) {
3680         ASSERT(get_itbl(t)->type == TSO);
3681         if (t == tso) {
3682           *last = tso->link;
3683           goto done;
3684         }
3685       }
3686       barf("unblockThread (Exception): TSO not found");
3687     }
3688
3689 #if !defined(THREADED_RTS)
3690   case BlockedOnRead:
3691   case BlockedOnWrite:
3692 #if defined(mingw32_HOST_OS)
3693   case BlockedOnDoProc:
3694 #endif
3695     {
3696       StgTSO *prev = NULL;
3697       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3698            prev = t, t = t->link) {
3699         if (t == tso) {
3700           if (prev == NULL) {
3701             blocked_queue_hd = t->link;
3702             if (blocked_queue_tl == t) {
3703               blocked_queue_tl = END_TSO_QUEUE;
3704             }
3705           } else {
3706             prev->link = t->link;
3707             if (blocked_queue_tl == t) {
3708               blocked_queue_tl = prev;
3709             }
3710           }
3711 #if defined(mingw32_HOST_OS)
3712           /* (Cooperatively) signal that the worker thread should abort
3713            * the request.
3714            */
3715           abandonWorkRequest(tso->block_info.async_result->reqID);
3716 #endif
3717           goto done;
3718         }
3719       }
3720       barf("unblockThread (I/O): TSO not found");
3721     }
3722
3723   case BlockedOnDelay:
3724     {
3725       StgTSO *prev = NULL;
3726       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3727            prev = t, t = t->link) {
3728         if (t == tso) {
3729           if (prev == NULL) {
3730             sleeping_queue = t->link;
3731           } else {
3732             prev->link = t->link;
3733           }
3734           goto done;
3735         }
3736       }
3737       barf("unblockThread (delay): TSO not found");
3738     }
3739 #endif
3740
3741   default:
3742     barf("unblockThread");
3743   }
3744
3745  done:
3746   tso->link = END_TSO_QUEUE;
3747   tso->why_blocked = NotBlocked;
3748   tso->block_info.closure = NULL;
3749   appendToRunQueue(cap,tso);
3750
3751   // We might have just migrated this TSO to our Capability:
3752   if (tso->bound) {
3753       tso->bound->cap = cap;
3754   }
3755   tso->cap = cap;
3756 }
3757 #endif
3758
3759 /* -----------------------------------------------------------------------------
3760  * checkBlackHoles()
3761  *
3762  * Check the blackhole_queue for threads that can be woken up.  We do
3763  * this periodically: before every GC, and whenever the run queue is
3764  * empty.
3765  *
3766  * An elegant solution might be to just wake up all the blocked
3767  * threads with awakenBlockedQueue occasionally: they'll go back to
3768  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3769  * doesn't give us a way to tell whether we've actually managed to
3770  * wake up any threads, so we would be busy-waiting.
3771  *
3772  * -------------------------------------------------------------------------- */
3773
3774 static rtsBool
3775 checkBlackHoles (Capability *cap)
3776 {
3777     StgTSO **prev, *t;
3778     rtsBool any_woke_up = rtsFalse;
3779     StgHalfWord type;
3780
3781     // blackhole_queue is global:
3782     ASSERT_LOCK_HELD(&sched_mutex);
3783
3784     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3785
3786     // ASSUMES: sched_mutex
3787     prev = &blackhole_queue;
3788     t = blackhole_queue;
3789     while (t != END_TSO_QUEUE) {
3790         ASSERT(t->why_blocked == BlockedOnBlackHole);
3791         type = get_itbl(t->block_info.closure)->type;
3792         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3793             IF_DEBUG(sanity,checkTSO(t));
3794             t = unblockOne(cap, t);
3795             // urk, the threads migrate to the current capability
3796             // here, but we'd like to keep them on the original one.
3797             *prev = t;
3798             any_woke_up = rtsTrue;
3799         } else {
3800             prev = &t->link;
3801             t = t->link;
3802         }
3803     }
3804
3805     return any_woke_up;
3806 }
3807
3808 /* -----------------------------------------------------------------------------
3809  * raiseAsync()
3810  *
3811  * The following function implements the magic for raising an
3812  * asynchronous exception in an existing thread.
3813  *
3814  * We first remove the thread from any queue on which it might be
3815  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3816  *
3817  * We strip the stack down to the innermost CATCH_FRAME, building
3818  * thunks in the heap for all the active computations, so they can 
3819  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3820  * an application of the handler to the exception, and push it on
3821  * the top of the stack.
3822  * 
3823  * How exactly do we save all the active computations?  We create an
3824  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3825  * AP_STACKs pushes everything from the corresponding update frame
3826  * upwards onto the stack.  (Actually, it pushes everything up to the
3827  * next update frame plus a pointer to the next AP_STACK object.
3828  * Entering the next AP_STACK object pushes more onto the stack until we
3829  * reach the last AP_STACK object - at which point the stack should look
3830  * exactly as it did when we killed the TSO and we can continue
3831  * execution by entering the closure on top of the stack.
3832  *
3833  * We can also kill a thread entirely - this happens if either (a) the 
3834  * exception passed to raiseAsync is NULL, or (b) there's no
3835  * CATCH_FRAME on the stack.  In either case, we strip the entire
3836  * stack and replace the thread with a zombie.
3837  *
3838  * ToDo: in THREADED_RTS mode, this function is only safe if either
3839  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3840  * one Capability), or (b) we own the Capability that the TSO is
3841  * currently blocked on or on the run queue of.
3842  *
3843  * -------------------------------------------------------------------------- */
3844  
3845 void
3846 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3847 {
3848     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3849 }
3850
3851 void
3852 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3853 {
3854     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3855 }
3856
3857 static void
3858 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3859             rtsBool stop_at_atomically, StgPtr stop_here)
3860 {
3861     StgRetInfoTable *info;
3862     StgPtr sp, frame;
3863     nat i;
3864   
3865     // Thread already dead?
3866     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3867         return;
3868     }
3869
3870     IF_DEBUG(scheduler, 
3871              sched_belch("raising exception in thread %ld.", (long)tso->id));
3872     
3873     // Remove it from any blocking queues
3874     unblockThread(cap,tso);
3875
3876     // mark it dirty; we're about to change its stack.
3877     dirtyTSO(tso);
3878
3879     sp = tso->sp;
3880     
3881     // The stack freezing code assumes there's a closure pointer on
3882     // the top of the stack, so we have to arrange that this is the case...
3883     //
3884     if (sp[0] == (W_)&stg_enter_info) {
3885         sp++;
3886     } else {
3887         sp--;
3888         sp[0] = (W_)&stg_dummy_ret_closure;
3889     }
3890
3891     frame = sp + 1;
3892     while (stop_here == NULL || frame < stop_here) {
3893
3894         // 1. Let the top of the stack be the "current closure"
3895         //
3896         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3897         // CATCH_FRAME.
3898         //
3899         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3900         // current closure applied to the chunk of stack up to (but not
3901         // including) the update frame.  This closure becomes the "current
3902         // closure".  Go back to step 2.
3903         //
3904         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3905         // top of the stack applied to the exception.
3906         // 
3907         // 5. If it's a STOP_FRAME, then kill the thread.
3908         // 
3909         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3910         // transaction
3911        
3912         info = get_ret_itbl((StgClosure *)frame);
3913
3914         switch (info->i.type) {
3915
3916         case UPDATE_FRAME:
3917         {
3918             StgAP_STACK * ap;
3919             nat words;
3920             
3921             // First build an AP_STACK consisting of the stack chunk above the
3922             // current update frame, with the top word on the stack as the
3923             // fun field.
3924             //
3925             words = frame - sp - 1;
3926             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3927             
3928             ap->size = words;
3929             ap->fun  = (StgClosure *)sp[0];
3930             sp++;
3931             for(i=0; i < (nat)words; ++i) {
3932                 ap->payload[i] = (StgClosure *)*sp++;
3933             }
3934             
3935             SET_HDR(ap,&stg_AP_STACK_info,
3936                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3937             TICK_ALLOC_UP_THK(words+1,0);
3938             
3939             IF_DEBUG(scheduler,
3940                      debugBelch("sched: Updating ");
3941                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3942                      debugBelch(" with ");
3943                      printObj((StgClosure *)ap);
3944                 );
3945
3946             // Replace the updatee with an indirection
3947             //
3948             // Warning: if we're in a loop, more than one update frame on
3949             // the stack may point to the same object.  Be careful not to
3950             // overwrite an IND_OLDGEN in this case, because we'll screw
3951             // up the mutable lists.  To be on the safe side, don't
3952             // overwrite any kind of indirection at all.  See also
3953             // threadSqueezeStack in GC.c, where we have to make a similar
3954             // check.
3955             //
3956             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3957                 // revert the black hole
3958                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3959                                (StgClosure *)ap);
3960             }
3961             sp += sizeofW(StgUpdateFrame) - 1;
3962             sp[0] = (W_)ap; // push onto stack
3963             frame = sp + 1;
3964             continue; //no need to bump frame
3965         }
3966
3967         case STOP_FRAME:
3968             // We've stripped the entire stack, the thread is now dead.
3969             tso->what_next = ThreadKilled;
3970             tso->sp = frame + sizeofW(StgStopFrame);
3971             return;
3972
3973         case CATCH_FRAME:
3974             // If we find a CATCH_FRAME, and we've got an exception to raise,
3975             // then build the THUNK raise(exception), and leave it on
3976             // top of the CATCH_FRAME ready to enter.
3977             //
3978         {
3979 #ifdef PROFILING
3980             StgCatchFrame *cf = (StgCatchFrame *)frame;
3981 #endif
3982             StgThunk *raise;
3983             
3984             if (exception == NULL) break;
3985
3986             // we've got an exception to raise, so let's pass it to the
3987             // handler in this frame.
3988             //
3989             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3990             TICK_ALLOC_SE_THK(1,0);
3991             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3992             raise->payload[0] = exception;
3993             
3994             // throw away the stack from Sp up to the CATCH_FRAME.
3995             //
3996             sp = frame - 1;
3997             
3998             /* Ensure that async excpetions are blocked now, so we don't get
3999              * a surprise exception before we get around to executing the
4000              * handler.
4001              */
4002             if (tso->blocked_exceptions == NULL) {
4003                 tso->blocked_exceptions = END_TSO_QUEUE;
4004             }
4005
4006             /* Put the newly-built THUNK on top of the stack, ready to execute
4007              * when the thread restarts.
4008              */
4009             sp[0] = (W_)raise;
4010             sp[-1] = (W_)&stg_enter_info;
4011             tso->sp = sp-1;
4012             tso->what_next = ThreadRunGHC;
4013             IF_DEBUG(sanity, checkTSO(tso));
4014             return;
4015         }
4016             
4017         case ATOMICALLY_FRAME:
4018             if (stop_at_atomically) {
4019                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
4020                 stmCondemnTransaction(cap, tso -> trec);
4021 #ifdef REG_R1
4022                 tso->sp = frame;
4023 #else
4024                 // R1 is not a register: the return convention for IO in
4025                 // this case puts the return value on the stack, so we
4026                 // need to set up the stack to return to the atomically
4027                 // frame properly...
4028                 tso->sp = frame - 2;
4029                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
4030                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
4031 #endif
4032                 tso->what_next = ThreadRunGHC;
4033                 return;
4034             }
4035             // Not stop_at_atomically... fall through and abort the
4036             // transaction.
4037             
4038         case CATCH_RETRY_FRAME:
4039             // IF we find an ATOMICALLY_FRAME then we abort the
4040             // current transaction and propagate the exception.  In
4041             // this case (unlike ordinary exceptions) we do not care
4042             // whether the transaction is valid or not because its
4043             // possible validity cannot have caused the exception
4044             // and will not be visible after the abort.
4045             IF_DEBUG(stm,
4046                      debugBelch("Found atomically block delivering async exception\n"));
4047             StgTRecHeader *trec = tso -> trec;
4048             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
4049             stmAbortTransaction(cap, trec);
4050             tso -> trec = outer;
4051             break;
4052             
4053         default:
4054             break;
4055         }
4056
4057         // move on to the next stack frame
4058         frame += stack_frame_sizeW((StgClosure *)frame);
4059     }
4060
4061     // if we got here, then we stopped at stop_here
4062     ASSERT(stop_here != NULL);
4063 }
4064
4065 /* -----------------------------------------------------------------------------
4066    Deleting threads
4067
4068    This is used for interruption (^C) and forking, and corresponds to
4069    raising an exception but without letting the thread catch the
4070    exception.
4071    -------------------------------------------------------------------------- */
4072
4073 static void 
4074 deleteThread (Capability *cap, StgTSO *tso)
4075 {
4076   if (tso->why_blocked != BlockedOnCCall &&
4077       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
4078       raiseAsync(cap,tso,NULL);
4079   }
4080 }
4081
4082 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
4083 static void 
4084 deleteThread_(Capability *cap, StgTSO *tso)
4085 { // for forkProcess only:
4086   // like deleteThread(), but we delete threads in foreign calls, too.
4087
4088     if (tso->why_blocked == BlockedOnCCall ||
4089         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
4090         unblockOne(cap,tso);
4091         tso->what_next = ThreadKilled;
4092     } else {
4093         deleteThread(cap,tso);
4094     }
4095 }
4096 #endif
4097
4098 /* -----------------------------------------------------------------------------
4099    raiseExceptionHelper
4100    
4101    This function is called by the raise# primitve, just so that we can
4102    move some of the tricky bits of raising an exception from C-- into
4103    C.  Who knows, it might be a useful re-useable thing here too.
4104    -------------------------------------------------------------------------- */
4105
4106 StgWord
4107 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
4108 {
4109     Capability *cap = regTableToCapability(reg);
4110     StgThunk *raise_closure = NULL;
4111     StgPtr p, next;
4112     StgRetInfoTable *info;
4113     //
4114     // This closure represents the expression 'raise# E' where E
4115     // is the exception raise.  It is used to overwrite all the
4116     // thunks which are currently under evaluataion.
4117     //
4118
4119     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
4120     // LDV profiling: stg_raise_info has THUNK as its closure
4121     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
4122     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
4123     // 1 does not cause any problem unless profiling is performed.
4124     // However, when LDV profiling goes on, we need to linearly scan
4125     // small object pool, where raise_closure is stored, so we should
4126     // use MIN_UPD_SIZE.
4127     //
4128     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
4129     //                                 sizeofW(StgClosure)+1);
4130     //
4131
4132     //
4133     // Walk up the stack, looking for the catch frame.  On the way,
4134     // we update any closures pointed to from update frames with the
4135     // raise closure that we just built.
4136     //
4137     p = tso->sp;
4138     while(1) {
4139         info = get_ret_itbl((StgClosure *)p);
4140         next = p + stack_frame_sizeW((StgClosure *)p);
4141         switch (info->i.type) {
4142             
4143         case UPDATE_FRAME:
4144             // Only create raise_closure if we need to.
4145             if (raise_closure == NULL) {
4146                 raise_closure = 
4147                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4148                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
4149                 raise_closure->payload[0] = exception;
4150             }
4151             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
4152             p = next;
4153             continue;
4154
4155         case ATOMICALLY_FRAME:
4156             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
4157             tso->sp = p;
4158             return ATOMICALLY_FRAME;
4159             
4160         case CATCH_FRAME:
4161             tso->sp = p;
4162             return CATCH_FRAME;
4163
4164         case CATCH_STM_FRAME:
4165             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
4166             tso->sp = p;
4167             return CATCH_STM_FRAME;
4168             
4169         case STOP_FRAME:
4170             tso->sp = p;
4171             return STOP_FRAME;
4172
4173         case CATCH_RETRY_FRAME:
4174         default:
4175             p = next; 
4176             continue;
4177         }
4178     }
4179 }
4180
4181
4182 /* -----------------------------------------------------------------------------
4183    findRetryFrameHelper
4184
4185    This function is called by the retry# primitive.  It traverses the stack
4186    leaving tso->sp referring to the frame which should handle the retry.  
4187
4188    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
4189    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
4190
4191    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4192    despite the similar implementation.
4193
4194    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4195    not be created within memory transactions.
4196    -------------------------------------------------------------------------- */
4197
4198 StgWord
4199 findRetryFrameHelper (StgTSO *tso)
4200 {
4201   StgPtr           p, next;
4202   StgRetInfoTable *info;
4203
4204   p = tso -> sp;
4205   while (1) {
4206     info = get_ret_itbl((StgClosure *)p);
4207     next = p + stack_frame_sizeW((StgClosure *)p);
4208     switch (info->i.type) {
4209       
4210     case ATOMICALLY_FRAME:
4211       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4212       tso->sp = p;
4213       return ATOMICALLY_FRAME;
4214       
4215     case CATCH_RETRY_FRAME:
4216       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4217       tso->sp = p;
4218       return CATCH_RETRY_FRAME;
4219       
4220     case CATCH_STM_FRAME:
4221     default:
4222       ASSERT(info->i.type != CATCH_FRAME);
4223       ASSERT(info->i.type != STOP_FRAME);
4224       p = next; 
4225       continue;
4226     }
4227   }
4228 }
4229
4230 /* -----------------------------------------------------------------------------
4231    resurrectThreads is called after garbage collection on the list of
4232    threads found to be garbage.  Each of these threads will be woken
4233    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4234    on an MVar, or NonTermination if the thread was blocked on a Black
4235    Hole.
4236
4237    Locks: assumes we hold *all* the capabilities.
4238    -------------------------------------------------------------------------- */
4239
4240 void
4241 resurrectThreads (StgTSO *threads)
4242 {
4243     StgTSO *tso, *next;
4244     Capability *cap;
4245
4246     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4247         next = tso->global_link;
4248         tso->global_link = all_threads;
4249         all_threads = tso;
4250         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4251         
4252         // Wake up the thread on the Capability it was last on
4253         cap = tso->cap;
4254         
4255         switch (tso->why_blocked) {
4256         case BlockedOnMVar:
4257         case BlockedOnException:
4258             /* Called by GC - sched_mutex lock is currently held. */
4259             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4260             break;
4261         case BlockedOnBlackHole:
4262             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4263             break;
4264         case BlockedOnSTM:
4265             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4266             break;
4267         case NotBlocked:
4268             /* This might happen if the thread was blocked on a black hole
4269              * belonging to a thread that we've just woken up (raiseAsync
4270              * can wake up threads, remember...).
4271              */
4272             continue;
4273         default:
4274             barf("resurrectThreads: thread blocked in a strange way");
4275         }
4276     }
4277 }
4278
4279 /* ----------------------------------------------------------------------------
4280  * Debugging: why is a thread blocked
4281  * [Also provides useful information when debugging threaded programs
4282  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4283    ------------------------------------------------------------------------- */
4284
4285 #if DEBUG
4286 static void
4287 printThreadBlockage(StgTSO *tso)
4288 {
4289   switch (tso->why_blocked) {
4290   case BlockedOnRead:
4291     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4292     break;
4293   case BlockedOnWrite:
4294     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4295     break;
4296 #if defined(mingw32_HOST_OS)
4297     case BlockedOnDoProc:
4298     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4299     break;
4300 #endif
4301   case BlockedOnDelay:
4302     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4303     break;
4304   case BlockedOnMVar:
4305     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4306     break;
4307   case BlockedOnException:
4308     debugBelch("is blocked on delivering an exception to thread %d",
4309             tso->block_info.tso->id);
4310     break;
4311   case BlockedOnBlackHole:
4312     debugBelch("is blocked on a black hole");
4313     break;
4314   case NotBlocked:
4315     debugBelch("is not blocked");
4316     break;
4317 #if defined(PARALLEL_HASKELL)
4318   case BlockedOnGA:
4319     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4320             tso->block_info.closure, info_type(tso->block_info.closure));
4321     break;
4322   case BlockedOnGA_NoSend:
4323     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4324             tso->block_info.closure, info_type(tso->block_info.closure));
4325     break;
4326 #endif
4327   case BlockedOnCCall:
4328     debugBelch("is blocked on an external call");
4329     break;
4330   case BlockedOnCCall_NoUnblockExc:
4331     debugBelch("is blocked on an external call (exceptions were already blocked)");
4332     break;
4333   case BlockedOnSTM:
4334     debugBelch("is blocked on an STM operation");
4335     break;
4336   default:
4337     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4338          tso->why_blocked, tso->id, tso);
4339   }
4340 }
4341
4342 void
4343 printThreadStatus(StgTSO *t)
4344 {
4345     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4346     {
4347       void *label = lookupThreadLabel(t->id);
4348       if (label) debugBelch("[\"%s\"] ",(char *)label);
4349     }
4350     if (t->what_next == ThreadRelocated) {
4351         debugBelch("has been relocated...\n");
4352     } else {
4353         switch (t->what_next) {
4354         case ThreadKilled:
4355             debugBelch("has been killed");
4356             break;
4357         case ThreadComplete:
4358             debugBelch("has completed");
4359             break;
4360         default:
4361             printThreadBlockage(t);
4362         }
4363         debugBelch("\n");
4364     }
4365 }
4366
4367 void
4368 printAllThreads(void)
4369 {
4370   StgTSO *t, *next;
4371   nat i;
4372   Capability *cap;
4373
4374 # if defined(GRAN)
4375   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4376   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4377                        time_string, rtsFalse/*no commas!*/);
4378
4379   debugBelch("all threads at [%s]:\n", time_string);
4380 # elif defined(PARALLEL_HASKELL)
4381   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4382   ullong_format_string(CURRENT_TIME,
4383                        time_string, rtsFalse/*no commas!*/);
4384
4385   debugBelch("all threads at [%s]:\n", time_string);
4386 # else
4387   debugBelch("all threads:\n");
4388 # endif
4389
4390   for (i = 0; i < n_capabilities; i++) {
4391       cap = &capabilities[i];
4392       debugBelch("threads on capability %d:\n", cap->no);
4393       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4394           printThreadStatus(t);
4395       }
4396   }
4397
4398   debugBelch("other threads:\n");
4399   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4400       if (t->why_blocked != NotBlocked) {
4401           printThreadStatus(t);
4402       }
4403       if (t->what_next == ThreadRelocated) {
4404           next = t->link;
4405       } else {
4406           next = t->global_link;
4407       }
4408   }
4409 }
4410
4411 // useful from gdb
4412 void 
4413 printThreadQueue(StgTSO *t)
4414 {
4415     nat i = 0;
4416     for (; t != END_TSO_QUEUE; t = t->link) {
4417         printThreadStatus(t);
4418         i++;
4419     }
4420     debugBelch("%d threads on queue\n", i);
4421 }
4422
4423 /* 
4424    Print a whole blocking queue attached to node (debugging only).
4425 */
4426 # if defined(PARALLEL_HASKELL)
4427 void 
4428 print_bq (StgClosure *node)
4429 {
4430   StgBlockingQueueElement *bqe;
4431   StgTSO *tso;
4432   rtsBool end;
4433
4434   debugBelch("## BQ of closure %p (%s): ",
4435           node, info_type(node));
4436
4437   /* should cover all closures that may have a blocking queue */
4438   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4439          get_itbl(node)->type == FETCH_ME_BQ ||
4440          get_itbl(node)->type == RBH ||
4441          get_itbl(node)->type == MVAR);
4442     
4443   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4444
4445   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4446 }
4447
4448 /* 
4449    Print a whole blocking queue starting with the element bqe.
4450 */
4451 void 
4452 print_bqe (StgBlockingQueueElement *bqe)
4453 {
4454   rtsBool end;
4455
4456   /* 
4457      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4458   */
4459   for (end = (bqe==END_BQ_QUEUE);
4460        !end; // iterate until bqe points to a CONSTR
4461        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4462        bqe = end ? END_BQ_QUEUE : bqe->link) {
4463     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4464     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4465     /* types of closures that may appear in a blocking queue */
4466     ASSERT(get_itbl(bqe)->type == TSO ||           
4467            get_itbl(bqe)->type == BLOCKED_FETCH || 
4468            get_itbl(bqe)->type == CONSTR); 
4469     /* only BQs of an RBH end with an RBH_Save closure */
4470     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4471
4472     switch (get_itbl(bqe)->type) {
4473     case TSO:
4474       debugBelch(" TSO %u (%x),",
4475               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4476       break;
4477     case BLOCKED_FETCH:
4478       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4479               ((StgBlockedFetch *)bqe)->node, 
4480               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4481               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4482               ((StgBlockedFetch *)bqe)->ga.weight);
4483       break;
4484     case CONSTR:
4485       debugBelch(" %s (IP %p),",
4486               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4487                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4488                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4489                "RBH_Save_?"), get_itbl(bqe));
4490       break;
4491     default:
4492       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4493            info_type((StgClosure *)bqe)); // , node, info_type(node));
4494       break;
4495     }
4496   } /* for */
4497   debugBelch("\n");
4498 }
4499 # elif defined(GRAN)
4500 void 
4501 print_bq (StgClosure *node)
4502 {
4503   StgBlockingQueueElement *bqe;
4504   PEs node_loc, tso_loc;
4505   rtsBool end;
4506
4507   /* should cover all closures that may have a blocking queue */
4508   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4509          get_itbl(node)->type == FETCH_ME_BQ ||
4510          get_itbl(node)->type == RBH);
4511     
4512   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4513   node_loc = where_is(node);
4514
4515   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4516           node, info_type(node), node_loc);
4517
4518   /* 
4519      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4520   */
4521   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4522        !end; // iterate until bqe points to a CONSTR
4523        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4524     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4525     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4526     /* types of closures that may appear in a blocking queue */
4527     ASSERT(get_itbl(bqe)->type == TSO ||           
4528            get_itbl(bqe)->type == CONSTR); 
4529     /* only BQs of an RBH end with an RBH_Save closure */
4530     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4531
4532     tso_loc = where_is((StgClosure *)bqe);
4533     switch (get_itbl(bqe)->type) {
4534     case TSO:
4535       debugBelch(" TSO %d (%p) on [PE %d],",
4536               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4537       break;
4538     case CONSTR:
4539       debugBelch(" %s (IP %p),",
4540               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4541                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4542                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4543                "RBH_Save_?"), get_itbl(bqe));
4544       break;
4545     default:
4546       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4547            info_type((StgClosure *)bqe), node, info_type(node));
4548       break;
4549     }
4550   } /* for */
4551   debugBelch("\n");
4552 }
4553 # endif
4554
4555 #if defined(PARALLEL_HASKELL)
4556 static nat
4557 run_queue_len(void)
4558 {
4559     nat i;
4560     StgTSO *tso;
4561     
4562     for (i=0, tso=run_queue_hd; 
4563          tso != END_TSO_QUEUE;
4564          i++, tso=tso->link) {
4565         /* nothing */
4566     }
4567         
4568     return i;
4569 }
4570 #endif
4571
4572 void
4573 sched_belch(char *s, ...)
4574 {
4575     va_list ap;
4576     va_start(ap,s);
4577 #ifdef THREADED_RTS
4578     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4579 #elif defined(PARALLEL_HASKELL)
4580     debugBelch("== ");
4581 #else
4582     debugBelch("sched: ");
4583 #endif
4584     vdebugBelch(s, ap);
4585     debugBelch("\n");
4586     va_end(ap);
4587 }
4588
4589 #endif /* DEBUG */