270a7d8715dd87b201e628d926dfceaf3d1cf9a2
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53 #include "Trace.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef  STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77  * Global variables
78  * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
84
85 /* 
86    In GranSim we have a runnable and a blocked queue for each processor.
87    In order to minimise code changes new arrays run_queue_hds/tls
88    are created. run_queue_hd is then a short cut (macro) for
89    run_queue_hds[CurrentProc] (see GranSim.h).
90    -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96    the std RTS (i.e. we are cheating). However, we don't use this list in
97    the GranSim specific code at the moment (so we are only potentially
98    cheating).  */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110  * LOCK: sched_mutex+capability, or all capabilities
111  */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up.  See
116  * Schedule.h for more thorough comment.
117  * LOCK: none (doesn't matter if we miss an update)
118  */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* Linked list of all threads.
122  * Used for detecting garbage collected threads.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *all_threads = NULL;
126
127 /* flag set by signal handler to precipitate a context switch
128  * LOCK: none (just an advisory flag)
129  */
130 int context_switch = 0;
131
132 /* flag that tracks whether we have done any execution in this time slice.
133  * LOCK: currently none, perhaps we should lock (but needs to be
134  * updated in the fast path of the scheduler).
135  */
136 nat recent_activity = ACTIVITY_YES;
137
138 /* if this flag is set as well, give up execution
139  * LOCK: none (changes once, from false->true)
140  */
141 rtsBool sched_state = SCHED_RUNNING;
142
143 /* Next thread ID to allocate.
144  * LOCK: sched_mutex
145  */
146 static StgThreadID next_thread_id = 1;
147
148 /* The smallest stack size that makes any sense is:
149  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
150  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
151  *  + 1                       (the closure to enter)
152  *  + 1                       (stg_ap_v_ret)
153  *  + 1                       (spare slot req'd by stg_ap_v_ret)
154  *
155  * A thread with this stack will bomb immediately with a stack
156  * overflow, which will increase its stack size.  
157  */
158 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
159
160 #if defined(GRAN)
161 StgTSO *CurrentTSO;
162 #endif
163
164 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
165  *  exists - earlier gccs apparently didn't.
166  *  -= chak
167  */
168 StgTSO dummy_tso;
169
170 /*
171  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
172  * in an MT setting, needed to signal that a worker thread shouldn't hang around
173  * in the scheduler when it is out of work.
174  */
175 rtsBool shutting_down_scheduler = rtsFalse;
176
177 /*
178  * This mutex protects most of the global scheduler data in
179  * the THREADED_RTS runtime.
180  */
181 #if defined(THREADED_RTS)
182 Mutex sched_mutex;
183 #endif
184
185 #if defined(PARALLEL_HASKELL)
186 StgTSO *LastTSO;
187 rtsTime TimeOfLastYield;
188 rtsBool emitSchedule = rtsTrue;
189 #endif
190
191 /* -----------------------------------------------------------------------------
192  * static function prototypes
193  * -------------------------------------------------------------------------- */
194
195 static Capability *schedule (Capability *initialCapability, Task *task);
196
197 //
198 // These function all encapsulate parts of the scheduler loop, and are
199 // abstracted only to make the structure and control flow of the
200 // scheduler clearer.
201 //
202 static void schedulePreLoop (void);
203 #if defined(THREADED_RTS)
204 static void schedulePushWork(Capability *cap, Task *task);
205 #endif
206 static void scheduleStartSignalHandlers (Capability *cap);
207 static void scheduleCheckBlockedThreads (Capability *cap);
208 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
209 static void scheduleCheckBlackHoles (Capability *cap);
210 static void scheduleDetectDeadlock (Capability *cap, Task *task);
211 #if defined(GRAN)
212 static StgTSO *scheduleProcessEvent(rtsEvent *event);
213 #endif
214 #if defined(PARALLEL_HASKELL)
215 static StgTSO *scheduleSendPendingMessages(void);
216 static void scheduleActivateSpark(void);
217 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
218 #endif
219 #if defined(PAR) || defined(GRAN)
220 static void scheduleGranParReport(void);
221 #endif
222 static void schedulePostRunThread(void);
223 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
224 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
225                                          StgTSO *t);
226 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
227                                     nat prev_what_next );
228 static void scheduleHandleThreadBlocked( StgTSO *t );
229 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
230                                              StgTSO *t );
231 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
232 static Capability *scheduleDoGC(Capability *cap, Task *task,
233                                 rtsBool force_major, 
234                                 void (*get_roots)(evac_fn));
235
236 static void unblockThread(Capability *cap, StgTSO *tso);
237 static rtsBool checkBlackHoles(Capability *cap);
238 static void AllRoots(evac_fn evac);
239
240 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
241
242 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
243                         rtsBool stop_at_atomically, StgPtr stop_here);
244
245 static void deleteThread (Capability *cap, StgTSO *tso);
246 static void deleteAllThreads (Capability *cap);
247
248 #ifdef DEBUG
249 static void printThreadBlockage(StgTSO *tso);
250 static void printThreadStatus(StgTSO *tso);
251 void printThreadQueue(StgTSO *tso);
252 #endif
253
254 #if defined(PARALLEL_HASKELL)
255 StgTSO * createSparkThread(rtsSpark spark);
256 StgTSO * activateSpark (rtsSpark spark);  
257 #endif
258
259 #ifdef DEBUG
260 static char *whatNext_strs[] = {
261   "(unknown)",
262   "ThreadRunGHC",
263   "ThreadInterpret",
264   "ThreadKilled",
265   "ThreadRelocated",
266   "ThreadComplete"
267 };
268 #endif
269
270 /* -----------------------------------------------------------------------------
271  * Putting a thread on the run queue: different scheduling policies
272  * -------------------------------------------------------------------------- */
273
274 STATIC_INLINE void
275 addToRunQueue( Capability *cap, StgTSO *t )
276 {
277 #if defined(PARALLEL_HASKELL)
278     if (RtsFlags.ParFlags.doFairScheduling) { 
279         // this does round-robin scheduling; good for concurrency
280         appendToRunQueue(cap,t);
281     } else {
282         // this does unfair scheduling; good for parallelism
283         pushOnRunQueue(cap,t);
284     }
285 #else
286     // this does round-robin scheduling; good for concurrency
287     appendToRunQueue(cap,t);
288 #endif
289 }
290
291 /* ---------------------------------------------------------------------------
292    Main scheduling loop.
293
294    We use round-robin scheduling, each thread returning to the
295    scheduler loop when one of these conditions is detected:
296
297       * out of heap space
298       * timer expires (thread yields)
299       * thread blocks
300       * thread ends
301       * stack overflow
302
303    GRAN version:
304      In a GranSim setup this loop iterates over the global event queue.
305      This revolves around the global event queue, which determines what 
306      to do next. Therefore, it's more complicated than either the 
307      concurrent or the parallel (GUM) setup.
308
309    GUM version:
310      GUM iterates over incoming messages.
311      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312      and sends out a fish whenever it has nothing to do; in-between
313      doing the actual reductions (shared code below) it processes the
314      incoming messages and deals with delayed operations 
315      (see PendingFetches).
316      This is not the ugliest code you could imagine, but it's bloody close.
317
318    ------------------------------------------------------------------------ */
319
320 static Capability *
321 schedule (Capability *initialCapability, Task *task)
322 {
323   StgTSO *t;
324   Capability *cap;
325   StgThreadReturnCode ret;
326 #if defined(GRAN)
327   rtsEvent *event;
328 #elif defined(PARALLEL_HASKELL)
329   StgTSO *tso;
330   GlobalTaskId pe;
331   rtsBool receivedFinish = rtsFalse;
332 # if defined(DEBUG)
333   nat tp_size, sp_size; // stats only
334 # endif
335 #endif
336   nat prev_what_next;
337   rtsBool ready_to_gc;
338 #if defined(THREADED_RTS)
339   rtsBool first = rtsTrue;
340 #endif
341   
342   cap = initialCapability;
343
344   // Pre-condition: this task owns initialCapability.
345   // The sched_mutex is *NOT* held
346   // NB. on return, we still hold a capability.
347
348   debugTrace (DEBUG_sched, 
349               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
350               task, initialCapability);
351
352   schedulePreLoop();
353
354   // -----------------------------------------------------------
355   // Scheduler loop starts here:
356
357 #if defined(PARALLEL_HASKELL)
358 #define TERMINATION_CONDITION        (!receivedFinish)
359 #elif defined(GRAN)
360 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
361 #else
362 #define TERMINATION_CONDITION        rtsTrue
363 #endif
364
365   while (TERMINATION_CONDITION) {
366
367 #if defined(GRAN)
368       /* Choose the processor with the next event */
369       CurrentProc = event->proc;
370       CurrentTSO = event->tso;
371 #endif
372
373 #if defined(THREADED_RTS)
374       if (first) {
375           // don't yield the first time, we want a chance to run this
376           // thread for a bit, even if there are others banging at the
377           // door.
378           first = rtsFalse;
379           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
380       } else {
381           // Yield the capability to higher-priority tasks if necessary.
382           yieldCapability(&cap, task);
383       }
384 #endif
385       
386 #if defined(THREADED_RTS)
387       schedulePushWork(cap,task);
388 #endif
389
390     // Check whether we have re-entered the RTS from Haskell without
391     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
392     // call).
393     if (cap->in_haskell) {
394           errorBelch("schedule: re-entered unsafely.\n"
395                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
396           stg_exit(EXIT_FAILURE);
397     }
398
399     // The interruption / shutdown sequence.
400     // 
401     // In order to cleanly shut down the runtime, we want to:
402     //   * make sure that all main threads return to their callers
403     //     with the state 'Interrupted'.
404     //   * clean up all OS threads assocated with the runtime
405     //   * free all memory etc.
406     //
407     // So the sequence for ^C goes like this:
408     //
409     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
410     //     arranges for some Capability to wake up
411     //
412     //   * all threads in the system are halted, and the zombies are
413     //     placed on the run queue for cleaning up.  We acquire all
414     //     the capabilities in order to delete the threads, this is
415     //     done by scheduleDoGC() for convenience (because GC already
416     //     needs to acquire all the capabilities).  We can't kill
417     //     threads involved in foreign calls.
418     // 
419     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
420     //
421     //   * sched_state := SCHED_SHUTTING_DOWN
422     //
423     //   * all workers exit when the run queue on their capability
424     //     drains.  All main threads will also exit when their TSO
425     //     reaches the head of the run queue and they can return.
426     //
427     //   * eventually all Capabilities will shut down, and the RTS can
428     //     exit.
429     //
430     //   * We might be left with threads blocked in foreign calls, 
431     //     we should really attempt to kill these somehow (TODO);
432     
433     switch (sched_state) {
434     case SCHED_RUNNING:
435         break;
436     case SCHED_INTERRUPTING:
437         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
438 #if defined(THREADED_RTS)
439         discardSparksCap(cap);
440 #endif
441         /* scheduleDoGC() deletes all the threads */
442         cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
443         break;
444     case SCHED_SHUTTING_DOWN:
445         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
446         // If we are a worker, just exit.  If we're a bound thread
447         // then we will exit below when we've removed our TSO from
448         // the run queue.
449         if (task->tso == NULL && emptyRunQueue(cap)) {
450             return cap;
451         }
452         break;
453     default:
454         barf("sched_state: %d", sched_state);
455     }
456
457 #if defined(THREADED_RTS)
458     // If the run queue is empty, take a spark and turn it into a thread.
459     {
460         if (emptyRunQueue(cap)) {
461             StgClosure *spark;
462             spark = findSpark(cap);
463             if (spark != NULL) {
464                 debugTrace(DEBUG_sched,
465                            "turning spark of closure %p into a thread",
466                            (StgClosure *)spark);
467                 createSparkThread(cap,spark);     
468             }
469         }
470     }
471 #endif // THREADED_RTS
472
473     scheduleStartSignalHandlers(cap);
474
475     // Only check the black holes here if we've nothing else to do.
476     // During normal execution, the black hole list only gets checked
477     // at GC time, to avoid repeatedly traversing this possibly long
478     // list each time around the scheduler.
479     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
480
481     scheduleCheckWakeupThreads(cap);
482
483     scheduleCheckBlockedThreads(cap);
484
485     scheduleDetectDeadlock(cap,task);
486 #if defined(THREADED_RTS)
487     cap = task->cap;    // reload cap, it might have changed
488 #endif
489
490     // Normally, the only way we can get here with no threads to
491     // run is if a keyboard interrupt received during 
492     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
493     // Additionally, it is not fatal for the
494     // threaded RTS to reach here with no threads to run.
495     //
496     // win32: might be here due to awaitEvent() being abandoned
497     // as a result of a console event having been delivered.
498     if ( emptyRunQueue(cap) ) {
499 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
500         ASSERT(sched_state >= SCHED_INTERRUPTING);
501 #endif
502         continue; // nothing to do
503     }
504
505 #if defined(PARALLEL_HASKELL)
506     scheduleSendPendingMessages();
507     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
508         continue;
509
510 #if defined(SPARKS)
511     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
512 #endif
513
514     /* If we still have no work we need to send a FISH to get a spark
515        from another PE */
516     if (emptyRunQueue(cap)) {
517         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
518         ASSERT(rtsFalse); // should not happen at the moment
519     }
520     // from here: non-empty run queue.
521     //  TODO: merge above case with this, only one call processMessages() !
522     if (PacketsWaiting()) {  /* process incoming messages, if
523                                 any pending...  only in else
524                                 because getRemoteWork waits for
525                                 messages as well */
526         receivedFinish = processMessages();
527     }
528 #endif
529
530 #if defined(GRAN)
531     scheduleProcessEvent(event);
532 #endif
533
534     // 
535     // Get a thread to run
536     //
537     t = popRunQueue(cap);
538
539 #if defined(GRAN) || defined(PAR)
540     scheduleGranParReport(); // some kind of debuging output
541 #else
542     // Sanity check the thread we're about to run.  This can be
543     // expensive if there is lots of thread switching going on...
544     IF_DEBUG(sanity,checkTSO(t));
545 #endif
546
547 #if defined(THREADED_RTS)
548     // Check whether we can run this thread in the current task.
549     // If not, we have to pass our capability to the right task.
550     {
551         Task *bound = t->bound;
552       
553         if (bound) {
554             if (bound == task) {
555                 debugTrace(DEBUG_sched,
556                            "### Running thread %d in bound thread", t->id);
557                 // yes, the Haskell thread is bound to the current native thread
558             } else {
559                 debugTrace(DEBUG_sched,
560                            "### thread %d bound to another OS thread", t->id);
561                 // no, bound to a different Haskell thread: pass to that thread
562                 pushOnRunQueue(cap,t);
563                 continue;
564             }
565         } else {
566             // The thread we want to run is unbound.
567             if (task->tso) { 
568                 debugTrace(DEBUG_sched,
569                            "### this OS thread cannot run thread %d", t->id);
570                 // no, the current native thread is bound to a different
571                 // Haskell thread, so pass it to any worker thread
572                 pushOnRunQueue(cap,t);
573                 continue; 
574             }
575         }
576     }
577 #endif
578
579     cap->r.rCurrentTSO = t;
580     
581     /* context switches are initiated by the timer signal, unless
582      * the user specified "context switch as often as possible", with
583      * +RTS -C0
584      */
585     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
586         && !emptyThreadQueues(cap)) {
587         context_switch = 1;
588     }
589          
590 run_thread:
591
592     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
593                               (long)t->id, whatNext_strs[t->what_next]);
594
595 #if defined(PROFILING)
596     startHeapProfTimer();
597 #endif
598
599     // ----------------------------------------------------------------------
600     // Run the current thread 
601
602     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
603     ASSERT(t->cap == cap);
604
605     prev_what_next = t->what_next;
606
607     errno = t->saved_errno;
608     cap->in_haskell = rtsTrue;
609
610     dirtyTSO(t);
611
612     recent_activity = ACTIVITY_YES;
613
614     switch (prev_what_next) {
615         
616     case ThreadKilled:
617     case ThreadComplete:
618         /* Thread already finished, return to scheduler. */
619         ret = ThreadFinished;
620         break;
621         
622     case ThreadRunGHC:
623     {
624         StgRegTable *r;
625         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
626         cap = regTableToCapability(r);
627         ret = r->rRet;
628         break;
629     }
630     
631     case ThreadInterpret:
632         cap = interpretBCO(cap);
633         ret = cap->r.rRet;
634         break;
635         
636     default:
637         barf("schedule: invalid what_next field");
638     }
639
640     cap->in_haskell = rtsFalse;
641
642     // The TSO might have moved, eg. if it re-entered the RTS and a GC
643     // happened.  So find the new location:
644     t = cap->r.rCurrentTSO;
645
646     // We have run some Haskell code: there might be blackhole-blocked
647     // threads to wake up now.
648     // Lock-free test here should be ok, we're just setting a flag.
649     if ( blackhole_queue != END_TSO_QUEUE ) {
650         blackholes_need_checking = rtsTrue;
651     }
652     
653     // And save the current errno in this thread.
654     // XXX: possibly bogus for SMP because this thread might already
655     // be running again, see code below.
656     t->saved_errno = errno;
657
658 #if defined(THREADED_RTS)
659     // If ret is ThreadBlocked, and this Task is bound to the TSO that
660     // blocked, we are in limbo - the TSO is now owned by whatever it
661     // is blocked on, and may in fact already have been woken up,
662     // perhaps even on a different Capability.  It may be the case
663     // that task->cap != cap.  We better yield this Capability
664     // immediately and return to normaility.
665     if (ret == ThreadBlocked) {
666         debugTrace(DEBUG_sched,
667                    "--<< thread %d (%s) stopped: blocked",
668                    t->id, whatNext_strs[t->what_next]);
669         continue;
670     }
671 #endif
672
673     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
674     ASSERT(t->cap == cap);
675
676     // ----------------------------------------------------------------------
677     
678     // Costs for the scheduler are assigned to CCS_SYSTEM
679 #if defined(PROFILING)
680     stopHeapProfTimer();
681     CCCS = CCS_SYSTEM;
682 #endif
683     
684     schedulePostRunThread();
685
686     ready_to_gc = rtsFalse;
687
688     switch (ret) {
689     case HeapOverflow:
690         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
691         break;
692
693     case StackOverflow:
694         scheduleHandleStackOverflow(cap,task,t);
695         break;
696
697     case ThreadYielding:
698         if (scheduleHandleYield(cap, t, prev_what_next)) {
699             // shortcut for switching between compiler/interpreter:
700             goto run_thread; 
701         }
702         break;
703
704     case ThreadBlocked:
705         scheduleHandleThreadBlocked(t);
706         break;
707
708     case ThreadFinished:
709         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
711         break;
712
713     default:
714       barf("schedule: invalid thread return code %d", (int)ret);
715     }
716
717     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
718     if (ready_to_gc) {
719       cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
720     }
721   } /* end of while() */
722
723   debugTrace(PAR_DEBUG_verbose,
724              "== Leaving schedule() after having received Finish");
725 }
726
727 /* ----------------------------------------------------------------------------
728  * Setting up the scheduler loop
729  * ------------------------------------------------------------------------- */
730
731 static void
732 schedulePreLoop(void)
733 {
734 #if defined(GRAN) 
735     /* set up first event to get things going */
736     /* ToDo: assign costs for system setup and init MainTSO ! */
737     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
738               ContinueThread, 
739               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
740     
741     debugTrace (DEBUG_gran,
742                 "GRAN: Init CurrentTSO (in schedule) = %p", 
743                 CurrentTSO);
744     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
745     
746     if (RtsFlags.GranFlags.Light) {
747         /* Save current time; GranSim Light only */
748         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
749     }      
750 #endif
751 }
752
753 /* -----------------------------------------------------------------------------
754  * schedulePushWork()
755  *
756  * Push work to other Capabilities if we have some.
757  * -------------------------------------------------------------------------- */
758
759 #if defined(THREADED_RTS)
760 static void
761 schedulePushWork(Capability *cap USED_IF_THREADS, 
762                  Task *task      USED_IF_THREADS)
763 {
764     Capability *free_caps[n_capabilities], *cap0;
765     nat i, n_free_caps;
766
767     // migration can be turned off with +RTS -qg
768     if (!RtsFlags.ParFlags.migrate) return;
769
770     // Check whether we have more threads on our run queue, or sparks
771     // in our pool, that we could hand to another Capability.
772     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
773         && sparkPoolSizeCap(cap) < 2) {
774         return;
775     }
776
777     // First grab as many free Capabilities as we can.
778     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
779         cap0 = &capabilities[i];
780         if (cap != cap0 && tryGrabCapability(cap0,task)) {
781             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
782                 // it already has some work, we just grabbed it at 
783                 // the wrong moment.  Or maybe it's deadlocked!
784                 releaseCapability(cap0);
785             } else {
786                 free_caps[n_free_caps++] = cap0;
787             }
788         }
789     }
790
791     // we now have n_free_caps free capabilities stashed in
792     // free_caps[].  Share our run queue equally with them.  This is
793     // probably the simplest thing we could do; improvements we might
794     // want to do include:
795     //
796     //   - giving high priority to moving relatively new threads, on 
797     //     the gournds that they haven't had time to build up a
798     //     working set in the cache on this CPU/Capability.
799     //
800     //   - giving low priority to moving long-lived threads
801
802     if (n_free_caps > 0) {
803         StgTSO *prev, *t, *next;
804         rtsBool pushed_to_all;
805
806         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
807
808         i = 0;
809         pushed_to_all = rtsFalse;
810
811         if (cap->run_queue_hd != END_TSO_QUEUE) {
812             prev = cap->run_queue_hd;
813             t = prev->link;
814             prev->link = END_TSO_QUEUE;
815             for (; t != END_TSO_QUEUE; t = next) {
816                 next = t->link;
817                 t->link = END_TSO_QUEUE;
818                 if (t->what_next == ThreadRelocated
819                     || t->bound == task // don't move my bound thread
820                     || tsoLocked(t)) {  // don't move a locked thread
821                     prev->link = t;
822                     prev = t;
823                 } else if (i == n_free_caps) {
824                     pushed_to_all = rtsTrue;
825                     i = 0;
826                     // keep one for us
827                     prev->link = t;
828                     prev = t;
829                 } else {
830                     debugTrace(DEBUG_sched, "pushing thread %d to capability %d", t->id, free_caps[i]->no);
831                     appendToRunQueue(free_caps[i],t);
832                     if (t->bound) { t->bound->cap = free_caps[i]; }
833                     t->cap = free_caps[i];
834                     i++;
835                 }
836             }
837             cap->run_queue_tl = prev;
838         }
839
840         // If there are some free capabilities that we didn't push any
841         // threads to, then try to push a spark to each one.
842         if (!pushed_to_all) {
843             StgClosure *spark;
844             // i is the next free capability to push to
845             for (; i < n_free_caps; i++) {
846                 if (emptySparkPoolCap(free_caps[i])) {
847                     spark = findSpark(cap);
848                     if (spark != NULL) {
849                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
850                         newSpark(&(free_caps[i]->r), spark);
851                     }
852                 }
853             }
854         }
855
856         // release the capabilities
857         for (i = 0; i < n_free_caps; i++) {
858             task->cap = free_caps[i];
859             releaseCapability(free_caps[i]);
860         }
861     }
862     task->cap = cap; // reset to point to our Capability.
863 }
864 #endif
865
866 /* ----------------------------------------------------------------------------
867  * Start any pending signal handlers
868  * ------------------------------------------------------------------------- */
869
870 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
871 static void
872 scheduleStartSignalHandlers(Capability *cap)
873 {
874     if (signals_pending()) { // safe outside the lock
875         startSignalHandlers(cap);
876     }
877 }
878 #else
879 static void
880 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
881 {
882 }
883 #endif
884
885 /* ----------------------------------------------------------------------------
886  * Check for blocked threads that can be woken up.
887  * ------------------------------------------------------------------------- */
888
889 static void
890 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
891 {
892 #if !defined(THREADED_RTS)
893     //
894     // Check whether any waiting threads need to be woken up.  If the
895     // run queue is empty, and there are no other tasks running, we
896     // can wait indefinitely for something to happen.
897     //
898     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
899     {
900         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
901     }
902 #endif
903 }
904
905
906 /* ----------------------------------------------------------------------------
907  * Check for threads woken up by other Capabilities
908  * ------------------------------------------------------------------------- */
909
910 static void
911 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
912 {
913 #if defined(THREADED_RTS)
914     // Any threads that were woken up by other Capabilities get
915     // appended to our run queue.
916     if (!emptyWakeupQueue(cap)) {
917         ACQUIRE_LOCK(&cap->lock);
918         if (emptyRunQueue(cap)) {
919             cap->run_queue_hd = cap->wakeup_queue_hd;
920             cap->run_queue_tl = cap->wakeup_queue_tl;
921         } else {
922             cap->run_queue_tl->link = cap->wakeup_queue_hd;
923             cap->run_queue_tl = cap->wakeup_queue_tl;
924         }
925         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
926         RELEASE_LOCK(&cap->lock);
927     }
928 #endif
929 }
930
931 /* ----------------------------------------------------------------------------
932  * Check for threads blocked on BLACKHOLEs that can be woken up
933  * ------------------------------------------------------------------------- */
934 static void
935 scheduleCheckBlackHoles (Capability *cap)
936 {
937     if ( blackholes_need_checking ) // check without the lock first
938     {
939         ACQUIRE_LOCK(&sched_mutex);
940         if ( blackholes_need_checking ) {
941             checkBlackHoles(cap);
942             blackholes_need_checking = rtsFalse;
943         }
944         RELEASE_LOCK(&sched_mutex);
945     }
946 }
947
948 /* ----------------------------------------------------------------------------
949  * Detect deadlock conditions and attempt to resolve them.
950  * ------------------------------------------------------------------------- */
951
952 static void
953 scheduleDetectDeadlock (Capability *cap, Task *task)
954 {
955
956 #if defined(PARALLEL_HASKELL)
957     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
958     return;
959 #endif
960
961     /* 
962      * Detect deadlock: when we have no threads to run, there are no
963      * threads blocked, waiting for I/O, or sleeping, and all the
964      * other tasks are waiting for work, we must have a deadlock of
965      * some description.
966      */
967     if ( emptyThreadQueues(cap) )
968     {
969 #if defined(THREADED_RTS)
970         /* 
971          * In the threaded RTS, we only check for deadlock if there
972          * has been no activity in a complete timeslice.  This means
973          * we won't eagerly start a full GC just because we don't have
974          * any threads to run currently.
975          */
976         if (recent_activity != ACTIVITY_INACTIVE) return;
977 #endif
978
979         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
980
981         // Garbage collection can release some new threads due to
982         // either (a) finalizers or (b) threads resurrected because
983         // they are unreachable and will therefore be sent an
984         // exception.  Any threads thus released will be immediately
985         // runnable.
986         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
987
988         recent_activity = ACTIVITY_DONE_GC;
989         
990         if ( !emptyRunQueue(cap) ) return;
991
992 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
993         /* If we have user-installed signal handlers, then wait
994          * for signals to arrive rather then bombing out with a
995          * deadlock.
996          */
997         if ( anyUserHandlers() ) {
998             debugTrace(DEBUG_sched,
999                        "still deadlocked, waiting for signals...");
1000
1001             awaitUserSignals();
1002
1003             if (signals_pending()) {
1004                 startSignalHandlers(cap);
1005             }
1006
1007             // either we have threads to run, or we were interrupted:
1008             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1009         }
1010 #endif
1011
1012 #if !defined(THREADED_RTS)
1013         /* Probably a real deadlock.  Send the current main thread the
1014          * Deadlock exception.
1015          */
1016         if (task->tso) {
1017             switch (task->tso->why_blocked) {
1018             case BlockedOnSTM:
1019             case BlockedOnBlackHole:
1020             case BlockedOnException:
1021             case BlockedOnMVar:
1022                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
1023                 return;
1024             default:
1025                 barf("deadlock: main thread blocked in a strange way");
1026             }
1027         }
1028         return;
1029 #endif
1030     }
1031 }
1032
1033 /* ----------------------------------------------------------------------------
1034  * Process an event (GRAN only)
1035  * ------------------------------------------------------------------------- */
1036
1037 #if defined(GRAN)
1038 static StgTSO *
1039 scheduleProcessEvent(rtsEvent *event)
1040 {
1041     StgTSO *t;
1042
1043     if (RtsFlags.GranFlags.Light)
1044       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1045
1046     /* adjust time based on time-stamp */
1047     if (event->time > CurrentTime[CurrentProc] &&
1048         event->evttype != ContinueThread)
1049       CurrentTime[CurrentProc] = event->time;
1050     
1051     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1052     if (!RtsFlags.GranFlags.Light)
1053       handleIdlePEs();
1054
1055     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1056
1057     /* main event dispatcher in GranSim */
1058     switch (event->evttype) {
1059       /* Should just be continuing execution */
1060     case ContinueThread:
1061       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1062       /* ToDo: check assertion
1063       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1064              run_queue_hd != END_TSO_QUEUE);
1065       */
1066       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1067       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1068           procStatus[CurrentProc]==Fetching) {
1069         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1070               CurrentTSO->id, CurrentTSO, CurrentProc);
1071         goto next_thread;
1072       } 
1073       /* Ignore ContinueThreads for completed threads */
1074       if (CurrentTSO->what_next == ThreadComplete) {
1075         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1076               CurrentTSO->id, CurrentTSO, CurrentProc);
1077         goto next_thread;
1078       } 
1079       /* Ignore ContinueThreads for threads that are being migrated */
1080       if (PROCS(CurrentTSO)==Nowhere) { 
1081         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1082               CurrentTSO->id, CurrentTSO, CurrentProc);
1083         goto next_thread;
1084       }
1085       /* The thread should be at the beginning of the run queue */
1086       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1087         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1088               CurrentTSO->id, CurrentTSO, CurrentProc);
1089         break; // run the thread anyway
1090       }
1091       /*
1092       new_event(proc, proc, CurrentTime[proc],
1093                 FindWork,
1094                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1095       goto next_thread; 
1096       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1097       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1098
1099     case FetchNode:
1100       do_the_fetchnode(event);
1101       goto next_thread;             /* handle next event in event queue  */
1102       
1103     case GlobalBlock:
1104       do_the_globalblock(event);
1105       goto next_thread;             /* handle next event in event queue  */
1106       
1107     case FetchReply:
1108       do_the_fetchreply(event);
1109       goto next_thread;             /* handle next event in event queue  */
1110       
1111     case UnblockThread:   /* Move from the blocked queue to the tail of */
1112       do_the_unblock(event);
1113       goto next_thread;             /* handle next event in event queue  */
1114       
1115     case ResumeThread:  /* Move from the blocked queue to the tail of */
1116       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1117       event->tso->gran.blocktime += 
1118         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1119       do_the_startthread(event);
1120       goto next_thread;             /* handle next event in event queue  */
1121       
1122     case StartThread:
1123       do_the_startthread(event);
1124       goto next_thread;             /* handle next event in event queue  */
1125       
1126     case MoveThread:
1127       do_the_movethread(event);
1128       goto next_thread;             /* handle next event in event queue  */
1129       
1130     case MoveSpark:
1131       do_the_movespark(event);
1132       goto next_thread;             /* handle next event in event queue  */
1133       
1134     case FindWork:
1135       do_the_findwork(event);
1136       goto next_thread;             /* handle next event in event queue  */
1137       
1138     default:
1139       barf("Illegal event type %u\n", event->evttype);
1140     }  /* switch */
1141     
1142     /* This point was scheduler_loop in the old RTS */
1143
1144     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1145
1146     TimeOfLastEvent = CurrentTime[CurrentProc];
1147     TimeOfNextEvent = get_time_of_next_event();
1148     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1149     // CurrentTSO = ThreadQueueHd;
1150
1151     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1152                          TimeOfNextEvent));
1153
1154     if (RtsFlags.GranFlags.Light) 
1155       GranSimLight_leave_system(event, &ActiveTSO); 
1156
1157     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1158
1159     IF_DEBUG(gran, 
1160              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1161
1162     /* in a GranSim setup the TSO stays on the run queue */
1163     t = CurrentTSO;
1164     /* Take a thread from the run queue. */
1165     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1166
1167     IF_DEBUG(gran, 
1168              debugBelch("GRAN: About to run current thread, which is\n");
1169              G_TSO(t,5));
1170
1171     context_switch = 0; // turned on via GranYield, checking events and time slice
1172
1173     IF_DEBUG(gran, 
1174              DumpGranEvent(GR_SCHEDULE, t));
1175
1176     procStatus[CurrentProc] = Busy;
1177 }
1178 #endif // GRAN
1179
1180 /* ----------------------------------------------------------------------------
1181  * Send pending messages (PARALLEL_HASKELL only)
1182  * ------------------------------------------------------------------------- */
1183
1184 #if defined(PARALLEL_HASKELL)
1185 static StgTSO *
1186 scheduleSendPendingMessages(void)
1187 {
1188     StgSparkPool *pool;
1189     rtsSpark spark;
1190     StgTSO *t;
1191
1192 # if defined(PAR) // global Mem.Mgmt., omit for now
1193     if (PendingFetches != END_BF_QUEUE) {
1194         processFetches();
1195     }
1196 # endif
1197     
1198     if (RtsFlags.ParFlags.BufferTime) {
1199         // if we use message buffering, we must send away all message
1200         // packets which have become too old...
1201         sendOldBuffers(); 
1202     }
1203 }
1204 #endif
1205
1206 /* ----------------------------------------------------------------------------
1207  * Activate spark threads (PARALLEL_HASKELL only)
1208  * ------------------------------------------------------------------------- */
1209
1210 #if defined(PARALLEL_HASKELL)
1211 static void
1212 scheduleActivateSpark(void)
1213 {
1214 #if defined(SPARKS)
1215   ASSERT(emptyRunQueue());
1216 /* We get here if the run queue is empty and want some work.
1217    We try to turn a spark into a thread, and add it to the run queue,
1218    from where it will be picked up in the next iteration of the scheduler
1219    loop.
1220 */
1221
1222       /* :-[  no local threads => look out for local sparks */
1223       /* the spark pool for the current PE */
1224       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1225       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1226           pool->hd < pool->tl) {
1227         /* 
1228          * ToDo: add GC code check that we really have enough heap afterwards!!
1229          * Old comment:
1230          * If we're here (no runnable threads) and we have pending
1231          * sparks, we must have a space problem.  Get enough space
1232          * to turn one of those pending sparks into a
1233          * thread... 
1234          */
1235
1236         spark = findSpark(rtsFalse);            /* get a spark */
1237         if (spark != (rtsSpark) NULL) {
1238           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1239           IF_PAR_DEBUG(fish, // schedule,
1240                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1241                              tso->id, tso, advisory_thread_count));
1242
1243           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1244             IF_PAR_DEBUG(fish, // schedule,
1245                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1246                             spark));
1247             return rtsFalse; /* failed to generate a thread */
1248           }                  /* otherwise fall through & pick-up new tso */
1249         } else {
1250           IF_PAR_DEBUG(fish, // schedule,
1251                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1252                              spark_queue_len(pool)));
1253           return rtsFalse;  /* failed to generate a thread */
1254         }
1255         return rtsTrue;  /* success in generating a thread */
1256   } else { /* no more threads permitted or pool empty */
1257     return rtsFalse;  /* failed to generateThread */
1258   }
1259 #else
1260   tso = NULL; // avoid compiler warning only
1261   return rtsFalse;  /* dummy in non-PAR setup */
1262 #endif // SPARKS
1263 }
1264 #endif // PARALLEL_HASKELL
1265
1266 /* ----------------------------------------------------------------------------
1267  * Get work from a remote node (PARALLEL_HASKELL only)
1268  * ------------------------------------------------------------------------- */
1269     
1270 #if defined(PARALLEL_HASKELL)
1271 static rtsBool
1272 scheduleGetRemoteWork(rtsBool *receivedFinish)
1273 {
1274   ASSERT(emptyRunQueue());
1275
1276   if (RtsFlags.ParFlags.BufferTime) {
1277         IF_PAR_DEBUG(verbose, 
1278                 debugBelch("...send all pending data,"));
1279         {
1280           nat i;
1281           for (i=1; i<=nPEs; i++)
1282             sendImmediately(i); // send all messages away immediately
1283         }
1284   }
1285 # ifndef SPARKS
1286         //++EDEN++ idle() , i.e. send all buffers, wait for work
1287         // suppress fishing in EDEN... just look for incoming messages
1288         // (blocking receive)
1289   IF_PAR_DEBUG(verbose, 
1290                debugBelch("...wait for incoming messages...\n"));
1291   *receivedFinish = processMessages(); // blocking receive...
1292
1293         // and reenter scheduling loop after having received something
1294         // (return rtsFalse below)
1295
1296 # else /* activate SPARKS machinery */
1297 /* We get here, if we have no work, tried to activate a local spark, but still
1298    have no work. We try to get a remote spark, by sending a FISH message.
1299    Thread migration should be added here, and triggered when a sequence of 
1300    fishes returns without work. */
1301         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1302
1303       /* =8-[  no local sparks => look for work on other PEs */
1304         /*
1305          * We really have absolutely no work.  Send out a fish
1306          * (there may be some out there already), and wait for
1307          * something to arrive.  We clearly can't run any threads
1308          * until a SCHEDULE or RESUME arrives, and so that's what
1309          * we're hoping to see.  (Of course, we still have to
1310          * respond to other types of messages.)
1311          */
1312         rtsTime now = msTime() /*CURRENT_TIME*/;
1313         IF_PAR_DEBUG(verbose, 
1314                      debugBelch("--  now=%ld\n", now));
1315         IF_PAR_DEBUG(fish, // verbose,
1316              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1317                  (last_fish_arrived_at!=0 &&
1318                   last_fish_arrived_at+delay > now)) {
1319                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1320                      now, last_fish_arrived_at+delay, 
1321                      last_fish_arrived_at,
1322                      delay);
1323              });
1324   
1325         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1326             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1327           if (last_fish_arrived_at==0 ||
1328               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1329             /* outstandingFishes is set in sendFish, processFish;
1330                avoid flooding system with fishes via delay */
1331     next_fish_to_send_at = 0;  
1332   } else {
1333     /* ToDo: this should be done in the main scheduling loop to avoid the
1334              busy wait here; not so bad if fish delay is very small  */
1335     int iq = 0; // DEBUGGING -- HWL
1336     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1337     /* send a fish when ready, but process messages that arrive in the meantime */
1338     do {
1339       if (PacketsWaiting()) {
1340         iq++; // DEBUGGING
1341         *receivedFinish = processMessages();
1342       }
1343       now = msTime();
1344     } while (!*receivedFinish || now<next_fish_to_send_at);
1345     // JB: This means the fish could become obsolete, if we receive
1346     // work. Better check for work again? 
1347     // last line: while (!receivedFinish || !haveWork || now<...)
1348     // next line: if (receivedFinish || haveWork )
1349
1350     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1351       return rtsFalse;  // NB: this will leave scheduler loop
1352                         // immediately after return!
1353                           
1354     IF_PAR_DEBUG(fish, // verbose,
1355                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1356
1357   }
1358
1359     // JB: IMHO, this should all be hidden inside sendFish(...)
1360     /* pe = choosePE(); 
1361        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1362                 NEW_FISH_HUNGER);
1363
1364     // Global statistics: count no. of fishes
1365     if (RtsFlags.ParFlags.ParStats.Global &&
1366          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1367            globalParStats.tot_fish_mess++;
1368            }
1369     */ 
1370
1371   /* delayed fishes must have been sent by now! */
1372   next_fish_to_send_at = 0;  
1373   }
1374       
1375   *receivedFinish = processMessages();
1376 # endif /* SPARKS */
1377
1378  return rtsFalse;
1379  /* NB: this function always returns rtsFalse, meaning the scheduler
1380     loop continues with the next iteration; 
1381     rationale: 
1382       return code means success in finding work; we enter this function
1383       if there is no local work, thus have to send a fish which takes
1384       time until it arrives with work; in the meantime we should process
1385       messages in the main loop;
1386  */
1387 }
1388 #endif // PARALLEL_HASKELL
1389
1390 /* ----------------------------------------------------------------------------
1391  * PAR/GRAN: Report stats & debugging info(?)
1392  * ------------------------------------------------------------------------- */
1393
1394 #if defined(PAR) || defined(GRAN)
1395 static void
1396 scheduleGranParReport(void)
1397 {
1398   ASSERT(run_queue_hd != END_TSO_QUEUE);
1399
1400   /* Take a thread from the run queue, if we have work */
1401   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1402
1403     /* If this TSO has got its outport closed in the meantime, 
1404      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1405      * It has to be marked as TH_DEAD for this purpose.
1406      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1407
1408 JB: TODO: investigate wether state change field could be nuked
1409      entirely and replaced by the normal tso state (whatnext
1410      field). All we want to do is to kill tsos from outside.
1411      */
1412
1413     /* ToDo: write something to the log-file
1414     if (RTSflags.ParFlags.granSimStats && !sameThread)
1415         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1416
1417     CurrentTSO = t;
1418     */
1419     /* the spark pool for the current PE */
1420     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1421
1422     IF_DEBUG(scheduler, 
1423              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1424                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1425
1426     IF_PAR_DEBUG(fish,
1427              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1428                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1429
1430     if (RtsFlags.ParFlags.ParStats.Full && 
1431         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1432         (emitSchedule || // forced emit
1433          (t && LastTSO && t->id != LastTSO->id))) {
1434       /* 
1435          we are running a different TSO, so write a schedule event to log file
1436          NB: If we use fair scheduling we also have to write  a deschedule 
1437              event for LastTSO; with unfair scheduling we know that the
1438              previous tso has blocked whenever we switch to another tso, so
1439              we don't need it in GUM for now
1440       */
1441       IF_PAR_DEBUG(fish, // schedule,
1442                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1443
1444       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1445                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1446       emitSchedule = rtsFalse;
1447     }
1448 }     
1449 #endif
1450
1451 /* ----------------------------------------------------------------------------
1452  * After running a thread...
1453  * ------------------------------------------------------------------------- */
1454
1455 static void
1456 schedulePostRunThread(void)
1457 {
1458 #if defined(PAR)
1459     /* HACK 675: if the last thread didn't yield, make sure to print a 
1460        SCHEDULE event to the log file when StgRunning the next thread, even
1461        if it is the same one as before */
1462     LastTSO = t; 
1463     TimeOfLastYield = CURRENT_TIME;
1464 #endif
1465
1466   /* some statistics gathering in the parallel case */
1467
1468 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1469   switch (ret) {
1470     case HeapOverflow:
1471 # if defined(GRAN)
1472       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1473       globalGranStats.tot_heapover++;
1474 # elif defined(PAR)
1475       globalParStats.tot_heapover++;
1476 # endif
1477       break;
1478
1479      case StackOverflow:
1480 # if defined(GRAN)
1481       IF_DEBUG(gran, 
1482                DumpGranEvent(GR_DESCHEDULE, t));
1483       globalGranStats.tot_stackover++;
1484 # elif defined(PAR)
1485       // IF_DEBUG(par, 
1486       // DumpGranEvent(GR_DESCHEDULE, t);
1487       globalParStats.tot_stackover++;
1488 # endif
1489       break;
1490
1491     case ThreadYielding:
1492 # if defined(GRAN)
1493       IF_DEBUG(gran, 
1494                DumpGranEvent(GR_DESCHEDULE, t));
1495       globalGranStats.tot_yields++;
1496 # elif defined(PAR)
1497       // IF_DEBUG(par, 
1498       // DumpGranEvent(GR_DESCHEDULE, t);
1499       globalParStats.tot_yields++;
1500 # endif
1501       break; 
1502
1503     case ThreadBlocked:
1504 # if defined(GRAN)
1505         debugTrace(DEBUG_sched, 
1506                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1507                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1508                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1509                if (t->block_info.closure!=(StgClosure*)NULL)
1510                  print_bq(t->block_info.closure);
1511                debugBelch("\n"));
1512
1513       // ??? needed; should emit block before
1514       IF_DEBUG(gran, 
1515                DumpGranEvent(GR_DESCHEDULE, t)); 
1516       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1517       /*
1518         ngoq Dogh!
1519       ASSERT(procStatus[CurrentProc]==Busy || 
1520               ((procStatus[CurrentProc]==Fetching) && 
1521               (t->block_info.closure!=(StgClosure*)NULL)));
1522       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1523           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1524             procStatus[CurrentProc]==Fetching)) 
1525         procStatus[CurrentProc] = Idle;
1526       */
1527 # elif defined(PAR)
1528 //++PAR++  blockThread() writes the event (change?)
1529 # endif
1530     break;
1531
1532   case ThreadFinished:
1533     break;
1534
1535   default:
1536     barf("parGlobalStats: unknown return code");
1537     break;
1538     }
1539 #endif
1540 }
1541
1542 /* -----------------------------------------------------------------------------
1543  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1544  * -------------------------------------------------------------------------- */
1545
1546 static rtsBool
1547 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1548 {
1549     // did the task ask for a large block?
1550     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1551         // if so, get one and push it on the front of the nursery.
1552         bdescr *bd;
1553         lnat blocks;
1554         
1555         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1556         
1557         debugTrace(DEBUG_sched,
1558                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1559                    (long)t->id, whatNext_strs[t->what_next], blocks);
1560     
1561         // don't do this if the nursery is (nearly) full, we'll GC first.
1562         if (cap->r.rCurrentNursery->link != NULL ||
1563             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1564                                                // if the nursery has only one block.
1565             
1566             ACQUIRE_SM_LOCK
1567             bd = allocGroup( blocks );
1568             RELEASE_SM_LOCK
1569             cap->r.rNursery->n_blocks += blocks;
1570             
1571             // link the new group into the list
1572             bd->link = cap->r.rCurrentNursery;
1573             bd->u.back = cap->r.rCurrentNursery->u.back;
1574             if (cap->r.rCurrentNursery->u.back != NULL) {
1575                 cap->r.rCurrentNursery->u.back->link = bd;
1576             } else {
1577 #if !defined(THREADED_RTS)
1578                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1579                        g0s0 == cap->r.rNursery);
1580 #endif
1581                 cap->r.rNursery->blocks = bd;
1582             }             
1583             cap->r.rCurrentNursery->u.back = bd;
1584             
1585             // initialise it as a nursery block.  We initialise the
1586             // step, gen_no, and flags field of *every* sub-block in
1587             // this large block, because this is easier than making
1588             // sure that we always find the block head of a large
1589             // block whenever we call Bdescr() (eg. evacuate() and
1590             // isAlive() in the GC would both have to do this, at
1591             // least).
1592             { 
1593                 bdescr *x;
1594                 for (x = bd; x < bd + blocks; x++) {
1595                     x->step = cap->r.rNursery;
1596                     x->gen_no = 0;
1597                     x->flags = 0;
1598                 }
1599             }
1600             
1601             // This assert can be a killer if the app is doing lots
1602             // of large block allocations.
1603             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1604             
1605             // now update the nursery to point to the new block
1606             cap->r.rCurrentNursery = bd;
1607             
1608             // we might be unlucky and have another thread get on the
1609             // run queue before us and steal the large block, but in that
1610             // case the thread will just end up requesting another large
1611             // block.
1612             pushOnRunQueue(cap,t);
1613             return rtsFalse;  /* not actually GC'ing */
1614         }
1615     }
1616     
1617     debugTrace(DEBUG_sched,
1618                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1619                (long)t->id, whatNext_strs[t->what_next]);
1620
1621 #if defined(GRAN)
1622     ASSERT(!is_on_queue(t,CurrentProc));
1623 #elif defined(PARALLEL_HASKELL)
1624     /* Currently we emit a DESCHEDULE event before GC in GUM.
1625        ToDo: either add separate event to distinguish SYSTEM time from rest
1626        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1627     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1628         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1629                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1630         emitSchedule = rtsTrue;
1631     }
1632 #endif
1633       
1634     pushOnRunQueue(cap,t);
1635     return rtsTrue;
1636     /* actual GC is done at the end of the while loop in schedule() */
1637 }
1638
1639 /* -----------------------------------------------------------------------------
1640  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1641  * -------------------------------------------------------------------------- */
1642
1643 static void
1644 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1645 {
1646     debugTrace (DEBUG_sched,
1647                 "--<< thread %ld (%s) stopped, StackOverflow\n", 
1648                 (long)t->id, whatNext_strs[t->what_next]);
1649
1650     /* just adjust the stack for this thread, then pop it back
1651      * on the run queue.
1652      */
1653     { 
1654         /* enlarge the stack */
1655         StgTSO *new_t = threadStackOverflow(cap, t);
1656         
1657         /* The TSO attached to this Task may have moved, so update the
1658          * pointer to it.
1659          */
1660         if (task->tso == t) {
1661             task->tso = new_t;
1662         }
1663         pushOnRunQueue(cap,new_t);
1664     }
1665 }
1666
1667 /* -----------------------------------------------------------------------------
1668  * Handle a thread that returned to the scheduler with ThreadYielding
1669  * -------------------------------------------------------------------------- */
1670
1671 static rtsBool
1672 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1673 {
1674     // Reset the context switch flag.  We don't do this just before
1675     // running the thread, because that would mean we would lose ticks
1676     // during GC, which can lead to unfair scheduling (a thread hogs
1677     // the CPU because the tick always arrives during GC).  This way
1678     // penalises threads that do a lot of allocation, but that seems
1679     // better than the alternative.
1680     context_switch = 0;
1681     
1682     /* put the thread back on the run queue.  Then, if we're ready to
1683      * GC, check whether this is the last task to stop.  If so, wake
1684      * up the GC thread.  getThread will block during a GC until the
1685      * GC is finished.
1686      */
1687 #ifdef DEBUG
1688     if (t->what_next != prev_what_next) {
1689         debugTrace(DEBUG_sched,
1690                    "--<< thread %ld (%s) stopped to switch evaluators\n", 
1691                    (long)t->id, whatNext_strs[t->what_next]);
1692     } else {
1693         debugTrace(DEBUG_sched,
1694                    "--<< thread %ld (%s) stopped, yielding\n",
1695                    (long)t->id, whatNext_strs[t->what_next]);
1696     }
1697 #endif
1698     
1699     IF_DEBUG(sanity,
1700              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1701              checkTSO(t));
1702     ASSERT(t->link == END_TSO_QUEUE);
1703     
1704     // Shortcut if we're just switching evaluators: don't bother
1705     // doing stack squeezing (which can be expensive), just run the
1706     // thread.
1707     if (t->what_next != prev_what_next) {
1708         return rtsTrue;
1709     }
1710     
1711 #if defined(GRAN)
1712     ASSERT(!is_on_queue(t,CurrentProc));
1713       
1714     IF_DEBUG(sanity,
1715              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1716              checkThreadQsSanity(rtsTrue));
1717
1718 #endif
1719
1720     addToRunQueue(cap,t);
1721
1722 #if defined(GRAN)
1723     /* add a ContinueThread event to actually process the thread */
1724     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1725               ContinueThread,
1726               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1727     IF_GRAN_DEBUG(bq, 
1728                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1729                   G_EVENTQ(0);
1730                   G_CURR_THREADQ(0));
1731 #endif
1732     return rtsFalse;
1733 }
1734
1735 /* -----------------------------------------------------------------------------
1736  * Handle a thread that returned to the scheduler with ThreadBlocked
1737  * -------------------------------------------------------------------------- */
1738
1739 static void
1740 scheduleHandleThreadBlocked( StgTSO *t
1741 #if !defined(GRAN) && !defined(DEBUG)
1742     STG_UNUSED
1743 #endif
1744     )
1745 {
1746 #if defined(GRAN)
1747     IF_DEBUG(scheduler,
1748              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1749                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1750              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1751     
1752     // ??? needed; should emit block before
1753     IF_DEBUG(gran, 
1754              DumpGranEvent(GR_DESCHEDULE, t)); 
1755     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1756     /*
1757       ngoq Dogh!
1758       ASSERT(procStatus[CurrentProc]==Busy || 
1759       ((procStatus[CurrentProc]==Fetching) && 
1760       (t->block_info.closure!=(StgClosure*)NULL)));
1761       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1762       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1763       procStatus[CurrentProc]==Fetching)) 
1764       procStatus[CurrentProc] = Idle;
1765     */
1766 #elif defined(PAR)
1767     IF_DEBUG(scheduler,
1768              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1769                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1770     IF_PAR_DEBUG(bq,
1771                  
1772                  if (t->block_info.closure!=(StgClosure*)NULL) 
1773                  print_bq(t->block_info.closure));
1774     
1775     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1776     blockThread(t);
1777     
1778     /* whatever we schedule next, we must log that schedule */
1779     emitSchedule = rtsTrue;
1780     
1781 #else /* !GRAN */
1782
1783       // We don't need to do anything.  The thread is blocked, and it
1784       // has tidied up its stack and placed itself on whatever queue
1785       // it needs to be on.
1786
1787 #if !defined(THREADED_RTS)
1788     ASSERT(t->why_blocked != NotBlocked);
1789              // This might not be true under THREADED_RTS: we don't have
1790              // exclusive access to this TSO, so someone might have
1791              // woken it up by now.  This actually happens: try
1792              // conc023 +RTS -N2.
1793 #endif
1794
1795 #ifdef DEBUG
1796     if (traceClass(DEBUG_sched)) {
1797         debugTraceBegin("--<< thread %d (%s) stopped: ", 
1798                    t->id, whatNext_strs[t->what_next]);
1799         printThreadBlockage(t);
1800         debugTraceEnd();
1801     }
1802 #endif
1803     
1804     /* Only for dumping event to log file 
1805        ToDo: do I need this in GranSim, too?
1806        blockThread(t);
1807     */
1808 #endif
1809 }
1810
1811 /* -----------------------------------------------------------------------------
1812  * Handle a thread that returned to the scheduler with ThreadFinished
1813  * -------------------------------------------------------------------------- */
1814
1815 static rtsBool
1816 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1817 {
1818     /* Need to check whether this was a main thread, and if so,
1819      * return with the return value.
1820      *
1821      * We also end up here if the thread kills itself with an
1822      * uncaught exception, see Exception.cmm.
1823      */
1824     debugTrace(DEBUG_sched, "--++ thread %d (%s) finished", 
1825                t->id, whatNext_strs[t->what_next]);
1826
1827 #if defined(GRAN)
1828       endThread(t, CurrentProc); // clean-up the thread
1829 #elif defined(PARALLEL_HASKELL)
1830       /* For now all are advisory -- HWL */
1831       //if(t->priority==AdvisoryPriority) ??
1832       advisory_thread_count--; // JB: Caution with this counter, buggy!
1833       
1834 # if defined(DIST)
1835       if(t->dist.priority==RevalPriority)
1836         FinishReval(t);
1837 # endif
1838     
1839 # if defined(EDENOLD)
1840       // the thread could still have an outport... (BUG)
1841       if (t->eden.outport != -1) {
1842       // delete the outport for the tso which has finished...
1843         IF_PAR_DEBUG(eden_ports,
1844                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1845                               t->eden.outport, t->id));
1846         deleteOPT(t);
1847       }
1848       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1849       if (t->eden.epid != -1) {
1850         IF_PAR_DEBUG(eden_ports,
1851                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1852                            t->id, t->eden.epid));
1853         removeTSOfromProcess(t);
1854       }
1855 # endif 
1856
1857 # if defined(PAR)
1858       if (RtsFlags.ParFlags.ParStats.Full &&
1859           !RtsFlags.ParFlags.ParStats.Suppressed) 
1860         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1861
1862       //  t->par only contains statistics: left out for now...
1863       IF_PAR_DEBUG(fish,
1864                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1865                               t->id,t,t->par.sparkname));
1866 # endif
1867 #endif // PARALLEL_HASKELL
1868
1869       //
1870       // Check whether the thread that just completed was a bound
1871       // thread, and if so return with the result.  
1872       //
1873       // There is an assumption here that all thread completion goes
1874       // through this point; we need to make sure that if a thread
1875       // ends up in the ThreadKilled state, that it stays on the run
1876       // queue so it can be dealt with here.
1877       //
1878
1879       if (t->bound) {
1880
1881           if (t->bound != task) {
1882 #if !defined(THREADED_RTS)
1883               // Must be a bound thread that is not the topmost one.  Leave
1884               // it on the run queue until the stack has unwound to the
1885               // point where we can deal with this.  Leaving it on the run
1886               // queue also ensures that the garbage collector knows about
1887               // this thread and its return value (it gets dropped from the
1888               // all_threads list so there's no other way to find it).
1889               appendToRunQueue(cap,t);
1890               return rtsFalse;
1891 #else
1892               // this cannot happen in the threaded RTS, because a
1893               // bound thread can only be run by the appropriate Task.
1894               barf("finished bound thread that isn't mine");
1895 #endif
1896           }
1897
1898           ASSERT(task->tso == t);
1899
1900           if (t->what_next == ThreadComplete) {
1901               if (task->ret) {
1902                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1903                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1904               }
1905               task->stat = Success;
1906           } else {
1907               if (task->ret) {
1908                   *(task->ret) = NULL;
1909               }
1910               if (sched_state >= SCHED_INTERRUPTING) {
1911                   task->stat = Interrupted;
1912               } else {
1913                   task->stat = Killed;
1914               }
1915           }
1916 #ifdef DEBUG
1917           removeThreadLabel((StgWord)task->tso->id);
1918 #endif
1919           return rtsTrue; // tells schedule() to return
1920       }
1921
1922       return rtsFalse;
1923 }
1924
1925 /* -----------------------------------------------------------------------------
1926  * Perform a heap census, if PROFILING
1927  * -------------------------------------------------------------------------- */
1928
1929 static rtsBool
1930 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1931 {
1932 #if defined(PROFILING)
1933     // When we have +RTS -i0 and we're heap profiling, do a census at
1934     // every GC.  This lets us get repeatable runs for debugging.
1935     if (performHeapProfile ||
1936         (RtsFlags.ProfFlags.profileInterval==0 &&
1937          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1938
1939         // checking black holes is necessary before GC, otherwise
1940         // there may be threads that are unreachable except by the
1941         // blackhole queue, which the GC will consider to be
1942         // deadlocked.
1943         scheduleCheckBlackHoles(&MainCapability);
1944
1945         debugTrace(DEBUG_sched, "garbage collecting before heap census");
1946         GarbageCollect(GetRoots, rtsTrue);
1947
1948         debugTrace(DEBUG_sched, "performing heap census");
1949         heapCensus();
1950
1951         performHeapProfile = rtsFalse;
1952         return rtsTrue;  // true <=> we already GC'd
1953     }
1954 #endif
1955     return rtsFalse;
1956 }
1957
1958 /* -----------------------------------------------------------------------------
1959  * Perform a garbage collection if necessary
1960  * -------------------------------------------------------------------------- */
1961
1962 static Capability *
1963 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1964               rtsBool force_major, void (*get_roots)(evac_fn))
1965 {
1966     StgTSO *t;
1967 #ifdef THREADED_RTS
1968     static volatile StgWord waiting_for_gc;
1969     rtsBool was_waiting;
1970     nat i;
1971 #endif
1972
1973 #ifdef THREADED_RTS
1974     // In order to GC, there must be no threads running Haskell code.
1975     // Therefore, the GC thread needs to hold *all* the capabilities,
1976     // and release them after the GC has completed.  
1977     //
1978     // This seems to be the simplest way: previous attempts involved
1979     // making all the threads with capabilities give up their
1980     // capabilities and sleep except for the *last* one, which
1981     // actually did the GC.  But it's quite hard to arrange for all
1982     // the other tasks to sleep and stay asleep.
1983     //
1984         
1985     was_waiting = cas(&waiting_for_gc, 0, 1);
1986     if (was_waiting) {
1987         do {
1988             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1989             if (cap) yieldCapability(&cap,task);
1990         } while (waiting_for_gc);
1991         return cap;  // NOTE: task->cap might have changed here
1992     }
1993
1994     for (i=0; i < n_capabilities; i++) {
1995         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1996         if (cap != &capabilities[i]) {
1997             Capability *pcap = &capabilities[i];
1998             // we better hope this task doesn't get migrated to
1999             // another Capability while we're waiting for this one.
2000             // It won't, because load balancing happens while we have
2001             // all the Capabilities, but even so it's a slightly
2002             // unsavoury invariant.
2003             task->cap = pcap;
2004             context_switch = 1;
2005             waitForReturnCapability(&pcap, task);
2006             if (pcap != &capabilities[i]) {
2007                 barf("scheduleDoGC: got the wrong capability");
2008             }
2009         }
2010     }
2011
2012     waiting_for_gc = rtsFalse;
2013 #endif
2014
2015     /* Kick any transactions which are invalid back to their
2016      * atomically frames.  When next scheduled they will try to
2017      * commit, this commit will fail and they will retry.
2018      */
2019     { 
2020         StgTSO *next;
2021
2022         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2023             if (t->what_next == ThreadRelocated) {
2024                 next = t->link;
2025             } else {
2026                 next = t->global_link;
2027                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2028                     if (!stmValidateNestOfTransactions (t -> trec)) {
2029                         debugTrace(DEBUG_sched | DEBUG_stm,
2030                                    "trec %p found wasting its time", t);
2031                         
2032                         // strip the stack back to the
2033                         // ATOMICALLY_FRAME, aborting the (nested)
2034                         // transaction, and saving the stack of any
2035                         // partially-evaluated thunks on the heap.
2036                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
2037                         
2038 #ifdef REG_R1
2039                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2040 #endif
2041                     }
2042                 }
2043             }
2044         }
2045     }
2046     
2047     // so this happens periodically:
2048     if (cap) scheduleCheckBlackHoles(cap);
2049     
2050     IF_DEBUG(scheduler, printAllThreads());
2051
2052     /*
2053      * We now have all the capabilities; if we're in an interrupting
2054      * state, then we should take the opportunity to delete all the
2055      * threads in the system.
2056      */
2057     if (sched_state >= SCHED_INTERRUPTING) {
2058         deleteAllThreads(&capabilities[0]);
2059         sched_state = SCHED_SHUTTING_DOWN;
2060     }
2061
2062     /* everybody back, start the GC.
2063      * Could do it in this thread, or signal a condition var
2064      * to do it in another thread.  Either way, we need to
2065      * broadcast on gc_pending_cond afterward.
2066      */
2067 #if defined(THREADED_RTS)
2068     debugTrace(DEBUG_sched, "doing GC");
2069 #endif
2070     GarbageCollect(get_roots, force_major);
2071     
2072 #if defined(THREADED_RTS)
2073     // release our stash of capabilities.
2074     for (i = 0; i < n_capabilities; i++) {
2075         if (cap != &capabilities[i]) {
2076             task->cap = &capabilities[i];
2077             releaseCapability(&capabilities[i]);
2078         }
2079     }
2080     if (cap) {
2081         task->cap = cap;
2082     } else {
2083         task->cap = NULL;
2084     }
2085 #endif
2086
2087 #if defined(GRAN)
2088     /* add a ContinueThread event to continue execution of current thread */
2089     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2090               ContinueThread,
2091               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2092     IF_GRAN_DEBUG(bq, 
2093                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2094                   G_EVENTQ(0);
2095                   G_CURR_THREADQ(0));
2096 #endif /* GRAN */
2097
2098     return cap;
2099 }
2100
2101 /* ---------------------------------------------------------------------------
2102  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2103  * used by Control.Concurrent for error checking.
2104  * ------------------------------------------------------------------------- */
2105  
2106 StgBool
2107 rtsSupportsBoundThreads(void)
2108 {
2109 #if defined(THREADED_RTS)
2110   return rtsTrue;
2111 #else
2112   return rtsFalse;
2113 #endif
2114 }
2115
2116 /* ---------------------------------------------------------------------------
2117  * isThreadBound(tso): check whether tso is bound to an OS thread.
2118  * ------------------------------------------------------------------------- */
2119  
2120 StgBool
2121 isThreadBound(StgTSO* tso USED_IF_THREADS)
2122 {
2123 #if defined(THREADED_RTS)
2124   return (tso->bound != NULL);
2125 #endif
2126   return rtsFalse;
2127 }
2128
2129 /* ---------------------------------------------------------------------------
2130  * Singleton fork(). Do not copy any running threads.
2131  * ------------------------------------------------------------------------- */
2132
2133 #if !defined(mingw32_HOST_OS)
2134 #define FORKPROCESS_PRIMOP_SUPPORTED
2135 #endif
2136
2137 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2138 static void 
2139 deleteThread_(Capability *cap, StgTSO *tso);
2140 #endif
2141 StgInt
2142 forkProcess(HsStablePtr *entry
2143 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2144             STG_UNUSED
2145 #endif
2146            )
2147 {
2148 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2149     Task *task;
2150     pid_t pid;
2151     StgTSO* t,*next;
2152     Capability *cap;
2153     
2154 #if defined(THREADED_RTS)
2155     if (RtsFlags.ParFlags.nNodes > 1) {
2156         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2157         stg_exit(EXIT_FAILURE);
2158     }
2159 #endif
2160
2161     debugTrace(DEBUG_sched, "forking!");
2162     
2163     // ToDo: for SMP, we should probably acquire *all* the capabilities
2164     cap = rts_lock();
2165     
2166     pid = fork();
2167     
2168     if (pid) { // parent
2169         
2170         // just return the pid
2171         rts_unlock(cap);
2172         return pid;
2173         
2174     } else { // child
2175         
2176         // Now, all OS threads except the thread that forked are
2177         // stopped.  We need to stop all Haskell threads, including
2178         // those involved in foreign calls.  Also we need to delete
2179         // all Tasks, because they correspond to OS threads that are
2180         // now gone.
2181
2182         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2183             if (t->what_next == ThreadRelocated) {
2184                 next = t->link;
2185             } else {
2186                 next = t->global_link;
2187                 // don't allow threads to catch the ThreadKilled
2188                 // exception, but we do want to raiseAsync() because these
2189                 // threads may be evaluating thunks that we need later.
2190                 deleteThread_(cap,t);
2191             }
2192         }
2193         
2194         // Empty the run queue.  It seems tempting to let all the
2195         // killed threads stay on the run queue as zombies to be
2196         // cleaned up later, but some of them correspond to bound
2197         // threads for which the corresponding Task does not exist.
2198         cap->run_queue_hd = END_TSO_QUEUE;
2199         cap->run_queue_tl = END_TSO_QUEUE;
2200
2201         // Any suspended C-calling Tasks are no more, their OS threads
2202         // don't exist now:
2203         cap->suspended_ccalling_tasks = NULL;
2204
2205         // Empty the all_threads list.  Otherwise, the garbage
2206         // collector may attempt to resurrect some of these threads.
2207         all_threads = END_TSO_QUEUE;
2208
2209         // Wipe the task list, except the current Task.
2210         ACQUIRE_LOCK(&sched_mutex);
2211         for (task = all_tasks; task != NULL; task=task->all_link) {
2212             if (task != cap->running_task) {
2213                 discardTask(task);
2214             }
2215         }
2216         RELEASE_LOCK(&sched_mutex);
2217
2218 #if defined(THREADED_RTS)
2219         // Wipe our spare workers list, they no longer exist.  New
2220         // workers will be created if necessary.
2221         cap->spare_workers = NULL;
2222         cap->returning_tasks_hd = NULL;
2223         cap->returning_tasks_tl = NULL;
2224 #endif
2225
2226         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2227         rts_checkSchedStatus("forkProcess",cap);
2228         
2229         rts_unlock(cap);
2230         hs_exit();                      // clean up and exit
2231         stg_exit(EXIT_SUCCESS);
2232     }
2233 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2234     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2235     return -1;
2236 #endif
2237 }
2238
2239 /* ---------------------------------------------------------------------------
2240  * Delete all the threads in the system
2241  * ------------------------------------------------------------------------- */
2242    
2243 static void
2244 deleteAllThreads ( Capability *cap )
2245 {
2246   StgTSO* t, *next;
2247   debugTrace(DEBUG_sched,"deleting all threads");
2248   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2249       if (t->what_next == ThreadRelocated) {
2250           next = t->link;
2251       } else {
2252           next = t->global_link;
2253           deleteThread(cap,t);
2254       }
2255   }      
2256
2257   // The run queue now contains a bunch of ThreadKilled threads.  We
2258   // must not throw these away: the main thread(s) will be in there
2259   // somewhere, and the main scheduler loop has to deal with it.
2260   // Also, the run queue is the only thing keeping these threads from
2261   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2262
2263 #if !defined(THREADED_RTS)
2264   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2265   ASSERT(sleeping_queue == END_TSO_QUEUE);
2266 #endif
2267 }
2268
2269 /* -----------------------------------------------------------------------------
2270    Managing the suspended_ccalling_tasks list.
2271    Locks required: sched_mutex
2272    -------------------------------------------------------------------------- */
2273
2274 STATIC_INLINE void
2275 suspendTask (Capability *cap, Task *task)
2276 {
2277     ASSERT(task->next == NULL && task->prev == NULL);
2278     task->next = cap->suspended_ccalling_tasks;
2279     task->prev = NULL;
2280     if (cap->suspended_ccalling_tasks) {
2281         cap->suspended_ccalling_tasks->prev = task;
2282     }
2283     cap->suspended_ccalling_tasks = task;
2284 }
2285
2286 STATIC_INLINE void
2287 recoverSuspendedTask (Capability *cap, Task *task)
2288 {
2289     if (task->prev) {
2290         task->prev->next = task->next;
2291     } else {
2292         ASSERT(cap->suspended_ccalling_tasks == task);
2293         cap->suspended_ccalling_tasks = task->next;
2294     }
2295     if (task->next) {
2296         task->next->prev = task->prev;
2297     }
2298     task->next = task->prev = NULL;
2299 }
2300
2301 /* ---------------------------------------------------------------------------
2302  * Suspending & resuming Haskell threads.
2303  * 
2304  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2305  * its capability before calling the C function.  This allows another
2306  * task to pick up the capability and carry on running Haskell
2307  * threads.  It also means that if the C call blocks, it won't lock
2308  * the whole system.
2309  *
2310  * The Haskell thread making the C call is put to sleep for the
2311  * duration of the call, on the susepended_ccalling_threads queue.  We
2312  * give out a token to the task, which it can use to resume the thread
2313  * on return from the C function.
2314  * ------------------------------------------------------------------------- */
2315    
2316 void *
2317 suspendThread (StgRegTable *reg)
2318 {
2319   Capability *cap;
2320   int saved_errno = errno;
2321   StgTSO *tso;
2322   Task *task;
2323
2324   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2325    */
2326   cap = regTableToCapability(reg);
2327
2328   task = cap->running_task;
2329   tso = cap->r.rCurrentTSO;
2330
2331   debugTrace(DEBUG_sched, 
2332              "thread %d did a safe foreign call", 
2333              cap->r.rCurrentTSO->id);
2334
2335   // XXX this might not be necessary --SDM
2336   tso->what_next = ThreadRunGHC;
2337
2338   threadPaused(cap,tso);
2339
2340   if(tso->blocked_exceptions == NULL)  {
2341       tso->why_blocked = BlockedOnCCall;
2342       tso->blocked_exceptions = END_TSO_QUEUE;
2343   } else {
2344       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2345   }
2346
2347   // Hand back capability
2348   task->suspended_tso = tso;
2349
2350   ACQUIRE_LOCK(&cap->lock);
2351
2352   suspendTask(cap,task);
2353   cap->in_haskell = rtsFalse;
2354   releaseCapability_(cap);
2355   
2356   RELEASE_LOCK(&cap->lock);
2357
2358 #if defined(THREADED_RTS)
2359   /* Preparing to leave the RTS, so ensure there's a native thread/task
2360      waiting to take over.
2361   */
2362   debugTrace(DEBUG_sched, "thread %d: leaving RTS", tso->id);
2363 #endif
2364
2365   errno = saved_errno;
2366   return task;
2367 }
2368
2369 StgRegTable *
2370 resumeThread (void *task_)
2371 {
2372     StgTSO *tso;
2373     Capability *cap;
2374     int saved_errno = errno;
2375     Task *task = task_;
2376
2377     cap = task->cap;
2378     // Wait for permission to re-enter the RTS with the result.
2379     waitForReturnCapability(&cap,task);
2380     // we might be on a different capability now... but if so, our
2381     // entry on the suspended_ccalling_tasks list will also have been
2382     // migrated.
2383
2384     // Remove the thread from the suspended list
2385     recoverSuspendedTask(cap,task);
2386
2387     tso = task->suspended_tso;
2388     task->suspended_tso = NULL;
2389     tso->link = END_TSO_QUEUE;
2390     debugTrace(DEBUG_sched, "thread %d: re-entering RTS", tso->id);
2391     
2392     if (tso->why_blocked == BlockedOnCCall) {
2393         awakenBlockedQueue(cap,tso->blocked_exceptions);
2394         tso->blocked_exceptions = NULL;
2395     }
2396     
2397     /* Reset blocking status */
2398     tso->why_blocked  = NotBlocked;
2399     
2400     cap->r.rCurrentTSO = tso;
2401     cap->in_haskell = rtsTrue;
2402     errno = saved_errno;
2403
2404     /* We might have GC'd, mark the TSO dirty again */
2405     dirtyTSO(tso);
2406
2407     IF_DEBUG(sanity, checkTSO(tso));
2408
2409     return &cap->r;
2410 }
2411
2412 /* ---------------------------------------------------------------------------
2413  * Comparing Thread ids.
2414  *
2415  * This is used from STG land in the implementation of the
2416  * instances of Eq/Ord for ThreadIds.
2417  * ------------------------------------------------------------------------ */
2418
2419 int
2420 cmp_thread(StgPtr tso1, StgPtr tso2) 
2421
2422   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2423   StgThreadID id2 = ((StgTSO *)tso2)->id;
2424  
2425   if (id1 < id2) return (-1);
2426   if (id1 > id2) return 1;
2427   return 0;
2428 }
2429
2430 /* ---------------------------------------------------------------------------
2431  * Fetching the ThreadID from an StgTSO.
2432  *
2433  * This is used in the implementation of Show for ThreadIds.
2434  * ------------------------------------------------------------------------ */
2435 int
2436 rts_getThreadId(StgPtr tso) 
2437 {
2438   return ((StgTSO *)tso)->id;
2439 }
2440
2441 #ifdef DEBUG
2442 void
2443 labelThread(StgPtr tso, char *label)
2444 {
2445   int len;
2446   void *buf;
2447
2448   /* Caveat: Once set, you can only set the thread name to "" */
2449   len = strlen(label)+1;
2450   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2451   strncpy(buf,label,len);
2452   /* Update will free the old memory for us */
2453   updateThreadLabel(((StgTSO *)tso)->id,buf);
2454 }
2455 #endif /* DEBUG */
2456
2457 /* ---------------------------------------------------------------------------
2458    Create a new thread.
2459
2460    The new thread starts with the given stack size.  Before the
2461    scheduler can run, however, this thread needs to have a closure
2462    (and possibly some arguments) pushed on its stack.  See
2463    pushClosure() in Schedule.h.
2464
2465    createGenThread() and createIOThread() (in SchedAPI.h) are
2466    convenient packaged versions of this function.
2467
2468    currently pri (priority) is only used in a GRAN setup -- HWL
2469    ------------------------------------------------------------------------ */
2470 #if defined(GRAN)
2471 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2472 StgTSO *
2473 createThread(nat size, StgInt pri)
2474 #else
2475 StgTSO *
2476 createThread(Capability *cap, nat size)
2477 #endif
2478 {
2479     StgTSO *tso;
2480     nat stack_size;
2481
2482     /* sched_mutex is *not* required */
2483
2484     /* First check whether we should create a thread at all */
2485 #if defined(PARALLEL_HASKELL)
2486     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2487     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2488         threadsIgnored++;
2489         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2490                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2491         return END_TSO_QUEUE;
2492     }
2493     threadsCreated++;
2494 #endif
2495
2496 #if defined(GRAN)
2497     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2498 #endif
2499
2500     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2501
2502     /* catch ridiculously small stack sizes */
2503     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2504         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2505     }
2506
2507     stack_size = size - TSO_STRUCT_SIZEW;
2508     
2509     tso = (StgTSO *)allocateLocal(cap, size);
2510     TICK_ALLOC_TSO(stack_size, 0);
2511
2512     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2513 #if defined(GRAN)
2514     SET_GRAN_HDR(tso, ThisPE);
2515 #endif
2516
2517     // Always start with the compiled code evaluator
2518     tso->what_next = ThreadRunGHC;
2519
2520     tso->why_blocked  = NotBlocked;
2521     tso->blocked_exceptions = NULL;
2522     tso->flags = TSO_DIRTY;
2523     
2524     tso->saved_errno = 0;
2525     tso->bound = NULL;
2526     tso->cap = cap;
2527     
2528     tso->stack_size     = stack_size;
2529     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2530                           - TSO_STRUCT_SIZEW;
2531     tso->sp             = (P_)&(tso->stack) + stack_size;
2532
2533     tso->trec = NO_TREC;
2534     
2535 #ifdef PROFILING
2536     tso->prof.CCCS = CCS_MAIN;
2537 #endif
2538     
2539   /* put a stop frame on the stack */
2540     tso->sp -= sizeofW(StgStopFrame);
2541     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2542     tso->link = END_TSO_QUEUE;
2543     
2544   // ToDo: check this
2545 #if defined(GRAN)
2546     /* uses more flexible routine in GranSim */
2547     insertThread(tso, CurrentProc);
2548 #else
2549     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2550      * from its creation
2551      */
2552 #endif
2553     
2554 #if defined(GRAN) 
2555     if (RtsFlags.GranFlags.GranSimStats.Full) 
2556         DumpGranEvent(GR_START,tso);
2557 #elif defined(PARALLEL_HASKELL)
2558     if (RtsFlags.ParFlags.ParStats.Full) 
2559         DumpGranEvent(GR_STARTQ,tso);
2560     /* HACk to avoid SCHEDULE 
2561        LastTSO = tso; */
2562 #endif
2563     
2564     /* Link the new thread on the global thread list.
2565      */
2566     ACQUIRE_LOCK(&sched_mutex);
2567     tso->id = next_thread_id++;  // while we have the mutex
2568     tso->global_link = all_threads;
2569     all_threads = tso;
2570     RELEASE_LOCK(&sched_mutex);
2571     
2572 #if defined(DIST)
2573     tso->dist.priority = MandatoryPriority; //by default that is...
2574 #endif
2575     
2576 #if defined(GRAN)
2577     tso->gran.pri = pri;
2578 # if defined(DEBUG)
2579     tso->gran.magic = TSO_MAGIC; // debugging only
2580 # endif
2581     tso->gran.sparkname   = 0;
2582     tso->gran.startedat   = CURRENT_TIME; 
2583     tso->gran.exported    = 0;
2584     tso->gran.basicblocks = 0;
2585     tso->gran.allocs      = 0;
2586     tso->gran.exectime    = 0;
2587     tso->gran.fetchtime   = 0;
2588     tso->gran.fetchcount  = 0;
2589     tso->gran.blocktime   = 0;
2590     tso->gran.blockcount  = 0;
2591     tso->gran.blockedat   = 0;
2592     tso->gran.globalsparks = 0;
2593     tso->gran.localsparks  = 0;
2594     if (RtsFlags.GranFlags.Light)
2595         tso->gran.clock  = Now; /* local clock */
2596     else
2597         tso->gran.clock  = 0;
2598     
2599     IF_DEBUG(gran,printTSO(tso));
2600 #elif defined(PARALLEL_HASKELL)
2601 # if defined(DEBUG)
2602     tso->par.magic = TSO_MAGIC; // debugging only
2603 # endif
2604     tso->par.sparkname   = 0;
2605     tso->par.startedat   = CURRENT_TIME; 
2606     tso->par.exported    = 0;
2607     tso->par.basicblocks = 0;
2608     tso->par.allocs      = 0;
2609     tso->par.exectime    = 0;
2610     tso->par.fetchtime   = 0;
2611     tso->par.fetchcount  = 0;
2612     tso->par.blocktime   = 0;
2613     tso->par.blockcount  = 0;
2614     tso->par.blockedat   = 0;
2615     tso->par.globalsparks = 0;
2616     tso->par.localsparks  = 0;
2617 #endif
2618     
2619 #if defined(GRAN)
2620     globalGranStats.tot_threads_created++;
2621     globalGranStats.threads_created_on_PE[CurrentProc]++;
2622     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2623     globalGranStats.tot_sq_probes++;
2624 #elif defined(PARALLEL_HASKELL)
2625     // collect parallel global statistics (currently done together with GC stats)
2626     if (RtsFlags.ParFlags.ParStats.Global &&
2627         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2628         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2629         globalParStats.tot_threads_created++;
2630     }
2631 #endif 
2632     
2633 #if defined(GRAN)
2634     debugTrace(GRAN_DEBUG_pri,
2635                "==__ schedule: Created TSO %d (%p);",
2636                CurrentProc, tso, tso->id);
2637 #elif defined(PARALLEL_HASKELL)
2638     debugTrace(PAR_DEBUG_verbose,
2639                "==__ schedule: Created TSO %d (%p); %d threads active",
2640                (long)tso->id, tso, advisory_thread_count);
2641 #else
2642     debugTrace(DEBUG_sched,
2643                "created thread %ld, stack size = %lx words", 
2644                (long)tso->id, (long)tso->stack_size);
2645 #endif    
2646     return tso;
2647 }
2648
2649 #if defined(PAR)
2650 /* RFP:
2651    all parallel thread creation calls should fall through the following routine.
2652 */
2653 StgTSO *
2654 createThreadFromSpark(rtsSpark spark) 
2655 { StgTSO *tso;
2656   ASSERT(spark != (rtsSpark)NULL);
2657 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2658   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2659   { threadsIgnored++;
2660     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2661           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2662     return END_TSO_QUEUE;
2663   }
2664   else
2665   { threadsCreated++;
2666     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2667     if (tso==END_TSO_QUEUE)     
2668       barf("createSparkThread: Cannot create TSO");
2669 #if defined(DIST)
2670     tso->priority = AdvisoryPriority;
2671 #endif
2672     pushClosure(tso,spark);
2673     addToRunQueue(tso);
2674     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2675   }
2676   return tso;
2677 }
2678 #endif
2679
2680 /*
2681   Turn a spark into a thread.
2682   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2683 */
2684 #if 0
2685 StgTSO *
2686 activateSpark (rtsSpark spark) 
2687 {
2688   StgTSO *tso;
2689
2690   tso = createSparkThread(spark);
2691   if (RtsFlags.ParFlags.ParStats.Full) {   
2692     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2693       IF_PAR_DEBUG(verbose,
2694                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2695                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2696   }
2697   // ToDo: fwd info on local/global spark to thread -- HWL
2698   // tso->gran.exported =  spark->exported;
2699   // tso->gran.locked =   !spark->global;
2700   // tso->gran.sparkname = spark->name;
2701
2702   return tso;
2703 }
2704 #endif
2705
2706 /* ---------------------------------------------------------------------------
2707  * scheduleThread()
2708  *
2709  * scheduleThread puts a thread on the end  of the runnable queue.
2710  * This will usually be done immediately after a thread is created.
2711  * The caller of scheduleThread must create the thread using e.g.
2712  * createThread and push an appropriate closure
2713  * on this thread's stack before the scheduler is invoked.
2714  * ------------------------------------------------------------------------ */
2715
2716 void
2717 scheduleThread(Capability *cap, StgTSO *tso)
2718 {
2719     // The thread goes at the *end* of the run-queue, to avoid possible
2720     // starvation of any threads already on the queue.
2721     appendToRunQueue(cap,tso);
2722 }
2723
2724 void
2725 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2726 {
2727 #if defined(THREADED_RTS)
2728     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2729                               // move this thread from now on.
2730     cpu %= RtsFlags.ParFlags.nNodes;
2731     if (cpu == cap->no) {
2732         appendToRunQueue(cap,tso);
2733     } else {
2734         Capability *target_cap = &capabilities[cpu];
2735         if (tso->bound) {
2736             tso->bound->cap = target_cap;
2737         }
2738         tso->cap = target_cap;
2739         wakeupThreadOnCapability(target_cap,tso);
2740     }
2741 #else
2742     appendToRunQueue(cap,tso);
2743 #endif
2744 }
2745
2746 Capability *
2747 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2748 {
2749     Task *task;
2750
2751     // We already created/initialised the Task
2752     task = cap->running_task;
2753
2754     // This TSO is now a bound thread; make the Task and TSO
2755     // point to each other.
2756     tso->bound = task;
2757     tso->cap = cap;
2758
2759     task->tso = tso;
2760     task->ret = ret;
2761     task->stat = NoStatus;
2762
2763     appendToRunQueue(cap,tso);
2764
2765     debugTrace(DEBUG_sched, "new bound thread (%d)", tso->id);
2766
2767 #if defined(GRAN)
2768     /* GranSim specific init */
2769     CurrentTSO = m->tso;                // the TSO to run
2770     procStatus[MainProc] = Busy;        // status of main PE
2771     CurrentProc = MainProc;             // PE to run it on
2772 #endif
2773
2774     cap = schedule(cap,task);
2775
2776     ASSERT(task->stat != NoStatus);
2777     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2778
2779     debugTrace(DEBUG_sched, "bound thread (%d) finished", task->tso->id);
2780     return cap;
2781 }
2782
2783 /* ----------------------------------------------------------------------------
2784  * Starting Tasks
2785  * ------------------------------------------------------------------------- */
2786
2787 #if defined(THREADED_RTS)
2788 void
2789 workerStart(Task *task)
2790 {
2791     Capability *cap;
2792
2793     // See startWorkerTask().
2794     ACQUIRE_LOCK(&task->lock);
2795     cap = task->cap;
2796     RELEASE_LOCK(&task->lock);
2797
2798     // set the thread-local pointer to the Task:
2799     taskEnter(task);
2800
2801     // schedule() runs without a lock.
2802     cap = schedule(cap,task);
2803
2804     // On exit from schedule(), we have a Capability.
2805     releaseCapability(cap);
2806     workerTaskStop(task);
2807 }
2808 #endif
2809
2810 /* ---------------------------------------------------------------------------
2811  * initScheduler()
2812  *
2813  * Initialise the scheduler.  This resets all the queues - if the
2814  * queues contained any threads, they'll be garbage collected at the
2815  * next pass.
2816  *
2817  * ------------------------------------------------------------------------ */
2818
2819 void 
2820 initScheduler(void)
2821 {
2822 #if defined(GRAN)
2823   nat i;
2824   for (i=0; i<=MAX_PROC; i++) {
2825     run_queue_hds[i]      = END_TSO_QUEUE;
2826     run_queue_tls[i]      = END_TSO_QUEUE;
2827     blocked_queue_hds[i]  = END_TSO_QUEUE;
2828     blocked_queue_tls[i]  = END_TSO_QUEUE;
2829     ccalling_threadss[i]  = END_TSO_QUEUE;
2830     blackhole_queue[i]    = END_TSO_QUEUE;
2831     sleeping_queue        = END_TSO_QUEUE;
2832   }
2833 #elif !defined(THREADED_RTS)
2834   blocked_queue_hd  = END_TSO_QUEUE;
2835   blocked_queue_tl  = END_TSO_QUEUE;
2836   sleeping_queue    = END_TSO_QUEUE;
2837 #endif
2838
2839   blackhole_queue   = END_TSO_QUEUE;
2840   all_threads       = END_TSO_QUEUE;
2841
2842   context_switch = 0;
2843   sched_state    = SCHED_RUNNING;
2844
2845   RtsFlags.ConcFlags.ctxtSwitchTicks =
2846       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2847       
2848 #if defined(THREADED_RTS)
2849   /* Initialise the mutex and condition variables used by
2850    * the scheduler. */
2851   initMutex(&sched_mutex);
2852 #endif
2853   
2854   ACQUIRE_LOCK(&sched_mutex);
2855
2856   /* A capability holds the state a native thread needs in
2857    * order to execute STG code. At least one capability is
2858    * floating around (only THREADED_RTS builds have more than one).
2859    */
2860   initCapabilities();
2861
2862   initTaskManager();
2863
2864 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2865   initSparkPools();
2866 #endif
2867
2868 #if defined(THREADED_RTS)
2869   /*
2870    * Eagerly start one worker to run each Capability, except for
2871    * Capability 0.  The idea is that we're probably going to start a
2872    * bound thread on Capability 0 pretty soon, so we don't want a
2873    * worker task hogging it.
2874    */
2875   { 
2876       nat i;
2877       Capability *cap;
2878       for (i = 1; i < n_capabilities; i++) {
2879           cap = &capabilities[i];
2880           ACQUIRE_LOCK(&cap->lock);
2881           startWorkerTask(cap, workerStart);
2882           RELEASE_LOCK(&cap->lock);
2883       }
2884   }
2885 #endif
2886
2887   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2888
2889   RELEASE_LOCK(&sched_mutex);
2890 }
2891
2892 void
2893 exitScheduler( void )
2894 {
2895     Task *task = NULL;
2896
2897 #if defined(THREADED_RTS)
2898     ACQUIRE_LOCK(&sched_mutex);
2899     task = newBoundTask();
2900     RELEASE_LOCK(&sched_mutex);
2901 #endif
2902
2903     // If we haven't killed all the threads yet, do it now.
2904     if (sched_state < SCHED_SHUTTING_DOWN) {
2905         sched_state = SCHED_INTERRUPTING;
2906         scheduleDoGC(NULL,task,rtsFalse,GetRoots);    
2907     }
2908     sched_state = SCHED_SHUTTING_DOWN;
2909
2910 #if defined(THREADED_RTS)
2911     { 
2912         nat i;
2913         
2914         for (i = 0; i < n_capabilities; i++) {
2915             shutdownCapability(&capabilities[i], task);
2916         }
2917         boundTaskExiting(task);
2918         stopTaskManager();
2919     }
2920 #endif
2921 }
2922
2923 /* ---------------------------------------------------------------------------
2924    Where are the roots that we know about?
2925
2926         - all the threads on the runnable queue
2927         - all the threads on the blocked queue
2928         - all the threads on the sleeping queue
2929         - all the thread currently executing a _ccall_GC
2930         - all the "main threads"
2931      
2932    ------------------------------------------------------------------------ */
2933
2934 /* This has to be protected either by the scheduler monitor, or by the
2935         garbage collection monitor (probably the latter).
2936         KH @ 25/10/99
2937 */
2938
2939 void
2940 GetRoots( evac_fn evac )
2941 {
2942     nat i;
2943     Capability *cap;
2944     Task *task;
2945
2946 #if defined(GRAN)
2947     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2948         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2949             evac((StgClosure **)&run_queue_hds[i]);
2950         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2951             evac((StgClosure **)&run_queue_tls[i]);
2952         
2953         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2954             evac((StgClosure **)&blocked_queue_hds[i]);
2955         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2956             evac((StgClosure **)&blocked_queue_tls[i]);
2957         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2958             evac((StgClosure **)&ccalling_threads[i]);
2959     }
2960
2961     markEventQueue();
2962
2963 #else /* !GRAN */
2964
2965     for (i = 0; i < n_capabilities; i++) {
2966         cap = &capabilities[i];
2967         evac((StgClosure **)(void *)&cap->run_queue_hd);
2968         evac((StgClosure **)(void *)&cap->run_queue_tl);
2969 #if defined(THREADED_RTS)
2970         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2971         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2972 #endif
2973         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2974              task=task->next) {
2975             debugTrace(DEBUG_sched,
2976                        "evac'ing suspended TSO %d", task->suspended_tso->id);
2977             evac((StgClosure **)(void *)&task->suspended_tso);
2978         }
2979
2980     }
2981     
2982
2983 #if !defined(THREADED_RTS)
2984     evac((StgClosure **)(void *)&blocked_queue_hd);
2985     evac((StgClosure **)(void *)&blocked_queue_tl);
2986     evac((StgClosure **)(void *)&sleeping_queue);
2987 #endif 
2988 #endif
2989
2990     // evac((StgClosure **)&blackhole_queue);
2991
2992 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2993     markSparkQueue(evac);
2994 #endif
2995     
2996 #if defined(RTS_USER_SIGNALS)
2997     // mark the signal handlers (signals should be already blocked)
2998     markSignalHandlers(evac);
2999 #endif
3000 }
3001
3002 /* -----------------------------------------------------------------------------
3003    performGC
3004
3005    This is the interface to the garbage collector from Haskell land.
3006    We provide this so that external C code can allocate and garbage
3007    collect when called from Haskell via _ccall_GC.
3008
3009    It might be useful to provide an interface whereby the programmer
3010    can specify more roots (ToDo).
3011    
3012    This needs to be protected by the GC condition variable above.  KH.
3013    -------------------------------------------------------------------------- */
3014
3015 static void (*extra_roots)(evac_fn);
3016
3017 static void
3018 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
3019 {
3020     Task *task;
3021     // We must grab a new Task here, because the existing Task may be
3022     // associated with a particular Capability, and chained onto the 
3023     // suspended_ccalling_tasks queue.
3024     ACQUIRE_LOCK(&sched_mutex);
3025     task = newBoundTask();
3026     RELEASE_LOCK(&sched_mutex);
3027     scheduleDoGC(NULL,task,force_major, get_roots);
3028     boundTaskExiting(task);
3029 }
3030
3031 void
3032 performGC(void)
3033 {
3034     performGC_(rtsFalse, GetRoots);
3035 }
3036
3037 void
3038 performMajorGC(void)
3039 {
3040     performGC_(rtsTrue, GetRoots);
3041 }
3042
3043 static void
3044 AllRoots(evac_fn evac)
3045 {
3046     GetRoots(evac);             // the scheduler's roots
3047     extra_roots(evac);          // the user's roots
3048 }
3049
3050 void
3051 performGCWithRoots(void (*get_roots)(evac_fn))
3052 {
3053     extra_roots = get_roots;
3054     performGC_(rtsFalse, AllRoots);
3055 }
3056
3057 /* -----------------------------------------------------------------------------
3058    Stack overflow
3059
3060    If the thread has reached its maximum stack size, then raise the
3061    StackOverflow exception in the offending thread.  Otherwise
3062    relocate the TSO into a larger chunk of memory and adjust its stack
3063    size appropriately.
3064    -------------------------------------------------------------------------- */
3065
3066 static StgTSO *
3067 threadStackOverflow(Capability *cap, StgTSO *tso)
3068 {
3069   nat new_stack_size, stack_words;
3070   lnat new_tso_size;
3071   StgPtr new_sp;
3072   StgTSO *dest;
3073
3074   IF_DEBUG(sanity,checkTSO(tso));
3075   if (tso->stack_size >= tso->max_stack_size) {
3076
3077       debugTrace(DEBUG_gc,
3078                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
3079                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
3080       IF_DEBUG(gc,
3081                /* If we're debugging, just print out the top of the stack */
3082                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3083                                                 tso->sp+64)));
3084
3085     /* Send this thread the StackOverflow exception */
3086     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
3087     return tso;
3088   }
3089
3090   /* Try to double the current stack size.  If that takes us over the
3091    * maximum stack size for this thread, then use the maximum instead.
3092    * Finally round up so the TSO ends up as a whole number of blocks.
3093    */
3094   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
3095   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
3096                                        TSO_STRUCT_SIZE)/sizeof(W_);
3097   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
3098   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
3099
3100   debugTrace(DEBUG_sched, 
3101              "increasing stack size from %ld words to %d.\n",
3102              (long)tso->stack_size, new_stack_size);
3103
3104   dest = (StgTSO *)allocate(new_tso_size);
3105   TICK_ALLOC_TSO(new_stack_size,0);
3106
3107   /* copy the TSO block and the old stack into the new area */
3108   memcpy(dest,tso,TSO_STRUCT_SIZE);
3109   stack_words = tso->stack + tso->stack_size - tso->sp;
3110   new_sp = (P_)dest + new_tso_size - stack_words;
3111   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
3112
3113   /* relocate the stack pointers... */
3114   dest->sp         = new_sp;
3115   dest->stack_size = new_stack_size;
3116         
3117   /* Mark the old TSO as relocated.  We have to check for relocated
3118    * TSOs in the garbage collector and any primops that deal with TSOs.
3119    *
3120    * It's important to set the sp value to just beyond the end
3121    * of the stack, so we don't attempt to scavenge any part of the
3122    * dead TSO's stack.
3123    */
3124   tso->what_next = ThreadRelocated;
3125   tso->link = dest;
3126   tso->sp = (P_)&(tso->stack[tso->stack_size]);
3127   tso->why_blocked = NotBlocked;
3128
3129   IF_PAR_DEBUG(verbose,
3130                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
3131                      tso->id, tso, tso->stack_size);
3132                /* If we're debugging, just print out the top of the stack */
3133                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3134                                                 tso->sp+64)));
3135   
3136   IF_DEBUG(sanity,checkTSO(tso));
3137 #if 0
3138   IF_DEBUG(scheduler,printTSO(dest));
3139 #endif
3140
3141   return dest;
3142 }
3143
3144 /* ---------------------------------------------------------------------------
3145    Wake up a queue that was blocked on some resource.
3146    ------------------------------------------------------------------------ */
3147
3148 #if defined(GRAN)
3149 STATIC_INLINE void
3150 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3151 {
3152 }
3153 #elif defined(PARALLEL_HASKELL)
3154 STATIC_INLINE void
3155 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3156 {
3157   /* write RESUME events to log file and
3158      update blocked and fetch time (depending on type of the orig closure) */
3159   if (RtsFlags.ParFlags.ParStats.Full) {
3160     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
3161                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3162                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3163     if (emptyRunQueue())
3164       emitSchedule = rtsTrue;
3165
3166     switch (get_itbl(node)->type) {
3167         case FETCH_ME_BQ:
3168           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3169           break;
3170         case RBH:
3171         case FETCH_ME:
3172         case BLACKHOLE_BQ:
3173           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3174           break;
3175 #ifdef DIST
3176         case MVAR:
3177           break;
3178 #endif    
3179         default:
3180           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3181         }
3182       }
3183 }
3184 #endif
3185
3186 #if defined(GRAN)
3187 StgBlockingQueueElement *
3188 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3189 {
3190     StgTSO *tso;
3191     PEs node_loc, tso_loc;
3192
3193     node_loc = where_is(node); // should be lifted out of loop
3194     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3195     tso_loc = where_is((StgClosure *)tso);
3196     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3197       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3198       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3199       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3200       // insertThread(tso, node_loc);
3201       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3202                 ResumeThread,
3203                 tso, node, (rtsSpark*)NULL);
3204       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3205       // len_local++;
3206       // len++;
3207     } else { // TSO is remote (actually should be FMBQ)
3208       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3209                                   RtsFlags.GranFlags.Costs.gunblocktime +
3210                                   RtsFlags.GranFlags.Costs.latency;
3211       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3212                 UnblockThread,
3213                 tso, node, (rtsSpark*)NULL);
3214       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3215       // len++;
3216     }
3217     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3218     IF_GRAN_DEBUG(bq,
3219                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3220                           (node_loc==tso_loc ? "Local" : "Global"), 
3221                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3222     tso->block_info.closure = NULL;
3223     debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)\n", 
3224                tso->id, tso));
3225 }
3226 #elif defined(PARALLEL_HASKELL)
3227 StgBlockingQueueElement *
3228 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3229 {
3230     StgBlockingQueueElement *next;
3231
3232     switch (get_itbl(bqe)->type) {
3233     case TSO:
3234       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3235       /* if it's a TSO just push it onto the run_queue */
3236       next = bqe->link;
3237       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3238       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3239       threadRunnable();
3240       unblockCount(bqe, node);
3241       /* reset blocking status after dumping event */
3242       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3243       break;
3244
3245     case BLOCKED_FETCH:
3246       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3247       next = bqe->link;
3248       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3249       PendingFetches = (StgBlockedFetch *)bqe;
3250       break;
3251
3252 # if defined(DEBUG)
3253       /* can ignore this case in a non-debugging setup; 
3254          see comments on RBHSave closures above */
3255     case CONSTR:
3256       /* check that the closure is an RBHSave closure */
3257       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3258              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3259              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3260       break;
3261
3262     default:
3263       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3264            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3265            (StgClosure *)bqe);
3266 # endif
3267     }
3268   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3269   return next;
3270 }
3271 #endif
3272
3273 StgTSO *
3274 unblockOne(Capability *cap, StgTSO *tso)
3275 {
3276   StgTSO *next;
3277
3278   ASSERT(get_itbl(tso)->type == TSO);
3279   ASSERT(tso->why_blocked != NotBlocked);
3280
3281   tso->why_blocked = NotBlocked;
3282   next = tso->link;
3283   tso->link = END_TSO_QUEUE;
3284
3285 #if defined(THREADED_RTS)
3286   if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
3287       // We are waking up this thread on the current Capability, which
3288       // might involve migrating it from the Capability it was last on.
3289       if (tso->bound) {
3290           ASSERT(tso->bound->cap == tso->cap);
3291           tso->bound->cap = cap;
3292       }
3293       tso->cap = cap;
3294       appendToRunQueue(cap,tso);
3295       // we're holding a newly woken thread, make sure we context switch
3296       // quickly so we can migrate it if necessary.
3297       context_switch = 1;
3298   } else {
3299       // we'll try to wake it up on the Capability it was last on.
3300       wakeupThreadOnCapability(tso->cap, tso);
3301   }
3302 #else
3303   appendToRunQueue(cap,tso);
3304   context_switch = 1;
3305 #endif
3306
3307   debugTrace(DEBUG_sched,
3308              "waking up thread %ld on cap %d",
3309              (long)tso->id, tso->cap->no);
3310
3311   return next;
3312 }
3313
3314
3315 #if defined(GRAN)
3316 void 
3317 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3318 {
3319   StgBlockingQueueElement *bqe;
3320   PEs node_loc;
3321   nat len = 0; 
3322
3323   IF_GRAN_DEBUG(bq, 
3324                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3325                       node, CurrentProc, CurrentTime[CurrentProc], 
3326                       CurrentTSO->id, CurrentTSO));
3327
3328   node_loc = where_is(node);
3329
3330   ASSERT(q == END_BQ_QUEUE ||
3331          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3332          get_itbl(q)->type == CONSTR); // closure (type constructor)
3333   ASSERT(is_unique(node));
3334
3335   /* FAKE FETCH: magically copy the node to the tso's proc;
3336      no Fetch necessary because in reality the node should not have been 
3337      moved to the other PE in the first place
3338   */
3339   if (CurrentProc!=node_loc) {
3340     IF_GRAN_DEBUG(bq, 
3341                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3342                         node, node_loc, CurrentProc, CurrentTSO->id, 
3343                         // CurrentTSO, where_is(CurrentTSO),
3344                         node->header.gran.procs));
3345     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3346     IF_GRAN_DEBUG(bq, 
3347                   debugBelch("## new bitmask of node %p is %#x\n",
3348                         node, node->header.gran.procs));
3349     if (RtsFlags.GranFlags.GranSimStats.Global) {
3350       globalGranStats.tot_fake_fetches++;
3351     }
3352   }
3353
3354   bqe = q;
3355   // ToDo: check: ASSERT(CurrentProc==node_loc);
3356   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3357     //next = bqe->link;
3358     /* 
3359        bqe points to the current element in the queue
3360        next points to the next element in the queue
3361     */
3362     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3363     //tso_loc = where_is(tso);
3364     len++;
3365     bqe = unblockOne(bqe, node);
3366   }
3367
3368   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3369      the closure to make room for the anchor of the BQ */
3370   if (bqe!=END_BQ_QUEUE) {
3371     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3372     /*
3373     ASSERT((info_ptr==&RBH_Save_0_info) ||
3374            (info_ptr==&RBH_Save_1_info) ||
3375            (info_ptr==&RBH_Save_2_info));
3376     */
3377     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3378     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3379     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3380
3381     IF_GRAN_DEBUG(bq,
3382                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3383                         node, info_type(node)));
3384   }
3385
3386   /* statistics gathering */
3387   if (RtsFlags.GranFlags.GranSimStats.Global) {
3388     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3389     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3390     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3391     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3392   }
3393   IF_GRAN_DEBUG(bq,
3394                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3395                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3396 }
3397 #elif defined(PARALLEL_HASKELL)
3398 void 
3399 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3400 {
3401   StgBlockingQueueElement *bqe;
3402
3403   IF_PAR_DEBUG(verbose, 
3404                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3405                      node, mytid));
3406 #ifdef DIST  
3407   //RFP
3408   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3409     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3410     return;
3411   }
3412 #endif
3413   
3414   ASSERT(q == END_BQ_QUEUE ||
3415          get_itbl(q)->type == TSO ||           
3416          get_itbl(q)->type == BLOCKED_FETCH || 
3417          get_itbl(q)->type == CONSTR); 
3418
3419   bqe = q;
3420   while (get_itbl(bqe)->type==TSO || 
3421          get_itbl(bqe)->type==BLOCKED_FETCH) {
3422     bqe = unblockOne(bqe, node);
3423   }
3424 }
3425
3426 #else   /* !GRAN && !PARALLEL_HASKELL */
3427
3428 void
3429 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3430 {
3431     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3432                              // Exception.cmm
3433     while (tso != END_TSO_QUEUE) {
3434         tso = unblockOne(cap,tso);
3435     }
3436 }
3437 #endif
3438
3439 /* ---------------------------------------------------------------------------
3440    Interrupt execution
3441    - usually called inside a signal handler so it mustn't do anything fancy.   
3442    ------------------------------------------------------------------------ */
3443
3444 void
3445 interruptStgRts(void)
3446 {
3447     sched_state = SCHED_INTERRUPTING;
3448     context_switch = 1;
3449 #if defined(THREADED_RTS)
3450     prodAllCapabilities();
3451 #endif
3452 }
3453
3454 /* -----------------------------------------------------------------------------
3455    Unblock a thread
3456
3457    This is for use when we raise an exception in another thread, which
3458    may be blocked.
3459    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3460    -------------------------------------------------------------------------- */
3461
3462 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3463 /*
3464   NB: only the type of the blocking queue is different in GranSim and GUM
3465       the operations on the queue-elements are the same
3466       long live polymorphism!
3467
3468   Locks: sched_mutex is held upon entry and exit.
3469
3470 */
3471 static void
3472 unblockThread(Capability *cap, StgTSO *tso)
3473 {
3474   StgBlockingQueueElement *t, **last;
3475
3476   switch (tso->why_blocked) {
3477
3478   case NotBlocked:
3479     return;  /* not blocked */
3480
3481   case BlockedOnSTM:
3482     // Be careful: nothing to do here!  We tell the scheduler that the thread
3483     // is runnable and we leave it to the stack-walking code to abort the 
3484     // transaction while unwinding the stack.  We should perhaps have a debugging
3485     // test to make sure that this really happens and that the 'zombie' transaction
3486     // does not get committed.
3487     goto done;
3488
3489   case BlockedOnMVar:
3490     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3491     {
3492       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3493       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3494
3495       last = (StgBlockingQueueElement **)&mvar->head;
3496       for (t = (StgBlockingQueueElement *)mvar->head; 
3497            t != END_BQ_QUEUE; 
3498            last = &t->link, last_tso = t, t = t->link) {
3499         if (t == (StgBlockingQueueElement *)tso) {
3500           *last = (StgBlockingQueueElement *)tso->link;
3501           if (mvar->tail == tso) {
3502             mvar->tail = (StgTSO *)last_tso;
3503           }
3504           goto done;
3505         }
3506       }
3507       barf("unblockThread (MVAR): TSO not found");
3508     }
3509
3510   case BlockedOnBlackHole:
3511     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3512     {
3513       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3514
3515       last = &bq->blocking_queue;
3516       for (t = bq->blocking_queue; 
3517            t != END_BQ_QUEUE; 
3518            last = &t->link, t = t->link) {
3519         if (t == (StgBlockingQueueElement *)tso) {
3520           *last = (StgBlockingQueueElement *)tso->link;
3521           goto done;
3522         }
3523       }
3524       barf("unblockThread (BLACKHOLE): TSO not found");
3525     }
3526
3527   case BlockedOnException:
3528     {
3529       StgTSO *target  = tso->block_info.tso;
3530
3531       ASSERT(get_itbl(target)->type == TSO);
3532
3533       if (target->what_next == ThreadRelocated) {
3534           target = target->link;
3535           ASSERT(get_itbl(target)->type == TSO);
3536       }
3537
3538       ASSERT(target->blocked_exceptions != NULL);
3539
3540       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3541       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3542            t != END_BQ_QUEUE; 
3543            last = &t->link, t = t->link) {
3544         ASSERT(get_itbl(t)->type == TSO);
3545         if (t == (StgBlockingQueueElement *)tso) {
3546           *last = (StgBlockingQueueElement *)tso->link;
3547           goto done;
3548         }
3549       }
3550       barf("unblockThread (Exception): TSO not found");
3551     }
3552
3553   case BlockedOnRead:
3554   case BlockedOnWrite:
3555 #if defined(mingw32_HOST_OS)
3556   case BlockedOnDoProc:
3557 #endif
3558     {
3559       /* take TSO off blocked_queue */
3560       StgBlockingQueueElement *prev = NULL;
3561       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3562            prev = t, t = t->link) {
3563         if (t == (StgBlockingQueueElement *)tso) {
3564           if (prev == NULL) {
3565             blocked_queue_hd = (StgTSO *)t->link;
3566             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3567               blocked_queue_tl = END_TSO_QUEUE;
3568             }
3569           } else {
3570             prev->link = t->link;
3571             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3572               blocked_queue_tl = (StgTSO *)prev;
3573             }
3574           }
3575 #if defined(mingw32_HOST_OS)
3576           /* (Cooperatively) signal that the worker thread should abort
3577            * the request.
3578            */
3579           abandonWorkRequest(tso->block_info.async_result->reqID);
3580 #endif
3581           goto done;
3582         }
3583       }
3584       barf("unblockThread (I/O): TSO not found");
3585     }
3586
3587   case BlockedOnDelay:
3588     {
3589       /* take TSO off sleeping_queue */
3590       StgBlockingQueueElement *prev = NULL;
3591       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3592            prev = t, t = t->link) {
3593         if (t == (StgBlockingQueueElement *)tso) {
3594           if (prev == NULL) {
3595             sleeping_queue = (StgTSO *)t->link;
3596           } else {
3597             prev->link = t->link;
3598           }
3599           goto done;
3600         }
3601       }
3602       barf("unblockThread (delay): TSO not found");
3603     }
3604
3605   default:
3606     barf("unblockThread");
3607   }
3608
3609  done:
3610   tso->link = END_TSO_QUEUE;
3611   tso->why_blocked = NotBlocked;
3612   tso->block_info.closure = NULL;
3613   pushOnRunQueue(cap,tso);
3614 }
3615 #else
3616 static void
3617 unblockThread(Capability *cap, StgTSO *tso)
3618 {
3619   StgTSO *t, **last;
3620   
3621   /* To avoid locking unnecessarily. */
3622   if (tso->why_blocked == NotBlocked) {
3623     return;
3624   }
3625
3626   switch (tso->why_blocked) {
3627
3628   case BlockedOnSTM:
3629     // Be careful: nothing to do here!  We tell the scheduler that the thread
3630     // is runnable and we leave it to the stack-walking code to abort the 
3631     // transaction while unwinding the stack.  We should perhaps have a debugging
3632     // test to make sure that this really happens and that the 'zombie' transaction
3633     // does not get committed.
3634     goto done;
3635
3636   case BlockedOnMVar:
3637     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3638     {
3639       StgTSO *last_tso = END_TSO_QUEUE;
3640       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3641
3642       last = &mvar->head;
3643       for (t = mvar->head; t != END_TSO_QUEUE; 
3644            last = &t->link, last_tso = t, t = t->link) {
3645         if (t == tso) {
3646           *last = tso->link;
3647           if (mvar->tail == tso) {
3648             mvar->tail = last_tso;
3649           }
3650           goto done;
3651         }
3652       }
3653       barf("unblockThread (MVAR): TSO not found");
3654     }
3655
3656   case BlockedOnBlackHole:
3657     {
3658       last = &blackhole_queue;
3659       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3660            last = &t->link, t = t->link) {
3661         if (t == tso) {
3662           *last = tso->link;
3663           goto done;
3664         }
3665       }
3666       barf("unblockThread (BLACKHOLE): TSO not found");
3667     }
3668
3669   case BlockedOnException:
3670     {
3671       StgTSO *target  = tso->block_info.tso;
3672
3673       ASSERT(get_itbl(target)->type == TSO);
3674
3675       while (target->what_next == ThreadRelocated) {
3676           target = target->link;
3677           ASSERT(get_itbl(target)->type == TSO);
3678       }
3679       
3680       ASSERT(target->blocked_exceptions != NULL);
3681
3682       last = &target->blocked_exceptions;
3683       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3684            last = &t->link, t = t->link) {
3685         ASSERT(get_itbl(t)->type == TSO);
3686         if (t == tso) {
3687           *last = tso->link;
3688           goto done;
3689         }
3690       }
3691       barf("unblockThread (Exception): TSO not found");
3692     }
3693
3694 #if !defined(THREADED_RTS)
3695   case BlockedOnRead:
3696   case BlockedOnWrite:
3697 #if defined(mingw32_HOST_OS)
3698   case BlockedOnDoProc:
3699 #endif
3700     {
3701       StgTSO *prev = NULL;
3702       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3703            prev = t, t = t->link) {
3704         if (t == tso) {
3705           if (prev == NULL) {
3706             blocked_queue_hd = t->link;
3707             if (blocked_queue_tl == t) {
3708               blocked_queue_tl = END_TSO_QUEUE;
3709             }
3710           } else {
3711             prev->link = t->link;
3712             if (blocked_queue_tl == t) {
3713               blocked_queue_tl = prev;
3714             }
3715           }
3716 #if defined(mingw32_HOST_OS)
3717           /* (Cooperatively) signal that the worker thread should abort
3718            * the request.
3719            */
3720           abandonWorkRequest(tso->block_info.async_result->reqID);
3721 #endif
3722           goto done;
3723         }
3724       }
3725       barf("unblockThread (I/O): TSO not found");
3726     }
3727
3728   case BlockedOnDelay:
3729     {
3730       StgTSO *prev = NULL;
3731       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3732            prev = t, t = t->link) {
3733         if (t == tso) {
3734           if (prev == NULL) {
3735             sleeping_queue = t->link;
3736           } else {
3737             prev->link = t->link;
3738           }
3739           goto done;
3740         }
3741       }
3742       barf("unblockThread (delay): TSO not found");
3743     }
3744 #endif
3745
3746   default:
3747     barf("unblockThread");
3748   }
3749
3750  done:
3751   tso->link = END_TSO_QUEUE;
3752   tso->why_blocked = NotBlocked;
3753   tso->block_info.closure = NULL;
3754   appendToRunQueue(cap,tso);
3755
3756   // We might have just migrated this TSO to our Capability:
3757   if (tso->bound) {
3758       tso->bound->cap = cap;
3759   }
3760   tso->cap = cap;
3761 }
3762 #endif
3763
3764 /* -----------------------------------------------------------------------------
3765  * checkBlackHoles()
3766  *
3767  * Check the blackhole_queue for threads that can be woken up.  We do
3768  * this periodically: before every GC, and whenever the run queue is
3769  * empty.
3770  *
3771  * An elegant solution might be to just wake up all the blocked
3772  * threads with awakenBlockedQueue occasionally: they'll go back to
3773  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3774  * doesn't give us a way to tell whether we've actually managed to
3775  * wake up any threads, so we would be busy-waiting.
3776  *
3777  * -------------------------------------------------------------------------- */
3778
3779 static rtsBool
3780 checkBlackHoles (Capability *cap)
3781 {
3782     StgTSO **prev, *t;
3783     rtsBool any_woke_up = rtsFalse;
3784     StgHalfWord type;
3785
3786     // blackhole_queue is global:
3787     ASSERT_LOCK_HELD(&sched_mutex);
3788
3789     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
3790
3791     // ASSUMES: sched_mutex
3792     prev = &blackhole_queue;
3793     t = blackhole_queue;
3794     while (t != END_TSO_QUEUE) {
3795         ASSERT(t->why_blocked == BlockedOnBlackHole);