fix possible ^C problems
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53 #include "Trace.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef  STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77  * Global variables
78  * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
84
85 /* 
86    In GranSim we have a runnable and a blocked queue for each processor.
87    In order to minimise code changes new arrays run_queue_hds/tls
88    are created. run_queue_hd is then a short cut (macro) for
89    run_queue_hds[CurrentProc] (see GranSim.h).
90    -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96    the std RTS (i.e. we are cheating). However, we don't use this list in
97    the GranSim specific code at the moment (so we are only potentially
98    cheating).  */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110  * LOCK: sched_mutex+capability, or all capabilities
111  */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up.  See
116  * Schedule.h for more thorough comment.
117  * LOCK: none (doesn't matter if we miss an update)
118  */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* Linked list of all threads.
122  * Used for detecting garbage collected threads.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *all_threads = NULL;
126
127 /* flag set by signal handler to precipitate a context switch
128  * LOCK: none (just an advisory flag)
129  */
130 int context_switch = 0;
131
132 /* flag that tracks whether we have done any execution in this time slice.
133  * LOCK: currently none, perhaps we should lock (but needs to be
134  * updated in the fast path of the scheduler).
135  */
136 nat recent_activity = ACTIVITY_YES;
137
138 /* if this flag is set as well, give up execution
139  * LOCK: none (changes once, from false->true)
140  */
141 rtsBool sched_state = SCHED_RUNNING;
142
143 /* Next thread ID to allocate.
144  * LOCK: sched_mutex
145  */
146 static StgThreadID next_thread_id = 1;
147
148 /* The smallest stack size that makes any sense is:
149  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
150  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
151  *  + 1                       (the closure to enter)
152  *  + 1                       (stg_ap_v_ret)
153  *  + 1                       (spare slot req'd by stg_ap_v_ret)
154  *
155  * A thread with this stack will bomb immediately with a stack
156  * overflow, which will increase its stack size.  
157  */
158 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
159
160 #if defined(GRAN)
161 StgTSO *CurrentTSO;
162 #endif
163
164 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
165  *  exists - earlier gccs apparently didn't.
166  *  -= chak
167  */
168 StgTSO dummy_tso;
169
170 /*
171  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
172  * in an MT setting, needed to signal that a worker thread shouldn't hang around
173  * in the scheduler when it is out of work.
174  */
175 rtsBool shutting_down_scheduler = rtsFalse;
176
177 /*
178  * This mutex protects most of the global scheduler data in
179  * the THREADED_RTS runtime.
180  */
181 #if defined(THREADED_RTS)
182 Mutex sched_mutex;
183 #endif
184
185 #if defined(PARALLEL_HASKELL)
186 StgTSO *LastTSO;
187 rtsTime TimeOfLastYield;
188 rtsBool emitSchedule = rtsTrue;
189 #endif
190
191 /* -----------------------------------------------------------------------------
192  * static function prototypes
193  * -------------------------------------------------------------------------- */
194
195 static Capability *schedule (Capability *initialCapability, Task *task);
196
197 //
198 // These function all encapsulate parts of the scheduler loop, and are
199 // abstracted only to make the structure and control flow of the
200 // scheduler clearer.
201 //
202 static void schedulePreLoop (void);
203 #if defined(THREADED_RTS)
204 static void schedulePushWork(Capability *cap, Task *task);
205 #endif
206 static void scheduleStartSignalHandlers (Capability *cap);
207 static void scheduleCheckBlockedThreads (Capability *cap);
208 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
209 static void scheduleCheckBlackHoles (Capability *cap);
210 static void scheduleDetectDeadlock (Capability *cap, Task *task);
211 #if defined(GRAN)
212 static StgTSO *scheduleProcessEvent(rtsEvent *event);
213 #endif
214 #if defined(PARALLEL_HASKELL)
215 static StgTSO *scheduleSendPendingMessages(void);
216 static void scheduleActivateSpark(void);
217 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
218 #endif
219 #if defined(PAR) || defined(GRAN)
220 static void scheduleGranParReport(void);
221 #endif
222 static void schedulePostRunThread(void);
223 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
224 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
225                                          StgTSO *t);
226 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
227                                     nat prev_what_next );
228 static void scheduleHandleThreadBlocked( StgTSO *t );
229 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
230                                              StgTSO *t );
231 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
232 static Capability *scheduleDoGC(Capability *cap, Task *task,
233                                 rtsBool force_major, 
234                                 void (*get_roots)(evac_fn));
235
236 static void unblockThread(Capability *cap, StgTSO *tso);
237 static rtsBool checkBlackHoles(Capability *cap);
238 static void AllRoots(evac_fn evac);
239
240 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
241
242 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
243                         rtsBool stop_at_atomically, StgPtr stop_here);
244
245 static void deleteThread (Capability *cap, StgTSO *tso);
246 static void deleteAllThreads (Capability *cap);
247
248 #ifdef DEBUG
249 static void printThreadBlockage(StgTSO *tso);
250 static void printThreadStatus(StgTSO *tso);
251 void printThreadQueue(StgTSO *tso);
252 #endif
253
254 #if defined(PARALLEL_HASKELL)
255 StgTSO * createSparkThread(rtsSpark spark);
256 StgTSO * activateSpark (rtsSpark spark);  
257 #endif
258
259 #ifdef DEBUG
260 static char *whatNext_strs[] = {
261   "(unknown)",
262   "ThreadRunGHC",
263   "ThreadInterpret",
264   "ThreadKilled",
265   "ThreadRelocated",
266   "ThreadComplete"
267 };
268 #endif
269
270 /* -----------------------------------------------------------------------------
271  * Putting a thread on the run queue: different scheduling policies
272  * -------------------------------------------------------------------------- */
273
274 STATIC_INLINE void
275 addToRunQueue( Capability *cap, StgTSO *t )
276 {
277 #if defined(PARALLEL_HASKELL)
278     if (RtsFlags.ParFlags.doFairScheduling) { 
279         // this does round-robin scheduling; good for concurrency
280         appendToRunQueue(cap,t);
281     } else {
282         // this does unfair scheduling; good for parallelism
283         pushOnRunQueue(cap,t);
284     }
285 #else
286     // this does round-robin scheduling; good for concurrency
287     appendToRunQueue(cap,t);
288 #endif
289 }
290
291 /* ---------------------------------------------------------------------------
292    Main scheduling loop.
293
294    We use round-robin scheduling, each thread returning to the
295    scheduler loop when one of these conditions is detected:
296
297       * out of heap space
298       * timer expires (thread yields)
299       * thread blocks
300       * thread ends
301       * stack overflow
302
303    GRAN version:
304      In a GranSim setup this loop iterates over the global event queue.
305      This revolves around the global event queue, which determines what 
306      to do next. Therefore, it's more complicated than either the 
307      concurrent or the parallel (GUM) setup.
308
309    GUM version:
310      GUM iterates over incoming messages.
311      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312      and sends out a fish whenever it has nothing to do; in-between
313      doing the actual reductions (shared code below) it processes the
314      incoming messages and deals with delayed operations 
315      (see PendingFetches).
316      This is not the ugliest code you could imagine, but it's bloody close.
317
318    ------------------------------------------------------------------------ */
319
320 static Capability *
321 schedule (Capability *initialCapability, Task *task)
322 {
323   StgTSO *t;
324   Capability *cap;
325   StgThreadReturnCode ret;
326 #if defined(GRAN)
327   rtsEvent *event;
328 #elif defined(PARALLEL_HASKELL)
329   StgTSO *tso;
330   GlobalTaskId pe;
331   rtsBool receivedFinish = rtsFalse;
332 # if defined(DEBUG)
333   nat tp_size, sp_size; // stats only
334 # endif
335 #endif
336   nat prev_what_next;
337   rtsBool ready_to_gc;
338 #if defined(THREADED_RTS)
339   rtsBool first = rtsTrue;
340 #endif
341   
342   cap = initialCapability;
343
344   // Pre-condition: this task owns initialCapability.
345   // The sched_mutex is *NOT* held
346   // NB. on return, we still hold a capability.
347
348   debugTrace (DEBUG_sched, 
349               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
350               task, initialCapability);
351
352   schedulePreLoop();
353
354   // -----------------------------------------------------------
355   // Scheduler loop starts here:
356
357 #if defined(PARALLEL_HASKELL)
358 #define TERMINATION_CONDITION        (!receivedFinish)
359 #elif defined(GRAN)
360 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
361 #else
362 #define TERMINATION_CONDITION        rtsTrue
363 #endif
364
365   while (TERMINATION_CONDITION) {
366
367 #if defined(GRAN)
368       /* Choose the processor with the next event */
369       CurrentProc = event->proc;
370       CurrentTSO = event->tso;
371 #endif
372
373 #if defined(THREADED_RTS)
374       if (first) {
375           // don't yield the first time, we want a chance to run this
376           // thread for a bit, even if there are others banging at the
377           // door.
378           first = rtsFalse;
379           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
380       } else {
381           // Yield the capability to higher-priority tasks if necessary.
382           yieldCapability(&cap, task);
383       }
384 #endif
385       
386 #if defined(THREADED_RTS)
387       schedulePushWork(cap,task);
388 #endif
389
390     // Check whether we have re-entered the RTS from Haskell without
391     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
392     // call).
393     if (cap->in_haskell) {
394           errorBelch("schedule: re-entered unsafely.\n"
395                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
396           stg_exit(EXIT_FAILURE);
397     }
398
399     // The interruption / shutdown sequence.
400     // 
401     // In order to cleanly shut down the runtime, we want to:
402     //   * make sure that all main threads return to their callers
403     //     with the state 'Interrupted'.
404     //   * clean up all OS threads assocated with the runtime
405     //   * free all memory etc.
406     //
407     // So the sequence for ^C goes like this:
408     //
409     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
410     //     arranges for some Capability to wake up
411     //
412     //   * all threads in the system are halted, and the zombies are
413     //     placed on the run queue for cleaning up.  We acquire all
414     //     the capabilities in order to delete the threads, this is
415     //     done by scheduleDoGC() for convenience (because GC already
416     //     needs to acquire all the capabilities).  We can't kill
417     //     threads involved in foreign calls.
418     // 
419     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
420     //
421     //   * sched_state := SCHED_SHUTTING_DOWN
422     //
423     //   * all workers exit when the run queue on their capability
424     //     drains.  All main threads will also exit when their TSO
425     //     reaches the head of the run queue and they can return.
426     //
427     //   * eventually all Capabilities will shut down, and the RTS can
428     //     exit.
429     //
430     //   * We might be left with threads blocked in foreign calls, 
431     //     we should really attempt to kill these somehow (TODO);
432     
433     switch (sched_state) {
434     case SCHED_RUNNING:
435         break;
436     case SCHED_INTERRUPTING:
437         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
438 #if defined(THREADED_RTS)
439         discardSparksCap(cap);
440 #endif
441         /* scheduleDoGC() deletes all the threads */
442         cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
443         break;
444     case SCHED_SHUTTING_DOWN:
445         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
446         // If we are a worker, just exit.  If we're a bound thread
447         // then we will exit below when we've removed our TSO from
448         // the run queue.
449         if (task->tso == NULL && emptyRunQueue(cap)) {
450             return cap;
451         }
452         break;
453     default:
454         barf("sched_state: %d", sched_state);
455     }
456
457 #if defined(THREADED_RTS)
458     // If the run queue is empty, take a spark and turn it into a thread.
459     {
460         if (emptyRunQueue(cap)) {
461             StgClosure *spark;
462             spark = findSpark(cap);
463             if (spark != NULL) {
464                 debugTrace(DEBUG_sched,
465                            "turning spark of closure %p into a thread",
466                            (StgClosure *)spark);
467                 createSparkThread(cap,spark);     
468             }
469         }
470     }
471 #endif // THREADED_RTS
472
473     scheduleStartSignalHandlers(cap);
474
475     // Only check the black holes here if we've nothing else to do.
476     // During normal execution, the black hole list only gets checked
477     // at GC time, to avoid repeatedly traversing this possibly long
478     // list each time around the scheduler.
479     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
480
481     scheduleCheckWakeupThreads(cap);
482
483     scheduleCheckBlockedThreads(cap);
484
485     scheduleDetectDeadlock(cap,task);
486 #if defined(THREADED_RTS)
487     cap = task->cap;    // reload cap, it might have changed
488 #endif
489
490     // Normally, the only way we can get here with no threads to
491     // run is if a keyboard interrupt received during 
492     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
493     // Additionally, it is not fatal for the
494     // threaded RTS to reach here with no threads to run.
495     //
496     // win32: might be here due to awaitEvent() being abandoned
497     // as a result of a console event having been delivered.
498     if ( emptyRunQueue(cap) ) {
499 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
500         ASSERT(sched_state >= SCHED_INTERRUPTING);
501 #endif
502         continue; // nothing to do
503     }
504
505 #if defined(PARALLEL_HASKELL)
506     scheduleSendPendingMessages();
507     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
508         continue;
509
510 #if defined(SPARKS)
511     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
512 #endif
513
514     /* If we still have no work we need to send a FISH to get a spark
515        from another PE */
516     if (emptyRunQueue(cap)) {
517         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
518         ASSERT(rtsFalse); // should not happen at the moment
519     }
520     // from here: non-empty run queue.
521     //  TODO: merge above case with this, only one call processMessages() !
522     if (PacketsWaiting()) {  /* process incoming messages, if
523                                 any pending...  only in else
524                                 because getRemoteWork waits for
525                                 messages as well */
526         receivedFinish = processMessages();
527     }
528 #endif
529
530 #if defined(GRAN)
531     scheduleProcessEvent(event);
532 #endif
533
534     // 
535     // Get a thread to run
536     //
537     t = popRunQueue(cap);
538
539 #if defined(GRAN) || defined(PAR)
540     scheduleGranParReport(); // some kind of debuging output
541 #else
542     // Sanity check the thread we're about to run.  This can be
543     // expensive if there is lots of thread switching going on...
544     IF_DEBUG(sanity,checkTSO(t));
545 #endif
546
547 #if defined(THREADED_RTS)
548     // Check whether we can run this thread in the current task.
549     // If not, we have to pass our capability to the right task.
550     {
551         Task *bound = t->bound;
552       
553         if (bound) {
554             if (bound == task) {
555                 debugTrace(DEBUG_sched,
556                            "### Running thread %d in bound thread", t->id);
557                 // yes, the Haskell thread is bound to the current native thread
558             } else {
559                 debugTrace(DEBUG_sched,
560                            "### thread %d bound to another OS thread", t->id);
561                 // no, bound to a different Haskell thread: pass to that thread
562                 pushOnRunQueue(cap,t);
563                 continue;
564             }
565         } else {
566             // The thread we want to run is unbound.
567             if (task->tso) { 
568                 debugTrace(DEBUG_sched,
569                            "### this OS thread cannot run thread %d", t->id);
570                 // no, the current native thread is bound to a different
571                 // Haskell thread, so pass it to any worker thread
572                 pushOnRunQueue(cap,t);
573                 continue; 
574             }
575         }
576     }
577 #endif
578
579     cap->r.rCurrentTSO = t;
580     
581     /* context switches are initiated by the timer signal, unless
582      * the user specified "context switch as often as possible", with
583      * +RTS -C0
584      */
585     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
586         && !emptyThreadQueues(cap)) {
587         context_switch = 1;
588     }
589          
590 run_thread:
591
592     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
593                               (long)t->id, whatNext_strs[t->what_next]);
594
595 #if defined(PROFILING)
596     startHeapProfTimer();
597 #endif
598
599     // ----------------------------------------------------------------------
600     // Run the current thread 
601
602     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
603     ASSERT(t->cap == cap);
604
605     prev_what_next = t->what_next;
606
607     errno = t->saved_errno;
608     cap->in_haskell = rtsTrue;
609
610     dirtyTSO(t);
611
612     recent_activity = ACTIVITY_YES;
613
614     switch (prev_what_next) {
615         
616     case ThreadKilled:
617     case ThreadComplete:
618         /* Thread already finished, return to scheduler. */
619         ret = ThreadFinished;
620         break;
621         
622     case ThreadRunGHC:
623     {
624         StgRegTable *r;
625         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
626         cap = regTableToCapability(r);
627         ret = r->rRet;
628         break;
629     }
630     
631     case ThreadInterpret:
632         cap = interpretBCO(cap);
633         ret = cap->r.rRet;
634         break;
635         
636     default:
637         barf("schedule: invalid what_next field");
638     }
639
640     cap->in_haskell = rtsFalse;
641
642     // The TSO might have moved, eg. if it re-entered the RTS and a GC
643     // happened.  So find the new location:
644     t = cap->r.rCurrentTSO;
645
646     // We have run some Haskell code: there might be blackhole-blocked
647     // threads to wake up now.
648     // Lock-free test here should be ok, we're just setting a flag.
649     if ( blackhole_queue != END_TSO_QUEUE ) {
650         blackholes_need_checking = rtsTrue;
651     }
652     
653     // And save the current errno in this thread.
654     // XXX: possibly bogus for SMP because this thread might already
655     // be running again, see code below.
656     t->saved_errno = errno;
657
658 #if defined(THREADED_RTS)
659     // If ret is ThreadBlocked, and this Task is bound to the TSO that
660     // blocked, we are in limbo - the TSO is now owned by whatever it
661     // is blocked on, and may in fact already have been woken up,
662     // perhaps even on a different Capability.  It may be the case
663     // that task->cap != cap.  We better yield this Capability
664     // immediately and return to normaility.
665     if (ret == ThreadBlocked) {
666         debugTrace(DEBUG_sched,
667                    "--<< thread %d (%s) stopped: blocked",
668                    t->id, whatNext_strs[t->what_next]);
669         continue;
670     }
671 #endif
672
673     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
674     ASSERT(t->cap == cap);
675
676     // ----------------------------------------------------------------------
677     
678     // Costs for the scheduler are assigned to CCS_SYSTEM
679 #if defined(PROFILING)
680     stopHeapProfTimer();
681     CCCS = CCS_SYSTEM;
682 #endif
683     
684     schedulePostRunThread();
685
686     ready_to_gc = rtsFalse;
687
688     switch (ret) {
689     case HeapOverflow:
690         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
691         break;
692
693     case StackOverflow:
694         scheduleHandleStackOverflow(cap,task,t);
695         break;
696
697     case ThreadYielding:
698         if (scheduleHandleYield(cap, t, prev_what_next)) {
699             // shortcut for switching between compiler/interpreter:
700             goto run_thread; 
701         }
702         break;
703
704     case ThreadBlocked:
705         scheduleHandleThreadBlocked(t);
706         break;
707
708     case ThreadFinished:
709         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
711         break;
712
713     default:
714       barf("schedule: invalid thread return code %d", (int)ret);
715     }
716
717     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
718     if (ready_to_gc) {
719       cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
720     }
721   } /* end of while() */
722
723   debugTrace(PAR_DEBUG_verbose,
724              "== Leaving schedule() after having received Finish");
725 }
726
727 /* ----------------------------------------------------------------------------
728  * Setting up the scheduler loop
729  * ------------------------------------------------------------------------- */
730
731 static void
732 schedulePreLoop(void)
733 {
734 #if defined(GRAN) 
735     /* set up first event to get things going */
736     /* ToDo: assign costs for system setup and init MainTSO ! */
737     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
738               ContinueThread, 
739               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
740     
741     debugTrace (DEBUG_gran,
742                 "GRAN: Init CurrentTSO (in schedule) = %p", 
743                 CurrentTSO);
744     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
745     
746     if (RtsFlags.GranFlags.Light) {
747         /* Save current time; GranSim Light only */
748         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
749     }      
750 #endif
751 }
752
753 /* -----------------------------------------------------------------------------
754  * schedulePushWork()
755  *
756  * Push work to other Capabilities if we have some.
757  * -------------------------------------------------------------------------- */
758
759 #if defined(THREADED_RTS)
760 static void
761 schedulePushWork(Capability *cap USED_IF_THREADS, 
762                  Task *task      USED_IF_THREADS)
763 {
764     Capability *free_caps[n_capabilities], *cap0;
765     nat i, n_free_caps;
766
767     // migration can be turned off with +RTS -qg
768     if (!RtsFlags.ParFlags.migrate) return;
769
770     // Check whether we have more threads on our run queue, or sparks
771     // in our pool, that we could hand to another Capability.
772     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
773         && sparkPoolSizeCap(cap) < 2) {
774         return;
775     }
776
777     // First grab as many free Capabilities as we can.
778     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
779         cap0 = &capabilities[i];
780         if (cap != cap0 && tryGrabCapability(cap0,task)) {
781             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
782                 // it already has some work, we just grabbed it at 
783                 // the wrong moment.  Or maybe it's deadlocked!
784                 releaseCapability(cap0);
785             } else {
786                 free_caps[n_free_caps++] = cap0;
787             }
788         }
789     }
790
791     // we now have n_free_caps free capabilities stashed in
792     // free_caps[].  Share our run queue equally with them.  This is
793     // probably the simplest thing we could do; improvements we might
794     // want to do include:
795     //
796     //   - giving high priority to moving relatively new threads, on 
797     //     the gournds that they haven't had time to build up a
798     //     working set in the cache on this CPU/Capability.
799     //
800     //   - giving low priority to moving long-lived threads
801
802     if (n_free_caps > 0) {
803         StgTSO *prev, *t, *next;
804         rtsBool pushed_to_all;
805
806         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
807
808         i = 0;
809         pushed_to_all = rtsFalse;
810
811         if (cap->run_queue_hd != END_TSO_QUEUE) {
812             prev = cap->run_queue_hd;
813             t = prev->link;
814             prev->link = END_TSO_QUEUE;
815             for (; t != END_TSO_QUEUE; t = next) {
816                 next = t->link;
817                 t->link = END_TSO_QUEUE;
818                 if (t->what_next == ThreadRelocated
819                     || t->bound == task // don't move my bound thread
820                     || tsoLocked(t)) {  // don't move a locked thread
821                     prev->link = t;
822                     prev = t;
823                 } else if (i == n_free_caps) {
824                     pushed_to_all = rtsTrue;
825                     i = 0;
826                     // keep one for us
827                     prev->link = t;
828                     prev = t;
829                 } else {
830                     debugTrace(DEBUG_sched, "pushing thread %d to capability %d", t->id, free_caps[i]->no);
831                     appendToRunQueue(free_caps[i],t);
832                     if (t->bound) { t->bound->cap = free_caps[i]; }
833                     t->cap = free_caps[i];
834                     i++;
835                 }
836             }
837             cap->run_queue_tl = prev;
838         }
839
840         // If there are some free capabilities that we didn't push any
841         // threads to, then try to push a spark to each one.
842         if (!pushed_to_all) {
843             StgClosure *spark;
844             // i is the next free capability to push to
845             for (; i < n_free_caps; i++) {
846                 if (emptySparkPoolCap(free_caps[i])) {
847                     spark = findSpark(cap);
848                     if (spark != NULL) {
849                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
850                         newSpark(&(free_caps[i]->r), spark);
851                     }
852                 }
853             }
854         }
855
856         // release the capabilities
857         for (i = 0; i < n_free_caps; i++) {
858             task->cap = free_caps[i];
859             releaseCapability(free_caps[i]);
860         }
861     }
862     task->cap = cap; // reset to point to our Capability.
863 }
864 #endif
865
866 /* ----------------------------------------------------------------------------
867  * Start any pending signal handlers
868  * ------------------------------------------------------------------------- */
869
870 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
871 static void
872 scheduleStartSignalHandlers(Capability *cap)
873 {
874     if (signals_pending()) { // safe outside the lock
875         startSignalHandlers(cap);
876     }
877 }
878 #else
879 static void
880 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
881 {
882 }
883 #endif
884
885 /* ----------------------------------------------------------------------------
886  * Check for blocked threads that can be woken up.
887  * ------------------------------------------------------------------------- */
888
889 static void
890 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
891 {
892 #if !defined(THREADED_RTS)
893     //
894     // Check whether any waiting threads need to be woken up.  If the
895     // run queue is empty, and there are no other tasks running, we
896     // can wait indefinitely for something to happen.
897     //
898     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
899     {
900         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
901     }
902 #endif
903 }
904
905
906 /* ----------------------------------------------------------------------------
907  * Check for threads woken up by other Capabilities
908  * ------------------------------------------------------------------------- */
909
910 static void
911 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
912 {
913 #if defined(THREADED_RTS)
914     // Any threads that were woken up by other Capabilities get
915     // appended to our run queue.
916     if (!emptyWakeupQueue(cap)) {
917         ACQUIRE_LOCK(&cap->lock);
918         if (emptyRunQueue(cap)) {
919             cap->run_queue_hd = cap->wakeup_queue_hd;
920             cap->run_queue_tl = cap->wakeup_queue_tl;
921         } else {
922             cap->run_queue_tl->link = cap->wakeup_queue_hd;
923             cap->run_queue_tl = cap->wakeup_queue_tl;
924         }
925         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
926         RELEASE_LOCK(&cap->lock);
927     }
928 #endif
929 }
930
931 /* ----------------------------------------------------------------------------
932  * Check for threads blocked on BLACKHOLEs that can be woken up
933  * ------------------------------------------------------------------------- */
934 static void
935 scheduleCheckBlackHoles (Capability *cap)
936 {
937     if ( blackholes_need_checking ) // check without the lock first
938     {
939         ACQUIRE_LOCK(&sched_mutex);
940         if ( blackholes_need_checking ) {
941             checkBlackHoles(cap);
942             blackholes_need_checking = rtsFalse;
943         }
944         RELEASE_LOCK(&sched_mutex);
945     }
946 }
947
948 /* ----------------------------------------------------------------------------
949  * Detect deadlock conditions and attempt to resolve them.
950  * ------------------------------------------------------------------------- */
951
952 static void
953 scheduleDetectDeadlock (Capability *cap, Task *task)
954 {
955
956 #if defined(PARALLEL_HASKELL)
957     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
958     return;
959 #endif
960
961     /* 
962      * Detect deadlock: when we have no threads to run, there are no
963      * threads blocked, waiting for I/O, or sleeping, and all the
964      * other tasks are waiting for work, we must have a deadlock of
965      * some description.
966      */
967     if ( emptyThreadQueues(cap) )
968     {
969 #if defined(THREADED_RTS)
970         /* 
971          * In the threaded RTS, we only check for deadlock if there
972          * has been no activity in a complete timeslice.  This means
973          * we won't eagerly start a full GC just because we don't have
974          * any threads to run currently.
975          */
976         if (recent_activity != ACTIVITY_INACTIVE) return;
977 #endif
978
979         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
980
981         // Garbage collection can release some new threads due to
982         // either (a) finalizers or (b) threads resurrected because
983         // they are unreachable and will therefore be sent an
984         // exception.  Any threads thus released will be immediately
985         // runnable.
986         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
987
988         recent_activity = ACTIVITY_DONE_GC;
989         
990         if ( !emptyRunQueue(cap) ) return;
991
992 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
993         /* If we have user-installed signal handlers, then wait
994          * for signals to arrive rather then bombing out with a
995          * deadlock.
996          */
997         if ( anyUserHandlers() ) {
998             debugTrace(DEBUG_sched,
999                        "still deadlocked, waiting for signals...");
1000
1001             awaitUserSignals();
1002
1003             if (signals_pending()) {
1004                 startSignalHandlers(cap);
1005             }
1006
1007             // either we have threads to run, or we were interrupted:
1008             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1009         }
1010 #endif
1011
1012 #if !defined(THREADED_RTS)
1013         /* Probably a real deadlock.  Send the current main thread the
1014          * Deadlock exception.
1015          */
1016         if (task->tso) {
1017             switch (task->tso->why_blocked) {
1018             case BlockedOnSTM:
1019             case BlockedOnBlackHole:
1020             case BlockedOnException:
1021             case BlockedOnMVar:
1022                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
1023                 return;
1024             default:
1025                 barf("deadlock: main thread blocked in a strange way");
1026             }
1027         }
1028         return;
1029 #endif
1030     }
1031 }
1032
1033 /* ----------------------------------------------------------------------------
1034  * Process an event (GRAN only)
1035  * ------------------------------------------------------------------------- */
1036
1037 #if defined(GRAN)
1038 static StgTSO *
1039 scheduleProcessEvent(rtsEvent *event)
1040 {
1041     StgTSO *t;
1042
1043     if (RtsFlags.GranFlags.Light)
1044       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1045
1046     /* adjust time based on time-stamp */
1047     if (event->time > CurrentTime[CurrentProc] &&
1048         event->evttype != ContinueThread)
1049       CurrentTime[CurrentProc] = event->time;
1050     
1051     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1052     if (!RtsFlags.GranFlags.Light)
1053       handleIdlePEs();
1054
1055     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1056
1057     /* main event dispatcher in GranSim */
1058     switch (event->evttype) {
1059       /* Should just be continuing execution */
1060     case ContinueThread:
1061       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1062       /* ToDo: check assertion
1063       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1064              run_queue_hd != END_TSO_QUEUE);
1065       */
1066       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1067       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1068           procStatus[CurrentProc]==Fetching) {
1069         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1070               CurrentTSO->id, CurrentTSO, CurrentProc);
1071         goto next_thread;
1072       } 
1073       /* Ignore ContinueThreads for completed threads */
1074       if (CurrentTSO->what_next == ThreadComplete) {
1075         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1076               CurrentTSO->id, CurrentTSO, CurrentProc);
1077         goto next_thread;
1078       } 
1079       /* Ignore ContinueThreads for threads that are being migrated */
1080       if (PROCS(CurrentTSO)==Nowhere) { 
1081         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1082               CurrentTSO->id, CurrentTSO, CurrentProc);
1083         goto next_thread;
1084       }
1085       /* The thread should be at the beginning of the run queue */
1086       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1087         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1088               CurrentTSO->id, CurrentTSO, CurrentProc);
1089         break; // run the thread anyway
1090       }
1091       /*
1092       new_event(proc, proc, CurrentTime[proc],
1093                 FindWork,
1094                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1095       goto next_thread; 
1096       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1097       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1098
1099     case FetchNode:
1100       do_the_fetchnode(event);
1101       goto next_thread;             /* handle next event in event queue  */
1102       
1103     case GlobalBlock:
1104       do_the_globalblock(event);
1105       goto next_thread;             /* handle next event in event queue  */
1106       
1107     case FetchReply:
1108       do_the_fetchreply(event);
1109       goto next_thread;             /* handle next event in event queue  */
1110       
1111     case UnblockThread:   /* Move from the blocked queue to the tail of */
1112       do_the_unblock(event);
1113       goto next_thread;             /* handle next event in event queue  */
1114       
1115     case ResumeThread:  /* Move from the blocked queue to the tail of */
1116       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1117       event->tso->gran.blocktime += 
1118         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1119       do_the_startthread(event);
1120       goto next_thread;             /* handle next event in event queue  */
1121       
1122     case StartThread:
1123       do_the_startthread(event);
1124       goto next_thread;             /* handle next event in event queue  */
1125       
1126     case MoveThread:
1127       do_the_movethread(event);
1128       goto next_thread;             /* handle next event in event queue  */
1129       
1130     case MoveSpark:
1131       do_the_movespark(event);
1132       goto next_thread;             /* handle next event in event queue  */
1133       
1134     case FindWork:
1135       do_the_findwork(event);
1136       goto next_thread;             /* handle next event in event queue  */
1137       
1138     default:
1139       barf("Illegal event type %u\n", event->evttype);
1140     }  /* switch */
1141     
1142     /* This point was scheduler_loop in the old RTS */
1143
1144     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1145
1146     TimeOfLastEvent = CurrentTime[CurrentProc];
1147     TimeOfNextEvent = get_time_of_next_event();
1148     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1149     // CurrentTSO = ThreadQueueHd;
1150
1151     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1152                          TimeOfNextEvent));
1153
1154     if (RtsFlags.GranFlags.Light) 
1155       GranSimLight_leave_system(event, &ActiveTSO); 
1156
1157     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1158
1159     IF_DEBUG(gran, 
1160              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1161
1162     /* in a GranSim setup the TSO stays on the run queue */
1163     t = CurrentTSO;
1164     /* Take a thread from the run queue. */
1165     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1166
1167     IF_DEBUG(gran, 
1168              debugBelch("GRAN: About to run current thread, which is\n");
1169              G_TSO(t,5));
1170
1171     context_switch = 0; // turned on via GranYield, checking events and time slice
1172
1173     IF_DEBUG(gran, 
1174              DumpGranEvent(GR_SCHEDULE, t));
1175
1176     procStatus[CurrentProc] = Busy;
1177 }
1178 #endif // GRAN
1179
1180 /* ----------------------------------------------------------------------------
1181  * Send pending messages (PARALLEL_HASKELL only)
1182  * ------------------------------------------------------------------------- */
1183
1184 #if defined(PARALLEL_HASKELL)
1185 static StgTSO *
1186 scheduleSendPendingMessages(void)
1187 {
1188     StgSparkPool *pool;
1189     rtsSpark spark;
1190     StgTSO *t;
1191
1192 # if defined(PAR) // global Mem.Mgmt., omit for now
1193     if (PendingFetches != END_BF_QUEUE) {
1194         processFetches();
1195     }
1196 # endif
1197     
1198     if (RtsFlags.ParFlags.BufferTime) {
1199         // if we use message buffering, we must send away all message
1200         // packets which have become too old...
1201         sendOldBuffers(); 
1202     }
1203 }
1204 #endif
1205
1206 /* ----------------------------------------------------------------------------
1207  * Activate spark threads (PARALLEL_HASKELL only)
1208  * ------------------------------------------------------------------------- */
1209
1210 #if defined(PARALLEL_HASKELL)
1211 static void
1212 scheduleActivateSpark(void)
1213 {
1214 #if defined(SPARKS)
1215   ASSERT(emptyRunQueue());
1216 /* We get here if the run queue is empty and want some work.
1217    We try to turn a spark into a thread, and add it to the run queue,
1218    from where it will be picked up in the next iteration of the scheduler
1219    loop.
1220 */
1221
1222       /* :-[  no local threads => look out for local sparks */
1223       /* the spark pool for the current PE */
1224       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1225       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1226           pool->hd < pool->tl) {
1227         /* 
1228          * ToDo: add GC code check that we really have enough heap afterwards!!
1229          * Old comment:
1230          * If we're here (no runnable threads) and we have pending
1231          * sparks, we must have a space problem.  Get enough space
1232          * to turn one of those pending sparks into a
1233          * thread... 
1234          */
1235
1236         spark = findSpark(rtsFalse);            /* get a spark */
1237         if (spark != (rtsSpark) NULL) {
1238           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1239           IF_PAR_DEBUG(fish, // schedule,
1240                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1241                              tso->id, tso, advisory_thread_count));
1242
1243           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1244             IF_PAR_DEBUG(fish, // schedule,
1245                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1246                             spark));
1247             return rtsFalse; /* failed to generate a thread */
1248           }                  /* otherwise fall through & pick-up new tso */
1249         } else {
1250           IF_PAR_DEBUG(fish, // schedule,
1251                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1252                              spark_queue_len(pool)));
1253           return rtsFalse;  /* failed to generate a thread */
1254         }
1255         return rtsTrue;  /* success in generating a thread */
1256   } else { /* no more threads permitted or pool empty */
1257     return rtsFalse;  /* failed to generateThread */
1258   }
1259 #else
1260   tso = NULL; // avoid compiler warning only
1261   return rtsFalse;  /* dummy in non-PAR setup */
1262 #endif // SPARKS
1263 }
1264 #endif // PARALLEL_HASKELL
1265
1266 /* ----------------------------------------------------------------------------
1267  * Get work from a remote node (PARALLEL_HASKELL only)
1268  * ------------------------------------------------------------------------- */
1269     
1270 #if defined(PARALLEL_HASKELL)
1271 static rtsBool
1272 scheduleGetRemoteWork(rtsBool *receivedFinish)
1273 {
1274   ASSERT(emptyRunQueue());
1275
1276   if (RtsFlags.ParFlags.BufferTime) {
1277         IF_PAR_DEBUG(verbose, 
1278                 debugBelch("...send all pending data,"));
1279         {
1280           nat i;
1281           for (i=1; i<=nPEs; i++)
1282             sendImmediately(i); // send all messages away immediately
1283         }
1284   }
1285 # ifndef SPARKS
1286         //++EDEN++ idle() , i.e. send all buffers, wait for work
1287         // suppress fishing in EDEN... just look for incoming messages
1288         // (blocking receive)
1289   IF_PAR_DEBUG(verbose, 
1290                debugBelch("...wait for incoming messages...\n"));
1291   *receivedFinish = processMessages(); // blocking receive...
1292
1293         // and reenter scheduling loop after having received something
1294         // (return rtsFalse below)
1295
1296 # else /* activate SPARKS machinery */
1297 /* We get here, if we have no work, tried to activate a local spark, but still
1298    have no work. We try to get a remote spark, by sending a FISH message.
1299    Thread migration should be added here, and triggered when a sequence of 
1300    fishes returns without work. */
1301         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1302
1303       /* =8-[  no local sparks => look for work on other PEs */
1304         /*
1305          * We really have absolutely no work.  Send out a fish
1306          * (there may be some out there already), and wait for
1307          * something to arrive.  We clearly can't run any threads
1308          * until a SCHEDULE or RESUME arrives, and so that's what
1309          * we're hoping to see.  (Of course, we still have to
1310          * respond to other types of messages.)
1311          */
1312         rtsTime now = msTime() /*CURRENT_TIME*/;
1313         IF_PAR_DEBUG(verbose, 
1314                      debugBelch("--  now=%ld\n", now));
1315         IF_PAR_DEBUG(fish, // verbose,
1316              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1317                  (last_fish_arrived_at!=0 &&
1318                   last_fish_arrived_at+delay > now)) {
1319                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1320                      now, last_fish_arrived_at+delay, 
1321                      last_fish_arrived_at,
1322                      delay);
1323              });
1324   
1325         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1326             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1327           if (last_fish_arrived_at==0 ||
1328               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1329             /* outstandingFishes is set in sendFish, processFish;
1330                avoid flooding system with fishes via delay */
1331     next_fish_to_send_at = 0;  
1332   } else {
1333     /* ToDo: this should be done in the main scheduling loop to avoid the
1334              busy wait here; not so bad if fish delay is very small  */
1335     int iq = 0; // DEBUGGING -- HWL
1336     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1337     /* send a fish when ready, but process messages that arrive in the meantime */
1338     do {
1339       if (PacketsWaiting()) {
1340         iq++; // DEBUGGING
1341         *receivedFinish = processMessages();
1342       }
1343       now = msTime();
1344     } while (!*receivedFinish || now<next_fish_to_send_at);
1345     // JB: This means the fish could become obsolete, if we receive
1346     // work. Better check for work again? 
1347     // last line: while (!receivedFinish || !haveWork || now<...)
1348     // next line: if (receivedFinish || haveWork )
1349
1350     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1351       return rtsFalse;  // NB: this will leave scheduler loop
1352                         // immediately after return!
1353                           
1354     IF_PAR_DEBUG(fish, // verbose,
1355                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1356
1357   }
1358
1359     // JB: IMHO, this should all be hidden inside sendFish(...)
1360     /* pe = choosePE(); 
1361        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1362                 NEW_FISH_HUNGER);
1363
1364     // Global statistics: count no. of fishes
1365     if (RtsFlags.ParFlags.ParStats.Global &&
1366          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1367            globalParStats.tot_fish_mess++;
1368            }
1369     */ 
1370
1371   /* delayed fishes must have been sent by now! */
1372   next_fish_to_send_at = 0;  
1373   }
1374       
1375   *receivedFinish = processMessages();
1376 # endif /* SPARKS */
1377
1378  return rtsFalse;
1379  /* NB: this function always returns rtsFalse, meaning the scheduler
1380     loop continues with the next iteration; 
1381     rationale: 
1382       return code means success in finding work; we enter this function
1383       if there is no local work, thus have to send a fish which takes
1384       time until it arrives with work; in the meantime we should process
1385       messages in the main loop;
1386  */
1387 }
1388 #endif // PARALLEL_HASKELL
1389
1390 /* ----------------------------------------------------------------------------
1391  * PAR/GRAN: Report stats & debugging info(?)
1392  * ------------------------------------------------------------------------- */
1393
1394 #if defined(PAR) || defined(GRAN)
1395 static void
1396 scheduleGranParReport(void)
1397 {
1398   ASSERT(run_queue_hd != END_TSO_QUEUE);
1399
1400   /* Take a thread from the run queue, if we have work */
1401   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1402
1403     /* If this TSO has got its outport closed in the meantime, 
1404      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1405      * It has to be marked as TH_DEAD for this purpose.
1406      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1407
1408 JB: TODO: investigate wether state change field could be nuked
1409      entirely and replaced by the normal tso state (whatnext
1410      field). All we want to do is to kill tsos from outside.
1411      */
1412
1413     /* ToDo: write something to the log-file
1414     if (RTSflags.ParFlags.granSimStats && !sameThread)
1415         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1416
1417     CurrentTSO = t;
1418     */
1419     /* the spark pool for the current PE */
1420     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1421
1422     IF_DEBUG(scheduler, 
1423              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1424                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1425
1426     IF_PAR_DEBUG(fish,
1427              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1428                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1429
1430     if (RtsFlags.ParFlags.ParStats.Full && 
1431         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1432         (emitSchedule || // forced emit
1433          (t && LastTSO && t->id != LastTSO->id))) {
1434       /* 
1435          we are running a different TSO, so write a schedule event to log file
1436          NB: If we use fair scheduling we also have to write  a deschedule 
1437              event for LastTSO; with unfair scheduling we know that the
1438              previous tso has blocked whenever we switch to another tso, so
1439              we don't need it in GUM for now
1440       */
1441       IF_PAR_DEBUG(fish, // schedule,
1442                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1443
1444       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1445                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1446       emitSchedule = rtsFalse;
1447     }
1448 }     
1449 #endif
1450
1451 /* ----------------------------------------------------------------------------
1452  * After running a thread...
1453  * ------------------------------------------------------------------------- */
1454
1455 static void
1456 schedulePostRunThread(void)
1457 {
1458 #if defined(PAR)
1459     /* HACK 675: if the last thread didn't yield, make sure to print a 
1460        SCHEDULE event to the log file when StgRunning the next thread, even
1461        if it is the same one as before */
1462     LastTSO = t; 
1463     TimeOfLastYield = CURRENT_TIME;
1464 #endif
1465
1466   /* some statistics gathering in the parallel case */
1467
1468 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1469   switch (ret) {
1470     case HeapOverflow:
1471 # if defined(GRAN)
1472       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1473       globalGranStats.tot_heapover++;
1474 # elif defined(PAR)
1475       globalParStats.tot_heapover++;
1476 # endif
1477       break;
1478
1479      case StackOverflow:
1480 # if defined(GRAN)
1481       IF_DEBUG(gran, 
1482                DumpGranEvent(GR_DESCHEDULE, t));
1483       globalGranStats.tot_stackover++;
1484 # elif defined(PAR)
1485       // IF_DEBUG(par, 
1486       // DumpGranEvent(GR_DESCHEDULE, t);
1487       globalParStats.tot_stackover++;
1488 # endif
1489       break;
1490
1491     case ThreadYielding:
1492 # if defined(GRAN)
1493       IF_DEBUG(gran, 
1494                DumpGranEvent(GR_DESCHEDULE, t));
1495       globalGranStats.tot_yields++;
1496 # elif defined(PAR)
1497       // IF_DEBUG(par, 
1498       // DumpGranEvent(GR_DESCHEDULE, t);
1499       globalParStats.tot_yields++;
1500 # endif
1501       break; 
1502
1503     case ThreadBlocked:
1504 # if defined(GRAN)
1505         debugTrace(DEBUG_sched, 
1506                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1507                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1508                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1509                if (t->block_info.closure!=(StgClosure*)NULL)
1510                  print_bq(t->block_info.closure);
1511                debugBelch("\n"));
1512
1513       // ??? needed; should emit block before
1514       IF_DEBUG(gran, 
1515                DumpGranEvent(GR_DESCHEDULE, t)); 
1516       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1517       /*
1518         ngoq Dogh!
1519       ASSERT(procStatus[CurrentProc]==Busy || 
1520               ((procStatus[CurrentProc]==Fetching) && 
1521               (t->block_info.closure!=(StgClosure*)NULL)));
1522       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1523           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1524             procStatus[CurrentProc]==Fetching)) 
1525         procStatus[CurrentProc] = Idle;
1526       */
1527 # elif defined(PAR)
1528 //++PAR++  blockThread() writes the event (change?)
1529 # endif
1530     break;
1531
1532   case ThreadFinished:
1533     break;
1534
1535   default:
1536     barf("parGlobalStats: unknown return code");
1537     break;
1538     }
1539 #endif
1540 }
1541
1542 /* -----------------------------------------------------------------------------
1543  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1544  * -------------------------------------------------------------------------- */
1545
1546 static rtsBool
1547 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1548 {
1549     // did the task ask for a large block?
1550     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1551         // if so, get one and push it on the front of the nursery.
1552         bdescr *bd;
1553         lnat blocks;
1554         
1555         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1556         
1557         debugTrace(DEBUG_sched,
1558                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1559                    (long)t->id, whatNext_strs[t->what_next], blocks);
1560     
1561         // don't do this if the nursery is (nearly) full, we'll GC first.
1562         if (cap->r.rCurrentNursery->link != NULL ||
1563             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1564                                                // if the nursery has only one block.
1565             
1566             ACQUIRE_SM_LOCK
1567             bd = allocGroup( blocks );
1568             RELEASE_SM_LOCK
1569             cap->r.rNursery->n_blocks += blocks;
1570             
1571             // link the new group into the list
1572             bd->link = cap->r.rCurrentNursery;
1573             bd->u.back = cap->r.rCurrentNursery->u.back;
1574             if (cap->r.rCurrentNursery->u.back != NULL) {
1575                 cap->r.rCurrentNursery->u.back->link = bd;
1576             } else {
1577 #if !defined(THREADED_RTS)
1578                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1579                        g0s0 == cap->r.rNursery);
1580 #endif
1581                 cap->r.rNursery->blocks = bd;
1582             }             
1583             cap->r.rCurrentNursery->u.back = bd;
1584             
1585             // initialise it as a nursery block.  We initialise the
1586             // step, gen_no, and flags field of *every* sub-block in
1587             // this large block, because this is easier than making
1588             // sure that we always find the block head of a large
1589             // block whenever we call Bdescr() (eg. evacuate() and
1590             // isAlive() in the GC would both have to do this, at
1591             // least).
1592             { 
1593                 bdescr *x;
1594                 for (x = bd; x < bd + blocks; x++) {
1595                     x->step = cap->r.rNursery;
1596                     x->gen_no = 0;
1597                     x->flags = 0;
1598                 }
1599             }
1600             
1601             // This assert can be a killer if the app is doing lots
1602             // of large block allocations.
1603             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1604             
1605             // now update the nursery to point to the new block
1606             cap->r.rCurrentNursery = bd;
1607             
1608             // we might be unlucky and have another thread get on the
1609             // run queue before us and steal the large block, but in that
1610             // case the thread will just end up requesting another large
1611             // block.
1612             pushOnRunQueue(cap,t);
1613             return rtsFalse;  /* not actually GC'ing */
1614         }
1615     }
1616     
1617     debugTrace(DEBUG_sched,
1618                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1619                (long)t->id, whatNext_strs[t->what_next]);
1620
1621 #if defined(GRAN)
1622     ASSERT(!is_on_queue(t,CurrentProc));
1623 #elif defined(PARALLEL_HASKELL)
1624     /* Currently we emit a DESCHEDULE event before GC in GUM.
1625        ToDo: either add separate event to distinguish SYSTEM time from rest
1626        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1627     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1628         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1629                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1630         emitSchedule = rtsTrue;
1631     }
1632 #endif
1633       
1634     pushOnRunQueue(cap,t);
1635     return rtsTrue;
1636     /* actual GC is done at the end of the while loop in schedule() */
1637 }
1638
1639 /* -----------------------------------------------------------------------------
1640  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1641  * -------------------------------------------------------------------------- */
1642
1643 static void
1644 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1645 {
1646     debugTrace (DEBUG_sched,
1647                 "--<< thread %ld (%s) stopped, StackOverflow\n", 
1648                 (long)t->id, whatNext_strs[t->what_next]);
1649
1650     /* just adjust the stack for this thread, then pop it back
1651      * on the run queue.
1652      */
1653     { 
1654         /* enlarge the stack */
1655         StgTSO *new_t = threadStackOverflow(cap, t);
1656         
1657         /* The TSO attached to this Task may have moved, so update the
1658          * pointer to it.
1659          */
1660         if (task->tso == t) {
1661             task->tso = new_t;
1662         }
1663         pushOnRunQueue(cap,new_t);
1664     }
1665 }
1666
1667 /* -----------------------------------------------------------------------------
1668  * Handle a thread that returned to the scheduler with ThreadYielding
1669  * -------------------------------------------------------------------------- */
1670
1671 static rtsBool
1672 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1673 {
1674     // Reset the context switch flag.  We don't do this just before
1675     // running the thread, because that would mean we would lose ticks
1676     // during GC, which can lead to unfair scheduling (a thread hogs
1677     // the CPU because the tick always arrives during GC).  This way
1678     // penalises threads that do a lot of allocation, but that seems
1679     // better than the alternative.
1680     context_switch = 0;
1681     
1682     /* put the thread back on the run queue.  Then, if we're ready to
1683      * GC, check whether this is the last task to stop.  If so, wake
1684      * up the GC thread.  getThread will block during a GC until the
1685      * GC is finished.
1686      */
1687 #ifdef DEBUG
1688     if (t->what_next != prev_what_next) {
1689         debugTrace(DEBUG_sched,
1690                    "--<< thread %ld (%s) stopped to switch evaluators\n", 
1691                    (long)t->id, whatNext_strs[t->what_next]);
1692     } else {
1693         debugTrace(DEBUG_sched,
1694                    "--<< thread %ld (%s) stopped, yielding\n",
1695                    (long)t->id, whatNext_strs[t->what_next]);
1696     }
1697 #endif
1698     
1699     IF_DEBUG(sanity,
1700              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1701              checkTSO(t));
1702     ASSERT(t->link == END_TSO_QUEUE);
1703     
1704     // Shortcut if we're just switching evaluators: don't bother
1705     // doing stack squeezing (which can be expensive), just run the
1706     // thread.
1707     if (t->what_next != prev_what_next) {
1708         return rtsTrue;
1709     }
1710     
1711 #if defined(GRAN)
1712     ASSERT(!is_on_queue(t,CurrentProc));
1713       
1714     IF_DEBUG(sanity,
1715              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1716              checkThreadQsSanity(rtsTrue));
1717
1718 #endif
1719
1720     addToRunQueue(cap,t);
1721
1722 #if defined(GRAN)
1723     /* add a ContinueThread event to actually process the thread */
1724     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1725               ContinueThread,
1726               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1727     IF_GRAN_DEBUG(bq, 
1728                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1729                   G_EVENTQ(0);
1730                   G_CURR_THREADQ(0));
1731 #endif
1732     return rtsFalse;
1733 }
1734
1735 /* -----------------------------------------------------------------------------
1736  * Handle a thread that returned to the scheduler with ThreadBlocked
1737  * -------------------------------------------------------------------------- */
1738
1739 static void
1740 scheduleHandleThreadBlocked( StgTSO *t
1741 #if !defined(GRAN) && !defined(DEBUG)
1742     STG_UNUSED
1743 #endif
1744     )
1745 {
1746 #if defined(GRAN)
1747     IF_DEBUG(scheduler,
1748              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1749                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1750              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1751     
1752     // ??? needed; should emit block before
1753     IF_DEBUG(gran, 
1754              DumpGranEvent(GR_DESCHEDULE, t)); 
1755     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1756     /*
1757       ngoq Dogh!
1758       ASSERT(procStatus[CurrentProc]==Busy || 
1759       ((procStatus[CurrentProc]==Fetching) && 
1760       (t->block_info.closure!=(StgClosure*)NULL)));
1761       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1762       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1763       procStatus[CurrentProc]==Fetching)) 
1764       procStatus[CurrentProc] = Idle;
1765     */
1766 #elif defined(PAR)
1767     IF_DEBUG(scheduler,
1768              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1769                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1770     IF_PAR_DEBUG(bq,
1771                  
1772                  if (t->block_info.closure!=(StgClosure*)NULL) 
1773                  print_bq(t->block_info.closure));
1774     
1775     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1776     blockThread(t);
1777     
1778     /* whatever we schedule next, we must log that schedule */
1779     emitSchedule = rtsTrue;
1780     
1781 #else /* !GRAN */
1782
1783       // We don't need to do anything.  The thread is blocked, and it
1784       // has tidied up its stack and placed itself on whatever queue
1785       // it needs to be on.
1786
1787 #if !defined(THREADED_RTS)
1788     ASSERT(t->why_blocked != NotBlocked);
1789              // This might not be true under THREADED_RTS: we don't have
1790              // exclusive access to this TSO, so someone might have
1791              // woken it up by now.  This actually happens: try
1792              // conc023 +RTS -N2.
1793 #endif
1794
1795 #ifdef DEBUG
1796     if (traceClass(DEBUG_sched)) {
1797         debugTraceBegin("--<< thread %d (%s) stopped: ", 
1798                    t->id, whatNext_strs[t->what_next]);
1799         printThreadBlockage(t);
1800         debugTraceEnd();
1801     }
1802 #endif
1803     
1804     /* Only for dumping event to log file 
1805        ToDo: do I need this in GranSim, too?
1806        blockThread(t);
1807     */
1808 #endif
1809 }
1810
1811 /* -----------------------------------------------------------------------------
1812  * Handle a thread that returned to the scheduler with ThreadFinished
1813  * -------------------------------------------------------------------------- */
1814
1815 static rtsBool
1816 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1817 {
1818     /* Need to check whether this was a main thread, and if so,
1819      * return with the return value.
1820      *
1821      * We also end up here if the thread kills itself with an
1822      * uncaught exception, see Exception.cmm.
1823      */
1824     debugTrace(DEBUG_sched, "--++ thread %d (%s) finished", 
1825                t->id, whatNext_strs[t->what_next]);
1826
1827 #if defined(GRAN)
1828       endThread(t, CurrentProc); // clean-up the thread
1829 #elif defined(PARALLEL_HASKELL)
1830       /* For now all are advisory -- HWL */
1831       //if(t->priority==AdvisoryPriority) ??
1832       advisory_thread_count--; // JB: Caution with this counter, buggy!
1833       
1834 # if defined(DIST)
1835       if(t->dist.priority==RevalPriority)
1836         FinishReval(t);
1837 # endif
1838     
1839 # if defined(EDENOLD)
1840       // the thread could still have an outport... (BUG)
1841       if (t->eden.outport != -1) {
1842       // delete the outport for the tso which has finished...
1843         IF_PAR_DEBUG(eden_ports,
1844                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1845                               t->eden.outport, t->id));
1846         deleteOPT(t);
1847       }
1848       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1849       if (t->eden.epid != -1) {
1850         IF_PAR_DEBUG(eden_ports,
1851                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1852                            t->id, t->eden.epid));
1853         removeTSOfromProcess(t);
1854       }
1855 # endif 
1856
1857 # if defined(PAR)
1858       if (RtsFlags.ParFlags.ParStats.Full &&
1859           !RtsFlags.ParFlags.ParStats.Suppressed) 
1860         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1861
1862       //  t->par only contains statistics: left out for now...
1863       IF_PAR_DEBUG(fish,
1864                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1865                               t->id,t,t->par.sparkname));
1866 # endif
1867 #endif // PARALLEL_HASKELL
1868
1869       //
1870       // Check whether the thread that just completed was a bound
1871       // thread, and if so return with the result.  
1872       //
1873       // There is an assumption here that all thread completion goes
1874       // through this point; we need to make sure that if a thread
1875       // ends up in the ThreadKilled state, that it stays on the run
1876       // queue so it can be dealt with here.
1877       //
1878
1879       if (t->bound) {
1880
1881           if (t->bound != task) {
1882 #if !defined(THREADED_RTS)
1883               // Must be a bound thread that is not the topmost one.  Leave
1884               // it on the run queue until the stack has unwound to the
1885               // point where we can deal with this.  Leaving it on the run
1886               // queue also ensures that the garbage collector knows about
1887               // this thread and its return value (it gets dropped from the
1888               // all_threads list so there's no other way to find it).
1889               appendToRunQueue(cap,t);
1890               return rtsFalse;
1891 #else
1892               // this cannot happen in the threaded RTS, because a
1893               // bound thread can only be run by the appropriate Task.
1894               barf("finished bound thread that isn't mine");
1895 #endif
1896           }
1897
1898           ASSERT(task->tso == t);
1899
1900           if (t->what_next == ThreadComplete) {
1901               if (task->ret) {
1902                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1903                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1904               }
1905               task->stat = Success;
1906           } else {
1907               if (task->ret) {
1908                   *(task->ret) = NULL;
1909               }
1910               if (sched_state >= SCHED_INTERRUPTING) {
1911                   task->stat = Interrupted;
1912               } else {
1913                   task->stat = Killed;
1914               }
1915           }
1916 #ifdef DEBUG
1917           removeThreadLabel((StgWord)task->tso->id);
1918 #endif
1919           return rtsTrue; // tells schedule() to return
1920       }
1921
1922       return rtsFalse;
1923 }
1924
1925 /* -----------------------------------------------------------------------------
1926  * Perform a heap census, if PROFILING
1927  * -------------------------------------------------------------------------- */
1928
1929 static rtsBool
1930 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1931 {
1932 #if defined(PROFILING)
1933     // When we have +RTS -i0 and we're heap profiling, do a census at
1934     // every GC.  This lets us get repeatable runs for debugging.
1935     if (performHeapProfile ||
1936         (RtsFlags.ProfFlags.profileInterval==0 &&
1937          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1938
1939         // checking black holes is necessary before GC, otherwise
1940         // there may be threads that are unreachable except by the
1941         // blackhole queue, which the GC will consider to be
1942         // deadlocked.
1943         scheduleCheckBlackHoles(&MainCapability);
1944
1945         debugTrace(DEBUG_sched, "garbage collecting before heap census");
1946         GarbageCollect(GetRoots, rtsTrue);
1947
1948         debugTrace(DEBUG_sched, "performing heap census");
1949         heapCensus();
1950
1951         performHeapProfile = rtsFalse;
1952         return rtsTrue;  // true <=> we already GC'd
1953     }
1954 #endif
1955     return rtsFalse;
1956 }
1957
1958 /* -----------------------------------------------------------------------------
1959  * Perform a garbage collection if necessary
1960  * -------------------------------------------------------------------------- */
1961
1962 static Capability *
1963 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1964               rtsBool force_major, void (*get_roots)(evac_fn))
1965 {
1966     StgTSO *t;
1967 #ifdef THREADED_RTS
1968     static volatile StgWord waiting_for_gc;
1969     rtsBool was_waiting;
1970     nat i;
1971 #endif
1972
1973 #ifdef THREADED_RTS
1974     // In order to GC, there must be no threads running Haskell code.
1975     // Therefore, the GC thread needs to hold *all* the capabilities,
1976     // and release them after the GC has completed.  
1977     //
1978     // This seems to be the simplest way: previous attempts involved
1979     // making all the threads with capabilities give up their
1980     // capabilities and sleep except for the *last* one, which
1981     // actually did the GC.  But it's quite hard to arrange for all
1982     // the other tasks to sleep and stay asleep.
1983     //
1984         
1985     was_waiting = cas(&waiting_for_gc, 0, 1);
1986     if (was_waiting) {
1987         do {
1988             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1989             if (cap) yieldCapability(&cap,task);
1990         } while (waiting_for_gc);
1991         return cap;  // NOTE: task->cap might have changed here
1992     }
1993
1994     for (i=0; i < n_capabilities; i++) {
1995         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1996         if (cap != &capabilities[i]) {
1997             Capability *pcap = &capabilities[i];
1998             // we better hope this task doesn't get migrated to
1999             // another Capability while we're waiting for this one.
2000             // It won't, because load balancing happens while we have
2001             // all the Capabilities, but even so it's a slightly
2002             // unsavoury invariant.
2003             task->cap = pcap;
2004             context_switch = 1;
2005             waitForReturnCapability(&pcap, task);
2006             if (pcap != &capabilities[i]) {
2007                 barf("scheduleDoGC: got the wrong capability");
2008             }
2009         }
2010     }
2011
2012     waiting_for_gc = rtsFalse;
2013 #endif
2014
2015     /* Kick any transactions which are invalid back to their
2016      * atomically frames.  When next scheduled they will try to
2017      * commit, this commit will fail and they will retry.
2018      */
2019     { 
2020         StgTSO *next;
2021
2022         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2023             if (t->what_next == ThreadRelocated) {
2024                 next = t->link;
2025             } else {
2026                 next = t->global_link;
2027                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2028                     if (!stmValidateNestOfTransactions (t -> trec)) {
2029                         debugTrace(DEBUG_sched | DEBUG_stm,
2030                                    "trec %p found wasting its time", t);
2031                         
2032                         // strip the stack back to the
2033                         // ATOMICALLY_FRAME, aborting the (nested)
2034                         // transaction, and saving the stack of any
2035                         // partially-evaluated thunks on the heap.
2036                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
2037                         
2038 #ifdef REG_R1
2039                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2040 #endif
2041                     }
2042                 }
2043             }
2044         }
2045     }
2046     
2047     // so this happens periodically:
2048     if (cap) scheduleCheckBlackHoles(cap);
2049     
2050     IF_DEBUG(scheduler, printAllThreads());
2051
2052     /*
2053      * We now have all the capabilities; if we're in an interrupting
2054      * state, then we should take the opportunity to delete all the
2055      * threads in the system.
2056      */
2057     if (sched_state >= SCHED_INTERRUPTING) {
2058         deleteAllThreads(&capabilities[0]);
2059         sched_state = SCHED_SHUTTING_DOWN;
2060     }
2061
2062     /* everybody back, start the GC.
2063      * Could do it in this thread, or signal a condition var
2064      * to do it in another thread.  Either way, we need to
2065      * broadcast on gc_pending_cond afterward.
2066      */
2067 #if defined(THREADED_RTS)
2068     debugTrace(DEBUG_sched, "doing GC");
2069 #endif
2070     GarbageCollect(get_roots, force_major);
2071     
2072 #if defined(THREADED_RTS)
2073     // release our stash of capabilities.
2074     for (i = 0; i < n_capabilities; i++) {
2075         if (cap != &capabilities[i]) {
2076             task->cap = &capabilities[i];
2077             releaseCapability(&capabilities[i]);
2078         }
2079     }
2080     if (cap) {
2081         task->cap = cap;
2082     } else {
2083         task->cap = NULL;
2084     }
2085 #endif
2086
2087 #if defined(GRAN)
2088     /* add a ContinueThread event to continue execution of current thread */
2089     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2090               ContinueThread,
2091               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2092     IF_GRAN_DEBUG(bq, 
2093                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2094                   G_EVENTQ(0);
2095                   G_CURR_THREADQ(0));
2096 #endif /* GRAN */
2097
2098     return cap;
2099 }
2100
2101 /* ---------------------------------------------------------------------------
2102  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2103  * used by Control.Concurrent for error checking.
2104  * ------------------------------------------------------------------------- */
2105  
2106 StgBool
2107 rtsSupportsBoundThreads(void)
2108 {
2109 #if defined(THREADED_RTS)
2110   return rtsTrue;
2111 #else
2112   return rtsFalse;
2113 #endif
2114 }
2115
2116 /* ---------------------------------------------------------------------------
2117  * isThreadBound(tso): check whether tso is bound to an OS thread.
2118  * ------------------------------------------------------------------------- */
2119  
2120 StgBool
2121 isThreadBound(StgTSO* tso USED_IF_THREADS)
2122 {
2123 #if defined(THREADED_RTS)
2124   return (tso->bound != NULL);
2125 #endif
2126   return rtsFalse;
2127 }
2128
2129 /* ---------------------------------------------------------------------------
2130  * Singleton fork(). Do not copy any running threads.
2131  * ------------------------------------------------------------------------- */
2132
2133 #if !defined(mingw32_HOST_OS)
2134 #define FORKPROCESS_PRIMOP_SUPPORTED
2135 #endif
2136
2137 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2138 static void 
2139 deleteThread_(Capability *cap, StgTSO *tso);
2140 #endif
2141 StgInt
2142 forkProcess(HsStablePtr *entry
2143 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2144             STG_UNUSED
2145 #endif
2146            )
2147 {
2148 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2149     Task *task;
2150     pid_t pid;
2151     StgTSO* t,*next;
2152     Capability *cap;
2153     
2154 #if defined(THREADED_RTS)
2155     if (RtsFlags.ParFlags.nNodes > 1) {
2156         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2157         stg_exit(EXIT_FAILURE);
2158     }
2159 #endif
2160
2161     debugTrace(DEBUG_sched, "forking!");
2162     
2163     // ToDo: for SMP, we should probably acquire *all* the capabilities
2164     cap = rts_lock();
2165     
2166     pid = fork();
2167     
2168     if (pid) { // parent
2169         
2170         // just return the pid
2171         rts_unlock(cap);
2172         return pid;
2173         
2174     } else { // child
2175         
2176         // Now, all OS threads except the thread that forked are
2177         // stopped.  We need to stop all Haskell threads, including
2178         // those involved in foreign calls.  Also we need to delete
2179         // all Tasks, because they correspond to OS threads that are
2180         // now gone.
2181
2182         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2183             if (t->what_next == ThreadRelocated) {
2184                 next = t->link;
2185             } else {
2186                 next = t->global_link;
2187                 // don't allow threads to catch the ThreadKilled
2188                 // exception, but we do want to raiseAsync() because these
2189                 // threads may be evaluating thunks that we need later.
2190                 deleteThread_(cap,t);
2191             }
2192         }
2193         
2194         // Empty the run queue.  It seems tempting to let all the
2195         // killed threads stay on the run queue as zombies to be
2196         // cleaned up later, but some of them correspond to bound
2197         // threads for which the corresponding Task does not exist.
2198         cap->run_queue_hd = END_TSO_QUEUE;
2199         cap->run_queue_tl = END_TSO_QUEUE;
2200
2201         // Any suspended C-calling Tasks are no more, their OS threads
2202         // don't exist now:
2203         cap->suspended_ccalling_tasks = NULL;
2204
2205         // Empty the all_threads list.  Otherwise, the garbage
2206         // collector may attempt to resurrect some of these threads.
2207         all_threads = END_TSO_QUEUE;
2208
2209         // Wipe the task list, except the current Task.
2210         ACQUIRE_LOCK(&sched_mutex);
2211         for (task = all_tasks; task != NULL; task=task->all_link) {
2212             if (task != cap->running_task) {
2213                 discardTask(task);
2214             }
2215         }
2216         RELEASE_LOCK(&sched_mutex);
2217
2218 #if defined(THREADED_RTS)
2219         // Wipe our spare workers list, they no longer exist.  New
2220         // workers will be created if necessary.
2221         cap->spare_workers = NULL;
2222         cap->returning_tasks_hd = NULL;
2223         cap->returning_tasks_tl = NULL;
2224 #endif
2225
2226         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2227         rts_checkSchedStatus("forkProcess",cap);
2228         
2229         rts_unlock(cap);
2230         hs_exit();                      // clean up and exit
2231         stg_exit(EXIT_SUCCESS);
2232     }
2233 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2234     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2235     return -1;
2236 #endif
2237 }
2238
2239 /* ---------------------------------------------------------------------------
2240  * Delete all the threads in the system
2241  * ------------------------------------------------------------------------- */
2242    
2243 static void
2244 deleteAllThreads ( Capability *cap )
2245 {
2246   StgTSO* t, *next;
2247   debugTrace(DEBUG_sched,"deleting all threads");
2248   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2249       if (t->what_next == ThreadRelocated) {
2250           next = t->link;
2251       } else {
2252           next = t->global_link;
2253           deleteThread(cap,t);
2254       }
2255   }      
2256
2257   // The run queue now contains a bunch of ThreadKilled threads.  We
2258   // must not throw these away: the main thread(s) will be in there
2259   // somewhere, and the main scheduler loop has to deal with it.
2260   // Also, the run queue is the only thing keeping these threads from
2261   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2262
2263 #if !defined(THREADED_RTS)
2264   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2265   ASSERT(sleeping_queue == END_TSO_QUEUE);
2266 #endif
2267 }
2268
2269 /* -----------------------------------------------------------------------------
2270    Managing the suspended_ccalling_tasks list.
2271    Locks required: sched_mutex
2272    -------------------------------------------------------------------------- */
2273
2274 STATIC_INLINE void
2275 suspendTask (Capability *cap, Task *task)
2276 {
2277     ASSERT(task->next == NULL && task->prev == NULL);
2278     task->next = cap->suspended_ccalling_tasks;
2279     task->prev = NULL;
2280     if (cap->suspended_ccalling_tasks) {
2281         cap->suspended_ccalling_tasks->prev = task;
2282     }
2283     cap->suspended_ccalling_tasks = task;
2284 }
2285
2286 STATIC_INLINE void
2287 recoverSuspendedTask (Capability *cap, Task *task)
2288 {
2289     if (task->prev) {
2290         task->prev->next = task->next;
2291     } else {
2292         ASSERT(cap->suspended_ccalling_tasks == task);
2293         cap->suspended_ccalling_tasks = task->next;
2294     }
2295     if (task->next) {
2296         task->next->prev = task->prev;
2297     }
2298     task->next = task->prev = NULL;
2299 }
2300
2301 /* ---------------------------------------------------------------------------
2302  * Suspending & resuming Haskell threads.
2303  * 
2304  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2305  * its capability before calling the C function.  This allows another
2306  * task to pick up the capability and carry on running Haskell
2307  * threads.  It also means that if the C call blocks, it won't lock
2308  * the whole system.
2309  *
2310  * The Haskell thread making the C call is put to sleep for the
2311  * duration of the call, on the susepended_ccalling_threads queue.  We
2312  * give out a token to the task, which it can use to resume the thread
2313  * on return from the C function.
2314  * ------------------------------------------------------------------------- */
2315    
2316 void *
2317 suspendThread (StgRegTable *reg)
2318 {
2319   Capability *cap;
2320   int saved_errno = errno;
2321   StgTSO *tso;
2322   Task *task;
2323
2324   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2325    */
2326   cap = regTableToCapability(reg);
2327
2328   task = cap->running_task;
2329   tso = cap->r.rCurrentTSO;
2330
2331   debugTrace(DEBUG_sched, 
2332              "thread %d did a safe foreign call", 
2333              cap->r.rCurrentTSO->id);
2334
2335   // XXX this might not be necessary --SDM
2336   tso->what_next = ThreadRunGHC;
2337
2338   threadPaused(cap,tso);
2339
2340   if(tso->blocked_exceptions == NULL)  {
2341       tso->why_blocked = BlockedOnCCall;
2342       tso->blocked_exceptions = END_TSO_QUEUE;
2343   } else {
2344       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2345   }
2346
2347   // Hand back capability
2348   task->suspended_tso = tso;
2349
2350   ACQUIRE_LOCK(&cap->lock);
2351
2352   suspendTask(cap,task);
2353   cap->in_haskell = rtsFalse;
2354   releaseCapability_(cap);
2355   
2356   RELEASE_LOCK(&cap->lock);
2357
2358 #if defined(THREADED_RTS)
2359   /* Preparing to leave the RTS, so ensure there's a native thread/task
2360      waiting to take over.
2361   */
2362   debugTrace(DEBUG_sched, "thread %d: leaving RTS", tso->id);
2363 #endif
2364
2365   errno = saved_errno;
2366   return task;
2367 }
2368
2369 StgRegTable *
2370 resumeThread (void *task_)
2371 {
2372     StgTSO *tso;
2373     Capability *cap;
2374     int saved_errno = errno;
2375     Task *task = task_;
2376
2377     cap = task->cap;
2378     // Wait for permission to re-enter the RTS with the result.
2379     waitForReturnCapability(&cap,task);
2380     // we might be on a different capability now... but if so, our
2381     // entry on the suspended_ccalling_tasks list will also have been
2382     // migrated.
2383
2384     // Remove the thread from the suspended list
2385     recoverSuspendedTask(cap,task);
2386
2387     tso = task->suspended_tso;
2388     task->suspended_tso = NULL;
2389     tso->link = END_TSO_QUEUE;
2390     debugTrace(DEBUG_sched, "thread %d: re-entering RTS", tso->id);
2391     
2392     if (tso->why_blocked == BlockedOnCCall) {
2393         awakenBlockedQueue(cap,tso->blocked_exceptions);
2394         tso->blocked_exceptions = NULL;
2395     }
2396     
2397     /* Reset blocking status */
2398     tso->why_blocked  = NotBlocked;
2399     
2400     cap->r.rCurrentTSO = tso;
2401     cap->in_haskell = rtsTrue;
2402     errno = saved_errno;
2403
2404     /* We might have GC'd, mark the TSO dirty again */
2405     dirtyTSO(tso);
2406
2407     IF_DEBUG(sanity, checkTSO(tso));
2408
2409     return &cap->r;
2410 }
2411
2412 /* ---------------------------------------------------------------------------
2413  * Comparing Thread ids.
2414  *
2415  * This is used from STG land in the implementation of the
2416  * instances of Eq/Ord for ThreadIds.
2417  * ------------------------------------------------------------------------ */
2418
2419 int
2420 cmp_thread(StgPtr tso1, StgPtr tso2) 
2421
2422   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2423   StgThreadID id2 = ((StgTSO *)tso2)->id;
2424  
2425   if (id1 < id2) return (-1);
2426   if (id1 > id2) return 1;
2427   return 0;
2428 }
2429
2430 /* ---------------------------------------------------------------------------
2431  * Fetching the ThreadID from an StgTSO.
2432  *
2433  * This is used in the implementation of Show for ThreadIds.
2434  * ------------------------------------------------------------------------ */
2435 int
2436 rts_getThreadId(StgPtr tso) 
2437 {
2438   return ((StgTSO *)tso)->id;
2439 }
2440
2441 #ifdef DEBUG
2442 void
2443 labelThread(StgPtr tso, char *label)
2444 {
2445   int len;
2446   void *buf;
2447
2448   /* Caveat: Once set, you can only set the thread name to "" */
2449   len = strlen(label)+1;
2450   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2451   strncpy(buf,label,len);
2452   /* Update will free the old memory for us */
2453   updateThreadLabel(((StgTSO *)tso)->id,buf);
2454 }
2455 #endif /* DEBUG */
2456
2457 /* ---------------------------------------------------------------------------
2458    Create a new thread.
2459
2460    The new thread starts with the given stack size.  Before the
2461    scheduler can run, however, this thread needs to have a closure
2462    (and possibly some arguments) pushed on its stack.  See
2463    pushClosure() in Schedule.h.
2464
2465    createGenThread() and createIOThread() (in SchedAPI.h) are
2466    convenient packaged versions of this function.
2467
2468    currently pri (priority) is only used in a GRAN setup -- HWL
2469    ------------------------------------------------------------------------ */
2470 #if defined(GRAN)
2471 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2472 StgTSO *
2473 createThread(nat size, StgInt pri)
2474 #else
2475 StgTSO *
2476 createThread(Capability *cap, nat size)
2477 #endif
2478 {
2479     StgTSO *tso;
2480     nat stack_size;
2481
2482     /* sched_mutex is *not* required */
2483
2484     /* First check whether we should create a thread at all */
2485 #if defined(PARALLEL_HASKELL)
2486     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2487     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2488         threadsIgnored++;
2489         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2490                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2491         return END_TSO_QUEUE;
2492     }
2493     threadsCreated++;
2494 #endif
2495
2496 #if defined(GRAN)
2497     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2498 #endif
2499
2500     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2501
2502     /* catch ridiculously small stack sizes */
2503     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2504         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2505     }
2506
2507     stack_size = size - TSO_STRUCT_SIZEW;
2508     
2509     tso = (StgTSO *)allocateLocal(cap, size);
2510     TICK_ALLOC_TSO(stack_size, 0);
2511
2512     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2513 #if defined(GRAN)
2514     SET_GRAN_HDR(tso, ThisPE);
2515 #endif
2516
2517     // Always start with the compiled code evaluator
2518     tso->what_next = ThreadRunGHC;
2519
2520     tso->why_blocked  = NotBlocked;
2521     tso->blocked_exceptions = NULL;
2522     tso->flags = TSO_DIRTY;
2523     
2524     tso->saved_errno = 0;
2525     tso->bound = NULL;
2526     tso->cap = cap;
2527     
2528     tso->stack_size     = stack_size;
2529     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2530                           - TSO_STRUCT_SIZEW;
2531     tso->sp             = (P_)&(tso->stack) + stack_size;
2532
2533     tso->trec = NO_TREC;
2534     
2535 #ifdef PROFILING
2536     tso->prof.CCCS = CCS_MAIN;
2537 #endif
2538     
2539   /* put a stop frame on the stack */
2540     tso->sp -= sizeofW(StgStopFrame);
2541     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2542     tso->link = END_TSO_QUEUE;
2543     
2544   // ToDo: check this
2545 #if defined(GRAN)
2546     /* uses more flexible routine in GranSim */
2547     insertThread(tso, CurrentProc);
2548 #else
2549     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2550      * from its creation
2551      */
2552 #endif
2553     
2554 #if defined(GRAN) 
2555     if (RtsFlags.GranFlags.GranSimStats.Full) 
2556         DumpGranEvent(GR_START,tso);
2557 #elif defined(PARALLEL_HASKELL)
2558     if (RtsFlags.ParFlags.ParStats.Full) 
2559         DumpGranEvent(GR_STARTQ,tso);
2560     /* HACk to avoid SCHEDULE 
2561        LastTSO = tso; */
2562 #endif
2563     
2564     /* Link the new thread on the global thread list.
2565      */
2566     ACQUIRE_LOCK(&sched_mutex);
2567     tso->id = next_thread_id++;  // while we have the mutex
2568     tso->global_link = all_threads;
2569     all_threads = tso;
2570     RELEASE_LOCK(&sched_mutex);
2571     
2572 #if defined(DIST)
2573     tso->dist.priority = MandatoryPriority; //by default that is...
2574 #endif
2575     
2576 #if defined(GRAN)
2577     tso->gran.pri = pri;
2578 # if defined(DEBUG)
2579     tso->gran.magic = TSO_MAGIC; // debugging only
2580 # endif
2581     tso->gran.sparkname   = 0;
2582     tso->gran.startedat   = CURRENT_TIME; 
2583     tso->gran.exported    = 0;
2584     tso->gran.basicblocks = 0;
2585     tso->gran.allocs      = 0;
2586     tso->gran.exectime    = 0;
2587     tso->gran.fetchtime   = 0;
2588     tso->gran.fetchcount  = 0;
2589     tso->gran.blocktime   = 0;
2590     tso->gran.blockcount  = 0;
2591     tso->gran.blockedat   = 0;
2592     tso->gran.globalsparks = 0;
2593     tso->gran.localsparks  = 0;
2594     if (RtsFlags.GranFlags.Light)
2595         tso->gran.clock  = Now; /* local clock */
2596     else
2597         tso->gran.clock  = 0;
2598     
2599     IF_DEBUG(gran,printTSO(tso));
2600 #elif defined(PARALLEL_HASKELL)
2601 # if defined(DEBUG)
2602     tso->par.magic = TSO_MAGIC; // debugging only
2603 # endif
2604     tso->par.sparkname   = 0;
2605     tso->par.startedat   = CURRENT_TIME; 
2606     tso->par.exported    = 0;
2607     tso->par.basicblocks = 0;
2608     tso->par.allocs      = 0;
2609     tso->par.exectime    = 0;
2610     tso->par.fetchtime   = 0;
2611     tso->par.fetchcount  = 0;
2612     tso->par.blocktime   = 0;
2613     tso->par.blockcount  = 0;
2614     tso->par.blockedat   = 0;
2615     tso->par.globalsparks = 0;
2616     tso->par.localsparks  = 0;
2617 #endif
2618     
2619 #if defined(GRAN)
2620     globalGranStats.tot_threads_created++;
2621     globalGranStats.threads_created_on_PE[CurrentProc]++;
2622     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2623     globalGranStats.tot_sq_probes++;
2624 #elif defined(PARALLEL_HASKELL)
2625     // collect parallel global statistics (currently done together with GC stats)
2626     if (RtsFlags.ParFlags.ParStats.Global &&
2627         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2628         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2629         globalParStats.tot_threads_created++;
2630     }
2631 #endif 
2632     
2633 #if defined(GRAN)
2634     debugTrace(GRAN_DEBUG_pri,
2635                "==__ schedule: Created TSO %d (%p);",
2636                CurrentProc, tso, tso->id);
2637 #elif defined(PARALLEL_HASKELL)
2638     debugTrace(PAR_DEBUG_verbose,
2639                "==__ schedule: Created TSO %d (%p); %d threads active",
2640                (long)tso->id, tso, advisory_thread_count);
2641 #else
2642     debugTrace(DEBUG_sched,
2643                "created thread %ld, stack size = %lx words", 
2644                (long)tso->id, (long)tso->stack_size);
2645 #endif    
2646     return tso;
2647 }
2648
2649 #if defined(PAR)
2650 /* RFP:
2651    all parallel thread creation calls should fall through the following routine.
2652 */
2653 StgTSO *
2654 createThreadFromSpark(rtsSpark spark) 
2655 { StgTSO *tso;
2656   ASSERT(spark != (rtsSpark)NULL);
2657 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2658   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2659   { threadsIgnored++;
2660     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2661           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2662     return END_TSO_QUEUE;
2663   }
2664   else
2665   { threadsCreated++;
2666     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2667     if (tso==END_TSO_QUEUE)     
2668       barf("createSparkThread: Cannot create TSO");
2669 #if defined(DIST)
2670     tso->priority = AdvisoryPriority;
2671 #endif
2672     pushClosure(tso,spark);
2673     addToRunQueue(tso);
2674     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2675   }
2676   return tso;
2677 }
2678 #endif
2679
2680 /*
2681   Turn a spark into a thread.
2682   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2683 */
2684 #if 0
2685 StgTSO *
2686 activateSpark (rtsSpark spark) 
2687 {
2688   StgTSO *tso;
2689
2690   tso = createSparkThread(spark);
2691   if (RtsFlags.ParFlags.ParStats.Full) {   
2692     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2693       IF_PAR_DEBUG(verbose,
2694                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2695                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2696   }
2697   // ToDo: fwd info on local/global spark to thread -- HWL
2698   // tso->gran.exported =  spark->exported;
2699   // tso->gran.locked =   !spark->global;
2700   // tso->gran.sparkname = spark->name;
2701
2702   return tso;
2703 }
2704 #endif
2705
2706 /* ---------------------------------------------------------------------------
2707  * scheduleThread()
2708  *
2709  * scheduleThread puts a thread on the end  of the runnable queue.
2710  * This will usually be done immediately after a thread is created.
2711  * The caller of scheduleThread must create the thread using e.g.
2712  * createThread and push an appropriate closure
2713  * on this thread's stack before the scheduler is invoked.
2714  * ------------------------------------------------------------------------ */
2715
2716 void
2717 scheduleThread(Capability *cap, StgTSO *tso)
2718 {
2719     // The thread goes at the *end* of the run-queue, to avoid possible
2720     // starvation of any threads already on the queue.
2721     appendToRunQueue(cap,tso);
2722 }
2723
2724 void
2725 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2726 {
2727 #if defined(THREADED_RTS)
2728     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2729                               // move this thread from now on.
2730     cpu %= RtsFlags.ParFlags.nNodes;
2731     if (cpu == cap->no) {
2732         appendToRunQueue(cap,tso);
2733     } else {
2734         Capability *target_cap = &capabilities[cpu];
2735         if (tso->bound) {
2736             tso->bound->cap = target_cap;
2737         }
2738         tso->cap = target_cap;
2739         wakeupThreadOnCapability(target_cap,tso);
2740     }
2741 #else
2742     appendToRunQueue(cap,tso);
2743 #endif
2744 }
2745
2746 Capability *
2747 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2748 {
2749     Task *task;
2750
2751     // We already created/initialised the Task
2752     task = cap->running_task;
2753
2754     // This TSO is now a bound thread; make the Task and TSO
2755     // point to each other.
2756     tso->bound = task;
2757     tso->cap = cap;
2758
2759     task->tso = tso;
2760     task->ret = ret;
2761     task->stat = NoStatus;
2762
2763     appendToRunQueue(cap,tso);
2764
2765     debugTrace(DEBUG_sched, "new bound thread (%d)", tso->id);
2766
2767 #if defined(GRAN)
2768     /* GranSim specific init */
2769     CurrentTSO = m->tso;                // the TSO to run
2770     procStatus[MainProc] = Busy;        // status of main PE
2771     CurrentProc = MainProc;             // PE to run it on
2772 #endif
2773
2774     cap = schedule(cap,task);
2775
2776     ASSERT(task->stat != NoStatus);
2777     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2778
2779     debugTrace(DEBUG_sched, "bound thread (%d) finished", task->tso->id);
2780     return cap;
2781 }
2782
2783 /* ----------------------------------------------------------------------------
2784  * Starting Tasks
2785  * ------------------------------------------------------------------------- */
2786
2787 #if defined(THREADED_RTS)
2788 void
2789 workerStart(Task *task)
2790 {
2791     Capability *cap;
2792
2793     // See startWorkerTask().
2794     ACQUIRE_LOCK(&task->lock);
2795     cap = task->cap;
2796     RELEASE_LOCK(&task->lock);
2797
2798     // set the thread-local pointer to the Task:
2799     taskEnter(task);
2800
2801     // schedule() runs without a lock.
2802     cap = schedule(cap,task);
2803
2804     // On exit from schedule(), we have a Capability.
2805     releaseCapability(cap);
2806     workerTaskStop(task);
2807 }
2808 #endif
2809
2810 /* ---------------------------------------------------------------------------
2811  * initScheduler()
2812  *
2813  * Initialise the scheduler.  This resets all the queues - if the
2814  * queues contained any threads, they'll be garbage collected at the
2815  * next pass.
2816  *
2817  * ------------------------------------------------------------------------ */
2818
2819 void 
2820 initScheduler(void)
2821 {
2822 #if defined(GRAN)
2823   nat i;
2824   for (i=0; i<=MAX_PROC; i++) {
2825     run_queue_hds[i]      = END_TSO_QUEUE;
2826     run_queue_tls[i]      = END_TSO_QUEUE;
2827     blocked_queue_hds[i]  = END_TSO_QUEUE;
2828     blocked_queue_tls[i]  = END_TSO_QUEUE;
2829     ccalling_threadss[i]  = END_TSO_QUEUE;
2830     blackhole_queue[i]    = END_TSO_QUEUE;
2831     sleeping_queue        = END_TSO_QUEUE;
2832   }
2833 #elif !defined(THREADED_RTS)
2834   blocked_queue_hd  = END_TSO_QUEUE;
2835   blocked_queue_tl  = END_TSO_QUEUE;
2836   sleeping_queue    = END_TSO_QUEUE;
2837 #endif
2838
2839   blackhole_queue   = END_TSO_QUEUE;
2840   all_threads       = END_TSO_QUEUE;
2841
2842   context_switch = 0;
2843   sched_state    = SCHED_RUNNING;
2844
2845   RtsFlags.ConcFlags.ctxtSwitchTicks =
2846       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2847       
2848 #if defined(THREADED_RTS)
2849   /* Initialise the mutex and condition variables used by
2850    * the scheduler. */
2851   initMutex(&sched_mutex);
2852 #endif
2853   
2854   ACQUIRE_LOCK(&sched_mutex);
2855
2856   /* A capability holds the state a native thread needs in
2857    * order to execute STG code. At least one capability is
2858    * floating around (only THREADED_RTS builds have more than one).
2859    */
2860   initCapabilities();
2861
2862   initTaskManager();
2863
2864 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2865   initSparkPools();
2866 #endif
2867
2868 #if defined(THREADED_RTS)
2869   /*
2870    * Eagerly start one worker to run each Capability, except for
2871    * Capability 0.  The idea is that we're probably going to start a
2872    * bound thread on Capability 0 pretty soon, so we don't want a
2873    * worker task hogging it.
2874    */
2875   { 
2876       nat i;
2877       Capability *cap;
2878       for (i = 1; i < n_capabilities; i++) {
2879           cap = &capabilities[i];
2880           ACQUIRE_LOCK(&cap->lock);
2881           startWorkerTask(cap, workerStart);
2882           RELEASE_LOCK(&cap->lock);
2883       }
2884   }
2885 #endif
2886
2887   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2888
2889   RELEASE_LOCK(&sched_mutex);
2890 }
2891
2892 void
2893 exitScheduler( void )
2894 {
2895     Task *task = NULL;
2896
2897 #if defined(THREADED_RTS)
2898     ACQUIRE_LOCK(&sched_mutex);
2899     task = newBoundTask();
2900     RELEASE_LOCK(&sched_mutex);
2901 #endif
2902
2903     // If we haven't killed all the threads yet, do it now.
2904     if (sched_state < SCHED_SHUTTING_DOWN) {
2905         sched_state = SCHED_INTERRUPTING;
2906         scheduleDoGC(NULL,task,rtsFalse,GetRoots);    
2907     }
2908     sched_state = SCHED_SHUTTING_DOWN;
2909
2910 #if defined(THREADED_RTS)
2911     { 
2912         nat i;
2913         
2914         for (i = 0; i < n_capabilities; i++) {
2915             shutdownCapability(&capabilities[i], task);
2916         }
2917         boundTaskExiting(task);
2918         stopTaskManager();
2919     }
2920 #endif
2921 }
2922
2923 /* ---------------------------------------------------------------------------
2924    Where are the roots that we know about?
2925
2926         - all the threads on the runnable queue
2927         - all the threads on the blocked queue
2928         - all the threads on the sleeping queue
2929         - all the thread currently executing a _ccall_GC
2930         - all the "main threads"
2931      
2932    ------------------------------------------------------------------------ */
2933
2934 /* This has to be protected either by the scheduler monitor, or by the
2935         garbage collection monitor (probably the latter).
2936         KH @ 25/10/99
2937 */
2938
2939 void
2940 GetRoots( evac_fn evac )
2941 {
2942     nat i;
2943     Capability *cap;
2944     Task *task;
2945
2946 #if defined(GRAN)
2947     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2948         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2949             evac((StgClosure **)&run_queue_hds[i]);
2950         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2951             evac((StgClosure **)&run_queue_tls[i]);
2952         
2953         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2954             evac((StgClosure **)&blocked_queue_hds[i]);
2955         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2956             evac((StgClosure **)&blocked_queue_tls[i]);
2957         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2958             evac((StgClosure **)&ccalling_threads[i]);
2959     }
2960
2961     markEventQueue();
2962
2963 #else /* !GRAN */
2964
2965     for (i = 0; i < n_capabilities; i++) {
2966         cap = &capabilities[i];
2967         evac((StgClosure **)(void *)&cap->run_queue_hd);
2968         evac((StgClosure **)(void *)&cap->run_queue_tl);
2969 #if defined(THREADED_RTS)
2970         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2971         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2972 #endif
2973         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2974              task=task->next) {
2975             debugTrace(DEBUG_sched,
2976                        "evac'ing suspended TSO %d", task->suspended_tso->id);
2977             evac((StgClosure **)(void *)&task->suspended_tso);
2978         }
2979
2980     }
2981     
2982
2983 #if !defined(THREADED_RTS)
2984     evac((StgClosure **)(void *)&blocked_queue_hd);
2985     evac((StgClosure **)(void *)&blocked_queue_tl);
2986     evac((StgClosure **)(void *)&sleeping_queue);
2987 #endif 
2988 #endif
2989
2990     // evac((StgClosure **)&blackhole_queue);
2991
2992 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2993     markSparkQueue(evac);
2994 #endif
2995     
2996 #if defined(RTS_USER_SIGNALS)
2997     // mark the signal handlers (signals should be already blocked)
2998     markSignalHandlers(evac);
2999 #endif
3000 }
3001
3002 /* -----------------------------------------------------------------------------
3003    performGC
3004
3005    This is the interface to the garbage collector from Haskell land.
3006    We provide this so that external C code can allocate and garbage
3007    collect when called from Haskell via _ccall_GC.
3008
3009    It might be useful to provide an interface whereby the programmer
3010    can specify more roots (ToDo).
3011    
3012    This needs to be protected by the GC condition variable above.  KH.
3013    -------------------------------------------------------------------------- */
3014
3015 static void (*extra_roots)(evac_fn);
3016
3017 static void
3018 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
3019 {
3020     Task *task;
3021     // We must grab a new Task here, because the existing Task may be
3022     // associated with a particular Capability, and chained onto the 
3023     // suspended_ccalling_tasks queue.
3024     ACQUIRE_LOCK(&sched_mutex);
3025     task = newBoundTask();
3026     RELEASE_LOCK(&sched_mutex);
3027     scheduleDoGC(NULL,task,force_major, get_roots);
3028     boundTaskExiting(task);
3029 }
3030
3031 void
3032 performGC(void)
3033 {
3034     performGC_(rtsFalse, GetRoots);
3035 }
3036
3037 void
3038 performMajorGC(void)
3039 {
3040     performGC_(rtsTrue, GetRoots);
3041 }
3042
3043 static void
3044 AllRoots(evac_fn evac)
3045 {
3046     GetRoots(evac);             // the scheduler's roots
3047     extra_roots(evac);          // the user's roots
3048 }
3049
3050 void
3051 performGCWithRoots(void (*get_roots)(evac_fn))
3052 {
3053     extra_roots = get_roots;
3054     performGC_(rtsFalse, AllRoots);
3055 }
3056
3057 /* -----------------------------------------------------------------------------
3058    Stack overflow
3059
3060    If the thread has reached its maximum stack size, then raise the
3061    StackOverflow exception in the offending thread.  Otherwise
3062    relocate the TSO into a larger chunk of memory and adjust its stack
3063    size appropriately.
3064    -------------------------------------------------------------------------- */
3065
3066 static StgTSO *
3067 threadStackOverflow(Capability *cap, StgTSO *tso)
3068 {
3069   nat new_stack_size, stack_words;
3070   lnat new_tso_size;
3071   StgPtr new_sp;
3072   StgTSO *dest;
3073
3074   IF_DEBUG(sanity,checkTSO(tso));
3075   if (tso->stack_size >= tso->max_stack_size) {
3076
3077       debugTrace(DEBUG_gc,
3078                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
3079                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
3080       IF_DEBUG(gc,
3081                /* If we're debugging, just print out the top of the stack */
3082                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3083                                                 tso->sp+64)));
3084
3085     /* Send this thread the StackOverflow exception */
3086     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
3087     return tso;
3088   }
3089
3090   /* Try to double the current stack size.  If that takes us over the
3091    * maximum stack size for this thread, then use the maximum instead.
3092    * Finally round up so the TSO ends up as a whole number of blocks.
3093    */
3094   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
3095   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
3096                                        TSO_STRUCT_SIZE)/sizeof(W_);
3097   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
3098   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
3099
3100   debugTrace(DEBUG_sched, 
3101              "increasing stack size from %ld words to %d.\n",
3102              (long)tso->stack_size, new_stack_size);
3103
3104   dest = (StgTSO *)allocate(new_tso_size);
3105   TICK_ALLOC_TSO(new_stack_size,0);
3106
3107   /* copy the TSO block and the old stack into the new area */
3108   memcpy(dest,tso,TSO_STRUCT_SIZE);
3109   stack_words = tso->stack + tso->stack_size - tso->sp;
3110   new_sp = (P_)dest + new_tso_size - stack_words;
3111   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
3112
3113   /* relocate the stack pointers... */
3114   dest->sp         = new_sp;
3115   dest->stack_size = new_stack_size;
3116         
3117   /* Mark the old TSO as relocated.  We have to check for relocated
3118    * TSOs in the garbage collector and any primops that deal with TSOs.
3119    *
3120    * It's important to set the sp value to just beyond the end
3121    * of the stack, so we don't attempt to scavenge any part of the
3122    * dead TSO's stack.
3123    */
3124   tso->what_next = ThreadRelocated;
3125   tso->link = dest;
3126   tso->sp = (P_)&(tso->stack[tso->stack_size]);
3127   tso->why_blocked = NotBlocked;
3128
3129   IF_PAR_DEBUG(verbose,
3130                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
3131                      tso->id, tso, tso->stack_size);
3132                /* If we're debugging, just print out the top of the stack */
3133                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3134                                                 tso->sp+64)));
3135   
3136   IF_DEBUG(sanity,checkTSO(tso));
3137 #if 0
3138   IF_DEBUG(scheduler,printTSO(dest));
3139 #endif
3140
3141   return dest;
3142 }
3143
3144 /* ---------------------------------------------------------------------------
3145    Wake up a queue that was blocked on some resource.
3146    ------------------------------------------------------------------------ */
3147
3148 #if defined(GRAN)
3149 STATIC_INLINE void
3150 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3151 {
3152 }
3153 #elif defined(PARALLEL_HASKELL)
3154 STATIC_INLINE void
3155 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3156 {
3157   /* write RESUME events to log file and
3158      update blocked and fetch time (depending on type of the orig closure) */
3159   if (RtsFlags.ParFlags.ParStats.Full) {
3160     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
3161                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3162                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3163     if (emptyRunQueue())
3164       emitSchedule = rtsTrue;
3165
3166     switch (get_itbl(node)->type) {
3167         case FETCH_ME_BQ:
3168           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3169           break;
3170         case RBH:
3171         case FETCH_ME:
3172         case BLACKHOLE_BQ:
3173           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3174           break;
3175 #ifdef DIST
3176         case MVAR:
3177           break;
3178 #endif    
3179         default:
3180           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3181         }
3182       }
3183 }
3184 #endif
3185
3186 #if defined(GRAN)
3187 StgBlockingQueueElement *
3188 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3189 {
3190     StgTSO *tso;
3191     PEs node_loc, tso_loc;
3192
3193     node_loc = where_is(node); // should be lifted out of loop
3194     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3195     tso_loc = where_is((StgClosure *)tso);
3196     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3197       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3198       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3199       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3200       // insertThread(tso, node_loc);
3201       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3202                 ResumeThread,
3203                 tso, node, (rtsSpark*)NULL);
3204       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3205       // len_local++;
3206       // len++;
3207     } else { // TSO is remote (actually should be FMBQ)
3208       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3209                                   RtsFlags.GranFlags.Costs.gunblocktime +
3210                                   RtsFlags.GranFlags.Costs.latency;
3211       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3212                 UnblockThread,
3213                 tso, node, (rtsSpark*)NULL);
3214       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3215       // len++;
3216     }
3217     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3218     IF_GRAN_DEBUG(bq,
3219                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3220                           (node_loc==tso_loc ? "Local" : "Global"), 
3221                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3222     tso->block_info.closure = NULL;
3223     debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)\n", 
3224                tso->id, tso));
3225 }
3226 #elif defined(PARALLEL_HASKELL)
3227 StgBlockingQueueElement *
3228 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3229 {
3230     StgBlockingQueueElement *next;
3231
3232     switch (get_itbl(bqe)->type) {
3233     case TSO:
3234       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3235       /* if it's a TSO just push it onto the run_queue */
3236       next = bqe->link;
3237       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3238       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3239       threadRunnable();
3240       unblockCount(bqe, node);
3241       /* reset blocking status after dumping event */
3242       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3243       break;
3244
3245     case BLOCKED_FETCH:
3246       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3247       next = bqe->link;
3248       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3249       PendingFetches = (StgBlockedFetch *)bqe;
3250       break;
3251
3252 # if defined(DEBUG)
3253       /* can ignore this case in a non-debugging setup; 
3254          see comments on RBHSave closures above */
3255     case CONSTR:
3256       /* check that the closure is an RBHSave closure */
3257       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3258              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3259              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3260       break;
3261
3262     default:
3263       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3264            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3265            (StgClosure *)bqe);
3266 # endif
3267     }
3268   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3269   return next;
3270 }
3271 #endif
3272
3273 StgTSO *
3274 unblockOne(Capability *cap, StgTSO *tso)
3275 {
3276   StgTSO *next;
3277
3278   ASSERT(get_itbl(tso)->type == TSO);
3279   ASSERT(tso->why_blocked != NotBlocked);
3280
3281   tso->why_blocked = NotBlocked;
3282   next = tso->link;
3283   tso->link = END_TSO_QUEUE;
3284
3285 #if defined(THREADED_RTS)
3286   if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
3287       // We are waking up this thread on the current Capability, which
3288       // might involve migrating it from the Capability it was last on.
3289       if (tso->bound) {
3290           ASSERT(tso->bound->cap == tso->cap);
3291           tso->bound->cap = cap;
3292       }
3293       tso->cap = cap;
3294       appendToRunQueue(cap,tso);
3295       // we're holding a newly woken thread, make sure we context switch
3296       // quickly so we can migrate it if necessary.
3297       context_switch = 1;
3298   } else {
3299       // we'll try to wake it up on the Capability it was last on.
3300       wakeupThreadOnCapability(tso->cap, tso);
3301   }
3302 #else
3303   appendToRunQueue(cap,tso);
3304   context_switch = 1;
3305 #endif
3306
3307   debugTrace(DEBUG_sched,
3308              "waking up thread %ld on cap %d",
3309              (long)tso->id, tso->cap->no);
3310
3311   return next;
3312 }
3313
3314
3315 #if defined(GRAN)
3316 void 
3317 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3318 {
3319   StgBlockingQueueElement *bqe;
3320   PEs node_loc;
3321   nat len = 0; 
3322
3323   IF_GRAN_DEBUG(bq, 
3324                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3325                       node, CurrentProc, CurrentTime[CurrentProc], 
3326                       CurrentTSO->id, CurrentTSO));
3327
3328   node_loc = where_is(node);
3329
3330   ASSERT(q == END_BQ_QUEUE ||
3331          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3332          get_itbl(q)->type == CONSTR); // closure (type constructor)
3333   ASSERT(is_unique(node));
3334
3335   /* FAKE FETCH: magically copy the node to the tso's proc;
3336      no Fetch necessary because in reality the node should not have been 
3337      moved to the other PE in the first place
3338   */
3339   if (CurrentProc!=node_loc) {
3340     IF_GRAN_DEBUG(bq, 
3341                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3342                         node, node_loc, CurrentProc, CurrentTSO->id, 
3343                         // CurrentTSO, where_is(CurrentTSO),
3344                         node->header.gran.procs));
3345     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3346     IF_GRAN_DEBUG(bq, 
3347                   debugBelch("## new bitmask of node %p is %#x\n",
3348                         node, node->header.gran.procs));
3349     if (RtsFlags.GranFlags.GranSimStats.Global) {
3350       globalGranStats.tot_fake_fetches++;
3351     }
3352   }
3353
3354   bqe = q;
3355   // ToDo: check: ASSERT(CurrentProc==node_loc);
3356   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3357     //next = bqe->link;
3358     /* 
3359        bqe points to the current element in the queue
3360        next points to the next element in the queue
3361     */
3362     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3363     //tso_loc = where_is(tso);
3364     len++;
3365     bqe = unblockOne(bqe, node);
3366   }
3367
3368   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3369      the closure to make room for the anchor of the BQ */
3370   if (bqe!=END_BQ_QUEUE) {
3371     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3372     /*
3373     ASSERT((info_ptr==&RBH_Save_0_info) ||
3374            (info_ptr==&RBH_Save_1_info) ||
3375            (info_ptr==&RBH_Save_2_info));
3376     */
3377     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3378     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3379     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3380
3381     IF_GRAN_DEBUG(bq,
3382                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3383                         node, info_type(node)));
3384   }
3385
3386   /* statistics gathering */
3387   if (RtsFlags.GranFlags.GranSimStats.Global) {
3388     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3389     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3390     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3391     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3392   }
3393   IF_GRAN_DEBUG(bq,
3394                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3395                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3396 }
3397 #elif defined(PARALLEL_HASKELL)
3398 void 
3399 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3400 {
3401   StgBlockingQueueElement *bqe;
3402
3403   IF_PAR_DEBUG(verbose, 
3404                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3405                      node, mytid));
3406 #ifdef DIST  
3407   //RFP
3408   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3409     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3410     return;
3411   }
3412 #endif
3413   
3414   ASSERT(q == END_BQ_QUEUE ||
3415          get_itbl(q)->type == TSO ||           
3416          get_itbl(q)->type == BLOCKED_FETCH || 
3417          get_itbl(q)->type == CONSTR); 
3418
3419   bqe = q;
3420   while (get_itbl(bqe)->type==TSO || 
3421          get_itbl(bqe)->type==BLOCKED_FETCH) {
3422     bqe = unblockOne(bqe, node);
3423   }
3424 }
3425
3426 #else   /* !GRAN && !PARALLEL_HASKELL */
3427
3428 void
3429 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3430 {
3431     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3432                              // Exception.cmm
3433     while (tso != END_TSO_QUEUE) {
3434         tso = unblockOne(cap,tso);
3435     }
3436 }
3437 #endif
3438
3439 /* ---------------------------------------------------------------------------
3440    Interrupt execution
3441    - usually called inside a signal handler so it mustn't do anything fancy.   
3442    ------------------------------------------------------------------------ */
3443
3444 void
3445 interruptStgRts(void)
3446 {
3447     sched_state = SCHED_INTERRUPTING;
3448     context_switch = 1;
3449     wakeUpRts();
3450 }
3451
3452 /* -----------------------------------------------------------------------------
3453    Wake up the RTS
3454    
3455    This function causes at least one OS thread to wake up and run the
3456    scheduler loop.  It is invoked when the RTS might be deadlocked, or
3457    an external event has arrived that may need servicing (eg. a
3458    keyboard interrupt).
3459
3460    In the single-threaded RTS we don't do anything here; we only have
3461    one thread anyway, and the event that caused us to want to wake up
3462    will have interrupted any blocking system call in progress anyway.
3463    -------------------------------------------------------------------------- */
3464
3465 void
3466 wakeUpRts(void)
3467 {
3468 #if defined(THREADED_RTS)
3469 #if !defined(mingw32_HOST_OS)
3470     // This forces the IO Manager thread to wakeup, which will
3471     // in turn ensure that some OS thread wakes up and runs the
3472     // scheduler loop, which will cause a GC and deadlock check.
3473     ioManagerWakeup();
3474 #else
3475     // On Windows this might be safe enough, because we aren't
3476     // in a signal handler.  Later we should use the IO Manager,
3477     // though.
3478     prodOneCapability();
3479 #endif
3480 #endif
3481 }
3482
3483 /* -----------------------------------------------------------------------------
3484    Unblock a thread
3485
3486    This is for use when we raise an exception in another thread, which
3487    may be blocked.
3488    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3489    -------------------------------------------------------------------------- */
3490
3491 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3492 /*
3493   NB: only the type of the blocking queue is different in GranSim and GUM
3494       the operations on the queue-elements are the same
3495       long live polymorphism!
3496
3497   Locks: sched_mutex is held upon entry and exit.
3498
3499 */
3500 static void
3501 unblockThread(Capability *cap, StgTSO *tso)
3502 {
3503   StgBlockingQueueElement *t, **last;
3504
3505   switch (tso->why_blocked) {
3506
3507   case NotBlocked:
3508     return;  /* not blocked */
3509
3510   case BlockedOnSTM:
3511     // Be careful: nothing to do here!  We tell the scheduler that the thread
3512     // is runnable and we leave it to the stack-walking code to abort the 
3513     // transaction while unwinding the stack.  We should perhaps have a debugging
3514     // test to make sure that this really happens and that the 'zombie' transaction
3515     // does not get committed.
3516     goto done;
3517
3518   case BlockedOnMVar:
3519     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3520     {
3521       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3522       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3523
3524       last = (StgBlockingQueueElement **)&mvar->head;
3525       for (t = (StgBlockingQueueElement *)mvar->head; 
3526            t != END_BQ_QUEUE; 
3527            last = &t->link, last_tso = t, t = t->link) {
3528         if (t == (StgBlockingQueueElement *)tso) {
3529           *last = (StgBlockingQueueElement *)tso->link;
3530           if (mvar->tail == tso) {
3531             mvar->tail = (StgTSO *)last_tso;
3532           }
3533           goto done;
3534         }
3535       }
3536       barf("unblockThread (MVAR): TSO not found");
3537     }
3538
3539   case BlockedOnBlackHole:
3540     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3541     {
3542       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3543
3544       last = &bq->blocking_queue;
3545       for (t = bq->blocking_queue; 
3546            t != END_BQ_QUEUE; 
3547            last = &t->link, t = t->link) {
3548         if (t == (StgBlockingQueueElement *)tso) {
3549           *last = (StgBlockingQueueElement *)tso->link;
3550           goto done;
3551         }
3552       }
3553       barf("unblockThread (BLACKHOLE): TSO not found");
3554     }
3555
3556   case BlockedOnException:
3557     {
3558       StgTSO *target  = tso->block_info.tso;
3559
3560       ASSERT(get_itbl(target)->type == TSO);
3561
3562       if (target->what_next == ThreadRelocated) {
3563           target = target->link;
3564           ASSERT(get_itbl(target)->type == TSO);
3565       }
3566
3567       ASSERT(target->blocked_exceptions != NULL);
3568
3569       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3570       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3571            t != END_BQ_QUEUE; 
3572            last = &t->link, t = t->link) {
3573         ASSERT(get_itbl(t)->type == TSO);
3574         if (t == (StgBlockingQueueElement *)tso) {
3575           *last = (StgBlockingQueueElement *)tso->link;
3576           goto done;
3577         }
3578       }
3579       barf("unblockThread (Exception): TSO not found");
3580     }
3581
3582   case BlockedOnRead:
3583   case BlockedOnWrite:
3584 #if defined(mingw32_HOST_OS)
3585   case BlockedOnDoProc:
3586 #endif
3587     {
3588       /* take TSO off blocked_queue */
3589       StgBlockingQueueElement *prev = NULL;
3590       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3591            prev = t, t = t->link) {
3592         if (t == (StgBlockingQueueElement *)tso) {
3593           if (prev == NULL) {
3594             blocked_queue_hd = (StgTSO *)t->link;
3595             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3596               blocked_queue_tl = END_TSO_QUEUE;
3597             }
3598           } else {
3599             prev->link = t->link;
3600             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3601               blocked_queue_tl = (StgTSO *)prev;
3602             }
3603           }
3604 #if defined(mingw32_HOST_OS)
3605           /* (Cooperatively) signal that the worker thread should abort
3606            * the request.
3607            */
3608           abandonWorkRequest(tso->block_info.async_result->reqID);
3609 #endif
3610           goto done;
3611         }
3612       }
3613       barf("unblockThread (I/O): TSO not found");
3614     }
3615
3616   case BlockedOnDelay:
3617     {
3618       /* take TSO off sleeping_queue */
3619       StgBlockingQueueElement *prev = NULL;
3620       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3621            prev = t, t = t->link) {
3622         if (t == (StgBlockingQueueElement *)tso) {
3623           if (prev == NULL) {
3624             sleeping_queue = (StgTSO *)t->link;
3625           } else {
3626             prev->link = t->link;
3627           }
3628           goto done;
3629         }
3630       }
3631       barf("unblockThread (delay): TSO not found");
3632     }
3633
3634   default:
3635     barf("unblockThread");
3636   }
3637
3638  done:
3639   tso->link = END_TSO_QUEUE;
3640   tso->why_blocked = NotBlocked;
3641   tso->block_info.closure = NULL;
3642   pushOnRunQueue(cap,tso);
3643 }
3644 #else
3645 static void
3646 unblockThread(Capability *cap, StgTSO *tso)
3647 {
3648   StgTSO *t, **last;
3649   
3650   /* To avoid locking unnecessarily. */
3651   if (tso->why_blocked == NotBlocked) {
3652     return;
3653   }
3654
3655   switch (tso->why_blocked) {
3656
3657   case BlockedOnSTM:
3658     // Be careful: nothing to do here!  We tell the scheduler that the thread
3659     // is runnable and we leave it to the stack-walking code to abort the 
3660     // transaction while unwinding the stack.  We should perhaps have a debugging
3661     // test to make sure that this really happens and that the 'zombie' transaction
3662     // does not get committed.
3663     goto done;
3664
3665   case BlockedOnMVar:
3666     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3667     {
3668       StgTSO *last_tso = END_TSO_QUEUE;
3669       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3670
3671       last = &mvar->head;
3672       for (t = mvar->head; t != END_TSO_QUEUE; 
3673            last = &t->link, last_tso = t, t = t->link) {
3674         if (t == tso) {
3675           *last = tso->link;
3676           if (mvar->tail == tso) {
3677             mvar->tail = last_tso;
3678           }
3679           goto done;
3680         }
3681       }
3682       barf("unblockThread (MVAR): TSO not found");
3683     }
3684
3685   case BlockedOnBlackHole:
3686     {
3687       last = &blackhole_queue;
3688       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3689            last = &t->link, t = t->link) {
3690         if (t == tso) {
3691           *last = tso->link;
3692           goto done;
3693         }
3694       }
3695       barf("unblockThread (BLACKHOLE): TSO not found");
3696     }
3697
3698   case BlockedOnException:
3699     {
3700       StgTSO *target  = tso->block_info.tso;
3701
3702       ASSERT(get_itbl(target)->type == TSO);
3703
3704       while (target->what_next == ThreadRelocated) {
3705           target = target->link;
3706           ASSERT(get_itbl(target)->type == TSO);
3707       }
3708       
3709       ASSERT(target->blocked_exceptions != NULL);
3710
3711       last = &target->blocked_exceptions;
3712       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3713            last = &t->link, t = t->link) {
3714         ASSERT(get_itbl(t)->type == TSO);
3715         if (t == tso) {
3716           *last = tso->link;
3717           goto done;
3718         }
3719       }
3720       barf("unblockThread (Exception): TSO not found");
3721     }
3722
3723 #if !defined(THREADED_RTS)
3724   case BlockedOnRead:
3725   case BlockedOnWrite:
3726 #if defined(mingw32_HOST_OS)
3727   case BlockedOnDoProc:
3728 #endif
3729     {
3730       StgTSO *prev = NULL;
3731       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3732            prev = t, t = t->link) {
3733         if (t == tso) {
3734           if (prev == NULL) {
3735             blocked_queue_hd = t->link;
3736             if (blocked_queue_tl == t) {
3737               blocked_queue_tl = END_TSO_QUEUE;
3738             }
3739           } else {
3740             prev->link = t->link;
3741             if (blocked_queue_tl == t) {
3742               blocked_queue_tl = prev;
3743             }
3744           }
3745 #if defined(mingw32_HOST_OS)
3746           /* (Cooperatively) signal that the worker thread should abort
3747            * the request.
3748            */
3749           abandonWorkRequest(tso->block_info.async_result->reqID);
3750 #endif
3751           goto done;
3752         }
3753       }
3754       barf("unblockThread (I/O): TSO not found");
3755     }
3756
3757   case BlockedOnDelay:
3758     {
3759       StgTSO *prev = NULL;
3760       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3761            prev = t, t = t->link) {
3762         if (t == tso) {
3763           if (prev == NULL) {
3764             sleeping_queue = t->link;
3765           } else {
3766             prev->link = t->link;
3767           }
3768           goto done;
3769         }
3770       }
3771       barf("unblockThread (delay): TSO not found");
3772     }
3773 #endif
3774
3775   default:
3776     barf("unblockThread");
3777   }
3778
3779  done:
3780   tso->link = END_TSO_QUEUE;
3781   tso->why_blocked = NotBlocked;
3782   tso->block_info.closure = NULL;
3783   appendToRunQueue(cap,tso);
3784
3785   // We might have just migrated this TSO to our Capability:
3786   if (tso->bound) {
3787       tso->bound->cap = cap;
3788   }
3789   tso->cap = cap;
3790 }
3791 #endif
3792
3793 /* -----------------------------------------------------------------------------
3794  * checkBlackHoles()
3795  *
3796  * Check the blackhole_queue for threads that can be woken up.  We do
3797  * this periodically: before every GC, and whenever the run queue is
3798  * empty.
3799  *
3800  * An elegant solution might be to just wake up all the blocked
3801  * threads with awakenBlockedQueue occasionally: they'll go back to
3802  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3803  * doesn't give us a way to tell whether we've actually managed to
3804  * wake up any threads, so we would be busy-waiting.
3805  *
3806  * -------------------------------------------------------------------------- */
3807
3808 static rtsBool
3809 checkBlackHoles (Capability *cap)
3810 {
3811     StgTSO **prev, *t;
3812     rtsBool any_woke_up = rtsFalse;
3813     StgHalfWord type;
3814
3815     // blackhole_queue is global:
3816     ASSERT_LOCK_HELD(&sched_mutex);
3817
3818     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
3819
3820     // ASSUMES: sched_mutex
3821     prev = &blackhole_queue;
3822     t = blackhole_queue;
3823     while (t != END_TSO_QUEUE) {
3824         ASSERT(t->why_blocked == BlockedOnBlackHole);
3825         type = get_itbl(t->block_info.closure)->type;
3826         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3827             IF_DEBUG(sanity,checkTSO(t));
3828             t = unblockOne(cap, t);
3829             // urk, the threads migrate to the current capability
3830             // here, but we'd like to keep them on the original one.
3831             *prev = t;
3832             any_woke_up = rtsTrue;
3833         } else {
3834             prev = &t->link;
3835             t = t->link;
3836         }
3837     }
3838
3839     return any_woke_up;
3840 }
3841
3842 /* -----------------------------------------------------------------------------
3843  * raiseAsync()
3844  *
3845  * The following function implements the magic for raising an
3846  * asynchronous exception in an existing thread.
3847  *
3848  * We first remove the thread from any queue on which it might be
3849  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3850  *
3851  * We strip the stack down to the innermost CATCH_FRAME, building
3852  * thunks in the heap for all the active computations, so they can 
3853  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3854  * an application of the handler to the exception, and push it on
3855  * the top of the stack.
3856  * 
3857  * How exactly do we save all the active computations?  We create an
3858  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3859  * AP_STACKs pushes everything from the corresponding update frame
3860  * upwards onto the stack.  (Actually, it pushes everything up to the
3861  * next update frame plus a pointer to the next AP_STACK object.
3862  * Entering the next AP_STACK object pushes more onto the stack until we
3863  * reach the last AP_STACK object - at which point the stack should look
3864  * exactly as it did when we killed the TSO and we can continue
3865  * execution by entering the closure on top of the stack.
3866  *
3867  * We can also kill a thread entirely - this happens if either (a) the 
3868  * exception passed to raiseAsync is NULL, or (b) there's no
3869  * CATCH_FRAME on the stack.  In either case, we strip the entire
3870  * stack and replace the thread with a zombie.
3871  *
3872  * ToDo: in THREADED_RTS mode, this function is only safe if either
3873  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3874  * one Capability), or (b) we own the Capability that the TSO is
3875  * currently blocked on or on the run queue of.
3876  *
3877  * -------------------------------------------------------------------------- */
3878  
3879 void
3880 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3881 {
3882     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3883 }
3884
3885 void
3886 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3887 {
3888     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3889 }
3890
3891 static void
3892 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3893             rtsBool stop_at_atomically, StgPtr stop_here)
3894 {
3895     StgRetInfoTable *info;
3896     StgPtr sp, frame;
3897     nat i;
3898   
3899     // Thread already dead?
3900     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3901         return;
3902     }
3903
3904     debugTrace(DEBUG_sched,
3905                "raising exception in thread %ld.", (long)tso->id);
3906     
3907     // Remove it from any blocking queues
3908     unblockThread(cap,tso);
3909
3910     // mark it dirty; we're about to change its stack.
3911     dirtyTSO(tso);
3912
3913     sp = tso->sp;
3914     
3915     // The stack freezing code assumes there's a closure pointer on
3916     // the top of the stack, so we have to arrange that this is the case...
3917     //
3918     if (sp[0] == (W_)&stg_enter_info) {
3919         sp++;
3920     } else {
3921         sp--;
3922         sp[0] = (W_)&stg_dummy_ret_closure;
3923     }
3924
3925     frame = sp + 1;
3926     while (stop_here == NULL || frame < stop_here) {
3927
3928         // 1. Let the top of the stack be the "current closure"
3929         //
3930         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3931         // CATCH_FRAME.
3932         //
3933         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3934         // current closure applied to the chunk of stack up to (but not
3935         // including) the update frame.  This closure becomes the "current
3936         // closure".  Go back to step 2.
3937         //
3938         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3939         // top of the stack applied to the exception.
3940         // 
3941         // 5. If it's a STOP_FRAME, then kill the thread.
3942         // 
3943         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3944         // transaction
3945        
3946         info = get_ret_itbl((StgClosure *)frame);
3947
3948         switch (info->i.type) {
3949
3950         case UPDATE_FRAME:
3951         {
3952             StgAP_STACK * ap;
3953             nat words;
3954             
3955             // First build an AP_STACK consisting of the stack chunk above the
3956             // current update frame, with the top word on the stack as the
3957             // fun field.
3958             //
3959             words = frame - sp - 1;
3960             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3961             
3962             ap->size = words;
3963             ap->fun  = (StgClosure *)sp[0];
3964             sp++;
3965             for(i=0; i < (nat)words; ++i) {
3966                 ap->payload[i] = (StgClosure *)*sp++;
3967             }
3968             
3969             SET_HDR(ap,&stg_AP_STACK_info,
3970                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3971             TICK_ALLOC_UP_THK(words+1,0);
3972             
3973             //IF_DEBUG(scheduler,
3974             //       debugBelch("sched: Updating ");
3975             //       printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3976             //       debugBelch(" with ");
3977             //       printObj((StgClosure *)ap);
3978             //  );
3979
3980             // Replace the updatee with an indirection
3981             //
3982             // Warning: if we're in a loop, more than one update frame on
3983             // the stack may point to the same object.  Be careful not to
3984             // overwrite an IND_OLDGEN in this case, because we'll screw
3985             // up the mutable lists.  To be on the safe side, don't
3986             // overwrite any kind of indirection at all.  See also
3987             // threadSqueezeStack in GC.c, where we have to make a similar
3988             // check.
3989             //
3990             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3991                 // revert the black hole
3992                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3993                                (StgClosure *)ap);
3994             }
3995             sp += sizeofW(StgUpdateFrame) - 1;
3996             sp[0] = (W_)ap; // push onto stack
3997             frame = sp + 1;
3998             continue; //no need to bump frame
3999         }
4000
4001         case STOP_FRAME:
4002             // We've stripped the entire stack, the thread is now dead.
4003             tso->what_next = ThreadKilled;
4004             tso->sp = frame + sizeofW(StgStopFrame);
4005             return;
4006
4007         case CATCH_FRAME:
4008             // If we find a CATCH_FRAME, and we've got an exception to raise,
4009             // then build the THUNK raise(exception), and leave it on
4010             // top of the CATCH_FRAME ready to enter.
4011             //
4012         {
4013 #ifdef PROFILING
4014             StgCatchFrame *cf = (StgCatchFrame *)frame;
4015 #endif
4016             StgThunk *raise;
4017             
4018             if (exception == NULL) break;
4019
4020             // we've got an exception to raise, so let's pass it to the
4021             // handler in this frame.
4022             //
4023             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4024             TICK_ALLOC_SE_THK(1,0);
4025             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
4026             raise->payload[0] = exception;
4027             
4028             // throw away the stack from Sp up to the CATCH_FRAME.
4029             //
4030             sp = frame - 1;
4031             
4032             /* Ensure that async excpetions are blocked now, so we don't get
4033              * a surprise exception before we get around to executing the
4034              * handler.
4035              */
4036             if (tso->blocked_exceptions == NULL) {
4037                 tso->blocked_exceptions = END_TSO_QUEUE;
4038             }
4039
4040             /* Put the newly-built THUNK on top of the stack, ready to execute
4041              * when the thread restarts.
4042              */
4043             sp[0] = (W_)raise;
4044             sp[-1] = (W_)&stg_enter_info;
4045             tso->sp = sp-1;
4046             tso->what_next = ThreadRunGHC;
4047             IF_DEBUG(sanity, checkTSO(tso));
4048             return;
4049         }
4050             
4051         case ATOMICALLY_FRAME:
4052             if (stop_at_atomically) {
4053                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
4054                 stmCondemnTransaction(cap, tso -> trec);
4055 #ifdef REG_R1
4056                 tso->sp = frame;
4057 #else
4058                 // R1 is not a register: the return convention for IO in
4059                 // this case puts the return value on the stack, so we
4060                 // need to set up the stack to return to the atomically
4061                 // frame properly...
4062                 tso->sp = frame - 2;
4063                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
4064                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
4065 #endif
4066                 tso->what_next = ThreadRunGHC;
4067                 return;
4068             }
4069             // Not stop_at_atomically... fall through and abort the
4070             // transaction.
4071             
4072         case CATCH_RETRY_FRAME:
4073             // IF we find an ATOMICALLY_FRAME then we abort the
4074             // current transaction and propagate the exception.  In
4075             // this case (unlike ordinary exceptions) we do not care
4076             // whether the transaction is valid or not because its
4077             // possible validity cannot have caused the exception
4078             // and will not be visible after the abort.
4079             debugTrace(DEBUG_stm, 
4080                        "found atomically block delivering async exception");
4081
4082             StgTRecHeader *trec = tso -> trec;
4083             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
4084             stmAbortTransaction(cap, trec);
4085             tso -> trec = outer;
4086             break;
4087             
4088         default:
4089             break;
4090         }
4091
4092         // move on to the next stack frame
4093         frame += stack_frame_sizeW((StgClosure *)frame);
4094     }
4095
4096     // if we got here, then we stopped at stop_here
4097     ASSERT(stop_here != NULL);
4098 }
4099
4100 /* -----------------------------------------------------------------------------
4101    Deleting threads
4102
4103    This is used for interruption (^C) and forking, and corresponds to
4104    raising an exception but without letting the thread catch the
4105    exception.
4106    -------------------------------------------------------------------------- */
4107
4108 static void 
4109 deleteThread (Capability *cap, StgTSO *tso)
4110 {
4111   if (tso->why_blocked != BlockedOnCCall &&
4112       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
4113       raiseAsync(cap,tso,NULL);
4114   }
4115 }
4116
4117 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
4118 static void 
4119 deleteThread_(Capability *cap, StgTSO *tso)
4120 { // for forkProcess only:
4121   // like deleteThread(), but we delete threads in foreign calls, too.
4122
4123     if (tso->why_blocked == BlockedOnCCall ||
4124         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
4125         unblockOne(cap,tso);
4126         tso->what_next = ThreadKilled;
4127     } else {
4128         deleteThread(cap,tso);
4129     }
4130 }
4131 #endif
4132
4133 /* -----------------------------------------------------------------------------
4134    raiseExceptionHelper
4135    
4136    This function is called by the raise# primitve, just so that we can
4137    move some of the tricky bits of raising an exception from C-- into
4138    C.  Who knows, it might be a useful re-useable thing here too.
4139    -------------------------------------------------------------------------- */
4140
4141 StgWord
4142 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
4143 {
4144     Capability *cap = regTableToCapability(reg);
4145     StgThunk *raise_closure = NULL;
4146     StgPtr p, next;
4147     StgRetInfoTable *info;
4148     //
4149     // This closure represents the expression 'raise# E' where E
4150     // is the exception raise.  It is used to overwrite all the
4151     // thunks which are currently under evaluataion.
4152     //
4153
4154     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
4155     // LDV profiling: stg_raise_info has THUNK as its closure
4156     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
4157     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
4158     // 1 does not cause any problem unless profiling is performed.
4159     // However, when LDV profiling goes on, we need to linearly scan
4160     // small object pool, where raise_closure is stored, so we should
4161     // use MIN_UPD_SIZE.
4162     //
4163     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
4164     //                                 sizeofW(StgClosure)+1);
4165     //
4166
4167     //
4168     // Walk up the stack, looking for the catch frame.  On the way,
4169     // we update any closures pointed to from update frames with the
4170     // raise closure that we just built.
4171     //
4172     p = tso->sp;
4173     while(1) {
4174         info = get_ret_itbl((StgClosure *)p);
4175         next = p + stack_frame_sizeW((StgClosure *)p);
4176         switch (info->i.type) {
4177             
4178         case UPDATE_FRAME:
4179             // Only create raise_closure if we need to.
4180             if (raise_closure == NULL) {
4181                 raise_closure = 
4182                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4183                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
4184                 raise_closure->payload[0] = exception;
4185             }
4186             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
4187             p = next;
4188             continue;
4189
4190         case ATOMICALLY_FRAME:
4191             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
4192             tso->sp = p;
4193             return ATOMICALLY_FRAME;
4194             
4195         case CATCH_FRAME:
4196             tso->sp = p;
4197             return CATCH_FRAME;
4198
4199         case CATCH_STM_FRAME:
4200             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
4201             tso->sp = p;
4202             return CATCH_STM_FRAME;
4203             
4204         case STOP_FRAME:
4205             tso->sp = p;
4206             return STOP_FRAME;
4207
4208         case CATCH_RETRY_FRAME:
4209         default:
4210             p = next; 
4211             continue;
4212         }
4213     }
4214 }
4215
4216
4217 /* -----------------------------------------------------------------------------
4218    findRetryFrameHelper
4219
4220    This function is called by the retry# primitive.  It traverses the stack
4221    leaving tso->sp referring to the frame which should handle the retry.  
4222
4223    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
4224    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
4225
4226    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4227    despite the similar implementation.
4228
4229    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4230    not be created within memory transactions.
4231    -------------------------------------------------------------------------- */
4232
4233 StgWord
4234 findRetryFrameHelper (StgTSO *tso)
4235 {
4236   StgPtr           p, next;
4237   StgRetInfoTable *info;
4238
4239   p = tso -> sp;
4240   while (1) {
4241     info = get_ret_itbl((StgClosure *)p);
4242     next = p + stack_frame_sizeW((StgClosure *)p);
4243     switch (info->i.type) {
4244       
4245     case ATOMICALLY_FRAME:
4246         debugTrace(DEBUG_stm,
4247                    "found ATOMICALLY_FRAME at %p during retrry", p);
4248         tso->sp = p;
4249         return ATOMICALLY_FRAME;
4250       
4251     case CATCH_RETRY_FRAME:
4252         debugTrace(DEBUG_stm,
4253                    "found CATCH_RETRY_FRAME at %p during retrry", p);
4254         tso->sp = p;
4255         return CATCH_RETRY_FRAME;
4256       
4257     case CATCH_STM_FRAME:
4258     default:
4259       ASSERT(info->i.type != CATCH_FRAME);
4260       ASSERT(info->i.type != STOP_FRAME);
4261       p = next; 
4262       continue;
4263     }
4264   }
4265 }
4266
4267 /* -----------------------------------------------------------------------------
4268    resurrectThreads is called after garbage collection on the list of
4269    threads found to be garbage.  Each of these threads will be woken
4270    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4271    on an MVar, or NonTermination if the thread was blocked on a Black
4272    Hole.
4273
4274    Locks: assumes we hold *all* the capabilities.
4275    -------------------------------------------------------------------------- */
4276
4277 void
4278 resurrectThreads (StgTSO *threads)
4279 {
4280     StgTSO *tso, *next;
4281     Capability *cap;
4282
4283     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4284         next = tso->global_link;
4285         tso->global_link = all_threads;
4286         all_threads = tso;
4287         debugTrace(DEBUG_sched, "resurrecting thread %d", tso->id);
4288         
4289         // Wake up the thread on the Capability it was last on
4290         cap = tso->cap;
4291         
4292         switch (tso->why_blocked) {
4293         case BlockedOnMVar:
4294         case BlockedOnException:
4295             /* Called by GC - sched_mutex lock is currently held. */
4296             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4297             break;
4298         case BlockedOnBlackHole:
4299             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4300             break;
4301         case BlockedOnSTM:
4302             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4303             break;
4304         case NotBlocked:
4305             /* This might happen if the thread was blocked on a black hole
4306              * belonging to a thread that we've just woken up (raiseAsync
4307              * can wake up threads, remember...).
4308              */
4309             continue;
4310         default:
4311             barf("resurrectThreads: thread blocked in a strange way");
4312         }
4313     }
4314 }
4315
4316 /* ----------------------------------------------------------------------------
4317  * Debugging: why is a thread blocked
4318  * [Also provides useful information when debugging threaded programs
4319  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4320    ------------------------------------------------------------------------- */
4321
4322 #if DEBUG
4323 static void
4324 printThreadBlockage(StgTSO *tso)
4325 {
4326   switch (tso->why_blocked) {
4327   case BlockedOnRead:
4328     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4329     break;
4330   case BlockedOnWrite:
4331     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4332     break;
4333 #if defined(mingw32_HOST_OS)
4334     case BlockedOnDoProc:
4335     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4336     break;
4337 #endif
4338   case BlockedOnDelay:
4339     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4340     break;
4341   case BlockedOnMVar:
4342     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4343     break;
4344   case BlockedOnException:
4345     debugBelch("is blocked on delivering an exception to thread %d",
4346             tso->block_info.tso->id);
4347     break;
4348   case BlockedOnBlackHole:
4349     debugBelch("is blocked on a black hole");
4350     break;
4351   case NotBlocked:
4352     debugBelch("is not blocked");
4353     break;
4354 #if defined(PARALLEL_HASKELL)
4355   case BlockedOnGA:
4356     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4357             tso->block_info.closure, info_type(tso->block_info.closure));
4358     break;
4359   case BlockedOnGA_NoSend:
4360     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4361             tso->block_info.closure, info_type(tso->block_info.closure));
4362     break;
4363 #endif
4364   case BlockedOnCCall:
4365     debugBelch("is blocked on an external call");
4366     break;
4367   case BlockedOnCCall_NoUnblockExc:
4368     debugBelch("is blocked on an external call (exceptions were already blocked)");
4369     break;
4370   case BlockedOnSTM:
4371     debugBelch("is blocked on an STM operation");
4372     break;
4373   default:
4374     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4375          tso->why_blocked, tso->id, tso);
4376   }
4377 }
4378
4379 void
4380 printThreadStatus(StgTSO *t)
4381 {
4382     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4383     {
4384       void *label = lookupThreadLabel(t->id);
4385       if (label) debugBelch("[\"%s\"] ",(char *)label);
4386     }
4387     if (t->what_next == ThreadRelocated) {
4388         debugBelch("has been relocated...\n");
4389     } else {
4390         switch (t->what_next) {
4391         case ThreadKilled:
4392             debugBelch("has been killed");
4393             break;
4394         case ThreadComplete:
4395             debugBelch("has completed");
4396             break;
4397         default:
4398             printThreadBlockage(t);
4399         }
4400         debugBelch("\n");
4401     }
4402 }
4403
4404 void
4405 printAllThreads(void)
4406 {
4407   StgTSO *t, *next;
4408   nat i;
4409   Capability *cap;
4410
4411 # if defined(GRAN)
4412   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4413   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4414                        time_string, rtsFalse/*no commas!*/);
4415
4416   debugBelch("all threads at [%s]:\n", time_string);
4417 # elif defined(PARALLEL_HASKELL)
4418   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4419   ullong_format_string(CURRENT_TIME,
4420                        time_string, rtsFalse/*no commas!*/);
4421
4422   debugBelch("all threads at [%s]:\n", time_string);
4423 # else
4424   debugBelch("all threads:\n");
4425 # endif
4426
4427   for (i = 0; i < n_capabilities; i++) {
4428       cap = &capabilities[i];
4429       debugBelch("threads on capability %d:\n", cap->no);
4430       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4431           printThreadStatus(t);
4432       }
4433   }
4434
4435   debugBelch("other threads:\n");
4436   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4437       if (t->why_blocked != NotBlocked) {
4438           printThreadStatus(t);
4439       }
4440       if (t->what_next == ThreadRelocated) {
4441           next = t->link;
4442       } else {
4443           next = t->global_link;
4444       }
4445   }
4446 }
4447
4448 // useful from gdb
4449 void 
4450 printThreadQueue(StgTSO *t)
4451 {
4452     nat i = 0;
4453     for (; t != END_TSO_QUEUE; t = t->link) {
4454         printThreadStatus(t);
4455         i++;
4456     }
4457     debugBelch("%d threads on queue\n", i);
4458 }
4459
4460 /* 
4461    Print a whole blocking queue attached to node (debugging only).
4462 */
4463 # if defined(PARALLEL_HASKELL)
4464 void 
4465 print_bq (StgClosure *node)
4466 {
4467   StgBlockingQueueElement *bqe;
4468   StgTSO *tso;
4469   rtsBool end;
4470
4471   debugBelch("## BQ of closure %p (%s): ",
4472           node, info_type(node));
4473
4474   /* should cover all closures that may have a blocking queue */
4475   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4476          get_itbl(node)->type == FETCH_ME_BQ ||
4477          get_itbl(node)->type == RBH ||
4478          get_itbl(node)->type == MVAR);
4479     
4480   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4481
4482   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4483 }
4484
4485 /* 
4486    Print a whole blocking queue starting with the element bqe.
4487 */
4488 void 
4489 print_bqe (StgBlockingQueueElement *bqe)
4490 {
4491   rtsBool end;
4492
4493   /* 
4494      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4495   */
4496   for (end = (bqe==END_BQ_QUEUE);
4497        !end; // iterate until bqe points to a CONSTR
4498        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4499        bqe = end ? END_BQ_QUEUE : bqe->link) {
4500     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4501     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4502     /* types of closures that may appear in a blocking queue */
4503     ASSERT(get_itbl(bqe)->type == TSO ||           
4504            get_itbl(bqe)->type == BLOCKED_FETCH || 
4505            get_itbl(bqe)->type == CONSTR); 
4506     /* only BQs of an RBH end with an RBH_Save closure */
4507     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4508
4509     switch (get_itbl(bqe)->type) {
4510     case TSO:
4511       debugBelch(" TSO %u (%x),",
4512               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4513       break;
4514     case BLOCKED_FETCH:
4515       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4516               ((StgBlockedFetch *)bqe)->node, 
4517               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4518               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4519               ((StgBlockedFetch *)bqe)->ga.weight);
4520       break;
4521     case CONSTR:
4522       debugBelch(" %s (IP %p),",
4523               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4524                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4525                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4526                "RBH_Save_?"), get_itbl(bqe));
4527       break;
4528     default:
4529       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4530            info_type((StgClosure *)bqe)); // , node, info_type(node));
4531       break;
4532     }
4533   } /* for */
4534   debugBelch("\n");
4535 }
4536 # elif defined(GRAN)
4537 void 
4538 print_bq (StgClosure *node)
4539 {
4540   StgBlockingQueueElement *bqe;
4541   PEs node_loc, tso_loc;
4542   rtsBool end;
4543
4544   /* should cover all closures that may have a blocking queue */
4545   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4546          get_itbl(node)->type == FETCH_ME_BQ ||
4547          get_itbl(node)->type == RBH);
4548     
4549   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4550   node_loc = where_is(node);
4551
4552   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4553           node, info_type(node), node_loc);
4554
4555   /* 
4556      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4557   */
4558   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4559        !end; // iterate until bqe points to a CONSTR
4560        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4561     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4562     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4563     /* types of closures that may appear in a blocking queue */
4564     ASSERT(get_itbl(bqe)->type == TSO ||           
4565            get_itbl(bqe)->type == CONSTR); 
4566     /* only BQs of an RBH end with an RBH_Save closure */
4567     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4568
4569     tso_loc = where_is((StgClosure *)bqe);
4570     switch (get_itbl(bqe)->type) {
4571     case TSO:
4572       debugBelch(" TSO %d (%p) on [PE %d],",
4573               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4574       break;
4575     case CONSTR:
4576       debugBelch(" %s (IP %p),",
4577               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4578                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4579                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4580                "RBH_Save_?"), get_itbl(bqe));
4581       break;
4582     default:
4583       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4584            info_type((StgClosure *)bqe), node, info_type(node));
4585       break;
4586     }
4587   } /* for */
4588   debugBelch("\n");
4589 }
4590 # endif
4591
4592 #if defined(PARALLEL_HASKELL)
4593 static nat
4594 run_queue_len(void)
4595 {
4596     nat i;
4597     StgTSO *tso;
4598     
4599     for (i=0, tso=run_queue_hd; 
4600          tso != END_TSO_QUEUE;
4601          i++, tso=tso->link) {
4602         /* nothing */
4603     }
4604         
4605     return i;
4606 }
4607 #endif
4608
4609 #endif /* DEBUG */