Remove unnecessary SCHED_INTERRUPTED scheduler state
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool sched_state = SCHED_RUNNING;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
208 static void scheduleCheckBlackHoles (Capability *cap);
209 static void scheduleDetectDeadlock (Capability *cap, Task *task);
210 #if defined(GRAN)
211 static StgTSO *scheduleProcessEvent(rtsEvent *event);
212 #endif
213 #if defined(PARALLEL_HASKELL)
214 static StgTSO *scheduleSendPendingMessages(void);
215 static void scheduleActivateSpark(void);
216 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
217 #endif
218 #if defined(PAR) || defined(GRAN)
219 static void scheduleGranParReport(void);
220 #endif
221 static void schedulePostRunThread(void);
222 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
223 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
224                                          StgTSO *t);
225 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
226                                     nat prev_what_next );
227 static void scheduleHandleThreadBlocked( StgTSO *t );
228 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
229                                              StgTSO *t );
230 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
231 static Capability *scheduleDoGC(Capability *cap, Task *task,
232                                 rtsBool force_major, 
233                                 void (*get_roots)(evac_fn));
234
235 static void unblockThread(Capability *cap, StgTSO *tso);
236 static rtsBool checkBlackHoles(Capability *cap);
237 static void AllRoots(evac_fn evac);
238
239 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
240
241 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
242                         rtsBool stop_at_atomically, StgPtr stop_here);
243
244 static void deleteThread (Capability *cap, StgTSO *tso);
245 static void deleteAllThreads (Capability *cap);
246
247 #ifdef DEBUG
248 static void printThreadBlockage(StgTSO *tso);
249 static void printThreadStatus(StgTSO *tso);
250 void printThreadQueue(StgTSO *tso);
251 #endif
252
253 #if defined(PARALLEL_HASKELL)
254 StgTSO * createSparkThread(rtsSpark spark);
255 StgTSO * activateSpark (rtsSpark spark);  
256 #endif
257
258 #ifdef DEBUG
259 static char *whatNext_strs[] = {
260   "(unknown)",
261   "ThreadRunGHC",
262   "ThreadInterpret",
263   "ThreadKilled",
264   "ThreadRelocated",
265   "ThreadComplete"
266 };
267 #endif
268
269 /* -----------------------------------------------------------------------------
270  * Putting a thread on the run queue: different scheduling policies
271  * -------------------------------------------------------------------------- */
272
273 STATIC_INLINE void
274 addToRunQueue( Capability *cap, StgTSO *t )
275 {
276 #if defined(PARALLEL_HASKELL)
277     if (RtsFlags.ParFlags.doFairScheduling) { 
278         // this does round-robin scheduling; good for concurrency
279         appendToRunQueue(cap,t);
280     } else {
281         // this does unfair scheduling; good for parallelism
282         pushOnRunQueue(cap,t);
283     }
284 #else
285     // this does round-robin scheduling; good for concurrency
286     appendToRunQueue(cap,t);
287 #endif
288 }
289
290 /* ---------------------------------------------------------------------------
291    Main scheduling loop.
292
293    We use round-robin scheduling, each thread returning to the
294    scheduler loop when one of these conditions is detected:
295
296       * out of heap space
297       * timer expires (thread yields)
298       * thread blocks
299       * thread ends
300       * stack overflow
301
302    GRAN version:
303      In a GranSim setup this loop iterates over the global event queue.
304      This revolves around the global event queue, which determines what 
305      to do next. Therefore, it's more complicated than either the 
306      concurrent or the parallel (GUM) setup.
307
308    GUM version:
309      GUM iterates over incoming messages.
310      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
311      and sends out a fish whenever it has nothing to do; in-between
312      doing the actual reductions (shared code below) it processes the
313      incoming messages and deals with delayed operations 
314      (see PendingFetches).
315      This is not the ugliest code you could imagine, but it's bloody close.
316
317    ------------------------------------------------------------------------ */
318
319 static Capability *
320 schedule (Capability *initialCapability, Task *task)
321 {
322   StgTSO *t;
323   Capability *cap;
324   StgThreadReturnCode ret;
325 #if defined(GRAN)
326   rtsEvent *event;
327 #elif defined(PARALLEL_HASKELL)
328   StgTSO *tso;
329   GlobalTaskId pe;
330   rtsBool receivedFinish = rtsFalse;
331 # if defined(DEBUG)
332   nat tp_size, sp_size; // stats only
333 # endif
334 #endif
335   nat prev_what_next;
336   rtsBool ready_to_gc;
337 #if defined(THREADED_RTS)
338   rtsBool first = rtsTrue;
339 #endif
340   
341   cap = initialCapability;
342
343   // Pre-condition: this task owns initialCapability.
344   // The sched_mutex is *NOT* held
345   // NB. on return, we still hold a capability.
346
347   IF_DEBUG(scheduler,
348            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
349                        task, initialCapability);
350       );
351
352   schedulePreLoop();
353
354   // -----------------------------------------------------------
355   // Scheduler loop starts here:
356
357 #if defined(PARALLEL_HASKELL)
358 #define TERMINATION_CONDITION        (!receivedFinish)
359 #elif defined(GRAN)
360 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
361 #else
362 #define TERMINATION_CONDITION        rtsTrue
363 #endif
364
365   while (TERMINATION_CONDITION) {
366
367 #if defined(GRAN)
368       /* Choose the processor with the next event */
369       CurrentProc = event->proc;
370       CurrentTSO = event->tso;
371 #endif
372
373 #if defined(THREADED_RTS)
374       if (first) {
375           // don't yield the first time, we want a chance to run this
376           // thread for a bit, even if there are others banging at the
377           // door.
378           first = rtsFalse;
379           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
380       } else {
381           // Yield the capability to higher-priority tasks if necessary.
382           yieldCapability(&cap, task);
383       }
384 #endif
385       
386 #if defined(THREADED_RTS)
387       schedulePushWork(cap,task);
388 #endif
389
390     // Check whether we have re-entered the RTS from Haskell without
391     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
392     // call).
393     if (cap->in_haskell) {
394           errorBelch("schedule: re-entered unsafely.\n"
395                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
396           stg_exit(EXIT_FAILURE);
397     }
398
399     // The interruption / shutdown sequence.
400     // 
401     // In order to cleanly shut down the runtime, we want to:
402     //   * make sure that all main threads return to their callers
403     //     with the state 'Interrupted'.
404     //   * clean up all OS threads assocated with the runtime
405     //   * free all memory etc.
406     //
407     // So the sequence for ^C goes like this:
408     //
409     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
410     //     arranges for some Capability to wake up
411     //
412     //   * all threads in the system are halted, and the zombies are
413     //     placed on the run queue for cleaning up.  We acquire all
414     //     the capabilities in order to delete the threads, this is
415     //     done by scheduleDoGC() for convenience (because GC already
416     //     needs to acquire all the capabilities).  We can't kill
417     //     threads involved in foreign calls.
418     // 
419     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
420     //
421     //   * sched_state := SCHED_SHUTTING_DOWN
422     //
423     //   * all workers exit when the run queue on their capability
424     //     drains.  All main threads will also exit when their TSO
425     //     reaches the head of the run queue and they can return.
426     //
427     //   * eventually all Capabilities will shut down, and the RTS can
428     //     exit.
429     //
430     //   * We might be left with threads blocked in foreign calls, 
431     //     we should really attempt to kill these somehow (TODO);
432     
433     switch (sched_state) {
434     case SCHED_RUNNING:
435         break;
436     case SCHED_INTERRUPTING:
437         IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
438 #if defined(THREADED_RTS)
439         discardSparksCap(cap);
440 #endif
441         /* scheduleDoGC() deletes all the threads */
442         cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
443         break;
444     case SCHED_SHUTTING_DOWN:
445         IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
446         // If we are a worker, just exit.  If we're a bound thread
447         // then we will exit below when we've removed our TSO from
448         // the run queue.
449         if (task->tso == NULL && emptyRunQueue(cap)) {
450             return cap;
451         }
452         break;
453     default:
454         barf("sched_state: %d", sched_state);
455     }
456
457 #if defined(THREADED_RTS)
458     // If the run queue is empty, take a spark and turn it into a thread.
459     {
460         if (emptyRunQueue(cap)) {
461             StgClosure *spark;
462             spark = findSpark(cap);
463             if (spark != NULL) {
464                 IF_DEBUG(scheduler,
465                          sched_belch("turning spark of closure %p into a thread",
466                                      (StgClosure *)spark));
467                 createSparkThread(cap,spark);     
468             }
469         }
470     }
471 #endif // THREADED_RTS
472
473     scheduleStartSignalHandlers(cap);
474
475     // Only check the black holes here if we've nothing else to do.
476     // During normal execution, the black hole list only gets checked
477     // at GC time, to avoid repeatedly traversing this possibly long
478     // list each time around the scheduler.
479     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
480
481     scheduleCheckWakeupThreads(cap);
482
483     scheduleCheckBlockedThreads(cap);
484
485     scheduleDetectDeadlock(cap,task);
486 #if defined(THREADED_RTS)
487     cap = task->cap;    // reload cap, it might have changed
488 #endif
489
490     // Normally, the only way we can get here with no threads to
491     // run is if a keyboard interrupt received during 
492     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
493     // Additionally, it is not fatal for the
494     // threaded RTS to reach here with no threads to run.
495     //
496     // win32: might be here due to awaitEvent() being abandoned
497     // as a result of a console event having been delivered.
498     if ( emptyRunQueue(cap) ) {
499 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
500         ASSERT(sched_state >= SCHED_INTERRUPTING);
501 #endif
502         continue; // nothing to do
503     }
504
505 #if defined(PARALLEL_HASKELL)
506     scheduleSendPendingMessages();
507     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
508         continue;
509
510 #if defined(SPARKS)
511     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
512 #endif
513
514     /* If we still have no work we need to send a FISH to get a spark
515        from another PE */
516     if (emptyRunQueue(cap)) {
517         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
518         ASSERT(rtsFalse); // should not happen at the moment
519     }
520     // from here: non-empty run queue.
521     //  TODO: merge above case with this, only one call processMessages() !
522     if (PacketsWaiting()) {  /* process incoming messages, if
523                                 any pending...  only in else
524                                 because getRemoteWork waits for
525                                 messages as well */
526         receivedFinish = processMessages();
527     }
528 #endif
529
530 #if defined(GRAN)
531     scheduleProcessEvent(event);
532 #endif
533
534     // 
535     // Get a thread to run
536     //
537     t = popRunQueue(cap);
538
539 #if defined(GRAN) || defined(PAR)
540     scheduleGranParReport(); // some kind of debuging output
541 #else
542     // Sanity check the thread we're about to run.  This can be
543     // expensive if there is lots of thread switching going on...
544     IF_DEBUG(sanity,checkTSO(t));
545 #endif
546
547 #if defined(THREADED_RTS)
548     // Check whether we can run this thread in the current task.
549     // If not, we have to pass our capability to the right task.
550     {
551         Task *bound = t->bound;
552       
553         if (bound) {
554             if (bound == task) {
555                 IF_DEBUG(scheduler,
556                          sched_belch("### Running thread %d in bound thread",
557                                      t->id));
558                 // yes, the Haskell thread is bound to the current native thread
559             } else {
560                 IF_DEBUG(scheduler,
561                          sched_belch("### thread %d bound to another OS thread",
562                                      t->id));
563                 // no, bound to a different Haskell thread: pass to that thread
564                 pushOnRunQueue(cap,t);
565                 continue;
566             }
567         } else {
568             // The thread we want to run is unbound.
569             if (task->tso) { 
570                 IF_DEBUG(scheduler,
571                          sched_belch("### this OS thread cannot run thread %d", t->id));
572                 // no, the current native thread is bound to a different
573                 // Haskell thread, so pass it to any worker thread
574                 pushOnRunQueue(cap,t);
575                 continue; 
576             }
577         }
578     }
579 #endif
580
581     cap->r.rCurrentTSO = t;
582     
583     /* context switches are initiated by the timer signal, unless
584      * the user specified "context switch as often as possible", with
585      * +RTS -C0
586      */
587     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
588         && !emptyThreadQueues(cap)) {
589         context_switch = 1;
590     }
591          
592 run_thread:
593
594     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
595                               (long)t->id, whatNext_strs[t->what_next]));
596
597 #if defined(PROFILING)
598     startHeapProfTimer();
599 #endif
600
601     // ----------------------------------------------------------------------
602     // Run the current thread 
603
604     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
605     ASSERT(t->cap == cap);
606
607     prev_what_next = t->what_next;
608
609     errno = t->saved_errno;
610     cap->in_haskell = rtsTrue;
611
612     dirtyTSO(t);
613
614     recent_activity = ACTIVITY_YES;
615
616     switch (prev_what_next) {
617         
618     case ThreadKilled:
619     case ThreadComplete:
620         /* Thread already finished, return to scheduler. */
621         ret = ThreadFinished;
622         break;
623         
624     case ThreadRunGHC:
625     {
626         StgRegTable *r;
627         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
628         cap = regTableToCapability(r);
629         ret = r->rRet;
630         break;
631     }
632     
633     case ThreadInterpret:
634         cap = interpretBCO(cap);
635         ret = cap->r.rRet;
636         break;
637         
638     default:
639         barf("schedule: invalid what_next field");
640     }
641
642     cap->in_haskell = rtsFalse;
643
644     // The TSO might have moved, eg. if it re-entered the RTS and a GC
645     // happened.  So find the new location:
646     t = cap->r.rCurrentTSO;
647
648     // We have run some Haskell code: there might be blackhole-blocked
649     // threads to wake up now.
650     // Lock-free test here should be ok, we're just setting a flag.
651     if ( blackhole_queue != END_TSO_QUEUE ) {
652         blackholes_need_checking = rtsTrue;
653     }
654     
655     // And save the current errno in this thread.
656     // XXX: possibly bogus for SMP because this thread might already
657     // be running again, see code below.
658     t->saved_errno = errno;
659
660 #if defined(THREADED_RTS)
661     // If ret is ThreadBlocked, and this Task is bound to the TSO that
662     // blocked, we are in limbo - the TSO is now owned by whatever it
663     // is blocked on, and may in fact already have been woken up,
664     // perhaps even on a different Capability.  It may be the case
665     // that task->cap != cap.  We better yield this Capability
666     // immediately and return to normaility.
667     if (ret == ThreadBlocked) {
668         IF_DEBUG(scheduler,
669                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
670                              t->id, whatNext_strs[t->what_next]));
671         continue;
672     }
673 #endif
674
675     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
676     ASSERT(t->cap == cap);
677
678     // ----------------------------------------------------------------------
679     
680     // Costs for the scheduler are assigned to CCS_SYSTEM
681 #if defined(PROFILING)
682     stopHeapProfTimer();
683     CCCS = CCS_SYSTEM;
684 #endif
685     
686 #if defined(THREADED_RTS)
687     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
688 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
689     IF_DEBUG(scheduler,debugBelch("sched: "););
690 #endif
691     
692     schedulePostRunThread();
693
694     ready_to_gc = rtsFalse;
695
696     switch (ret) {
697     case HeapOverflow:
698         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
699         break;
700
701     case StackOverflow:
702         scheduleHandleStackOverflow(cap,task,t);
703         break;
704
705     case ThreadYielding:
706         if (scheduleHandleYield(cap, t, prev_what_next)) {
707             // shortcut for switching between compiler/interpreter:
708             goto run_thread; 
709         }
710         break;
711
712     case ThreadBlocked:
713         scheduleHandleThreadBlocked(t);
714         break;
715
716     case ThreadFinished:
717         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
718         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
719         break;
720
721     default:
722       barf("schedule: invalid thread return code %d", (int)ret);
723     }
724
725     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
726     if (ready_to_gc) {
727       cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
728     }
729   } /* end of while() */
730
731   IF_PAR_DEBUG(verbose,
732                debugBelch("== Leaving schedule() after having received Finish\n"));
733 }
734
735 /* ----------------------------------------------------------------------------
736  * Setting up the scheduler loop
737  * ------------------------------------------------------------------------- */
738
739 static void
740 schedulePreLoop(void)
741 {
742 #if defined(GRAN) 
743     /* set up first event to get things going */
744     /* ToDo: assign costs for system setup and init MainTSO ! */
745     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
746               ContinueThread, 
747               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
748     
749     IF_DEBUG(gran,
750              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
751                         CurrentTSO);
752              G_TSO(CurrentTSO, 5));
753     
754     if (RtsFlags.GranFlags.Light) {
755         /* Save current time; GranSim Light only */
756         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
757     }      
758 #endif
759 }
760
761 /* -----------------------------------------------------------------------------
762  * schedulePushWork()
763  *
764  * Push work to other Capabilities if we have some.
765  * -------------------------------------------------------------------------- */
766
767 #if defined(THREADED_RTS)
768 static void
769 schedulePushWork(Capability *cap USED_IF_THREADS, 
770                  Task *task      USED_IF_THREADS)
771 {
772     Capability *free_caps[n_capabilities], *cap0;
773     nat i, n_free_caps;
774
775     // migration can be turned off with +RTS -qg
776     if (!RtsFlags.ParFlags.migrate) return;
777
778     // Check whether we have more threads on our run queue, or sparks
779     // in our pool, that we could hand to another Capability.
780     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
781         && sparkPoolSizeCap(cap) < 2) {
782         return;
783     }
784
785     // First grab as many free Capabilities as we can.
786     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
787         cap0 = &capabilities[i];
788         if (cap != cap0 && tryGrabCapability(cap0,task)) {
789             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
790                 // it already has some work, we just grabbed it at 
791                 // the wrong moment.  Or maybe it's deadlocked!
792                 releaseCapability(cap0);
793             } else {
794                 free_caps[n_free_caps++] = cap0;
795             }
796         }
797     }
798
799     // we now have n_free_caps free capabilities stashed in
800     // free_caps[].  Share our run queue equally with them.  This is
801     // probably the simplest thing we could do; improvements we might
802     // want to do include:
803     //
804     //   - giving high priority to moving relatively new threads, on 
805     //     the gournds that they haven't had time to build up a
806     //     working set in the cache on this CPU/Capability.
807     //
808     //   - giving low priority to moving long-lived threads
809
810     if (n_free_caps > 0) {
811         StgTSO *prev, *t, *next;
812         rtsBool pushed_to_all;
813
814         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
815
816         i = 0;
817         pushed_to_all = rtsFalse;
818
819         if (cap->run_queue_hd != END_TSO_QUEUE) {
820             prev = cap->run_queue_hd;
821             t = prev->link;
822             prev->link = END_TSO_QUEUE;
823             for (; t != END_TSO_QUEUE; t = next) {
824                 next = t->link;
825                 t->link = END_TSO_QUEUE;
826                 if (t->what_next == ThreadRelocated
827                     || t->bound == task // don't move my bound thread
828                     || tsoLocked(t)) {  // don't move a locked thread
829                     prev->link = t;
830                     prev = t;
831                 } else if (i == n_free_caps) {
832                     pushed_to_all = rtsTrue;
833                     i = 0;
834                     // keep one for us
835                     prev->link = t;
836                     prev = t;
837                 } else {
838                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
839                     appendToRunQueue(free_caps[i],t);
840                     if (t->bound) { t->bound->cap = free_caps[i]; }
841                     t->cap = free_caps[i];
842                     i++;
843                 }
844             }
845             cap->run_queue_tl = prev;
846         }
847
848         // If there are some free capabilities that we didn't push any
849         // threads to, then try to push a spark to each one.
850         if (!pushed_to_all) {
851             StgClosure *spark;
852             // i is the next free capability to push to
853             for (; i < n_free_caps; i++) {
854                 if (emptySparkPoolCap(free_caps[i])) {
855                     spark = findSpark(cap);
856                     if (spark != NULL) {
857                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
858                         newSpark(&(free_caps[i]->r), spark);
859                     }
860                 }
861             }
862         }
863
864         // release the capabilities
865         for (i = 0; i < n_free_caps; i++) {
866             task->cap = free_caps[i];
867             releaseCapability(free_caps[i]);
868         }
869     }
870     task->cap = cap; // reset to point to our Capability.
871 }
872 #endif
873
874 /* ----------------------------------------------------------------------------
875  * Start any pending signal handlers
876  * ------------------------------------------------------------------------- */
877
878 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
879 static void
880 scheduleStartSignalHandlers(Capability *cap)
881 {
882     if (signals_pending()) { // safe outside the lock
883         startSignalHandlers(cap);
884     }
885 }
886 #else
887 static void
888 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
889 {
890 }
891 #endif
892
893 /* ----------------------------------------------------------------------------
894  * Check for blocked threads that can be woken up.
895  * ------------------------------------------------------------------------- */
896
897 static void
898 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
899 {
900 #if !defined(THREADED_RTS)
901     //
902     // Check whether any waiting threads need to be woken up.  If the
903     // run queue is empty, and there are no other tasks running, we
904     // can wait indefinitely for something to happen.
905     //
906     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
907     {
908         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
909     }
910 #endif
911 }
912
913
914 /* ----------------------------------------------------------------------------
915  * Check for threads woken up by other Capabilities
916  * ------------------------------------------------------------------------- */
917
918 static void
919 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
920 {
921 #if defined(THREADED_RTS)
922     // Any threads that were woken up by other Capabilities get
923     // appended to our run queue.
924     if (!emptyWakeupQueue(cap)) {
925         ACQUIRE_LOCK(&cap->lock);
926         if (emptyRunQueue(cap)) {
927             cap->run_queue_hd = cap->wakeup_queue_hd;
928             cap->run_queue_tl = cap->wakeup_queue_tl;
929         } else {
930             cap->run_queue_tl->link = cap->wakeup_queue_hd;
931             cap->run_queue_tl = cap->wakeup_queue_tl;
932         }
933         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
934         RELEASE_LOCK(&cap->lock);
935     }
936 #endif
937 }
938
939 /* ----------------------------------------------------------------------------
940  * Check for threads blocked on BLACKHOLEs that can be woken up
941  * ------------------------------------------------------------------------- */
942 static void
943 scheduleCheckBlackHoles (Capability *cap)
944 {
945     if ( blackholes_need_checking ) // check without the lock first
946     {
947         ACQUIRE_LOCK(&sched_mutex);
948         if ( blackholes_need_checking ) {
949             checkBlackHoles(cap);
950             blackholes_need_checking = rtsFalse;
951         }
952         RELEASE_LOCK(&sched_mutex);
953     }
954 }
955
956 /* ----------------------------------------------------------------------------
957  * Detect deadlock conditions and attempt to resolve them.
958  * ------------------------------------------------------------------------- */
959
960 static void
961 scheduleDetectDeadlock (Capability *cap, Task *task)
962 {
963
964 #if defined(PARALLEL_HASKELL)
965     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
966     return;
967 #endif
968
969     /* 
970      * Detect deadlock: when we have no threads to run, there are no
971      * threads blocked, waiting for I/O, or sleeping, and all the
972      * other tasks are waiting for work, we must have a deadlock of
973      * some description.
974      */
975     if ( emptyThreadQueues(cap) )
976     {
977 #if defined(THREADED_RTS)
978         /* 
979          * In the threaded RTS, we only check for deadlock if there
980          * has been no activity in a complete timeslice.  This means
981          * we won't eagerly start a full GC just because we don't have
982          * any threads to run currently.
983          */
984         if (recent_activity != ACTIVITY_INACTIVE) return;
985 #endif
986
987         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
988
989         // Garbage collection can release some new threads due to
990         // either (a) finalizers or (b) threads resurrected because
991         // they are unreachable and will therefore be sent an
992         // exception.  Any threads thus released will be immediately
993         // runnable.
994         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
995
996         recent_activity = ACTIVITY_DONE_GC;
997         
998         if ( !emptyRunQueue(cap) ) return;
999
1000 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
1001         /* If we have user-installed signal handlers, then wait
1002          * for signals to arrive rather then bombing out with a
1003          * deadlock.
1004          */
1005         if ( anyUserHandlers() ) {
1006             IF_DEBUG(scheduler, 
1007                      sched_belch("still deadlocked, waiting for signals..."));
1008
1009             awaitUserSignals();
1010
1011             if (signals_pending()) {
1012                 startSignalHandlers(cap);
1013             }
1014
1015             // either we have threads to run, or we were interrupted:
1016             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1017         }
1018 #endif
1019
1020 #if !defined(THREADED_RTS)
1021         /* Probably a real deadlock.  Send the current main thread the
1022          * Deadlock exception.
1023          */
1024         if (task->tso) {
1025             switch (task->tso->why_blocked) {
1026             case BlockedOnSTM:
1027             case BlockedOnBlackHole:
1028             case BlockedOnException:
1029             case BlockedOnMVar:
1030                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
1031                 return;
1032             default:
1033                 barf("deadlock: main thread blocked in a strange way");
1034             }
1035         }
1036         return;
1037 #endif
1038     }
1039 }
1040
1041 /* ----------------------------------------------------------------------------
1042  * Process an event (GRAN only)
1043  * ------------------------------------------------------------------------- */
1044
1045 #if defined(GRAN)
1046 static StgTSO *
1047 scheduleProcessEvent(rtsEvent *event)
1048 {
1049     StgTSO *t;
1050
1051     if (RtsFlags.GranFlags.Light)
1052       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1053
1054     /* adjust time based on time-stamp */
1055     if (event->time > CurrentTime[CurrentProc] &&
1056         event->evttype != ContinueThread)
1057       CurrentTime[CurrentProc] = event->time;
1058     
1059     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1060     if (!RtsFlags.GranFlags.Light)
1061       handleIdlePEs();
1062
1063     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1064
1065     /* main event dispatcher in GranSim */
1066     switch (event->evttype) {
1067       /* Should just be continuing execution */
1068     case ContinueThread:
1069       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1070       /* ToDo: check assertion
1071       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1072              run_queue_hd != END_TSO_QUEUE);
1073       */
1074       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1075       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1076           procStatus[CurrentProc]==Fetching) {
1077         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1078               CurrentTSO->id, CurrentTSO, CurrentProc);
1079         goto next_thread;
1080       } 
1081       /* Ignore ContinueThreads for completed threads */
1082       if (CurrentTSO->what_next == ThreadComplete) {
1083         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1084               CurrentTSO->id, CurrentTSO, CurrentProc);
1085         goto next_thread;
1086       } 
1087       /* Ignore ContinueThreads for threads that are being migrated */
1088       if (PROCS(CurrentTSO)==Nowhere) { 
1089         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1090               CurrentTSO->id, CurrentTSO, CurrentProc);
1091         goto next_thread;
1092       }
1093       /* The thread should be at the beginning of the run queue */
1094       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1095         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1096               CurrentTSO->id, CurrentTSO, CurrentProc);
1097         break; // run the thread anyway
1098       }
1099       /*
1100       new_event(proc, proc, CurrentTime[proc],
1101                 FindWork,
1102                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1103       goto next_thread; 
1104       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1105       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1106
1107     case FetchNode:
1108       do_the_fetchnode(event);
1109       goto next_thread;             /* handle next event in event queue  */
1110       
1111     case GlobalBlock:
1112       do_the_globalblock(event);
1113       goto next_thread;             /* handle next event in event queue  */
1114       
1115     case FetchReply:
1116       do_the_fetchreply(event);
1117       goto next_thread;             /* handle next event in event queue  */
1118       
1119     case UnblockThread:   /* Move from the blocked queue to the tail of */
1120       do_the_unblock(event);
1121       goto next_thread;             /* handle next event in event queue  */
1122       
1123     case ResumeThread:  /* Move from the blocked queue to the tail of */
1124       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1125       event->tso->gran.blocktime += 
1126         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1127       do_the_startthread(event);
1128       goto next_thread;             /* handle next event in event queue  */
1129       
1130     case StartThread:
1131       do_the_startthread(event);
1132       goto next_thread;             /* handle next event in event queue  */
1133       
1134     case MoveThread:
1135       do_the_movethread(event);
1136       goto next_thread;             /* handle next event in event queue  */
1137       
1138     case MoveSpark:
1139       do_the_movespark(event);
1140       goto next_thread;             /* handle next event in event queue  */
1141       
1142     case FindWork:
1143       do_the_findwork(event);
1144       goto next_thread;             /* handle next event in event queue  */
1145       
1146     default:
1147       barf("Illegal event type %u\n", event->evttype);
1148     }  /* switch */
1149     
1150     /* This point was scheduler_loop in the old RTS */
1151
1152     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1153
1154     TimeOfLastEvent = CurrentTime[CurrentProc];
1155     TimeOfNextEvent = get_time_of_next_event();
1156     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1157     // CurrentTSO = ThreadQueueHd;
1158
1159     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1160                          TimeOfNextEvent));
1161
1162     if (RtsFlags.GranFlags.Light) 
1163       GranSimLight_leave_system(event, &ActiveTSO); 
1164
1165     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1166
1167     IF_DEBUG(gran, 
1168              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1169
1170     /* in a GranSim setup the TSO stays on the run queue */
1171     t = CurrentTSO;
1172     /* Take a thread from the run queue. */
1173     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1174
1175     IF_DEBUG(gran, 
1176              debugBelch("GRAN: About to run current thread, which is\n");
1177              G_TSO(t,5));
1178
1179     context_switch = 0; // turned on via GranYield, checking events and time slice
1180
1181     IF_DEBUG(gran, 
1182              DumpGranEvent(GR_SCHEDULE, t));
1183
1184     procStatus[CurrentProc] = Busy;
1185 }
1186 #endif // GRAN
1187
1188 /* ----------------------------------------------------------------------------
1189  * Send pending messages (PARALLEL_HASKELL only)
1190  * ------------------------------------------------------------------------- */
1191
1192 #if defined(PARALLEL_HASKELL)
1193 static StgTSO *
1194 scheduleSendPendingMessages(void)
1195 {
1196     StgSparkPool *pool;
1197     rtsSpark spark;
1198     StgTSO *t;
1199
1200 # if defined(PAR) // global Mem.Mgmt., omit for now
1201     if (PendingFetches != END_BF_QUEUE) {
1202         processFetches();
1203     }
1204 # endif
1205     
1206     if (RtsFlags.ParFlags.BufferTime) {
1207         // if we use message buffering, we must send away all message
1208         // packets which have become too old...
1209         sendOldBuffers(); 
1210     }
1211 }
1212 #endif
1213
1214 /* ----------------------------------------------------------------------------
1215  * Activate spark threads (PARALLEL_HASKELL only)
1216  * ------------------------------------------------------------------------- */
1217
1218 #if defined(PARALLEL_HASKELL)
1219 static void
1220 scheduleActivateSpark(void)
1221 {
1222 #if defined(SPARKS)
1223   ASSERT(emptyRunQueue());
1224 /* We get here if the run queue is empty and want some work.
1225    We try to turn a spark into a thread, and add it to the run queue,
1226    from where it will be picked up in the next iteration of the scheduler
1227    loop.
1228 */
1229
1230       /* :-[  no local threads => look out for local sparks */
1231       /* the spark pool for the current PE */
1232       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1233       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1234           pool->hd < pool->tl) {
1235         /* 
1236          * ToDo: add GC code check that we really have enough heap afterwards!!
1237          * Old comment:
1238          * If we're here (no runnable threads) and we have pending
1239          * sparks, we must have a space problem.  Get enough space
1240          * to turn one of those pending sparks into a
1241          * thread... 
1242          */
1243
1244         spark = findSpark(rtsFalse);            /* get a spark */
1245         if (spark != (rtsSpark) NULL) {
1246           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1247           IF_PAR_DEBUG(fish, // schedule,
1248                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1249                              tso->id, tso, advisory_thread_count));
1250
1251           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1252             IF_PAR_DEBUG(fish, // schedule,
1253                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1254                             spark));
1255             return rtsFalse; /* failed to generate a thread */
1256           }                  /* otherwise fall through & pick-up new tso */
1257         } else {
1258           IF_PAR_DEBUG(fish, // schedule,
1259                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1260                              spark_queue_len(pool)));
1261           return rtsFalse;  /* failed to generate a thread */
1262         }
1263         return rtsTrue;  /* success in generating a thread */
1264   } else { /* no more threads permitted or pool empty */
1265     return rtsFalse;  /* failed to generateThread */
1266   }
1267 #else
1268   tso = NULL; // avoid compiler warning only
1269   return rtsFalse;  /* dummy in non-PAR setup */
1270 #endif // SPARKS
1271 }
1272 #endif // PARALLEL_HASKELL
1273
1274 /* ----------------------------------------------------------------------------
1275  * Get work from a remote node (PARALLEL_HASKELL only)
1276  * ------------------------------------------------------------------------- */
1277     
1278 #if defined(PARALLEL_HASKELL)
1279 static rtsBool
1280 scheduleGetRemoteWork(rtsBool *receivedFinish)
1281 {
1282   ASSERT(emptyRunQueue());
1283
1284   if (RtsFlags.ParFlags.BufferTime) {
1285         IF_PAR_DEBUG(verbose, 
1286                 debugBelch("...send all pending data,"));
1287         {
1288           nat i;
1289           for (i=1; i<=nPEs; i++)
1290             sendImmediately(i); // send all messages away immediately
1291         }
1292   }
1293 # ifndef SPARKS
1294         //++EDEN++ idle() , i.e. send all buffers, wait for work
1295         // suppress fishing in EDEN... just look for incoming messages
1296         // (blocking receive)
1297   IF_PAR_DEBUG(verbose, 
1298                debugBelch("...wait for incoming messages...\n"));
1299   *receivedFinish = processMessages(); // blocking receive...
1300
1301         // and reenter scheduling loop after having received something
1302         // (return rtsFalse below)
1303
1304 # else /* activate SPARKS machinery */
1305 /* We get here, if we have no work, tried to activate a local spark, but still
1306    have no work. We try to get a remote spark, by sending a FISH message.
1307    Thread migration should be added here, and triggered when a sequence of 
1308    fishes returns without work. */
1309         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1310
1311       /* =8-[  no local sparks => look for work on other PEs */
1312         /*
1313          * We really have absolutely no work.  Send out a fish
1314          * (there may be some out there already), and wait for
1315          * something to arrive.  We clearly can't run any threads
1316          * until a SCHEDULE or RESUME arrives, and so that's what
1317          * we're hoping to see.  (Of course, we still have to
1318          * respond to other types of messages.)
1319          */
1320         rtsTime now = msTime() /*CURRENT_TIME*/;
1321         IF_PAR_DEBUG(verbose, 
1322                      debugBelch("--  now=%ld\n", now));
1323         IF_PAR_DEBUG(fish, // verbose,
1324              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1325                  (last_fish_arrived_at!=0 &&
1326                   last_fish_arrived_at+delay > now)) {
1327                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1328                      now, last_fish_arrived_at+delay, 
1329                      last_fish_arrived_at,
1330                      delay);
1331              });
1332   
1333         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1334             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1335           if (last_fish_arrived_at==0 ||
1336               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1337             /* outstandingFishes is set in sendFish, processFish;
1338                avoid flooding system with fishes via delay */
1339     next_fish_to_send_at = 0;  
1340   } else {
1341     /* ToDo: this should be done in the main scheduling loop to avoid the
1342              busy wait here; not so bad if fish delay is very small  */
1343     int iq = 0; // DEBUGGING -- HWL
1344     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1345     /* send a fish when ready, but process messages that arrive in the meantime */
1346     do {
1347       if (PacketsWaiting()) {
1348         iq++; // DEBUGGING
1349         *receivedFinish = processMessages();
1350       }
1351       now = msTime();
1352     } while (!*receivedFinish || now<next_fish_to_send_at);
1353     // JB: This means the fish could become obsolete, if we receive
1354     // work. Better check for work again? 
1355     // last line: while (!receivedFinish || !haveWork || now<...)
1356     // next line: if (receivedFinish || haveWork )
1357
1358     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1359       return rtsFalse;  // NB: this will leave scheduler loop
1360                         // immediately after return!
1361                           
1362     IF_PAR_DEBUG(fish, // verbose,
1363                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1364
1365   }
1366
1367     // JB: IMHO, this should all be hidden inside sendFish(...)
1368     /* pe = choosePE(); 
1369        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1370                 NEW_FISH_HUNGER);
1371
1372     // Global statistics: count no. of fishes
1373     if (RtsFlags.ParFlags.ParStats.Global &&
1374          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1375            globalParStats.tot_fish_mess++;
1376            }
1377     */ 
1378
1379   /* delayed fishes must have been sent by now! */
1380   next_fish_to_send_at = 0;  
1381   }
1382       
1383   *receivedFinish = processMessages();
1384 # endif /* SPARKS */
1385
1386  return rtsFalse;
1387  /* NB: this function always returns rtsFalse, meaning the scheduler
1388     loop continues with the next iteration; 
1389     rationale: 
1390       return code means success in finding work; we enter this function
1391       if there is no local work, thus have to send a fish which takes
1392       time until it arrives with work; in the meantime we should process
1393       messages in the main loop;
1394  */
1395 }
1396 #endif // PARALLEL_HASKELL
1397
1398 /* ----------------------------------------------------------------------------
1399  * PAR/GRAN: Report stats & debugging info(?)
1400  * ------------------------------------------------------------------------- */
1401
1402 #if defined(PAR) || defined(GRAN)
1403 static void
1404 scheduleGranParReport(void)
1405 {
1406   ASSERT(run_queue_hd != END_TSO_QUEUE);
1407
1408   /* Take a thread from the run queue, if we have work */
1409   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1410
1411     /* If this TSO has got its outport closed in the meantime, 
1412      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1413      * It has to be marked as TH_DEAD for this purpose.
1414      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1415
1416 JB: TODO: investigate wether state change field could be nuked
1417      entirely and replaced by the normal tso state (whatnext
1418      field). All we want to do is to kill tsos from outside.
1419      */
1420
1421     /* ToDo: write something to the log-file
1422     if (RTSflags.ParFlags.granSimStats && !sameThread)
1423         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1424
1425     CurrentTSO = t;
1426     */
1427     /* the spark pool for the current PE */
1428     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1429
1430     IF_DEBUG(scheduler, 
1431              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1432                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1433
1434     IF_PAR_DEBUG(fish,
1435              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1436                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1437
1438     if (RtsFlags.ParFlags.ParStats.Full && 
1439         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1440         (emitSchedule || // forced emit
1441          (t && LastTSO && t->id != LastTSO->id))) {
1442       /* 
1443          we are running a different TSO, so write a schedule event to log file
1444          NB: If we use fair scheduling we also have to write  a deschedule 
1445              event for LastTSO; with unfair scheduling we know that the
1446              previous tso has blocked whenever we switch to another tso, so
1447              we don't need it in GUM for now
1448       */
1449       IF_PAR_DEBUG(fish, // schedule,
1450                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1451
1452       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1453                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1454       emitSchedule = rtsFalse;
1455     }
1456 }     
1457 #endif
1458
1459 /* ----------------------------------------------------------------------------
1460  * After running a thread...
1461  * ------------------------------------------------------------------------- */
1462
1463 static void
1464 schedulePostRunThread(void)
1465 {
1466 #if defined(PAR)
1467     /* HACK 675: if the last thread didn't yield, make sure to print a 
1468        SCHEDULE event to the log file when StgRunning the next thread, even
1469        if it is the same one as before */
1470     LastTSO = t; 
1471     TimeOfLastYield = CURRENT_TIME;
1472 #endif
1473
1474   /* some statistics gathering in the parallel case */
1475
1476 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1477   switch (ret) {
1478     case HeapOverflow:
1479 # if defined(GRAN)
1480       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1481       globalGranStats.tot_heapover++;
1482 # elif defined(PAR)
1483       globalParStats.tot_heapover++;
1484 # endif
1485       break;
1486
1487      case StackOverflow:
1488 # if defined(GRAN)
1489       IF_DEBUG(gran, 
1490                DumpGranEvent(GR_DESCHEDULE, t));
1491       globalGranStats.tot_stackover++;
1492 # elif defined(PAR)
1493       // IF_DEBUG(par, 
1494       // DumpGranEvent(GR_DESCHEDULE, t);
1495       globalParStats.tot_stackover++;
1496 # endif
1497       break;
1498
1499     case ThreadYielding:
1500 # if defined(GRAN)
1501       IF_DEBUG(gran, 
1502                DumpGranEvent(GR_DESCHEDULE, t));
1503       globalGranStats.tot_yields++;
1504 # elif defined(PAR)
1505       // IF_DEBUG(par, 
1506       // DumpGranEvent(GR_DESCHEDULE, t);
1507       globalParStats.tot_yields++;
1508 # endif
1509       break; 
1510
1511     case ThreadBlocked:
1512 # if defined(GRAN)
1513       IF_DEBUG(scheduler,
1514                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1515                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1516                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1517                if (t->block_info.closure!=(StgClosure*)NULL)
1518                  print_bq(t->block_info.closure);
1519                debugBelch("\n"));
1520
1521       // ??? needed; should emit block before
1522       IF_DEBUG(gran, 
1523                DumpGranEvent(GR_DESCHEDULE, t)); 
1524       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1525       /*
1526         ngoq Dogh!
1527       ASSERT(procStatus[CurrentProc]==Busy || 
1528               ((procStatus[CurrentProc]==Fetching) && 
1529               (t->block_info.closure!=(StgClosure*)NULL)));
1530       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1531           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1532             procStatus[CurrentProc]==Fetching)) 
1533         procStatus[CurrentProc] = Idle;
1534       */
1535 # elif defined(PAR)
1536 //++PAR++  blockThread() writes the event (change?)
1537 # endif
1538     break;
1539
1540   case ThreadFinished:
1541     break;
1542
1543   default:
1544     barf("parGlobalStats: unknown return code");
1545     break;
1546     }
1547 #endif
1548 }
1549
1550 /* -----------------------------------------------------------------------------
1551  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1552  * -------------------------------------------------------------------------- */
1553
1554 static rtsBool
1555 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1556 {
1557     // did the task ask for a large block?
1558     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1559         // if so, get one and push it on the front of the nursery.
1560         bdescr *bd;
1561         lnat blocks;
1562         
1563         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1564         
1565         IF_DEBUG(scheduler,
1566                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1567                             (long)t->id, whatNext_strs[t->what_next], blocks));
1568         
1569         // don't do this if the nursery is (nearly) full, we'll GC first.
1570         if (cap->r.rCurrentNursery->link != NULL ||
1571             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1572                                                // if the nursery has only one block.
1573             
1574             ACQUIRE_SM_LOCK
1575             bd = allocGroup( blocks );
1576             RELEASE_SM_LOCK
1577             cap->r.rNursery->n_blocks += blocks;
1578             
1579             // link the new group into the list
1580             bd->link = cap->r.rCurrentNursery;
1581             bd->u.back = cap->r.rCurrentNursery->u.back;
1582             if (cap->r.rCurrentNursery->u.back != NULL) {
1583                 cap->r.rCurrentNursery->u.back->link = bd;
1584             } else {
1585 #if !defined(THREADED_RTS)
1586                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1587                        g0s0 == cap->r.rNursery);
1588 #endif
1589                 cap->r.rNursery->blocks = bd;
1590             }             
1591             cap->r.rCurrentNursery->u.back = bd;
1592             
1593             // initialise it as a nursery block.  We initialise the
1594             // step, gen_no, and flags field of *every* sub-block in
1595             // this large block, because this is easier than making
1596             // sure that we always find the block head of a large
1597             // block whenever we call Bdescr() (eg. evacuate() and
1598             // isAlive() in the GC would both have to do this, at
1599             // least).
1600             { 
1601                 bdescr *x;
1602                 for (x = bd; x < bd + blocks; x++) {
1603                     x->step = cap->r.rNursery;
1604                     x->gen_no = 0;
1605                     x->flags = 0;
1606                 }
1607             }
1608             
1609             // This assert can be a killer if the app is doing lots
1610             // of large block allocations.
1611             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1612             
1613             // now update the nursery to point to the new block
1614             cap->r.rCurrentNursery = bd;
1615             
1616             // we might be unlucky and have another thread get on the
1617             // run queue before us and steal the large block, but in that
1618             // case the thread will just end up requesting another large
1619             // block.
1620             pushOnRunQueue(cap,t);
1621             return rtsFalse;  /* not actually GC'ing */
1622         }
1623     }
1624     
1625     IF_DEBUG(scheduler,
1626              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1627                         (long)t->id, whatNext_strs[t->what_next]));
1628 #if defined(GRAN)
1629     ASSERT(!is_on_queue(t,CurrentProc));
1630 #elif defined(PARALLEL_HASKELL)
1631     /* Currently we emit a DESCHEDULE event before GC in GUM.
1632        ToDo: either add separate event to distinguish SYSTEM time from rest
1633        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1634     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1635         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1636                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1637         emitSchedule = rtsTrue;
1638     }
1639 #endif
1640       
1641     pushOnRunQueue(cap,t);
1642     return rtsTrue;
1643     /* actual GC is done at the end of the while loop in schedule() */
1644 }
1645
1646 /* -----------------------------------------------------------------------------
1647  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1648  * -------------------------------------------------------------------------- */
1649
1650 static void
1651 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1652 {
1653     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1654                                   (long)t->id, whatNext_strs[t->what_next]));
1655     /* just adjust the stack for this thread, then pop it back
1656      * on the run queue.
1657      */
1658     { 
1659         /* enlarge the stack */
1660         StgTSO *new_t = threadStackOverflow(cap, t);
1661         
1662         /* The TSO attached to this Task may have moved, so update the
1663          * pointer to it.
1664          */
1665         if (task->tso == t) {
1666             task->tso = new_t;
1667         }
1668         pushOnRunQueue(cap,new_t);
1669     }
1670 }
1671
1672 /* -----------------------------------------------------------------------------
1673  * Handle a thread that returned to the scheduler with ThreadYielding
1674  * -------------------------------------------------------------------------- */
1675
1676 static rtsBool
1677 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1678 {
1679     // Reset the context switch flag.  We don't do this just before
1680     // running the thread, because that would mean we would lose ticks
1681     // during GC, which can lead to unfair scheduling (a thread hogs
1682     // the CPU because the tick always arrives during GC).  This way
1683     // penalises threads that do a lot of allocation, but that seems
1684     // better than the alternative.
1685     context_switch = 0;
1686     
1687     /* put the thread back on the run queue.  Then, if we're ready to
1688      * GC, check whether this is the last task to stop.  If so, wake
1689      * up the GC thread.  getThread will block during a GC until the
1690      * GC is finished.
1691      */
1692     IF_DEBUG(scheduler,
1693              if (t->what_next != prev_what_next) {
1694                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1695                             (long)t->id, whatNext_strs[t->what_next]);
1696              } else {
1697                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1698                             (long)t->id, whatNext_strs[t->what_next]);
1699              }
1700         );
1701     
1702     IF_DEBUG(sanity,
1703              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1704              checkTSO(t));
1705     ASSERT(t->link == END_TSO_QUEUE);
1706     
1707     // Shortcut if we're just switching evaluators: don't bother
1708     // doing stack squeezing (which can be expensive), just run the
1709     // thread.
1710     if (t->what_next != prev_what_next) {
1711         return rtsTrue;
1712     }
1713     
1714 #if defined(GRAN)
1715     ASSERT(!is_on_queue(t,CurrentProc));
1716       
1717     IF_DEBUG(sanity,
1718              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1719              checkThreadQsSanity(rtsTrue));
1720
1721 #endif
1722
1723     addToRunQueue(cap,t);
1724
1725 #if defined(GRAN)
1726     /* add a ContinueThread event to actually process the thread */
1727     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1728               ContinueThread,
1729               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1730     IF_GRAN_DEBUG(bq, 
1731                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1732                   G_EVENTQ(0);
1733                   G_CURR_THREADQ(0));
1734 #endif
1735     return rtsFalse;
1736 }
1737
1738 /* -----------------------------------------------------------------------------
1739  * Handle a thread that returned to the scheduler with ThreadBlocked
1740  * -------------------------------------------------------------------------- */
1741
1742 static void
1743 scheduleHandleThreadBlocked( StgTSO *t
1744 #if !defined(GRAN) && !defined(DEBUG)
1745     STG_UNUSED
1746 #endif
1747     )
1748 {
1749 #if defined(GRAN)
1750     IF_DEBUG(scheduler,
1751              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1752                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1753              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1754     
1755     // ??? needed; should emit block before
1756     IF_DEBUG(gran, 
1757              DumpGranEvent(GR_DESCHEDULE, t)); 
1758     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1759     /*
1760       ngoq Dogh!
1761       ASSERT(procStatus[CurrentProc]==Busy || 
1762       ((procStatus[CurrentProc]==Fetching) && 
1763       (t->block_info.closure!=(StgClosure*)NULL)));
1764       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1765       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1766       procStatus[CurrentProc]==Fetching)) 
1767       procStatus[CurrentProc] = Idle;
1768     */
1769 #elif defined(PAR)
1770     IF_DEBUG(scheduler,
1771              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1772                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1773     IF_PAR_DEBUG(bq,
1774                  
1775                  if (t->block_info.closure!=(StgClosure*)NULL) 
1776                  print_bq(t->block_info.closure));
1777     
1778     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1779     blockThread(t);
1780     
1781     /* whatever we schedule next, we must log that schedule */
1782     emitSchedule = rtsTrue;
1783     
1784 #else /* !GRAN */
1785
1786       // We don't need to do anything.  The thread is blocked, and it
1787       // has tidied up its stack and placed itself on whatever queue
1788       // it needs to be on.
1789
1790 #if !defined(THREADED_RTS)
1791     ASSERT(t->why_blocked != NotBlocked);
1792              // This might not be true under THREADED_RTS: we don't have
1793              // exclusive access to this TSO, so someone might have
1794              // woken it up by now.  This actually happens: try
1795              // conc023 +RTS -N2.
1796 #endif
1797
1798     IF_DEBUG(scheduler,
1799              debugBelch("--<< thread %d (%s) stopped: ", 
1800                         t->id, whatNext_strs[t->what_next]);
1801              printThreadBlockage(t);
1802              debugBelch("\n"));
1803     
1804     /* Only for dumping event to log file 
1805        ToDo: do I need this in GranSim, too?
1806        blockThread(t);
1807     */
1808 #endif
1809 }
1810
1811 /* -----------------------------------------------------------------------------
1812  * Handle a thread that returned to the scheduler with ThreadFinished
1813  * -------------------------------------------------------------------------- */
1814
1815 static rtsBool
1816 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1817 {
1818     /* Need to check whether this was a main thread, and if so,
1819      * return with the return value.
1820      *
1821      * We also end up here if the thread kills itself with an
1822      * uncaught exception, see Exception.cmm.
1823      */
1824     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1825                                   t->id, whatNext_strs[t->what_next]));
1826
1827 #if defined(GRAN)
1828       endThread(t, CurrentProc); // clean-up the thread
1829 #elif defined(PARALLEL_HASKELL)
1830       /* For now all are advisory -- HWL */
1831       //if(t->priority==AdvisoryPriority) ??
1832       advisory_thread_count--; // JB: Caution with this counter, buggy!
1833       
1834 # if defined(DIST)
1835       if(t->dist.priority==RevalPriority)
1836         FinishReval(t);
1837 # endif
1838     
1839 # if defined(EDENOLD)
1840       // the thread could still have an outport... (BUG)
1841       if (t->eden.outport != -1) {
1842       // delete the outport for the tso which has finished...
1843         IF_PAR_DEBUG(eden_ports,
1844                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1845                               t->eden.outport, t->id));
1846         deleteOPT(t);
1847       }
1848       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1849       if (t->eden.epid != -1) {
1850         IF_PAR_DEBUG(eden_ports,
1851                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1852                            t->id, t->eden.epid));
1853         removeTSOfromProcess(t);
1854       }
1855 # endif 
1856
1857 # if defined(PAR)
1858       if (RtsFlags.ParFlags.ParStats.Full &&
1859           !RtsFlags.ParFlags.ParStats.Suppressed) 
1860         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1861
1862       //  t->par only contains statistics: left out for now...
1863       IF_PAR_DEBUG(fish,
1864                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1865                               t->id,t,t->par.sparkname));
1866 # endif
1867 #endif // PARALLEL_HASKELL
1868
1869       //
1870       // Check whether the thread that just completed was a bound
1871       // thread, and if so return with the result.  
1872       //
1873       // There is an assumption here that all thread completion goes
1874       // through this point; we need to make sure that if a thread
1875       // ends up in the ThreadKilled state, that it stays on the run
1876       // queue so it can be dealt with here.
1877       //
1878
1879       if (t->bound) {
1880
1881           if (t->bound != task) {
1882 #if !defined(THREADED_RTS)
1883               // Must be a bound thread that is not the topmost one.  Leave
1884               // it on the run queue until the stack has unwound to the
1885               // point where we can deal with this.  Leaving it on the run
1886               // queue also ensures that the garbage collector knows about
1887               // this thread and its return value (it gets dropped from the
1888               // all_threads list so there's no other way to find it).
1889               appendToRunQueue(cap,t);
1890               return rtsFalse;
1891 #else
1892               // this cannot happen in the threaded RTS, because a
1893               // bound thread can only be run by the appropriate Task.
1894               barf("finished bound thread that isn't mine");
1895 #endif
1896           }
1897
1898           ASSERT(task->tso == t);
1899
1900           if (t->what_next == ThreadComplete) {
1901               if (task->ret) {
1902                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1903                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1904               }
1905               task->stat = Success;
1906           } else {
1907               if (task->ret) {
1908                   *(task->ret) = NULL;
1909               }
1910               if (sched_state >= SCHED_INTERRUPTING) {
1911                   task->stat = Interrupted;
1912               } else {
1913                   task->stat = Killed;
1914               }
1915           }
1916 #ifdef DEBUG
1917           removeThreadLabel((StgWord)task->tso->id);
1918 #endif
1919           return rtsTrue; // tells schedule() to return
1920       }
1921
1922       return rtsFalse;
1923 }
1924
1925 /* -----------------------------------------------------------------------------
1926  * Perform a heap census, if PROFILING
1927  * -------------------------------------------------------------------------- */
1928
1929 static rtsBool
1930 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1931 {
1932 #if defined(PROFILING)
1933     // When we have +RTS -i0 and we're heap profiling, do a census at
1934     // every GC.  This lets us get repeatable runs for debugging.
1935     if (performHeapProfile ||
1936         (RtsFlags.ProfFlags.profileInterval==0 &&
1937          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1938
1939         // checking black holes is necessary before GC, otherwise
1940         // there may be threads that are unreachable except by the
1941         // blackhole queue, which the GC will consider to be
1942         // deadlocked.
1943         scheduleCheckBlackHoles(&MainCapability);
1944
1945         IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
1946         GarbageCollect(GetRoots, rtsTrue);
1947
1948         IF_DEBUG(scheduler, sched_belch("performing heap census"));
1949         heapCensus();
1950
1951         performHeapProfile = rtsFalse;
1952         return rtsTrue;  // true <=> we already GC'd
1953     }
1954 #endif
1955     return rtsFalse;
1956 }
1957
1958 /* -----------------------------------------------------------------------------
1959  * Perform a garbage collection if necessary
1960  * -------------------------------------------------------------------------- */
1961
1962 static Capability *
1963 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1964               rtsBool force_major, void (*get_roots)(evac_fn))
1965 {
1966     StgTSO *t;
1967 #ifdef THREADED_RTS
1968     static volatile StgWord waiting_for_gc;
1969     rtsBool was_waiting;
1970     nat i;
1971 #endif
1972
1973 #ifdef THREADED_RTS
1974     // In order to GC, there must be no threads running Haskell code.
1975     // Therefore, the GC thread needs to hold *all* the capabilities,
1976     // and release them after the GC has completed.  
1977     //
1978     // This seems to be the simplest way: previous attempts involved
1979     // making all the threads with capabilities give up their
1980     // capabilities and sleep except for the *last* one, which
1981     // actually did the GC.  But it's quite hard to arrange for all
1982     // the other tasks to sleep and stay asleep.
1983     //
1984         
1985     was_waiting = cas(&waiting_for_gc, 0, 1);
1986     if (was_waiting) {
1987         do {
1988             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1989             if (cap) yieldCapability(&cap,task);
1990         } while (waiting_for_gc);
1991         return cap;  // NOTE: task->cap might have changed here
1992     }
1993
1994     for (i=0; i < n_capabilities; i++) {
1995         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1996         if (cap != &capabilities[i]) {
1997             Capability *pcap = &capabilities[i];
1998             // we better hope this task doesn't get migrated to
1999             // another Capability while we're waiting for this one.
2000             // It won't, because load balancing happens while we have
2001             // all the Capabilities, but even so it's a slightly
2002             // unsavoury invariant.
2003             task->cap = pcap;
2004             context_switch = 1;
2005             waitForReturnCapability(&pcap, task);
2006             if (pcap != &capabilities[i]) {
2007                 barf("scheduleDoGC: got the wrong capability");
2008             }
2009         }
2010     }
2011
2012     waiting_for_gc = rtsFalse;
2013 #endif
2014
2015     /* Kick any transactions which are invalid back to their
2016      * atomically frames.  When next scheduled they will try to
2017      * commit, this commit will fail and they will retry.
2018      */
2019     { 
2020         StgTSO *next;
2021
2022         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2023             if (t->what_next == ThreadRelocated) {
2024                 next = t->link;
2025             } else {
2026                 next = t->global_link;
2027                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2028                     if (!stmValidateNestOfTransactions (t -> trec)) {
2029                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
2030                         
2031                         // strip the stack back to the
2032                         // ATOMICALLY_FRAME, aborting the (nested)
2033                         // transaction, and saving the stack of any
2034                         // partially-evaluated thunks on the heap.
2035                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
2036                         
2037 #ifdef REG_R1
2038                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2039 #endif
2040                     }
2041                 }
2042             }
2043         }
2044     }
2045     
2046     // so this happens periodically:
2047     if (cap) scheduleCheckBlackHoles(cap);
2048     
2049     IF_DEBUG(scheduler, printAllThreads());
2050
2051     /*
2052      * We now have all the capabilities; if we're in an interrupting
2053      * state, then we should take the opportunity to delete all the
2054      * threads in the system.
2055      */
2056     if (sched_state >= SCHED_INTERRUPTING) {
2057         deleteAllThreads(&capabilities[0]);
2058         sched_state = SCHED_SHUTTING_DOWN;
2059     }
2060
2061     /* everybody back, start the GC.
2062      * Could do it in this thread, or signal a condition var
2063      * to do it in another thread.  Either way, we need to
2064      * broadcast on gc_pending_cond afterward.
2065      */
2066 #if defined(THREADED_RTS)
2067     IF_DEBUG(scheduler,sched_belch("doing GC"));
2068 #endif
2069     GarbageCollect(get_roots, force_major);
2070     
2071 #if defined(THREADED_RTS)
2072     // release our stash of capabilities.
2073     for (i = 0; i < n_capabilities; i++) {
2074         if (cap != &capabilities[i]) {
2075             task->cap = &capabilities[i];
2076             releaseCapability(&capabilities[i]);
2077         }
2078     }
2079     if (cap) {
2080         task->cap = cap;
2081     } else {
2082         task->cap = NULL;
2083     }
2084 #endif
2085
2086 #if defined(GRAN)
2087     /* add a ContinueThread event to continue execution of current thread */
2088     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2089               ContinueThread,
2090               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2091     IF_GRAN_DEBUG(bq, 
2092                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2093                   G_EVENTQ(0);
2094                   G_CURR_THREADQ(0));
2095 #endif /* GRAN */
2096
2097     return cap;
2098 }
2099
2100 /* ---------------------------------------------------------------------------
2101  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2102  * used by Control.Concurrent for error checking.
2103  * ------------------------------------------------------------------------- */
2104  
2105 StgBool
2106 rtsSupportsBoundThreads(void)
2107 {
2108 #if defined(THREADED_RTS)
2109   return rtsTrue;
2110 #else
2111   return rtsFalse;
2112 #endif
2113 }
2114
2115 /* ---------------------------------------------------------------------------
2116  * isThreadBound(tso): check whether tso is bound to an OS thread.
2117  * ------------------------------------------------------------------------- */
2118  
2119 StgBool
2120 isThreadBound(StgTSO* tso USED_IF_THREADS)
2121 {
2122 #if defined(THREADED_RTS)
2123   return (tso->bound != NULL);
2124 #endif
2125   return rtsFalse;
2126 }
2127
2128 /* ---------------------------------------------------------------------------
2129  * Singleton fork(). Do not copy any running threads.
2130  * ------------------------------------------------------------------------- */
2131
2132 #if !defined(mingw32_HOST_OS)
2133 #define FORKPROCESS_PRIMOP_SUPPORTED
2134 #endif
2135
2136 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2137 static void 
2138 deleteThread_(Capability *cap, StgTSO *tso);
2139 #endif
2140 StgInt
2141 forkProcess(HsStablePtr *entry
2142 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2143             STG_UNUSED
2144 #endif
2145            )
2146 {
2147 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2148     Task *task;
2149     pid_t pid;
2150     StgTSO* t,*next;
2151     Capability *cap;
2152     
2153 #if defined(THREADED_RTS)
2154     if (RtsFlags.ParFlags.nNodes > 1) {
2155         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2156         stg_exit(EXIT_FAILURE);
2157     }
2158 #endif
2159
2160     IF_DEBUG(scheduler,sched_belch("forking!"));
2161     
2162     // ToDo: for SMP, we should probably acquire *all* the capabilities
2163     cap = rts_lock();
2164     
2165     pid = fork();
2166     
2167     if (pid) { // parent
2168         
2169         // just return the pid
2170         rts_unlock(cap);
2171         return pid;
2172         
2173     } else { // child
2174         
2175         // Now, all OS threads except the thread that forked are
2176         // stopped.  We need to stop all Haskell threads, including
2177         // those involved in foreign calls.  Also we need to delete
2178         // all Tasks, because they correspond to OS threads that are
2179         // now gone.
2180
2181         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2182             if (t->what_next == ThreadRelocated) {
2183                 next = t->link;
2184             } else {
2185                 next = t->global_link;
2186                 // don't allow threads to catch the ThreadKilled
2187                 // exception, but we do want to raiseAsync() because these
2188                 // threads may be evaluating thunks that we need later.
2189                 deleteThread_(cap,t);
2190             }
2191         }
2192         
2193         // Empty the run queue.  It seems tempting to let all the
2194         // killed threads stay on the run queue as zombies to be
2195         // cleaned up later, but some of them correspond to bound
2196         // threads for which the corresponding Task does not exist.
2197         cap->run_queue_hd = END_TSO_QUEUE;
2198         cap->run_queue_tl = END_TSO_QUEUE;
2199
2200         // Any suspended C-calling Tasks are no more, their OS threads
2201         // don't exist now:
2202         cap->suspended_ccalling_tasks = NULL;
2203
2204         // Empty the all_threads list.  Otherwise, the garbage
2205         // collector may attempt to resurrect some of these threads.
2206         all_threads = END_TSO_QUEUE;
2207
2208         // Wipe the task list, except the current Task.
2209         ACQUIRE_LOCK(&sched_mutex);
2210         for (task = all_tasks; task != NULL; task=task->all_link) {
2211             if (task != cap->running_task) {
2212                 discardTask(task);
2213             }
2214         }
2215         RELEASE_LOCK(&sched_mutex);
2216
2217 #if defined(THREADED_RTS)
2218         // Wipe our spare workers list, they no longer exist.  New
2219         // workers will be created if necessary.
2220         cap->spare_workers = NULL;
2221         cap->returning_tasks_hd = NULL;
2222         cap->returning_tasks_tl = NULL;
2223 #endif
2224
2225         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2226         rts_checkSchedStatus("forkProcess",cap);
2227         
2228         rts_unlock(cap);
2229         hs_exit();                      // clean up and exit
2230         stg_exit(EXIT_SUCCESS);
2231     }
2232 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2233     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2234     return -1;
2235 #endif
2236 }
2237
2238 /* ---------------------------------------------------------------------------
2239  * Delete all the threads in the system
2240  * ------------------------------------------------------------------------- */
2241    
2242 static void
2243 deleteAllThreads ( Capability *cap )
2244 {
2245   StgTSO* t, *next;
2246   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2247   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2248       if (t->what_next == ThreadRelocated) {
2249           next = t->link;
2250       } else {
2251           next = t->global_link;
2252           deleteThread(cap,t);
2253       }
2254   }      
2255
2256   // The run queue now contains a bunch of ThreadKilled threads.  We
2257   // must not throw these away: the main thread(s) will be in there
2258   // somewhere, and the main scheduler loop has to deal with it.
2259   // Also, the run queue is the only thing keeping these threads from
2260   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2261
2262 #if !defined(THREADED_RTS)
2263   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2264   ASSERT(sleeping_queue == END_TSO_QUEUE);
2265 #endif
2266 }
2267
2268 /* -----------------------------------------------------------------------------
2269    Managing the suspended_ccalling_tasks list.
2270    Locks required: sched_mutex
2271    -------------------------------------------------------------------------- */
2272
2273 STATIC_INLINE void
2274 suspendTask (Capability *cap, Task *task)
2275 {
2276     ASSERT(task->next == NULL && task->prev == NULL);
2277     task->next = cap->suspended_ccalling_tasks;
2278     task->prev = NULL;
2279     if (cap->suspended_ccalling_tasks) {
2280         cap->suspended_ccalling_tasks->prev = task;
2281     }
2282     cap->suspended_ccalling_tasks = task;
2283 }
2284
2285 STATIC_INLINE void
2286 recoverSuspendedTask (Capability *cap, Task *task)
2287 {
2288     if (task->prev) {
2289         task->prev->next = task->next;
2290     } else {
2291         ASSERT(cap->suspended_ccalling_tasks == task);
2292         cap->suspended_ccalling_tasks = task->next;
2293     }
2294     if (task->next) {
2295         task->next->prev = task->prev;
2296     }
2297     task->next = task->prev = NULL;
2298 }
2299
2300 /* ---------------------------------------------------------------------------
2301  * Suspending & resuming Haskell threads.
2302  * 
2303  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2304  * its capability before calling the C function.  This allows another
2305  * task to pick up the capability and carry on running Haskell
2306  * threads.  It also means that if the C call blocks, it won't lock
2307  * the whole system.
2308  *
2309  * The Haskell thread making the C call is put to sleep for the
2310  * duration of the call, on the susepended_ccalling_threads queue.  We
2311  * give out a token to the task, which it can use to resume the thread
2312  * on return from the C function.
2313  * ------------------------------------------------------------------------- */
2314    
2315 void *
2316 suspendThread (StgRegTable *reg)
2317 {
2318   Capability *cap;
2319   int saved_errno = errno;
2320   StgTSO *tso;
2321   Task *task;
2322
2323   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2324    */
2325   cap = regTableToCapability(reg);
2326
2327   task = cap->running_task;
2328   tso = cap->r.rCurrentTSO;
2329
2330   IF_DEBUG(scheduler,
2331            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2332
2333   // XXX this might not be necessary --SDM
2334   tso->what_next = ThreadRunGHC;
2335
2336   threadPaused(cap,tso);
2337
2338   if(tso->blocked_exceptions == NULL)  {
2339       tso->why_blocked = BlockedOnCCall;
2340       tso->blocked_exceptions = END_TSO_QUEUE;
2341   } else {
2342       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2343   }
2344
2345   // Hand back capability
2346   task->suspended_tso = tso;
2347
2348   ACQUIRE_LOCK(&cap->lock);
2349
2350   suspendTask(cap,task);
2351   cap->in_haskell = rtsFalse;
2352   releaseCapability_(cap);
2353   
2354   RELEASE_LOCK(&cap->lock);
2355
2356 #if defined(THREADED_RTS)
2357   /* Preparing to leave the RTS, so ensure there's a native thread/task
2358      waiting to take over.
2359   */
2360   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2361 #endif
2362
2363   errno = saved_errno;
2364   return task;
2365 }
2366
2367 StgRegTable *
2368 resumeThread (void *task_)
2369 {
2370     StgTSO *tso;
2371     Capability *cap;
2372     int saved_errno = errno;
2373     Task *task = task_;
2374
2375     cap = task->cap;
2376     // Wait for permission to re-enter the RTS with the result.
2377     waitForReturnCapability(&cap,task);
2378     // we might be on a different capability now... but if so, our
2379     // entry on the suspended_ccalling_tasks list will also have been
2380     // migrated.
2381
2382     // Remove the thread from the suspended list
2383     recoverSuspendedTask(cap,task);
2384
2385     tso = task->suspended_tso;
2386     task->suspended_tso = NULL;
2387     tso->link = END_TSO_QUEUE;
2388     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2389     
2390     if (tso->why_blocked == BlockedOnCCall) {
2391         awakenBlockedQueue(cap,tso->blocked_exceptions);
2392         tso->blocked_exceptions = NULL;
2393     }
2394     
2395     /* Reset blocking status */
2396     tso->why_blocked  = NotBlocked;
2397     
2398     cap->r.rCurrentTSO = tso;
2399     cap->in_haskell = rtsTrue;
2400     errno = saved_errno;
2401
2402     /* We might have GC'd, mark the TSO dirty again */
2403     dirtyTSO(tso);
2404
2405     IF_DEBUG(sanity, checkTSO(tso));
2406
2407     return &cap->r;
2408 }
2409
2410 /* ---------------------------------------------------------------------------
2411  * Comparing Thread ids.
2412  *
2413  * This is used from STG land in the implementation of the
2414  * instances of Eq/Ord for ThreadIds.
2415  * ------------------------------------------------------------------------ */
2416
2417 int
2418 cmp_thread(StgPtr tso1, StgPtr tso2) 
2419
2420   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2421   StgThreadID id2 = ((StgTSO *)tso2)->id;
2422  
2423   if (id1 < id2) return (-1);
2424   if (id1 > id2) return 1;
2425   return 0;
2426 }
2427
2428 /* ---------------------------------------------------------------------------
2429  * Fetching the ThreadID from an StgTSO.
2430  *
2431  * This is used in the implementation of Show for ThreadIds.
2432  * ------------------------------------------------------------------------ */
2433 int
2434 rts_getThreadId(StgPtr tso) 
2435 {
2436   return ((StgTSO *)tso)->id;
2437 }
2438
2439 #ifdef DEBUG
2440 void
2441 labelThread(StgPtr tso, char *label)
2442 {
2443   int len;
2444   void *buf;
2445
2446   /* Caveat: Once set, you can only set the thread name to "" */
2447   len = strlen(label)+1;
2448   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2449   strncpy(buf,label,len);
2450   /* Update will free the old memory for us */
2451   updateThreadLabel(((StgTSO *)tso)->id,buf);
2452 }
2453 #endif /* DEBUG */
2454
2455 /* ---------------------------------------------------------------------------
2456    Create a new thread.
2457
2458    The new thread starts with the given stack size.  Before the
2459    scheduler can run, however, this thread needs to have a closure
2460    (and possibly some arguments) pushed on its stack.  See
2461    pushClosure() in Schedule.h.
2462
2463    createGenThread() and createIOThread() (in SchedAPI.h) are
2464    convenient packaged versions of this function.
2465
2466    currently pri (priority) is only used in a GRAN setup -- HWL
2467    ------------------------------------------------------------------------ */
2468 #if defined(GRAN)
2469 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2470 StgTSO *
2471 createThread(nat size, StgInt pri)
2472 #else
2473 StgTSO *
2474 createThread(Capability *cap, nat size)
2475 #endif
2476 {
2477     StgTSO *tso;
2478     nat stack_size;
2479
2480     /* sched_mutex is *not* required */
2481
2482     /* First check whether we should create a thread at all */
2483 #if defined(PARALLEL_HASKELL)
2484     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2485     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2486         threadsIgnored++;
2487         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2488                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2489         return END_TSO_QUEUE;
2490     }
2491     threadsCreated++;
2492 #endif
2493
2494 #if defined(GRAN)
2495     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2496 #endif
2497
2498     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2499
2500     /* catch ridiculously small stack sizes */
2501     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2502         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2503     }
2504
2505     stack_size = size - TSO_STRUCT_SIZEW;
2506     
2507     tso = (StgTSO *)allocateLocal(cap, size);
2508     TICK_ALLOC_TSO(stack_size, 0);
2509
2510     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2511 #if defined(GRAN)
2512     SET_GRAN_HDR(tso, ThisPE);
2513 #endif
2514
2515     // Always start with the compiled code evaluator
2516     tso->what_next = ThreadRunGHC;
2517
2518     tso->why_blocked  = NotBlocked;
2519     tso->blocked_exceptions = NULL;
2520     tso->flags = TSO_DIRTY;
2521     
2522     tso->saved_errno = 0;
2523     tso->bound = NULL;
2524     tso->cap = cap;
2525     
2526     tso->stack_size     = stack_size;
2527     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2528                           - TSO_STRUCT_SIZEW;
2529     tso->sp             = (P_)&(tso->stack) + stack_size;
2530
2531     tso->trec = NO_TREC;
2532     
2533 #ifdef PROFILING
2534     tso->prof.CCCS = CCS_MAIN;
2535 #endif
2536     
2537   /* put a stop frame on the stack */
2538     tso->sp -= sizeofW(StgStopFrame);
2539     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2540     tso->link = END_TSO_QUEUE;
2541     
2542   // ToDo: check this
2543 #if defined(GRAN)
2544     /* uses more flexible routine in GranSim */
2545     insertThread(tso, CurrentProc);
2546 #else
2547     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2548      * from its creation
2549      */
2550 #endif
2551     
2552 #if defined(GRAN) 
2553     if (RtsFlags.GranFlags.GranSimStats.Full) 
2554         DumpGranEvent(GR_START,tso);
2555 #elif defined(PARALLEL_HASKELL)
2556     if (RtsFlags.ParFlags.ParStats.Full) 
2557         DumpGranEvent(GR_STARTQ,tso);
2558     /* HACk to avoid SCHEDULE 
2559        LastTSO = tso; */
2560 #endif
2561     
2562     /* Link the new thread on the global thread list.
2563      */
2564     ACQUIRE_LOCK(&sched_mutex);
2565     tso->id = next_thread_id++;  // while we have the mutex
2566     tso->global_link = all_threads;
2567     all_threads = tso;
2568     RELEASE_LOCK(&sched_mutex);
2569     
2570 #if defined(DIST)
2571     tso->dist.priority = MandatoryPriority; //by default that is...
2572 #endif
2573     
2574 #if defined(GRAN)
2575     tso->gran.pri = pri;
2576 # if defined(DEBUG)
2577     tso->gran.magic = TSO_MAGIC; // debugging only
2578 # endif
2579     tso->gran.sparkname   = 0;
2580     tso->gran.startedat   = CURRENT_TIME; 
2581     tso->gran.exported    = 0;
2582     tso->gran.basicblocks = 0;
2583     tso->gran.allocs      = 0;
2584     tso->gran.exectime    = 0;
2585     tso->gran.fetchtime   = 0;
2586     tso->gran.fetchcount  = 0;
2587     tso->gran.blocktime   = 0;
2588     tso->gran.blockcount  = 0;
2589     tso->gran.blockedat   = 0;
2590     tso->gran.globalsparks = 0;
2591     tso->gran.localsparks  = 0;
2592     if (RtsFlags.GranFlags.Light)
2593         tso->gran.clock  = Now; /* local clock */
2594     else
2595         tso->gran.clock  = 0;
2596     
2597     IF_DEBUG(gran,printTSO(tso));
2598 #elif defined(PARALLEL_HASKELL)
2599 # if defined(DEBUG)
2600     tso->par.magic = TSO_MAGIC; // debugging only
2601 # endif
2602     tso->par.sparkname   = 0;
2603     tso->par.startedat   = CURRENT_TIME; 
2604     tso->par.exported    = 0;
2605     tso->par.basicblocks = 0;
2606     tso->par.allocs      = 0;
2607     tso->par.exectime    = 0;
2608     tso->par.fetchtime   = 0;
2609     tso->par.fetchcount  = 0;
2610     tso->par.blocktime   = 0;
2611     tso->par.blockcount  = 0;
2612     tso->par.blockedat   = 0;
2613     tso->par.globalsparks = 0;
2614     tso->par.localsparks  = 0;
2615 #endif
2616     
2617 #if defined(GRAN)
2618     globalGranStats.tot_threads_created++;
2619     globalGranStats.threads_created_on_PE[CurrentProc]++;
2620     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2621     globalGranStats.tot_sq_probes++;
2622 #elif defined(PARALLEL_HASKELL)
2623     // collect parallel global statistics (currently done together with GC stats)
2624     if (RtsFlags.ParFlags.ParStats.Global &&
2625         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2626         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2627         globalParStats.tot_threads_created++;
2628     }
2629 #endif 
2630     
2631 #if defined(GRAN)
2632     IF_GRAN_DEBUG(pri,
2633                   sched_belch("==__ schedule: Created TSO %d (%p);",
2634                               CurrentProc, tso, tso->id));
2635 #elif defined(PARALLEL_HASKELL)
2636     IF_PAR_DEBUG(verbose,
2637                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2638                              (long)tso->id, tso, advisory_thread_count));
2639 #else
2640     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2641                                    (long)tso->id, (long)tso->stack_size));
2642 #endif    
2643     return tso;
2644 }
2645
2646 #if defined(PAR)
2647 /* RFP:
2648    all parallel thread creation calls should fall through the following routine.
2649 */
2650 StgTSO *
2651 createThreadFromSpark(rtsSpark spark) 
2652 { StgTSO *tso;
2653   ASSERT(spark != (rtsSpark)NULL);
2654 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2655   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2656   { threadsIgnored++;
2657     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2658           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2659     return END_TSO_QUEUE;
2660   }
2661   else
2662   { threadsCreated++;
2663     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2664     if (tso==END_TSO_QUEUE)     
2665       barf("createSparkThread: Cannot create TSO");
2666 #if defined(DIST)
2667     tso->priority = AdvisoryPriority;
2668 #endif
2669     pushClosure(tso,spark);
2670     addToRunQueue(tso);
2671     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2672   }
2673   return tso;
2674 }
2675 #endif
2676
2677 /*
2678   Turn a spark into a thread.
2679   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2680 */
2681 #if 0
2682 StgTSO *
2683 activateSpark (rtsSpark spark) 
2684 {
2685   StgTSO *tso;
2686
2687   tso = createSparkThread(spark);
2688   if (RtsFlags.ParFlags.ParStats.Full) {   
2689     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2690       IF_PAR_DEBUG(verbose,
2691                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2692                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2693   }
2694   // ToDo: fwd info on local/global spark to thread -- HWL
2695   // tso->gran.exported =  spark->exported;
2696   // tso->gran.locked =   !spark->global;
2697   // tso->gran.sparkname = spark->name;
2698
2699   return tso;
2700 }
2701 #endif
2702
2703 /* ---------------------------------------------------------------------------
2704  * scheduleThread()
2705  *
2706  * scheduleThread puts a thread on the end  of the runnable queue.
2707  * This will usually be done immediately after a thread is created.
2708  * The caller of scheduleThread must create the thread using e.g.
2709  * createThread and push an appropriate closure
2710  * on this thread's stack before the scheduler is invoked.
2711  * ------------------------------------------------------------------------ */
2712
2713 void
2714 scheduleThread(Capability *cap, StgTSO *tso)
2715 {
2716     // The thread goes at the *end* of the run-queue, to avoid possible
2717     // starvation of any threads already on the queue.
2718     appendToRunQueue(cap,tso);
2719 }
2720
2721 void
2722 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2723 {
2724 #if defined(THREADED_RTS)
2725     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2726                               // move this thread from now on.
2727     cpu %= RtsFlags.ParFlags.nNodes;
2728     if (cpu == cap->no) {
2729         appendToRunQueue(cap,tso);
2730     } else {
2731         Capability *target_cap = &capabilities[cpu];
2732         if (tso->bound) {
2733             tso->bound->cap = target_cap;
2734         }
2735         tso->cap = target_cap;
2736         wakeupThreadOnCapability(target_cap,tso);
2737     }
2738 #else
2739     appendToRunQueue(cap,tso);
2740 #endif
2741 }
2742
2743 Capability *
2744 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2745 {
2746     Task *task;
2747
2748     // We already created/initialised the Task
2749     task = cap->running_task;
2750
2751     // This TSO is now a bound thread; make the Task and TSO
2752     // point to each other.
2753     tso->bound = task;
2754     tso->cap = cap;
2755
2756     task->tso = tso;
2757     task->ret = ret;
2758     task->stat = NoStatus;
2759
2760     appendToRunQueue(cap,tso);
2761
2762     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2763
2764 #if defined(GRAN)
2765     /* GranSim specific init */
2766     CurrentTSO = m->tso;                // the TSO to run
2767     procStatus[MainProc] = Busy;        // status of main PE
2768     CurrentProc = MainProc;             // PE to run it on
2769 #endif
2770
2771     cap = schedule(cap,task);
2772
2773     ASSERT(task->stat != NoStatus);
2774     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2775
2776     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2777     return cap;
2778 }
2779
2780 /* ----------------------------------------------------------------------------
2781  * Starting Tasks
2782  * ------------------------------------------------------------------------- */
2783
2784 #if defined(THREADED_RTS)
2785 void
2786 workerStart(Task *task)
2787 {
2788     Capability *cap;
2789
2790     // See startWorkerTask().
2791     ACQUIRE_LOCK(&task->lock);
2792     cap = task->cap;
2793     RELEASE_LOCK(&task->lock);
2794
2795     // set the thread-local pointer to the Task:
2796     taskEnter(task);
2797
2798     // schedule() runs without a lock.
2799     cap = schedule(cap,task);
2800
2801     // On exit from schedule(), we have a Capability.
2802     releaseCapability(cap);
2803     taskStop(task);
2804 }
2805 #endif
2806
2807 /* ---------------------------------------------------------------------------
2808  * initScheduler()
2809  *
2810  * Initialise the scheduler.  This resets all the queues - if the
2811  * queues contained any threads, they'll be garbage collected at the
2812  * next pass.
2813  *
2814  * ------------------------------------------------------------------------ */
2815
2816 void 
2817 initScheduler(void)
2818 {
2819 #if defined(GRAN)
2820   nat i;
2821   for (i=0; i<=MAX_PROC; i++) {
2822     run_queue_hds[i]      = END_TSO_QUEUE;
2823     run_queue_tls[i]      = END_TSO_QUEUE;
2824     blocked_queue_hds[i]  = END_TSO_QUEUE;
2825     blocked_queue_tls[i]  = END_TSO_QUEUE;
2826     ccalling_threadss[i]  = END_TSO_QUEUE;
2827     blackhole_queue[i]    = END_TSO_QUEUE;
2828     sleeping_queue        = END_TSO_QUEUE;
2829   }
2830 #elif !defined(THREADED_RTS)
2831   blocked_queue_hd  = END_TSO_QUEUE;
2832   blocked_queue_tl  = END_TSO_QUEUE;
2833   sleeping_queue    = END_TSO_QUEUE;
2834 #endif
2835
2836   blackhole_queue   = END_TSO_QUEUE;
2837   all_threads       = END_TSO_QUEUE;
2838
2839   context_switch = 0;
2840   sched_state    = SCHED_RUNNING;
2841
2842   RtsFlags.ConcFlags.ctxtSwitchTicks =
2843       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2844       
2845 #if defined(THREADED_RTS)
2846   /* Initialise the mutex and condition variables used by
2847    * the scheduler. */
2848   initMutex(&sched_mutex);
2849 #endif
2850   
2851   ACQUIRE_LOCK(&sched_mutex);
2852
2853   /* A capability holds the state a native thread needs in
2854    * order to execute STG code. At least one capability is
2855    * floating around (only THREADED_RTS builds have more than one).
2856    */
2857   initCapabilities();
2858
2859   initTaskManager();
2860
2861 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2862   initSparkPools();
2863 #endif
2864
2865 #if defined(THREADED_RTS)
2866   /*
2867    * Eagerly start one worker to run each Capability, except for
2868    * Capability 0.  The idea is that we're probably going to start a
2869    * bound thread on Capability 0 pretty soon, so we don't want a
2870    * worker task hogging it.
2871    */
2872   { 
2873       nat i;
2874       Capability *cap;
2875       for (i = 1; i < n_capabilities; i++) {
2876           cap = &capabilities[i];
2877           ACQUIRE_LOCK(&cap->lock);
2878           startWorkerTask(cap, workerStart);
2879           RELEASE_LOCK(&cap->lock);
2880       }
2881   }
2882 #endif
2883
2884   RELEASE_LOCK(&sched_mutex);
2885 }
2886
2887 void
2888 exitScheduler( void )
2889 {
2890     Task *task = NULL;
2891
2892 #if defined(THREADED_RTS)
2893     ACQUIRE_LOCK(&sched_mutex);
2894     task = newBoundTask();
2895     RELEASE_LOCK(&sched_mutex);
2896 #endif
2897
2898     // If we haven't killed all the threads yet, do it now.
2899     if (sched_state < SCHED_SHUTTING_DOWN) {
2900         sched_state = SCHED_INTERRUPTING;
2901         scheduleDoGC(NULL,task,rtsFalse,GetRoots);    
2902     }
2903     sched_state = SCHED_SHUTTING_DOWN;
2904
2905 #if defined(THREADED_RTS)
2906     { 
2907         nat i;
2908         
2909         for (i = 0; i < n_capabilities; i++) {
2910             shutdownCapability(&capabilities[i], task);
2911         }
2912         boundTaskExiting(task);
2913         stopTaskManager();
2914     }
2915 #endif
2916 }
2917
2918 /* ---------------------------------------------------------------------------
2919    Where are the roots that we know about?
2920
2921         - all the threads on the runnable queue
2922         - all the threads on the blocked queue
2923         - all the threads on the sleeping queue
2924         - all the thread currently executing a _ccall_GC
2925         - all the "main threads"
2926      
2927    ------------------------------------------------------------------------ */
2928
2929 /* This has to be protected either by the scheduler monitor, or by the
2930         garbage collection monitor (probably the latter).
2931         KH @ 25/10/99
2932 */
2933
2934 void
2935 GetRoots( evac_fn evac )
2936 {
2937     nat i;
2938     Capability *cap;
2939     Task *task;
2940
2941 #if defined(GRAN)
2942     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2943         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2944             evac((StgClosure **)&run_queue_hds[i]);
2945         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2946             evac((StgClosure **)&run_queue_tls[i]);
2947         
2948         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2949             evac((StgClosure **)&blocked_queue_hds[i]);
2950         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2951             evac((StgClosure **)&blocked_queue_tls[i]);
2952         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2953             evac((StgClosure **)&ccalling_threads[i]);
2954     }
2955
2956     markEventQueue();
2957
2958 #else /* !GRAN */
2959
2960     for (i = 0; i < n_capabilities; i++) {
2961         cap = &capabilities[i];
2962         evac((StgClosure **)(void *)&cap->run_queue_hd);
2963         evac((StgClosure **)(void *)&cap->run_queue_tl);
2964 #if defined(THREADED_RTS)
2965         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2966         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2967 #endif
2968         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2969              task=task->next) {
2970             IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
2971             evac((StgClosure **)(void *)&task->suspended_tso);
2972         }
2973
2974     }
2975     
2976
2977 #if !defined(THREADED_RTS)
2978     evac((StgClosure **)(void *)&blocked_queue_hd);
2979     evac((StgClosure **)(void *)&blocked_queue_tl);
2980     evac((StgClosure **)(void *)&sleeping_queue);
2981 #endif 
2982 #endif
2983
2984     // evac((StgClosure **)&blackhole_queue);
2985
2986 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2987     markSparkQueue(evac);
2988 #endif
2989     
2990 #if defined(RTS_USER_SIGNALS)
2991     // mark the signal handlers (signals should be already blocked)
2992     markSignalHandlers(evac);
2993 #endif
2994 }
2995
2996 /* -----------------------------------------------------------------------------
2997    performGC
2998
2999    This is the interface to the garbage collector from Haskell land.
3000    We provide this so that external C code can allocate and garbage
3001    collect when called from Haskell via _ccall_GC.
3002
3003    It might be useful to provide an interface whereby the programmer
3004    can specify more roots (ToDo).
3005    
3006    This needs to be protected by the GC condition variable above.  KH.
3007    -------------------------------------------------------------------------- */
3008
3009 static void (*extra_roots)(evac_fn);
3010
3011 static void
3012 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
3013 {
3014     Task *task;
3015     // We must grab a new Task here, because the existing Task may be
3016     // associated with a particular Capability, and chained onto the 
3017     // suspended_ccalling_tasks queue.
3018     ACQUIRE_LOCK(&sched_mutex);
3019     task = newBoundTask();
3020     RELEASE_LOCK(&sched_mutex);
3021     scheduleDoGC(NULL,task,force_major, get_roots);
3022     boundTaskExiting(task);
3023 }
3024
3025 void
3026 performGC(void)
3027 {
3028     performGC_(rtsFalse, GetRoots);
3029 }
3030
3031 void
3032 performMajorGC(void)
3033 {
3034     performGC_(rtsTrue, GetRoots);
3035 }
3036
3037 static void
3038 AllRoots(evac_fn evac)
3039 {
3040     GetRoots(evac);             // the scheduler's roots
3041     extra_roots(evac);          // the user's roots
3042 }
3043
3044 void
3045 performGCWithRoots(void (*get_roots)(evac_fn))
3046 {
3047     extra_roots = get_roots;
3048     performGC_(rtsFalse, AllRoots);
3049 }
3050
3051 /* -----------------------------------------------------------------------------
3052    Stack overflow
3053
3054    If the thread has reached its maximum stack size, then raise the
3055    StackOverflow exception in the offending thread.  Otherwise
3056    relocate the TSO into a larger chunk of memory and adjust its stack
3057    size appropriately.
3058    -------------------------------------------------------------------------- */
3059
3060 static StgTSO *
3061 threadStackOverflow(Capability *cap, StgTSO *tso)
3062 {
3063   nat new_stack_size, stack_words;
3064   lnat new_tso_size;
3065   StgPtr new_sp;
3066   StgTSO *dest;
3067
3068   IF_DEBUG(sanity,checkTSO(tso));
3069   if (tso->stack_size >= tso->max_stack_size) {
3070
3071     IF_DEBUG(gc,
3072              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
3073                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
3074              /* If we're debugging, just print out the top of the stack */
3075              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3076                                               tso->sp+64)));
3077
3078     /* Send this thread the StackOverflow exception */
3079     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
3080     return tso;
3081   }
3082
3083   /* Try to double the current stack size.  If that takes us over the
3084    * maximum stack size for this thread, then use the maximum instead.
3085    * Finally round up so the TSO ends up as a whole number of blocks.
3086    */
3087   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
3088   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
3089                                        TSO_STRUCT_SIZE)/sizeof(W_);
3090   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
3091   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
3092
3093   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
3094
3095   dest = (StgTSO *)allocate(new_tso_size);
3096   TICK_ALLOC_TSO(new_stack_size,0);
3097
3098   /* copy the TSO block and the old stack into the new area */
3099   memcpy(dest,tso,TSO_STRUCT_SIZE);
3100   stack_words = tso->stack + tso->stack_size - tso->sp;
3101   new_sp = (P_)dest + new_tso_size - stack_words;
3102   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
3103
3104   /* relocate the stack pointers... */
3105   dest->sp         = new_sp;
3106   dest->stack_size = new_stack_size;
3107         
3108   /* Mark the old TSO as relocated.  We have to check for relocated
3109    * TSOs in the garbage collector and any primops that deal with TSOs.
3110    *
3111    * It's important to set the sp value to just beyond the end
3112    * of the stack, so we don't attempt to scavenge any part of the
3113    * dead TSO's stack.
3114    */
3115   tso->what_next = ThreadRelocated;
3116   tso->link = dest;
3117   tso->sp = (P_)&(tso->stack[tso->stack_size]);
3118   tso->why_blocked = NotBlocked;
3119
3120   IF_PAR_DEBUG(verbose,
3121                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
3122                      tso->id, tso, tso->stack_size);
3123                /* If we're debugging, just print out the top of the stack */
3124                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3125                                                 tso->sp+64)));
3126   
3127   IF_DEBUG(sanity,checkTSO(tso));
3128 #if 0
3129   IF_DEBUG(scheduler,printTSO(dest));
3130 #endif
3131
3132   return dest;
3133 }
3134
3135 /* ---------------------------------------------------------------------------
3136    Wake up a queue that was blocked on some resource.
3137    ------------------------------------------------------------------------ */
3138
3139 #if defined(GRAN)
3140 STATIC_INLINE void
3141 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3142 {
3143 }
3144 #elif defined(PARALLEL_HASKELL)
3145 STATIC_INLINE void
3146 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3147 {
3148   /* write RESUME events to log file and
3149      update blocked and fetch time (depending on type of the orig closure) */
3150   if (RtsFlags.ParFlags.ParStats.Full) {
3151     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
3152                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3153                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3154     if (emptyRunQueue())
3155       emitSchedule = rtsTrue;
3156
3157     switch (get_itbl(node)->type) {
3158         case FETCH_ME_BQ:
3159           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3160           break;
3161         case RBH:
3162         case FETCH_ME:
3163         case BLACKHOLE_BQ:
3164           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3165           break;
3166 #ifdef DIST
3167         case MVAR:
3168           break;
3169 #endif    
3170         default:
3171           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3172         }
3173       }
3174 }
3175 #endif
3176
3177 #if defined(GRAN)
3178 StgBlockingQueueElement *
3179 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3180 {
3181     StgTSO *tso;
3182     PEs node_loc, tso_loc;
3183
3184     node_loc = where_is(node); // should be lifted out of loop
3185     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3186     tso_loc = where_is((StgClosure *)tso);
3187     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3188       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3189       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3190       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3191       // insertThread(tso, node_loc);
3192       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3193                 ResumeThread,
3194                 tso, node, (rtsSpark*)NULL);
3195       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3196       // len_local++;
3197       // len++;
3198     } else { // TSO is remote (actually should be FMBQ)
3199       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3200                                   RtsFlags.GranFlags.Costs.gunblocktime +
3201                                   RtsFlags.GranFlags.Costs.latency;
3202       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3203                 UnblockThread,
3204                 tso, node, (rtsSpark*)NULL);
3205       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3206       // len++;
3207     }
3208     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3209     IF_GRAN_DEBUG(bq,
3210                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3211                           (node_loc==tso_loc ? "Local" : "Global"), 
3212                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3213     tso->block_info.closure = NULL;
3214     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3215                              tso->id, tso));
3216 }
3217 #elif defined(PARALLEL_HASKELL)
3218 StgBlockingQueueElement *
3219 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3220 {
3221     StgBlockingQueueElement *next;
3222
3223     switch (get_itbl(bqe)->type) {
3224     case TSO:
3225       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3226       /* if it's a TSO just push it onto the run_queue */
3227       next = bqe->link;
3228       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3229       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3230       threadRunnable();
3231       unblockCount(bqe, node);
3232       /* reset blocking status after dumping event */
3233       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3234       break;
3235
3236     case BLOCKED_FETCH:
3237       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3238       next = bqe->link;
3239       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3240       PendingFetches = (StgBlockedFetch *)bqe;
3241       break;
3242
3243 # if defined(DEBUG)
3244       /* can ignore this case in a non-debugging setup; 
3245          see comments on RBHSave closures above */
3246     case CONSTR:
3247       /* check that the closure is an RBHSave closure */
3248       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3249              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3250              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3251       break;
3252
3253     default:
3254       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3255            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3256            (StgClosure *)bqe);
3257 # endif
3258     }
3259   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3260   return next;
3261 }
3262 #endif
3263
3264 StgTSO *
3265 unblockOne(Capability *cap, StgTSO *tso)
3266 {
3267   StgTSO *next;
3268
3269   ASSERT(get_itbl(tso)->type == TSO);
3270   ASSERT(tso->why_blocked != NotBlocked);
3271
3272   tso->why_blocked = NotBlocked;
3273   next = tso->link;
3274   tso->link = END_TSO_QUEUE;
3275
3276 #if defined(THREADED_RTS)
3277   if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
3278       // We are waking up this thread on the current Capability, which
3279       // might involve migrating it from the Capability it was last on.
3280       if (tso->bound) {
3281           ASSERT(tso->bound->cap == tso->cap);
3282           tso->bound->cap = cap;
3283       }
3284       tso->cap = cap;
3285       appendToRunQueue(cap,tso);
3286       // we're holding a newly woken thread, make sure we context switch
3287       // quickly so we can migrate it if necessary.
3288       context_switch = 1;
3289   } else {
3290       // we'll try to wake it up on the Capability it was last on.
3291       wakeupThreadOnCapability(tso->cap, tso);
3292   }
3293 #else
3294   appendToRunQueue(cap,tso);
3295   context_switch = 1;
3296 #endif
3297
3298   IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
3299   return next;
3300 }
3301
3302
3303 #if defined(GRAN)
3304 void 
3305 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3306 {
3307   StgBlockingQueueElement *bqe;
3308   PEs node_loc;
3309   nat len = 0; 
3310
3311   IF_GRAN_DEBUG(bq, 
3312                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3313                       node, CurrentProc, CurrentTime[CurrentProc], 
3314                       CurrentTSO->id, CurrentTSO));
3315
3316   node_loc = where_is(node);
3317
3318   ASSERT(q == END_BQ_QUEUE ||
3319          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3320          get_itbl(q)->type == CONSTR); // closure (type constructor)
3321   ASSERT(is_unique(node));
3322
3323   /* FAKE FETCH: magically copy the node to the tso's proc;
3324      no Fetch necessary because in reality the node should not have been 
3325      moved to the other PE in the first place
3326   */
3327   if (CurrentProc!=node_loc) {
3328     IF_GRAN_DEBUG(bq, 
3329                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3330                         node, node_loc, CurrentProc, CurrentTSO->id, 
3331                         // CurrentTSO, where_is(CurrentTSO),
3332                         node->header.gran.procs));
3333     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3334     IF_GRAN_DEBUG(bq, 
3335                   debugBelch("## new bitmask of node %p is %#x\n",
3336                         node, node->header.gran.procs));
3337     if (RtsFlags.GranFlags.GranSimStats.Global) {
3338       globalGranStats.tot_fake_fetches++;
3339     }
3340   }
3341
3342   bqe = q;
3343   // ToDo: check: ASSERT(CurrentProc==node_loc);
3344   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3345     //next = bqe->link;
3346     /* 
3347        bqe points to the current element in the queue
3348        next points to the next element in the queue
3349     */
3350     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3351     //tso_loc = where_is(tso);
3352     len++;
3353     bqe = unblockOne(bqe, node);
3354   }
3355
3356   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3357      the closure to make room for the anchor of the BQ */
3358   if (bqe!=END_BQ_QUEUE) {
3359     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3360     /*
3361     ASSERT((info_ptr==&RBH_Save_0_info) ||
3362            (info_ptr==&RBH_Save_1_info) ||
3363            (info_ptr==&RBH_Save_2_info));
3364     */
3365     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3366     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3367     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3368
3369     IF_GRAN_DEBUG(bq,
3370                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3371                         node, info_type(node)));
3372   }
3373
3374   /* statistics gathering */
3375   if (RtsFlags.GranFlags.GranSimStats.Global) {
3376     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3377     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3378     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3379     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3380   }
3381   IF_GRAN_DEBUG(bq,
3382                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3383                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3384 }
3385 #elif defined(PARALLEL_HASKELL)
3386 void 
3387 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3388 {
3389   StgBlockingQueueElement *bqe;
3390
3391   IF_PAR_DEBUG(verbose, 
3392                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3393                      node, mytid));
3394 #ifdef DIST  
3395   //RFP
3396   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3397     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3398     return;
3399   }
3400 #endif
3401   
3402   ASSERT(q == END_BQ_QUEUE ||
3403          get_itbl(q)->type == TSO ||           
3404          get_itbl(q)->type == BLOCKED_FETCH || 
3405          get_itbl(q)->type == CONSTR); 
3406
3407   bqe = q;
3408   while (get_itbl(bqe)->type==TSO || 
3409          get_itbl(bqe)->type==BLOCKED_FETCH) {
3410     bqe = unblockOne(bqe, node);
3411   }
3412 }
3413
3414 #else   /* !GRAN && !PARALLEL_HASKELL */
3415
3416 void
3417 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3418 {
3419     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3420                              // Exception.cmm
3421     while (tso != END_TSO_QUEUE) {
3422         tso = unblockOne(cap,tso);
3423     }
3424 }
3425 #endif
3426
3427 /* ---------------------------------------------------------------------------
3428    Interrupt execution
3429    - usually called inside a signal handler so it mustn't do anything fancy.   
3430    ------------------------------------------------------------------------ */
3431
3432 void
3433 interruptStgRts(void)
3434 {
3435     sched_state = SCHED_INTERRUPTING;
3436     context_switch = 1;
3437 #if defined(THREADED_RTS)
3438     prodAllCapabilities();
3439 #endif
3440 }
3441
3442 /* -----------------------------------------------------------------------------
3443    Unblock a thread
3444
3445    This is for use when we raise an exception in another thread, which
3446    may be blocked.
3447    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3448    -------------------------------------------------------------------------- */
3449
3450 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3451 /*
3452   NB: only the type of the blocking queue is different in GranSim and GUM
3453       the operations on the queue-elements are the same
3454       long live polymorphism!
3455
3456   Locks: sched_mutex is held upon entry and exit.
3457
3458 */
3459 static void
3460 unblockThread(Capability *cap, StgTSO *tso)
3461 {
3462   StgBlockingQueueElement *t, **last;
3463
3464   switch (tso->why_blocked) {
3465
3466   case NotBlocked:
3467     return;  /* not blocked */
3468
3469   case BlockedOnSTM:
3470     // Be careful: nothing to do here!  We tell the scheduler that the thread
3471     // is runnable and we leave it to the stack-walking code to abort the 
3472     // transaction while unwinding the stack.  We should perhaps have a debugging
3473     // test to make sure that this really happens and that the 'zombie' transaction
3474     // does not get committed.
3475     goto done;
3476
3477   case BlockedOnMVar:
3478     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3479     {
3480       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3481       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3482
3483       last = (StgBlockingQueueElement **)&mvar->head;
3484       for (t = (StgBlockingQueueElement *)mvar->head; 
3485            t != END_BQ_QUEUE; 
3486            last = &t->link, last_tso = t, t = t->link) {
3487         if (t == (StgBlockingQueueElement *)tso) {
3488           *last = (StgBlockingQueueElement *)tso->link;
3489           if (mvar->tail == tso) {
3490             mvar->tail = (StgTSO *)last_tso;
3491           }
3492           goto done;
3493         }
3494       }
3495       barf("unblockThread (MVAR): TSO not found");
3496     }
3497
3498   case BlockedOnBlackHole:
3499     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3500     {
3501       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3502
3503       last = &bq->blocking_queue;
3504       for (t = bq->blocking_queue; 
3505            t != END_BQ_QUEUE; 
3506            last = &t->link, t = t->link) {
3507         if (t == (StgBlockingQueueElement *)tso) {
3508           *last = (StgBlockingQueueElement *)tso->link;
3509           goto done;
3510         }
3511       }
3512       barf("unblockThread (BLACKHOLE): TSO not found");
3513     }
3514
3515   case BlockedOnException:
3516     {
3517       StgTSO *target  = tso->block_info.tso;
3518
3519       ASSERT(get_itbl(target)->type == TSO);
3520
3521       if (target->what_next == ThreadRelocated) {
3522           target = target->link;
3523           ASSERT(get_itbl(target)->type == TSO);
3524       }
3525
3526       ASSERT(target->blocked_exceptions != NULL);
3527
3528       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3529       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3530            t != END_BQ_QUEUE; 
3531            last = &t->link, t = t->link) {
3532         ASSERT(get_itbl(t)->type == TSO);
3533         if (t == (StgBlockingQueueElement *)tso) {
3534           *last = (StgBlockingQueueElement *)tso->link;
3535           goto done;
3536         }
3537       }
3538       barf("unblockThread (Exception): TSO not found");
3539     }
3540
3541   case BlockedOnRead:
3542   case BlockedOnWrite:
3543 #if defined(mingw32_HOST_OS)
3544   case BlockedOnDoProc:
3545 #endif
3546     {
3547       /* take TSO off blocked_queue */
3548       StgBlockingQueueElement *prev = NULL;
3549       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3550            prev = t, t = t->link) {
3551         if (t == (StgBlockingQueueElement *)tso) {
3552           if (prev == NULL) {
3553             blocked_queue_hd = (StgTSO *)t->link;
3554             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3555               blocked_queue_tl = END_TSO_QUEUE;
3556             }
3557           } else {
3558             prev->link = t->link;
3559             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3560               blocked_queue_tl = (StgTSO *)prev;
3561             }
3562           }
3563 #if defined(mingw32_HOST_OS)
3564           /* (Cooperatively) signal that the worker thread should abort
3565            * the request.
3566            */
3567           abandonWorkRequest(tso->block_info.async_result->reqID);
3568 #endif
3569           goto done;
3570         }
3571       }
3572       barf("unblockThread (I/O): TSO not found");
3573     }
3574
3575   case BlockedOnDelay:
3576     {
3577       /* take TSO off sleeping_queue */
3578       StgBlockingQueueElement *prev = NULL;
3579       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3580            prev = t, t = t->link) {
3581         if (t == (StgBlockingQueueElement *)tso) {
3582           if (prev == NULL) {
3583             sleeping_queue = (StgTSO *)t->link;
3584           } else {
3585             prev->link = t->link;
3586           }
3587           goto done;
3588         }
3589       }
3590       barf("unblockThread (delay): TSO not found");
3591     }
3592
3593   default:
3594     barf("unblockThread");
3595   }
3596
3597  done:
3598   tso->link = END_TSO_QUEUE;
3599   tso->why_blocked = NotBlocked;
3600   tso->block_info.closure = NULL;
3601   pushOnRunQueue(cap,tso);
3602 }
3603 #else
3604 static void
3605 unblockThread(Capability *cap, StgTSO *tso)
3606 {
3607   StgTSO *t, **last;
3608   
3609   /* To avoid locking unnecessarily. */
3610   if (tso->why_blocked == NotBlocked) {
3611     return;
3612   }
3613
3614   switch (tso->why_blocked) {
3615
3616   case BlockedOnSTM:
3617     // Be careful: nothing to do here!  We tell the scheduler that the thread
3618     // is runnable and we leave it to the stack-walking code to abort the 
3619     // transaction while unwinding the stack.  We should perhaps have a debugging
3620     // test to make sure that this really happens and that the 'zombie' transaction
3621     // does not get committed.
3622     goto done;
3623
3624   case BlockedOnMVar:
3625     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3626     {
3627       StgTSO *last_tso = END_TSO_QUEUE;
3628       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3629
3630       last = &mvar->head;
3631       for (t = mvar->head; t != END_TSO_QUEUE; 
3632            last = &t->link, last_tso = t, t = t->link) {
3633         if (t == tso) {
3634           *last = tso->link;
3635           if (mvar->tail == tso) {
3636             mvar->tail = last_tso;
3637           }
3638           goto done;
3639         }
3640       }
3641       barf("unblockThread (MVAR): TSO not found");
3642     }
3643
3644   case BlockedOnBlackHole:
3645     {
3646       last = &blackhole_queue;
3647       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3648            last = &t->link, t = t->link) {
3649         if (t == tso) {
3650           *last = tso->link;
3651           goto done;
3652         }
3653       }
3654       barf("unblockThread (BLACKHOLE): TSO not found");
3655     }
3656
3657   case BlockedOnException:
3658     {
3659       StgTSO *target  = tso->block_info.tso;
3660
3661       ASSERT(get_itbl(target)->type == TSO);
3662
3663       while (target->what_next == ThreadRelocated) {
3664           target = target->link;
3665           ASSERT(get_itbl(target)->type == TSO);
3666       }
3667       
3668       ASSERT(target->blocked_exceptions != NULL);
3669
3670       last = &target->blocked_exceptions;
3671       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3672            last = &t->link, t = t->link) {
3673         ASSERT(get_itbl(t)->type == TSO);
3674         if (t == tso) {
3675           *last = tso->link;
3676           goto done;
3677         }
3678       }
3679       barf("unblockThread (Exception): TSO not found");
3680     }
3681
3682 #if !defined(THREADED_RTS)
3683   case BlockedOnRead:
3684   case BlockedOnWrite:
3685 #if defined(mingw32_HOST_OS)
3686   case BlockedOnDoProc:
3687 #endif
3688     {
3689       StgTSO *prev = NULL;
3690       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3691            prev = t, t = t->link) {
3692         if (t == tso) {
3693           if (prev == NULL) {
3694             blocked_queue_hd = t->link;
3695             if (blocked_queue_tl == t) {
3696               blocked_queue_tl = END_TSO_QUEUE;
3697             }
3698           } else {
3699             prev->link = t->link;
3700             if (blocked_queue_tl == t) {
3701               blocked_queue_tl = prev;
3702             }
3703           }
3704 #if defined(mingw32_HOST_OS)
3705           /* (Cooperatively) signal that the worker thread should abort
3706            * the request.
3707            */
3708           abandonWorkRequest(tso->block_info.async_result->reqID);
3709 #endif
3710           goto done;
3711         }
3712       }
3713       barf("unblockThread (I/O): TSO not found");
3714     }
3715
3716   case BlockedOnDelay:
3717     {
3718       StgTSO *prev = NULL;
3719       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3720            prev = t, t = t->link) {
3721         if (t == tso) {
3722           if (prev == NULL) {
3723             sleeping_queue = t->link;
3724           } else {
3725             prev->link = t->link;
3726           }
3727           goto done;
3728         }
3729       }
3730       barf("unblockThread (delay): TSO not found");
3731     }
3732 #endif
3733
3734   default:
3735     barf("unblockThread");
3736   }
3737
3738  done:
3739   tso->link = END_TSO_QUEUE;
3740   tso->why_blocked = NotBlocked;
3741   tso->block_info.closure = NULL;
3742   appendToRunQueue(cap,tso);
3743
3744   // We might have just migrated this TSO to our Capability:
3745   if (tso->bound) {
3746       tso->bound->cap = cap;
3747   }
3748   tso->cap = cap;
3749 }
3750 #endif
3751
3752 /* -----------------------------------------------------------------------------
3753  * checkBlackHoles()
3754  *
3755  * Check the blackhole_queue for threads that can be woken up.  We do
3756  * this periodically: before every GC, and whenever the run queue is
3757  * empty.
3758  *
3759  * An elegant solution might be to just wake up all the blocked
3760  * threads with awakenBlockedQueue occasionally: they'll go back to
3761  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3762  * doesn't give us a way to tell whether we've actually managed to
3763  * wake up any threads, so we would be busy-waiting.
3764  *
3765  * -------------------------------------------------------------------------- */
3766
3767 static rtsBool
3768 checkBlackHoles (Capability *cap)
3769 {
3770     StgTSO **prev, *t;
3771     rtsBool any_woke_up = rtsFalse;
3772     StgHalfWord type;
3773
3774     // blackhole_queue is global:
3775     ASSERT_LOCK_HELD(&sched_mutex);
3776
3777     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3778
3779     // ASSUMES: sched_mutex
3780     prev = &blackhole_queue;
3781     t = blackhole_queue;
3782     while (t != END_TSO_QUEUE) {
3783         ASSERT(t->why_blocked == BlockedOnBlackHole);
3784         type = get_itbl(t->block_info.closure)->type;
3785         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3786             IF_DEBUG(sanity,checkTSO(t));
3787             t = unblockOne(cap, t);
3788             // urk, the threads migrate to the current capability
3789             // here, but we'd like to keep them on the original one.
3790             *prev = t;
3791             any_woke_up = rtsTrue;
3792         } else {
3793             prev = &t->link;
3794             t = t->link;
3795         }
3796     }
3797
3798     return any_woke_up;
3799 }
3800
3801 /* -----------------------------------------------------------------------------
3802  * raiseAsync()
3803  *
3804  * The following function implements the magic for raising an
3805  * asynchronous exception in an existing thread.
3806  *
3807  * We first remove the thread from any queue on which it might be
3808  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3809  *
3810  * We strip the stack down to the innermost CATCH_FRAME, building
3811  * thunks in the heap for all the active computations, so they can 
3812  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3813  * an application of the handler to the exception, and push it on
3814  * the top of the stack.
3815  * 
3816  * How exactly do we save all the active computations?  We create an
3817  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3818  * AP_STACKs pushes everything from the corresponding update frame
3819  * upwards onto the stack.  (Actually, it pushes everything up to the
3820  * next update frame plus a pointer to the next AP_STACK object.
3821  * Entering the next AP_STACK object pushes more onto the stack until we
3822  * reach the last AP_STACK object - at which point the stack should look
3823  * exactly as it did when we killed the TSO and we can continue
3824  * execution by entering the closure on top of the stack.
3825  *
3826  * We can also kill a thread entirely - this happens if either (a) the 
3827  * exception passed to raiseAsync is NULL, or (b) there's no
3828  * CATCH_FRAME on the stack.  In either case, we strip the entire
3829  * stack and replace the thread with a zombie.
3830  *
3831  * ToDo: in THREADED_RTS mode, this function is only safe if either
3832  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3833  * one Capability), or (b) we own the Capability that the TSO is
3834  * currently blocked on or on the run queue of.
3835  *
3836  * -------------------------------------------------------------------------- */
3837  
3838 void
3839 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3840 {
3841     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3842 }
3843
3844 void
3845 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3846 {
3847     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3848 }
3849
3850 static void
3851 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3852             rtsBool stop_at_atomically, StgPtr stop_here)
3853 {
3854     StgRetInfoTable *info;
3855     StgPtr sp, frame;
3856     nat i;
3857   
3858     // Thread already dead?
3859     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3860         return;
3861     }
3862
3863     IF_DEBUG(scheduler, 
3864              sched_belch("raising exception in thread %ld.", (long)tso->id));
3865     
3866     // Remove it from any blocking queues
3867     unblockThread(cap,tso);
3868
3869     // mark it dirty; we're about to change its stack.
3870     dirtyTSO(tso);
3871
3872     sp = tso->sp;
3873     
3874     // The stack freezing code assumes there's a closure pointer on
3875     // the top of the stack, so we have to arrange that this is the case...
3876     //
3877     if (sp[0] == (W_)&stg_enter_info) {
3878         sp++;
3879     } else {
3880         sp--;
3881         sp[0] = (W_)&stg_dummy_ret_closure;
3882     }
3883
3884     frame = sp + 1;
3885     while (stop_here == NULL || frame < stop_here) {
3886
3887         // 1. Let the top of the stack be the "current closure"
3888         //
3889         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3890         // CATCH_FRAME.
3891         //
3892         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3893         // current closure applied to the chunk of stack up to (but not
3894         // including) the update frame.  This closure becomes the "current
3895         // closure".  Go back to step 2.
3896         //
3897         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3898         // top of the stack applied to the exception.
3899         // 
3900         // 5. If it's a STOP_FRAME, then kill the thread.
3901         // 
3902         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3903         // transaction
3904        
3905         info = get_ret_itbl((StgClosure *)frame);
3906
3907         switch (info->i.type) {
3908
3909         case UPDATE_FRAME:
3910         {
3911             StgAP_STACK * ap;
3912             nat words;
3913             
3914             // First build an AP_STACK consisting of the stack chunk above the
3915             // current update frame, with the top word on the stack as the
3916             // fun field.
3917             //
3918             words = frame - sp - 1;
3919             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3920             
3921             ap->size = words;
3922             ap->fun  = (StgClosure *)sp[0];
3923             sp++;
3924             for(i=0; i < (nat)words; ++i) {
3925                 ap->payload[i] = (StgClosure *)*sp++;
3926             }
3927             
3928             SET_HDR(ap,&stg_AP_STACK_info,
3929                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3930             TICK_ALLOC_UP_THK(words+1,0);
3931             
3932             IF_DEBUG(scheduler,
3933                      debugBelch("sched: Updating ");
3934                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3935                      debugBelch(" with ");
3936                      printObj((StgClosure *)ap);
3937                 );
3938
3939             // Replace the updatee with an indirection
3940             //
3941             // Warning: if we're in a loop, more than one update frame on
3942             // the stack may point to the same object.  Be careful not to
3943             // overwrite an IND_OLDGEN in this case, because we'll screw
3944             // up the mutable lists.  To be on the safe side, don't
3945             // overwrite any kind of indirection at all.  See also
3946             // threadSqueezeStack in GC.c, where we have to make a similar
3947             // check.
3948             //
3949             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3950                 // revert the black hole
3951                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3952                                (StgClosure *)ap);
3953             }
3954             sp += sizeofW(StgUpdateFrame) - 1;
3955             sp[0] = (W_)ap; // push onto stack
3956             frame = sp + 1;
3957             continue; //no need to bump frame
3958         }
3959
3960         case STOP_FRAME:
3961             // We've stripped the entire stack, the thread is now dead.
3962             tso->what_next = ThreadKilled;
3963             tso->sp = frame + sizeofW(StgStopFrame);
3964             return;
3965
3966         case CATCH_FRAME:
3967             // If we find a CATCH_FRAME, and we've got an exception to raise,
3968             // then build the THUNK raise(exception), and leave it on
3969             // top of the CATCH_FRAME ready to enter.
3970             //
3971         {
3972 #ifdef PROFILING
3973             StgCatchFrame *cf = (StgCatchFrame *)frame;
3974 #endif
3975             StgThunk *raise;
3976             
3977             if (exception == NULL) break;
3978
3979             // we've got an exception to raise, so let's pass it to the
3980             // handler in this frame.
3981             //
3982             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3983             TICK_ALLOC_SE_THK(1,0);
3984             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3985             raise->payload[0] = exception;
3986             
3987             // throw away the stack from Sp up to the CATCH_FRAME.
3988             //
3989             sp = frame - 1;
3990             
3991             /* Ensure that async excpetions are blocked now, so we don't get
3992              * a surprise exception before we get around to executing the
3993              * handler.
3994              */
3995             if (tso->blocked_exceptions == NULL) {
3996                 tso->blocked_exceptions = END_TSO_QUEUE;
3997             }
3998
3999             /* Put the newly-built THUNK on top of the stack, ready to execute
4000              * when the thread restarts.
4001              */
4002             sp[0] = (W_)raise;
4003             sp[-1] = (W_)&stg_enter_info;
4004             tso->sp = sp-1;
4005             tso->what_next = ThreadRunGHC;
4006             IF_DEBUG(sanity, checkTSO(tso));
4007             return;
4008         }
4009             
4010         case ATOMICALLY_FRAME:
4011             if (stop_at_atomically) {
4012                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
4013                 stmCondemnTransaction(cap, tso -> trec);
4014 #ifdef REG_R1
4015                 tso->sp = frame;
4016 #else
4017                 // R1 is not a register: the return convention for IO in
4018                 // this case puts the return value on the stack, so we
4019                 // need to set up the stack to return to the atomically
4020                 // frame properly...
4021                 tso->sp = frame - 2;
4022                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
4023                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
4024 #endif
4025                 tso->what_next = ThreadRunGHC;
4026                 return;
4027             }
4028             // Not stop_at_atomically... fall through and abort the
4029             // transaction.
4030             
4031         case CATCH_RETRY_FRAME:
4032             // IF we find an ATOMICALLY_FRAME then we abort the
4033             // current transaction and propagate the exception.  In
4034             // this case (unlike ordinary exceptions) we do not care
4035             // whether the transaction is valid or not because its
4036             // possible validity cannot have caused the exception
4037             // and will not be visible after the abort.
4038             IF_DEBUG(stm,
4039                      debugBelch("Found atomically block delivering async exception\n"));
4040             StgTRecHeader *trec = tso -> trec;
4041             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
4042             stmAbortTransaction(cap, trec);
4043             tso -> trec = outer;
4044             break;
4045             
4046         default:
4047             break;
4048         }
4049
4050         // move on to the next stack frame
4051         frame += stack_frame_sizeW((StgClosure *)frame);
4052     }
4053
4054     // if we got here, then we stopped at stop_here
4055     ASSERT(stop_here != NULL);
4056 }
4057
4058 /* -----------------------------------------------------------------------------
4059    Deleting threads
4060
4061    This is used for interruption (^C) and forking, and corresponds to
4062    raising an exception but without letting the thread catch the
4063    exception.
4064    -------------------------------------------------------------------------- */
4065
4066 static void 
4067 deleteThread (Capability *cap, StgTSO *tso)
4068 {
4069   if (tso->why_blocked != BlockedOnCCall &&
4070       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
4071       raiseAsync(cap,tso,NULL);
4072   }
4073 }
4074
4075 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
4076 static void 
4077 deleteThread_(Capability *cap, StgTSO *tso)
4078 { // for forkProcess only:
4079   // like deleteThread(), but we delete threads in foreign calls, too.
4080
4081     if (tso->why_blocked == BlockedOnCCall ||
4082         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
4083         unblockOne(cap,tso);
4084         tso->what_next = ThreadKilled;
4085     } else {
4086         deleteThread(cap,tso);
4087     }
4088 }
4089 #endif
4090
4091 /* -----------------------------------------------------------------------------
4092    raiseExceptionHelper
4093    
4094    This function is called by the raise# primitve, just so that we can
4095    move some of the tricky bits of raising an exception from C-- into
4096    C.  Who knows, it might be a useful re-useable thing here too.
4097    -------------------------------------------------------------------------- */
4098
4099 StgWord
4100 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
4101 {
4102     Capability *cap = regTableToCapability(reg);
4103     StgThunk *raise_closure = NULL;
4104     StgPtr p, next;
4105     StgRetInfoTable *info;
4106     //
4107     // This closure represents the expression 'raise# E' where E
4108     // is the exception raise.  It is used to overwrite all the
4109     // thunks which are currently under evaluataion.
4110     //
4111
4112     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
4113     // LDV profiling: stg_raise_info has THUNK as its closure
4114     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
4115     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
4116     // 1 does not cause any problem unless profiling is performed.
4117     // However, when LDV profiling goes on, we need to linearly scan
4118     // small object pool, where raise_closure is stored, so we should
4119     // use MIN_UPD_SIZE.
4120     //
4121     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
4122     //                                 sizeofW(StgClosure)+1);
4123     //
4124
4125     //
4126     // Walk up the stack, looking for the catch frame.  On the way,
4127     // we update any closures pointed to from update frames with the
4128     // raise closure that we just built.
4129     //
4130     p = tso->sp;
4131     while(1) {
4132         info = get_ret_itbl((StgClosure *)p);
4133         next = p + stack_frame_sizeW((StgClosure *)p);
4134         switch (info->i.type) {
4135             
4136         case UPDATE_FRAME:
4137             // Only create raise_closure if we need to.
4138             if (raise_closure == NULL) {
4139                 raise_closure = 
4140                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4141                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
4142                 raise_closure->payload[0] = exception;
4143             }
4144             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
4145             p = next;
4146             continue;
4147
4148         case ATOMICALLY_FRAME:
4149             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
4150             tso->sp = p;
4151             return ATOMICALLY_FRAME;
4152             
4153         case CATCH_FRAME:
4154             tso->sp = p;
4155             return CATCH_FRAME;
4156
4157         case CATCH_STM_FRAME:
4158             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
4159             tso->sp = p;
4160             return CATCH_STM_FRAME;
4161             
4162         case STOP_FRAME:
4163             tso->sp = p;
4164             return STOP_FRAME;
4165
4166         case CATCH_RETRY_FRAME:
4167         default:
4168             p = next; 
4169             continue;
4170         }
4171     }
4172 }
4173
4174
4175 /* -----------------------------------------------------------------------------
4176    findRetryFrameHelper
4177
4178    This function is called by the retry# primitive.  It traverses the stack
4179    leaving tso->sp referring to the frame which should handle the retry.  
4180
4181    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
4182    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
4183
4184    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4185    despite the similar implementation.
4186
4187    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4188    not be created within memory transactions.
4189    -------------------------------------------------------------------------- */
4190
4191 StgWord
4192 findRetryFrameHelper (StgTSO *tso)
4193 {
4194   StgPtr           p, next;
4195   StgRetInfoTable *info;
4196
4197   p = tso -> sp;
4198   while (1) {
4199     info = get_ret_itbl((StgClosure *)p);
4200     next = p + stack_frame_sizeW((StgClosure *)p);
4201     switch (info->i.type) {
4202       
4203     case ATOMICALLY_FRAME:
4204       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4205       tso->sp = p;
4206       return ATOMICALLY_FRAME;
4207       
4208     case CATCH_RETRY_FRAME:
4209       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4210       tso->sp = p;
4211       return CATCH_RETRY_FRAME;
4212       
4213     case CATCH_STM_FRAME:
4214     default:
4215       ASSERT(info->i.type != CATCH_FRAME);
4216       ASSERT(info->i.type != STOP_FRAME);
4217       p = next; 
4218       continue;
4219     }
4220   }
4221 }
4222
4223 /* -----------------------------------------------------------------------------
4224    resurrectThreads is called after garbage collection on the list of
4225    threads found to be garbage.  Each of these threads will be woken
4226    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4227    on an MVar, or NonTermination if the thread was blocked on a Black
4228    Hole.
4229
4230    Locks: assumes we hold *all* the capabilities.
4231    -------------------------------------------------------------------------- */
4232
4233 void
4234 resurrectThreads (StgTSO *threads)
4235 {
4236     StgTSO *tso, *next;
4237     Capability *cap;
4238
4239     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4240         next = tso->global_link;
4241         tso->global_link = all_threads;
4242         all_threads = tso;
4243         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4244         
4245         // Wake up the thread on the Capability it was last on
4246         cap = tso->cap;
4247         
4248         switch (tso->why_blocked) {
4249         case BlockedOnMVar:
4250         case BlockedOnException:
4251             /* Called by GC - sched_mutex lock is currently held. */
4252             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4253             break;
4254         case BlockedOnBlackHole:
4255             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4256             break;
4257         case BlockedOnSTM:
4258             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4259             break;
4260         case NotBlocked:
4261             /* This might happen if the thread was blocked on a black hole
4262              * belonging to a thread that we've just woken up (raiseAsync
4263              * can wake up threads, remember...).
4264              */
4265             continue;
4266         default:
4267             barf("resurrectThreads: thread blocked in a strange way");
4268         }
4269     }
4270 }
4271
4272 /* ----------------------------------------------------------------------------
4273  * Debugging: why is a thread blocked
4274  * [Also provides useful information when debugging threaded programs
4275  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4276    ------------------------------------------------------------------------- */
4277
4278 #if DEBUG
4279 static void
4280 printThreadBlockage(StgTSO *tso)
4281 {
4282   switch (tso->why_blocked) {
4283   case BlockedOnRead:
4284     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4285     break;
4286   case BlockedOnWrite:
4287     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4288     break;
4289 #if defined(mingw32_HOST_OS)
4290     case BlockedOnDoProc:
4291     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4292     break;
4293 #endif
4294   case BlockedOnDelay:
4295     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4296     break;
4297   case BlockedOnMVar:
4298     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4299     break;
4300   case BlockedOnException:
4301     debugBelch("is blocked on delivering an exception to thread %d",
4302             tso->block_info.tso->id);
4303     break;
4304   case BlockedOnBlackHole:
4305     debugBelch("is blocked on a black hole");
4306     break;
4307   case NotBlocked:
4308     debugBelch("is not blocked");
4309     break;
4310 #if defined(PARALLEL_HASKELL)
4311   case BlockedOnGA:
4312     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4313             tso->block_info.closure, info_type(tso->block_info.closure));
4314     break;
4315   case BlockedOnGA_NoSend:
4316     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4317             tso->block_info.closure, info_type(tso->block_info.closure));
4318     break;
4319 #endif
4320   case BlockedOnCCall:
4321     debugBelch("is blocked on an external call");
4322     break;
4323   case BlockedOnCCall_NoUnblockExc:
4324     debugBelch("is blocked on an external call (exceptions were already blocked)");
4325     break;
4326   case BlockedOnSTM:
4327     debugBelch("is blocked on an STM operation");
4328     break;
4329   default:
4330     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4331          tso->why_blocked, tso->id, tso);
4332   }
4333 }
4334
4335 void
4336 printThreadStatus(StgTSO *t)
4337 {
4338     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4339     {
4340       void *label = lookupThreadLabel(t->id);
4341       if (label) debugBelch("[\"%s\"] ",(char *)label);
4342     }
4343     if (t->what_next == ThreadRelocated) {
4344         debugBelch("has been relocated...\n");
4345     } else {
4346         switch (t->what_next) {
4347         case ThreadKilled:
4348             debugBelch("has been killed");
4349             break;
4350         case ThreadComplete:
4351             debugBelch("has completed");
4352             break;
4353         default:
4354             printThreadBlockage(t);
4355         }
4356         debugBelch("\n");
4357     }
4358 }
4359
4360 void
4361 printAllThreads(void)
4362 {
4363   StgTSO *t, *next;
4364   nat i;
4365   Capability *cap;
4366
4367 # if defined(GRAN)
4368   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4369   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4370                        time_string, rtsFalse/*no commas!*/);
4371
4372   debugBelch("all threads at [%s]:\n", time_string);
4373 # elif defined(PARALLEL_HASKELL)
4374   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4375   ullong_format_string(CURRENT_TIME,
4376                        time_string, rtsFalse/*no commas!*/);
4377
4378   debugBelch("all threads at [%s]:\n", time_string);
4379 # else
4380   debugBelch("all threads:\n");
4381 # endif
4382
4383   for (i = 0; i < n_capabilities; i++) {
4384       cap = &capabilities[i];
4385       debugBelch("threads on capability %d:\n", cap->no);
4386       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4387           printThreadStatus(t);
4388       }
4389   }
4390
4391   debugBelch("other threads:\n");
4392   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4393       if (t->why_blocked != NotBlocked) {
4394           printThreadStatus(t);
4395       }
4396       if (t->what_next == ThreadRelocated) {
4397           next = t->link;
4398       } else {
4399           next = t->global_link;
4400       }
4401   }
4402 }
4403
4404 // useful from gdb
4405 void 
4406 printThreadQueue(StgTSO *t)
4407 {
4408     nat i = 0;
4409     for (; t != END_TSO_QUEUE; t = t->link) {
4410         printThreadStatus(t);
4411         i++;
4412     }
4413     debugBelch("%d threads on queue\n", i);
4414 }
4415
4416 /* 
4417    Print a whole blocking queue attached to node (debugging only).
4418 */
4419 # if defined(PARALLEL_HASKELL)
4420 void 
4421 print_bq (StgClosure *node)
4422 {
4423   StgBlockingQueueElement *bqe;
4424   StgTSO *tso;
4425   rtsBool end;
4426
4427   debugBelch("## BQ of closure %p (%s): ",
4428           node, info_type(node));
4429
4430   /* should cover all closures that may have a blocking queue */
4431   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4432          get_itbl(node)->type == FETCH_ME_BQ ||
4433          get_itbl(node)->type == RBH ||
4434          get_itbl(node)->type == MVAR);
4435     
4436   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4437
4438   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4439 }
4440
4441 /* 
4442    Print a whole blocking queue starting with the element bqe.
4443 */
4444 void 
4445 print_bqe (StgBlockingQueueElement *bqe)
4446 {
4447   rtsBool end;
4448
4449   /* 
4450      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4451   */
4452   for (end = (bqe==END_BQ_QUEUE);
4453        !end; // iterate until bqe points to a CONSTR
4454        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4455        bqe = end ? END_BQ_QUEUE : bqe->link) {
4456     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4457     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4458     /* types of closures that may appear in a blocking queue */
4459     ASSERT(get_itbl(bqe)->type == TSO ||           
4460            get_itbl(bqe)->type == BLOCKED_FETCH || 
4461            get_itbl(bqe)->type == CONSTR); 
4462     /* only BQs of an RBH end with an RBH_Save closure */
4463     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4464
4465     switch (get_itbl(bqe)->type) {
4466     case TSO:
4467       debugBelch(" TSO %u (%x),",
4468               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4469       break;
4470     case BLOCKED_FETCH:
4471       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4472               ((StgBlockedFetch *)bqe)->node, 
4473               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4474               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4475               ((StgBlockedFetch *)bqe)->ga.weight);
4476       break;
4477     case CONSTR:
4478       debugBelch(" %s (IP %p),",
4479               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4480                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4481                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4482                "RBH_Save_?"), get_itbl(bqe));
4483       break;
4484     default:
4485       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4486            info_type((StgClosure *)bqe)); // , node, info_type(node));
4487       break;
4488     }
4489   } /* for */
4490   debugBelch("\n");
4491 }
4492 # elif defined(GRAN)
4493 void 
4494 print_bq (StgClosure *node)
4495 {
4496   StgBlockingQueueElement *bqe;
4497   PEs node_loc, tso_loc;
4498   rtsBool end;
4499
4500   /* should cover all closures that may have a blocking queue */
4501   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4502          get_itbl(node)->type == FETCH_ME_BQ ||
4503          get_itbl(node)->type == RBH);
4504     
4505   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4506   node_loc = where_is(node);
4507
4508   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4509           node, info_type(node), node_loc);
4510
4511   /* 
4512      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4513   */
4514   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4515        !end; // iterate until bqe points to a CONSTR
4516        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4517     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4518     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4519     /* types of closures that may appear in a blocking queue */
4520     ASSERT(get_itbl(bqe)->type == TSO ||           
4521            get_itbl(bqe)->type == CONSTR); 
4522     /* only BQs of an RBH end with an RBH_Save closure */
4523     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4524
4525     tso_loc = where_is((StgClosure *)bqe);
4526     switch (get_itbl(bqe)->type) {
4527     case TSO:
4528       debugBelch(" TSO %d (%p) on [PE %d],",
4529               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4530       break;
4531     case CONSTR:
4532       debugBelch(" %s (IP %p),",
4533               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4534                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4535                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4536                "RBH_Save_?"), get_itbl(bqe));
4537       break;
4538     default:
4539       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4540            info_type((StgClosure *)bqe), node, info_type(node));
4541       break;
4542     }
4543   } /* for */
4544   debugBelch("\n");
4545 }
4546 # endif
4547
4548 #if defined(PARALLEL_HASKELL)
4549 static nat
4550 run_queue_len(void)
4551 {
4552     nat i;
4553     StgTSO *tso;
4554     
4555     for (i=0, tso=run_queue_hd; 
4556          tso != END_TSO_QUEUE;
4557          i++, tso=tso->link) {
4558         /* nothing */
4559     }
4560         
4561     return i;
4562 }
4563 #endif
4564
4565 void
4566 sched_belch(char *s, ...)
4567 {
4568     va_list ap;
4569     va_start(ap,s);
4570 #ifdef THREADED_RTS
4571     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4572 #elif defined(PARALLEL_HASKELL)
4573     debugBelch("== ");
4574 #else
4575     debugBelch("sched: ");
4576 #endif
4577     vdebugBelch(s, ap);
4578     debugBelch("\n");
4579     va_end(ap);
4580 }
4581
4582 #endif /* DEBUG */