Massive patch for the first months work adding System FC to GHC #35
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #ifdef PROFILING
33 #include "Proftimer.h"
34 #include "ProfHeap.h"
35 #endif
36 #if defined(GRAN) || defined(PARALLEL_HASKELL)
37 # include "GranSimRts.h"
38 # include "GranSim.h"
39 # include "ParallelRts.h"
40 # include "Parallel.h"
41 # include "ParallelDebug.h"
42 # include "FetchMe.h"
43 # include "HLC.h"
44 #endif
45 #include "Sparks.h"
46 #include "Capability.h"
47 #include "Task.h"
48 #include "AwaitEvent.h"
49 #if defined(mingw32_HOST_OS)
50 #include "win32/IOManager.h"
51 #endif
52 #include "Trace.h"
53 #include "RaiseAsync.h"
54 #include "Threads.h"
55
56 #ifdef HAVE_SYS_TYPES_H
57 #include <sys/types.h>
58 #endif
59 #ifdef HAVE_UNISTD_H
60 #include <unistd.h>
61 #endif
62
63 #include <string.h>
64 #include <stdlib.h>
65 #include <stdarg.h>
66
67 #ifdef HAVE_ERRNO_H
68 #include <errno.h>
69 #endif
70
71 // Turn off inlining when debugging - it obfuscates things
72 #ifdef DEBUG
73 # undef  STATIC_INLINE
74 # define STATIC_INLINE static
75 #endif
76
77 /* -----------------------------------------------------------------------------
78  * Global variables
79  * -------------------------------------------------------------------------- */
80
81 #if defined(GRAN)
82
83 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
85
86 /* 
87    In GranSim we have a runnable and a blocked queue for each processor.
88    In order to minimise code changes new arrays run_queue_hds/tls
89    are created. run_queue_hd is then a short cut (macro) for
90    run_queue_hds[CurrentProc] (see GranSim.h).
91    -- HWL
92 */
93 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
94 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
95 StgTSO *ccalling_threadss[MAX_PROC];
96 /* We use the same global list of threads (all_threads) in GranSim as in
97    the std RTS (i.e. we are cheating). However, we don't use this list in
98    the GranSim specific code at the moment (so we are only potentially
99    cheating).  */
100
101 #else /* !GRAN */
102
103 #if !defined(THREADED_RTS)
104 // Blocked/sleeping thrads
105 StgTSO *blocked_queue_hd = NULL;
106 StgTSO *blocked_queue_tl = NULL;
107 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
108 #endif
109
110 /* Threads blocked on blackholes.
111  * LOCK: sched_mutex+capability, or all capabilities
112  */
113 StgTSO *blackhole_queue = NULL;
114 #endif
115
116 /* The blackhole_queue should be checked for threads to wake up.  See
117  * Schedule.h for more thorough comment.
118  * LOCK: none (doesn't matter if we miss an update)
119  */
120 rtsBool blackholes_need_checking = rtsFalse;
121
122 /* Linked list of all threads.
123  * Used for detecting garbage collected threads.
124  * LOCK: sched_mutex+capability, or all capabilities
125  */
126 StgTSO *all_threads = NULL;
127
128 /* flag set by signal handler to precipitate a context switch
129  * LOCK: none (just an advisory flag)
130  */
131 int context_switch = 0;
132
133 /* flag that tracks whether we have done any execution in this time slice.
134  * LOCK: currently none, perhaps we should lock (but needs to be
135  * updated in the fast path of the scheduler).
136  */
137 nat recent_activity = ACTIVITY_YES;
138
139 /* if this flag is set as well, give up execution
140  * LOCK: none (changes once, from false->true)
141  */
142 rtsBool sched_state = SCHED_RUNNING;
143
144 #if defined(GRAN)
145 StgTSO *CurrentTSO;
146 #endif
147
148 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
149  *  exists - earlier gccs apparently didn't.
150  *  -= chak
151  */
152 StgTSO dummy_tso;
153
154 /*
155  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
156  * in an MT setting, needed to signal that a worker thread shouldn't hang around
157  * in the scheduler when it is out of work.
158  */
159 rtsBool shutting_down_scheduler = rtsFalse;
160
161 /*
162  * This mutex protects most of the global scheduler data in
163  * the THREADED_RTS runtime.
164  */
165 #if defined(THREADED_RTS)
166 Mutex sched_mutex;
167 #endif
168
169 #if defined(PARALLEL_HASKELL)
170 StgTSO *LastTSO;
171 rtsTime TimeOfLastYield;
172 rtsBool emitSchedule = rtsTrue;
173 #endif
174
175 #if !defined(mingw32_HOST_OS)
176 #define FORKPROCESS_PRIMOP_SUPPORTED
177 #endif
178
179 /* -----------------------------------------------------------------------------
180  * static function prototypes
181  * -------------------------------------------------------------------------- */
182
183 static Capability *schedule (Capability *initialCapability, Task *task);
184
185 //
186 // These function all encapsulate parts of the scheduler loop, and are
187 // abstracted only to make the structure and control flow of the
188 // scheduler clearer.
189 //
190 static void schedulePreLoop (void);
191 #if defined(THREADED_RTS)
192 static void schedulePushWork(Capability *cap, Task *task);
193 #endif
194 static void scheduleStartSignalHandlers (Capability *cap);
195 static void scheduleCheckBlockedThreads (Capability *cap);
196 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 static void scheduleCheckBlackHoles (Capability *cap);
198 static void scheduleDetectDeadlock (Capability *cap, Task *task);
199 #if defined(GRAN)
200 static StgTSO *scheduleProcessEvent(rtsEvent *event);
201 #endif
202 #if defined(PARALLEL_HASKELL)
203 static StgTSO *scheduleSendPendingMessages(void);
204 static void scheduleActivateSpark(void);
205 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
206 #endif
207 #if defined(PAR) || defined(GRAN)
208 static void scheduleGranParReport(void);
209 #endif
210 static void schedulePostRunThread(void);
211 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
213                                          StgTSO *t);
214 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
215                                     nat prev_what_next );
216 static void scheduleHandleThreadBlocked( StgTSO *t );
217 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
218                                              StgTSO *t );
219 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 static Capability *scheduleDoGC(Capability *cap, Task *task,
221                                 rtsBool force_major, 
222                                 void (*get_roots)(evac_fn));
223
224 static rtsBool checkBlackHoles(Capability *cap);
225 static void AllRoots(evac_fn evac);
226
227 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
228
229 static void deleteThread (Capability *cap, StgTSO *tso);
230 static void deleteAllThreads (Capability *cap);
231
232 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
233 static void deleteThread_(Capability *cap, StgTSO *tso);
234 #endif
235
236 #if defined(PARALLEL_HASKELL)
237 StgTSO * createSparkThread(rtsSpark spark);
238 StgTSO * activateSpark (rtsSpark spark);  
239 #endif
240
241 #ifdef DEBUG
242 static char *whatNext_strs[] = {
243   "(unknown)",
244   "ThreadRunGHC",
245   "ThreadInterpret",
246   "ThreadKilled",
247   "ThreadRelocated",
248   "ThreadComplete"
249 };
250 #endif
251
252 /* -----------------------------------------------------------------------------
253  * Putting a thread on the run queue: different scheduling policies
254  * -------------------------------------------------------------------------- */
255
256 STATIC_INLINE void
257 addToRunQueue( Capability *cap, StgTSO *t )
258 {
259 #if defined(PARALLEL_HASKELL)
260     if (RtsFlags.ParFlags.doFairScheduling) { 
261         // this does round-robin scheduling; good for concurrency
262         appendToRunQueue(cap,t);
263     } else {
264         // this does unfair scheduling; good for parallelism
265         pushOnRunQueue(cap,t);
266     }
267 #else
268     // this does round-robin scheduling; good for concurrency
269     appendToRunQueue(cap,t);
270 #endif
271 }
272
273 /* ---------------------------------------------------------------------------
274    Main scheduling loop.
275
276    We use round-robin scheduling, each thread returning to the
277    scheduler loop when one of these conditions is detected:
278
279       * out of heap space
280       * timer expires (thread yields)
281       * thread blocks
282       * thread ends
283       * stack overflow
284
285    GRAN version:
286      In a GranSim setup this loop iterates over the global event queue.
287      This revolves around the global event queue, which determines what 
288      to do next. Therefore, it's more complicated than either the 
289      concurrent or the parallel (GUM) setup.
290
291    GUM version:
292      GUM iterates over incoming messages.
293      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
294      and sends out a fish whenever it has nothing to do; in-between
295      doing the actual reductions (shared code below) it processes the
296      incoming messages and deals with delayed operations 
297      (see PendingFetches).
298      This is not the ugliest code you could imagine, but it's bloody close.
299
300    ------------------------------------------------------------------------ */
301
302 static Capability *
303 schedule (Capability *initialCapability, Task *task)
304 {
305   StgTSO *t;
306   Capability *cap;
307   StgThreadReturnCode ret;
308 #if defined(GRAN)
309   rtsEvent *event;
310 #elif defined(PARALLEL_HASKELL)
311   StgTSO *tso;
312   GlobalTaskId pe;
313   rtsBool receivedFinish = rtsFalse;
314 # if defined(DEBUG)
315   nat tp_size, sp_size; // stats only
316 # endif
317 #endif
318   nat prev_what_next;
319   rtsBool ready_to_gc;
320 #if defined(THREADED_RTS)
321   rtsBool first = rtsTrue;
322 #endif
323   
324   cap = initialCapability;
325
326   // Pre-condition: this task owns initialCapability.
327   // The sched_mutex is *NOT* held
328   // NB. on return, we still hold a capability.
329
330   debugTrace (DEBUG_sched, 
331               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
332               task, initialCapability);
333
334   schedulePreLoop();
335
336   // -----------------------------------------------------------
337   // Scheduler loop starts here:
338
339 #if defined(PARALLEL_HASKELL)
340 #define TERMINATION_CONDITION        (!receivedFinish)
341 #elif defined(GRAN)
342 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
343 #else
344 #define TERMINATION_CONDITION        rtsTrue
345 #endif
346
347   while (TERMINATION_CONDITION) {
348
349 #if defined(GRAN)
350       /* Choose the processor with the next event */
351       CurrentProc = event->proc;
352       CurrentTSO = event->tso;
353 #endif
354
355 #if defined(THREADED_RTS)
356       if (first) {
357           // don't yield the first time, we want a chance to run this
358           // thread for a bit, even if there are others banging at the
359           // door.
360           first = rtsFalse;
361           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
362       } else {
363           // Yield the capability to higher-priority tasks if necessary.
364           yieldCapability(&cap, task);
365       }
366 #endif
367       
368 #if defined(THREADED_RTS)
369       schedulePushWork(cap,task);
370 #endif
371
372     // Check whether we have re-entered the RTS from Haskell without
373     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
374     // call).
375     if (cap->in_haskell) {
376           errorBelch("schedule: re-entered unsafely.\n"
377                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
378           stg_exit(EXIT_FAILURE);
379     }
380
381     // The interruption / shutdown sequence.
382     // 
383     // In order to cleanly shut down the runtime, we want to:
384     //   * make sure that all main threads return to their callers
385     //     with the state 'Interrupted'.
386     //   * clean up all OS threads assocated with the runtime
387     //   * free all memory etc.
388     //
389     // So the sequence for ^C goes like this:
390     //
391     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
392     //     arranges for some Capability to wake up
393     //
394     //   * all threads in the system are halted, and the zombies are
395     //     placed on the run queue for cleaning up.  We acquire all
396     //     the capabilities in order to delete the threads, this is
397     //     done by scheduleDoGC() for convenience (because GC already
398     //     needs to acquire all the capabilities).  We can't kill
399     //     threads involved in foreign calls.
400     // 
401     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
402     //
403     //   * sched_state := SCHED_SHUTTING_DOWN
404     //
405     //   * all workers exit when the run queue on their capability
406     //     drains.  All main threads will also exit when their TSO
407     //     reaches the head of the run queue and they can return.
408     //
409     //   * eventually all Capabilities will shut down, and the RTS can
410     //     exit.
411     //
412     //   * We might be left with threads blocked in foreign calls, 
413     //     we should really attempt to kill these somehow (TODO);
414     
415     switch (sched_state) {
416     case SCHED_RUNNING:
417         break;
418     case SCHED_INTERRUPTING:
419         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
420 #if defined(THREADED_RTS)
421         discardSparksCap(cap);
422 #endif
423         /* scheduleDoGC() deletes all the threads */
424         cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
425         break;
426     case SCHED_SHUTTING_DOWN:
427         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
428         // If we are a worker, just exit.  If we're a bound thread
429         // then we will exit below when we've removed our TSO from
430         // the run queue.
431         if (task->tso == NULL && emptyRunQueue(cap)) {
432             return cap;
433         }
434         break;
435     default:
436         barf("sched_state: %d", sched_state);
437     }
438
439 #if defined(THREADED_RTS)
440     // If the run queue is empty, take a spark and turn it into a thread.
441     {
442         if (emptyRunQueue(cap)) {
443             StgClosure *spark;
444             spark = findSpark(cap);
445             if (spark != NULL) {
446                 debugTrace(DEBUG_sched,
447                            "turning spark of closure %p into a thread",
448                            (StgClosure *)spark);
449                 createSparkThread(cap,spark);     
450             }
451         }
452     }
453 #endif // THREADED_RTS
454
455     scheduleStartSignalHandlers(cap);
456
457     // Only check the black holes here if we've nothing else to do.
458     // During normal execution, the black hole list only gets checked
459     // at GC time, to avoid repeatedly traversing this possibly long
460     // list each time around the scheduler.
461     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
462
463     scheduleCheckWakeupThreads(cap);
464
465     scheduleCheckBlockedThreads(cap);
466
467     scheduleDetectDeadlock(cap,task);
468 #if defined(THREADED_RTS)
469     cap = task->cap;    // reload cap, it might have changed
470 #endif
471
472     // Normally, the only way we can get here with no threads to
473     // run is if a keyboard interrupt received during 
474     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
475     // Additionally, it is not fatal for the
476     // threaded RTS to reach here with no threads to run.
477     //
478     // win32: might be here due to awaitEvent() being abandoned
479     // as a result of a console event having been delivered.
480     if ( emptyRunQueue(cap) ) {
481 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
482         ASSERT(sched_state >= SCHED_INTERRUPTING);
483 #endif
484         continue; // nothing to do
485     }
486
487 #if defined(PARALLEL_HASKELL)
488     scheduleSendPendingMessages();
489     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
490         continue;
491
492 #if defined(SPARKS)
493     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
494 #endif
495
496     /* If we still have no work we need to send a FISH to get a spark
497        from another PE */
498     if (emptyRunQueue(cap)) {
499         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
500         ASSERT(rtsFalse); // should not happen at the moment
501     }
502     // from here: non-empty run queue.
503     //  TODO: merge above case with this, only one call processMessages() !
504     if (PacketsWaiting()) {  /* process incoming messages, if
505                                 any pending...  only in else
506                                 because getRemoteWork waits for
507                                 messages as well */
508         receivedFinish = processMessages();
509     }
510 #endif
511
512 #if defined(GRAN)
513     scheduleProcessEvent(event);
514 #endif
515
516     // 
517     // Get a thread to run
518     //
519     t = popRunQueue(cap);
520
521 #if defined(GRAN) || defined(PAR)
522     scheduleGranParReport(); // some kind of debuging output
523 #else
524     // Sanity check the thread we're about to run.  This can be
525     // expensive if there is lots of thread switching going on...
526     IF_DEBUG(sanity,checkTSO(t));
527 #endif
528
529 #if defined(THREADED_RTS)
530     // Check whether we can run this thread in the current task.
531     // If not, we have to pass our capability to the right task.
532     {
533         Task *bound = t->bound;
534       
535         if (bound) {
536             if (bound == task) {
537                 debugTrace(DEBUG_sched,
538                            "### Running thread %lu in bound thread", (unsigned long)t->id);
539                 // yes, the Haskell thread is bound to the current native thread
540             } else {
541                 debugTrace(DEBUG_sched,
542                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
543                 // no, bound to a different Haskell thread: pass to that thread
544                 pushOnRunQueue(cap,t);
545                 continue;
546             }
547         } else {
548             // The thread we want to run is unbound.
549             if (task->tso) { 
550                 debugTrace(DEBUG_sched,
551                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
552                 // no, the current native thread is bound to a different
553                 // Haskell thread, so pass it to any worker thread
554                 pushOnRunQueue(cap,t);
555                 continue; 
556             }
557         }
558     }
559 #endif
560
561     cap->r.rCurrentTSO = t;
562     
563     /* context switches are initiated by the timer signal, unless
564      * the user specified "context switch as often as possible", with
565      * +RTS -C0
566      */
567     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
568         && !emptyThreadQueues(cap)) {
569         context_switch = 1;
570     }
571          
572 run_thread:
573
574     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
575                               (long)t->id, whatNext_strs[t->what_next]);
576
577 #if defined(PROFILING)
578     startHeapProfTimer();
579 #endif
580
581     // Check for exceptions blocked on this thread
582     maybePerformBlockedException (cap, t);
583
584     // ----------------------------------------------------------------------
585     // Run the current thread 
586
587     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
588     ASSERT(t->cap == cap);
589
590     prev_what_next = t->what_next;
591
592     errno = t->saved_errno;
593     cap->in_haskell = rtsTrue;
594
595     dirtyTSO(t);
596
597     recent_activity = ACTIVITY_YES;
598
599     switch (prev_what_next) {
600         
601     case ThreadKilled:
602     case ThreadComplete:
603         /* Thread already finished, return to scheduler. */
604         ret = ThreadFinished;
605         break;
606         
607     case ThreadRunGHC:
608     {
609         StgRegTable *r;
610         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
611         cap = regTableToCapability(r);
612         ret = r->rRet;
613         break;
614     }
615     
616     case ThreadInterpret:
617         cap = interpretBCO(cap);
618         ret = cap->r.rRet;
619         break;
620         
621     default:
622         barf("schedule: invalid what_next field");
623     }
624
625     cap->in_haskell = rtsFalse;
626
627     // The TSO might have moved, eg. if it re-entered the RTS and a GC
628     // happened.  So find the new location:
629     t = cap->r.rCurrentTSO;
630
631     // We have run some Haskell code: there might be blackhole-blocked
632     // threads to wake up now.
633     // Lock-free test here should be ok, we're just setting a flag.
634     if ( blackhole_queue != END_TSO_QUEUE ) {
635         blackholes_need_checking = rtsTrue;
636     }
637     
638     // And save the current errno in this thread.
639     // XXX: possibly bogus for SMP because this thread might already
640     // be running again, see code below.
641     t->saved_errno = errno;
642
643 #if defined(THREADED_RTS)
644     // If ret is ThreadBlocked, and this Task is bound to the TSO that
645     // blocked, we are in limbo - the TSO is now owned by whatever it
646     // is blocked on, and may in fact already have been woken up,
647     // perhaps even on a different Capability.  It may be the case
648     // that task->cap != cap.  We better yield this Capability
649     // immediately and return to normaility.
650     if (ret == ThreadBlocked) {
651         debugTrace(DEBUG_sched,
652                    "--<< thread %lu (%s) stopped: blocked",
653                    (unsigned long)t->id, whatNext_strs[t->what_next]);
654         continue;
655     }
656 #endif
657
658     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
659     ASSERT(t->cap == cap);
660
661     // ----------------------------------------------------------------------
662     
663     // Costs for the scheduler are assigned to CCS_SYSTEM
664 #if defined(PROFILING)
665     stopHeapProfTimer();
666     CCCS = CCS_SYSTEM;
667 #endif
668     
669     schedulePostRunThread();
670
671     ready_to_gc = rtsFalse;
672
673     switch (ret) {
674     case HeapOverflow:
675         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
676         break;
677
678     case StackOverflow:
679         scheduleHandleStackOverflow(cap,task,t);
680         break;
681
682     case ThreadYielding:
683         if (scheduleHandleYield(cap, t, prev_what_next)) {
684             // shortcut for switching between compiler/interpreter:
685             goto run_thread; 
686         }
687         break;
688
689     case ThreadBlocked:
690         scheduleHandleThreadBlocked(t);
691         break;
692
693     case ThreadFinished:
694         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
695         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
696         break;
697
698     default:
699       barf("schedule: invalid thread return code %d", (int)ret);
700     }
701
702     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
703     if (ready_to_gc) {
704       cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
705     }
706   } /* end of while() */
707
708   debugTrace(PAR_DEBUG_verbose,
709              "== Leaving schedule() after having received Finish");
710 }
711
712 /* ----------------------------------------------------------------------------
713  * Setting up the scheduler loop
714  * ------------------------------------------------------------------------- */
715
716 static void
717 schedulePreLoop(void)
718 {
719 #if defined(GRAN) 
720     /* set up first event to get things going */
721     /* ToDo: assign costs for system setup and init MainTSO ! */
722     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
723               ContinueThread, 
724               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
725     
726     debugTrace (DEBUG_gran,
727                 "GRAN: Init CurrentTSO (in schedule) = %p", 
728                 CurrentTSO);
729     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
730     
731     if (RtsFlags.GranFlags.Light) {
732         /* Save current time; GranSim Light only */
733         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
734     }      
735 #endif
736 }
737
738 /* -----------------------------------------------------------------------------
739  * schedulePushWork()
740  *
741  * Push work to other Capabilities if we have some.
742  * -------------------------------------------------------------------------- */
743
744 #if defined(THREADED_RTS)
745 static void
746 schedulePushWork(Capability *cap USED_IF_THREADS, 
747                  Task *task      USED_IF_THREADS)
748 {
749     Capability *free_caps[n_capabilities], *cap0;
750     nat i, n_free_caps;
751
752     // migration can be turned off with +RTS -qg
753     if (!RtsFlags.ParFlags.migrate) return;
754
755     // Check whether we have more threads on our run queue, or sparks
756     // in our pool, that we could hand to another Capability.
757     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
758         && sparkPoolSizeCap(cap) < 2) {
759         return;
760     }
761
762     // First grab as many free Capabilities as we can.
763     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
764         cap0 = &capabilities[i];
765         if (cap != cap0 && tryGrabCapability(cap0,task)) {
766             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
767                 // it already has some work, we just grabbed it at 
768                 // the wrong moment.  Or maybe it's deadlocked!
769                 releaseCapability(cap0);
770             } else {
771                 free_caps[n_free_caps++] = cap0;
772             }
773         }
774     }
775
776     // we now have n_free_caps free capabilities stashed in
777     // free_caps[].  Share our run queue equally with them.  This is
778     // probably the simplest thing we could do; improvements we might
779     // want to do include:
780     //
781     //   - giving high priority to moving relatively new threads, on 
782     //     the gournds that they haven't had time to build up a
783     //     working set in the cache on this CPU/Capability.
784     //
785     //   - giving low priority to moving long-lived threads
786
787     if (n_free_caps > 0) {
788         StgTSO *prev, *t, *next;
789         rtsBool pushed_to_all;
790
791         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
792
793         i = 0;
794         pushed_to_all = rtsFalse;
795
796         if (cap->run_queue_hd != END_TSO_QUEUE) {
797             prev = cap->run_queue_hd;
798             t = prev->link;
799             prev->link = END_TSO_QUEUE;
800             for (; t != END_TSO_QUEUE; t = next) {
801                 next = t->link;
802                 t->link = END_TSO_QUEUE;
803                 if (t->what_next == ThreadRelocated
804                     || t->bound == task // don't move my bound thread
805                     || tsoLocked(t)) {  // don't move a locked thread
806                     prev->link = t;
807                     prev = t;
808                 } else if (i == n_free_caps) {
809                     pushed_to_all = rtsTrue;
810                     i = 0;
811                     // keep one for us
812                     prev->link = t;
813                     prev = t;
814                 } else {
815                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
816                     appendToRunQueue(free_caps[i],t);
817                     if (t->bound) { t->bound->cap = free_caps[i]; }
818                     t->cap = free_caps[i];
819                     i++;
820                 }
821             }
822             cap->run_queue_tl = prev;
823         }
824
825         // If there are some free capabilities that we didn't push any
826         // threads to, then try to push a spark to each one.
827         if (!pushed_to_all) {
828             StgClosure *spark;
829             // i is the next free capability to push to
830             for (; i < n_free_caps; i++) {
831                 if (emptySparkPoolCap(free_caps[i])) {
832                     spark = findSpark(cap);
833                     if (spark != NULL) {
834                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
835                         newSpark(&(free_caps[i]->r), spark);
836                     }
837                 }
838             }
839         }
840
841         // release the capabilities
842         for (i = 0; i < n_free_caps; i++) {
843             task->cap = free_caps[i];
844             releaseCapability(free_caps[i]);
845         }
846     }
847     task->cap = cap; // reset to point to our Capability.
848 }
849 #endif
850
851 /* ----------------------------------------------------------------------------
852  * Start any pending signal handlers
853  * ------------------------------------------------------------------------- */
854
855 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
856 static void
857 scheduleStartSignalHandlers(Capability *cap)
858 {
859     if (signals_pending()) { // safe outside the lock
860         startSignalHandlers(cap);
861     }
862 }
863 #else
864 static void
865 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
866 {
867 }
868 #endif
869
870 /* ----------------------------------------------------------------------------
871  * Check for blocked threads that can be woken up.
872  * ------------------------------------------------------------------------- */
873
874 static void
875 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
876 {
877 #if !defined(THREADED_RTS)
878     //
879     // Check whether any waiting threads need to be woken up.  If the
880     // run queue is empty, and there are no other tasks running, we
881     // can wait indefinitely for something to happen.
882     //
883     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
884     {
885         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
886     }
887 #endif
888 }
889
890
891 /* ----------------------------------------------------------------------------
892  * Check for threads woken up by other Capabilities
893  * ------------------------------------------------------------------------- */
894
895 static void
896 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
897 {
898 #if defined(THREADED_RTS)
899     // Any threads that were woken up by other Capabilities get
900     // appended to our run queue.
901     if (!emptyWakeupQueue(cap)) {
902         ACQUIRE_LOCK(&cap->lock);
903         if (emptyRunQueue(cap)) {
904             cap->run_queue_hd = cap->wakeup_queue_hd;
905             cap->run_queue_tl = cap->wakeup_queue_tl;
906         } else {
907             cap->run_queue_tl->link = cap->wakeup_queue_hd;
908             cap->run_queue_tl = cap->wakeup_queue_tl;
909         }
910         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
911         RELEASE_LOCK(&cap->lock);
912     }
913 #endif
914 }
915
916 /* ----------------------------------------------------------------------------
917  * Check for threads blocked on BLACKHOLEs that can be woken up
918  * ------------------------------------------------------------------------- */
919 static void
920 scheduleCheckBlackHoles (Capability *cap)
921 {
922     if ( blackholes_need_checking ) // check without the lock first
923     {
924         ACQUIRE_LOCK(&sched_mutex);
925         if ( blackholes_need_checking ) {
926             checkBlackHoles(cap);
927             blackholes_need_checking = rtsFalse;
928         }
929         RELEASE_LOCK(&sched_mutex);
930     }
931 }
932
933 /* ----------------------------------------------------------------------------
934  * Detect deadlock conditions and attempt to resolve them.
935  * ------------------------------------------------------------------------- */
936
937 static void
938 scheduleDetectDeadlock (Capability *cap, Task *task)
939 {
940
941 #if defined(PARALLEL_HASKELL)
942     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
943     return;
944 #endif
945
946     /* 
947      * Detect deadlock: when we have no threads to run, there are no
948      * threads blocked, waiting for I/O, or sleeping, and all the
949      * other tasks are waiting for work, we must have a deadlock of
950      * some description.
951      */
952     if ( emptyThreadQueues(cap) )
953     {
954 #if defined(THREADED_RTS)
955         /* 
956          * In the threaded RTS, we only check for deadlock if there
957          * has been no activity in a complete timeslice.  This means
958          * we won't eagerly start a full GC just because we don't have
959          * any threads to run currently.
960          */
961         if (recent_activity != ACTIVITY_INACTIVE) return;
962 #endif
963
964         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
965
966         // Garbage collection can release some new threads due to
967         // either (a) finalizers or (b) threads resurrected because
968         // they are unreachable and will therefore be sent an
969         // exception.  Any threads thus released will be immediately
970         // runnable.
971         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
972
973         recent_activity = ACTIVITY_DONE_GC;
974         
975         if ( !emptyRunQueue(cap) ) return;
976
977 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
978         /* If we have user-installed signal handlers, then wait
979          * for signals to arrive rather then bombing out with a
980          * deadlock.
981          */
982         if ( anyUserHandlers() ) {
983             debugTrace(DEBUG_sched,
984                        "still deadlocked, waiting for signals...");
985
986             awaitUserSignals();
987
988             if (signals_pending()) {
989                 startSignalHandlers(cap);
990             }
991
992             // either we have threads to run, or we were interrupted:
993             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
994         }
995 #endif
996
997 #if !defined(THREADED_RTS)
998         /* Probably a real deadlock.  Send the current main thread the
999          * Deadlock exception.
1000          */
1001         if (task->tso) {
1002             switch (task->tso->why_blocked) {
1003             case BlockedOnSTM:
1004             case BlockedOnBlackHole:
1005             case BlockedOnException:
1006             case BlockedOnMVar:
1007                 throwToSingleThreaded(cap, task->tso, 
1008                                       (StgClosure *)NonTermination_closure);
1009                 return;
1010             default:
1011                 barf("deadlock: main thread blocked in a strange way");
1012             }
1013         }
1014         return;
1015 #endif
1016     }
1017 }
1018
1019 /* ----------------------------------------------------------------------------
1020  * Process an event (GRAN only)
1021  * ------------------------------------------------------------------------- */
1022
1023 #if defined(GRAN)
1024 static StgTSO *
1025 scheduleProcessEvent(rtsEvent *event)
1026 {
1027     StgTSO *t;
1028
1029     if (RtsFlags.GranFlags.Light)
1030       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1031
1032     /* adjust time based on time-stamp */
1033     if (event->time > CurrentTime[CurrentProc] &&
1034         event->evttype != ContinueThread)
1035       CurrentTime[CurrentProc] = event->time;
1036     
1037     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1038     if (!RtsFlags.GranFlags.Light)
1039       handleIdlePEs();
1040
1041     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1042
1043     /* main event dispatcher in GranSim */
1044     switch (event->evttype) {
1045       /* Should just be continuing execution */
1046     case ContinueThread:
1047       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1048       /* ToDo: check assertion
1049       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1050              run_queue_hd != END_TSO_QUEUE);
1051       */
1052       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1053       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1054           procStatus[CurrentProc]==Fetching) {
1055         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1056               CurrentTSO->id, CurrentTSO, CurrentProc);
1057         goto next_thread;
1058       } 
1059       /* Ignore ContinueThreads for completed threads */
1060       if (CurrentTSO->what_next == ThreadComplete) {
1061         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1062               CurrentTSO->id, CurrentTSO, CurrentProc);
1063         goto next_thread;
1064       } 
1065       /* Ignore ContinueThreads for threads that are being migrated */
1066       if (PROCS(CurrentTSO)==Nowhere) { 
1067         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1068               CurrentTSO->id, CurrentTSO, CurrentProc);
1069         goto next_thread;
1070       }
1071       /* The thread should be at the beginning of the run queue */
1072       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1073         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1074               CurrentTSO->id, CurrentTSO, CurrentProc);
1075         break; // run the thread anyway
1076       }
1077       /*
1078       new_event(proc, proc, CurrentTime[proc],
1079                 FindWork,
1080                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1081       goto next_thread; 
1082       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1083       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1084
1085     case FetchNode:
1086       do_the_fetchnode(event);
1087       goto next_thread;             /* handle next event in event queue  */
1088       
1089     case GlobalBlock:
1090       do_the_globalblock(event);
1091       goto next_thread;             /* handle next event in event queue  */
1092       
1093     case FetchReply:
1094       do_the_fetchreply(event);
1095       goto next_thread;             /* handle next event in event queue  */
1096       
1097     case UnblockThread:   /* Move from the blocked queue to the tail of */
1098       do_the_unblock(event);
1099       goto next_thread;             /* handle next event in event queue  */
1100       
1101     case ResumeThread:  /* Move from the blocked queue to the tail of */
1102       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1103       event->tso->gran.blocktime += 
1104         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1105       do_the_startthread(event);
1106       goto next_thread;             /* handle next event in event queue  */
1107       
1108     case StartThread:
1109       do_the_startthread(event);
1110       goto next_thread;             /* handle next event in event queue  */
1111       
1112     case MoveThread:
1113       do_the_movethread(event);
1114       goto next_thread;             /* handle next event in event queue  */
1115       
1116     case MoveSpark:
1117       do_the_movespark(event);
1118       goto next_thread;             /* handle next event in event queue  */
1119       
1120     case FindWork:
1121       do_the_findwork(event);
1122       goto next_thread;             /* handle next event in event queue  */
1123       
1124     default:
1125       barf("Illegal event type %u\n", event->evttype);
1126     }  /* switch */
1127     
1128     /* This point was scheduler_loop in the old RTS */
1129
1130     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1131
1132     TimeOfLastEvent = CurrentTime[CurrentProc];
1133     TimeOfNextEvent = get_time_of_next_event();
1134     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1135     // CurrentTSO = ThreadQueueHd;
1136
1137     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1138                          TimeOfNextEvent));
1139
1140     if (RtsFlags.GranFlags.Light) 
1141       GranSimLight_leave_system(event, &ActiveTSO); 
1142
1143     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1144
1145     IF_DEBUG(gran, 
1146              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1147
1148     /* in a GranSim setup the TSO stays on the run queue */
1149     t = CurrentTSO;
1150     /* Take a thread from the run queue. */
1151     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1152
1153     IF_DEBUG(gran, 
1154              debugBelch("GRAN: About to run current thread, which is\n");
1155              G_TSO(t,5));
1156
1157     context_switch = 0; // turned on via GranYield, checking events and time slice
1158
1159     IF_DEBUG(gran, 
1160              DumpGranEvent(GR_SCHEDULE, t));
1161
1162     procStatus[CurrentProc] = Busy;
1163 }
1164 #endif // GRAN
1165
1166 /* ----------------------------------------------------------------------------
1167  * Send pending messages (PARALLEL_HASKELL only)
1168  * ------------------------------------------------------------------------- */
1169
1170 #if defined(PARALLEL_HASKELL)
1171 static StgTSO *
1172 scheduleSendPendingMessages(void)
1173 {
1174     StgSparkPool *pool;
1175     rtsSpark spark;
1176     StgTSO *t;
1177
1178 # if defined(PAR) // global Mem.Mgmt., omit for now
1179     if (PendingFetches != END_BF_QUEUE) {
1180         processFetches();
1181     }
1182 # endif
1183     
1184     if (RtsFlags.ParFlags.BufferTime) {
1185         // if we use message buffering, we must send away all message
1186         // packets which have become too old...
1187         sendOldBuffers(); 
1188     }
1189 }
1190 #endif
1191
1192 /* ----------------------------------------------------------------------------
1193  * Activate spark threads (PARALLEL_HASKELL only)
1194  * ------------------------------------------------------------------------- */
1195
1196 #if defined(PARALLEL_HASKELL)
1197 static void
1198 scheduleActivateSpark(void)
1199 {
1200 #if defined(SPARKS)
1201   ASSERT(emptyRunQueue());
1202 /* We get here if the run queue is empty and want some work.
1203    We try to turn a spark into a thread, and add it to the run queue,
1204    from where it will be picked up in the next iteration of the scheduler
1205    loop.
1206 */
1207
1208       /* :-[  no local threads => look out for local sparks */
1209       /* the spark pool for the current PE */
1210       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1211       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1212           pool->hd < pool->tl) {
1213         /* 
1214          * ToDo: add GC code check that we really have enough heap afterwards!!
1215          * Old comment:
1216          * If we're here (no runnable threads) and we have pending
1217          * sparks, we must have a space problem.  Get enough space
1218          * to turn one of those pending sparks into a
1219          * thread... 
1220          */
1221
1222         spark = findSpark(rtsFalse);            /* get a spark */
1223         if (spark != (rtsSpark) NULL) {
1224           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1225           IF_PAR_DEBUG(fish, // schedule,
1226                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1227                              tso->id, tso, advisory_thread_count));
1228
1229           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1230             IF_PAR_DEBUG(fish, // schedule,
1231                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1232                             spark));
1233             return rtsFalse; /* failed to generate a thread */
1234           }                  /* otherwise fall through & pick-up new tso */
1235         } else {
1236           IF_PAR_DEBUG(fish, // schedule,
1237                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1238                              spark_queue_len(pool)));
1239           return rtsFalse;  /* failed to generate a thread */
1240         }
1241         return rtsTrue;  /* success in generating a thread */
1242   } else { /* no more threads permitted or pool empty */
1243     return rtsFalse;  /* failed to generateThread */
1244   }
1245 #else
1246   tso = NULL; // avoid compiler warning only
1247   return rtsFalse;  /* dummy in non-PAR setup */
1248 #endif // SPARKS
1249 }
1250 #endif // PARALLEL_HASKELL
1251
1252 /* ----------------------------------------------------------------------------
1253  * Get work from a remote node (PARALLEL_HASKELL only)
1254  * ------------------------------------------------------------------------- */
1255     
1256 #if defined(PARALLEL_HASKELL)
1257 static rtsBool
1258 scheduleGetRemoteWork(rtsBool *receivedFinish)
1259 {
1260   ASSERT(emptyRunQueue());
1261
1262   if (RtsFlags.ParFlags.BufferTime) {
1263         IF_PAR_DEBUG(verbose, 
1264                 debugBelch("...send all pending data,"));
1265         {
1266           nat i;
1267           for (i=1; i<=nPEs; i++)
1268             sendImmediately(i); // send all messages away immediately
1269         }
1270   }
1271 # ifndef SPARKS
1272         //++EDEN++ idle() , i.e. send all buffers, wait for work
1273         // suppress fishing in EDEN... just look for incoming messages
1274         // (blocking receive)
1275   IF_PAR_DEBUG(verbose, 
1276                debugBelch("...wait for incoming messages...\n"));
1277   *receivedFinish = processMessages(); // blocking receive...
1278
1279         // and reenter scheduling loop after having received something
1280         // (return rtsFalse below)
1281
1282 # else /* activate SPARKS machinery */
1283 /* We get here, if we have no work, tried to activate a local spark, but still
1284    have no work. We try to get a remote spark, by sending a FISH message.
1285    Thread migration should be added here, and triggered when a sequence of 
1286    fishes returns without work. */
1287         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1288
1289       /* =8-[  no local sparks => look for work on other PEs */
1290         /*
1291          * We really have absolutely no work.  Send out a fish
1292          * (there may be some out there already), and wait for
1293          * something to arrive.  We clearly can't run any threads
1294          * until a SCHEDULE or RESUME arrives, and so that's what
1295          * we're hoping to see.  (Of course, we still have to
1296          * respond to other types of messages.)
1297          */
1298         rtsTime now = msTime() /*CURRENT_TIME*/;
1299         IF_PAR_DEBUG(verbose, 
1300                      debugBelch("--  now=%ld\n", now));
1301         IF_PAR_DEBUG(fish, // verbose,
1302              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1303                  (last_fish_arrived_at!=0 &&
1304                   last_fish_arrived_at+delay > now)) {
1305                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1306                      now, last_fish_arrived_at+delay, 
1307                      last_fish_arrived_at,
1308                      delay);
1309              });
1310   
1311         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1312             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1313           if (last_fish_arrived_at==0 ||
1314               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1315             /* outstandingFishes is set in sendFish, processFish;
1316                avoid flooding system with fishes via delay */
1317     next_fish_to_send_at = 0;  
1318   } else {
1319     /* ToDo: this should be done in the main scheduling loop to avoid the
1320              busy wait here; not so bad if fish delay is very small  */
1321     int iq = 0; // DEBUGGING -- HWL
1322     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1323     /* send a fish when ready, but process messages that arrive in the meantime */
1324     do {
1325       if (PacketsWaiting()) {
1326         iq++; // DEBUGGING
1327         *receivedFinish = processMessages();
1328       }
1329       now = msTime();
1330     } while (!*receivedFinish || now<next_fish_to_send_at);
1331     // JB: This means the fish could become obsolete, if we receive
1332     // work. Better check for work again? 
1333     // last line: while (!receivedFinish || !haveWork || now<...)
1334     // next line: if (receivedFinish || haveWork )
1335
1336     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1337       return rtsFalse;  // NB: this will leave scheduler loop
1338                         // immediately after return!
1339                           
1340     IF_PAR_DEBUG(fish, // verbose,
1341                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1342
1343   }
1344
1345     // JB: IMHO, this should all be hidden inside sendFish(...)
1346     /* pe = choosePE(); 
1347        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1348                 NEW_FISH_HUNGER);
1349
1350     // Global statistics: count no. of fishes
1351     if (RtsFlags.ParFlags.ParStats.Global &&
1352          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1353            globalParStats.tot_fish_mess++;
1354            }
1355     */ 
1356
1357   /* delayed fishes must have been sent by now! */
1358   next_fish_to_send_at = 0;  
1359   }
1360       
1361   *receivedFinish = processMessages();
1362 # endif /* SPARKS */
1363
1364  return rtsFalse;
1365  /* NB: this function always returns rtsFalse, meaning the scheduler
1366     loop continues with the next iteration; 
1367     rationale: 
1368       return code means success in finding work; we enter this function
1369       if there is no local work, thus have to send a fish which takes
1370       time until it arrives with work; in the meantime we should process
1371       messages in the main loop;
1372  */
1373 }
1374 #endif // PARALLEL_HASKELL
1375
1376 /* ----------------------------------------------------------------------------
1377  * PAR/GRAN: Report stats & debugging info(?)
1378  * ------------------------------------------------------------------------- */
1379
1380 #if defined(PAR) || defined(GRAN)
1381 static void
1382 scheduleGranParReport(void)
1383 {
1384   ASSERT(run_queue_hd != END_TSO_QUEUE);
1385
1386   /* Take a thread from the run queue, if we have work */
1387   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1388
1389     /* If this TSO has got its outport closed in the meantime, 
1390      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1391      * It has to be marked as TH_DEAD for this purpose.
1392      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1393
1394 JB: TODO: investigate wether state change field could be nuked
1395      entirely and replaced by the normal tso state (whatnext
1396      field). All we want to do is to kill tsos from outside.
1397      */
1398
1399     /* ToDo: write something to the log-file
1400     if (RTSflags.ParFlags.granSimStats && !sameThread)
1401         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1402
1403     CurrentTSO = t;
1404     */
1405     /* the spark pool for the current PE */
1406     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1407
1408     IF_DEBUG(scheduler, 
1409              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1410                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1411
1412     IF_PAR_DEBUG(fish,
1413              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1414                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1415
1416     if (RtsFlags.ParFlags.ParStats.Full && 
1417         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1418         (emitSchedule || // forced emit
1419          (t && LastTSO && t->id != LastTSO->id))) {
1420       /* 
1421          we are running a different TSO, so write a schedule event to log file
1422          NB: If we use fair scheduling we also have to write  a deschedule 
1423              event for LastTSO; with unfair scheduling we know that the
1424              previous tso has blocked whenever we switch to another tso, so
1425              we don't need it in GUM for now
1426       */
1427       IF_PAR_DEBUG(fish, // schedule,
1428                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1429
1430       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1431                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1432       emitSchedule = rtsFalse;
1433     }
1434 }     
1435 #endif
1436
1437 /* ----------------------------------------------------------------------------
1438  * After running a thread...
1439  * ------------------------------------------------------------------------- */
1440
1441 static void
1442 schedulePostRunThread(void)
1443 {
1444 #if defined(PAR)
1445     /* HACK 675: if the last thread didn't yield, make sure to print a 
1446        SCHEDULE event to the log file when StgRunning the next thread, even
1447        if it is the same one as before */
1448     LastTSO = t; 
1449     TimeOfLastYield = CURRENT_TIME;
1450 #endif
1451
1452   /* some statistics gathering in the parallel case */
1453
1454 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1455   switch (ret) {
1456     case HeapOverflow:
1457 # if defined(GRAN)
1458       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1459       globalGranStats.tot_heapover++;
1460 # elif defined(PAR)
1461       globalParStats.tot_heapover++;
1462 # endif
1463       break;
1464
1465      case StackOverflow:
1466 # if defined(GRAN)
1467       IF_DEBUG(gran, 
1468                DumpGranEvent(GR_DESCHEDULE, t));
1469       globalGranStats.tot_stackover++;
1470 # elif defined(PAR)
1471       // IF_DEBUG(par, 
1472       // DumpGranEvent(GR_DESCHEDULE, t);
1473       globalParStats.tot_stackover++;
1474 # endif
1475       break;
1476
1477     case ThreadYielding:
1478 # if defined(GRAN)
1479       IF_DEBUG(gran, 
1480                DumpGranEvent(GR_DESCHEDULE, t));
1481       globalGranStats.tot_yields++;
1482 # elif defined(PAR)
1483       // IF_DEBUG(par, 
1484       // DumpGranEvent(GR_DESCHEDULE, t);
1485       globalParStats.tot_yields++;
1486 # endif
1487       break; 
1488
1489     case ThreadBlocked:
1490 # if defined(GRAN)
1491         debugTrace(DEBUG_sched, 
1492                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1493                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1494                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1495                if (t->block_info.closure!=(StgClosure*)NULL)
1496                  print_bq(t->block_info.closure);
1497                debugBelch("\n"));
1498
1499       // ??? needed; should emit block before
1500       IF_DEBUG(gran, 
1501                DumpGranEvent(GR_DESCHEDULE, t)); 
1502       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1503       /*
1504         ngoq Dogh!
1505       ASSERT(procStatus[CurrentProc]==Busy || 
1506               ((procStatus[CurrentProc]==Fetching) && 
1507               (t->block_info.closure!=(StgClosure*)NULL)));
1508       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1509           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1510             procStatus[CurrentProc]==Fetching)) 
1511         procStatus[CurrentProc] = Idle;
1512       */
1513 # elif defined(PAR)
1514 //++PAR++  blockThread() writes the event (change?)
1515 # endif
1516     break;
1517
1518   case ThreadFinished:
1519     break;
1520
1521   default:
1522     barf("parGlobalStats: unknown return code");
1523     break;
1524     }
1525 #endif
1526 }
1527
1528 /* -----------------------------------------------------------------------------
1529  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1530  * -------------------------------------------------------------------------- */
1531
1532 static rtsBool
1533 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1534 {
1535     // did the task ask for a large block?
1536     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1537         // if so, get one and push it on the front of the nursery.
1538         bdescr *bd;
1539         lnat blocks;
1540         
1541         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1542         
1543         debugTrace(DEBUG_sched,
1544                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1545                    (long)t->id, whatNext_strs[t->what_next], blocks);
1546     
1547         // don't do this if the nursery is (nearly) full, we'll GC first.
1548         if (cap->r.rCurrentNursery->link != NULL ||
1549             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1550                                                // if the nursery has only one block.
1551             
1552             ACQUIRE_SM_LOCK
1553             bd = allocGroup( blocks );
1554             RELEASE_SM_LOCK
1555             cap->r.rNursery->n_blocks += blocks;
1556             
1557             // link the new group into the list
1558             bd->link = cap->r.rCurrentNursery;
1559             bd->u.back = cap->r.rCurrentNursery->u.back;
1560             if (cap->r.rCurrentNursery->u.back != NULL) {
1561                 cap->r.rCurrentNursery->u.back->link = bd;
1562             } else {
1563 #if !defined(THREADED_RTS)
1564                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1565                        g0s0 == cap->r.rNursery);
1566 #endif
1567                 cap->r.rNursery->blocks = bd;
1568             }             
1569             cap->r.rCurrentNursery->u.back = bd;
1570             
1571             // initialise it as a nursery block.  We initialise the
1572             // step, gen_no, and flags field of *every* sub-block in
1573             // this large block, because this is easier than making
1574             // sure that we always find the block head of a large
1575             // block whenever we call Bdescr() (eg. evacuate() and
1576             // isAlive() in the GC would both have to do this, at
1577             // least).
1578             { 
1579                 bdescr *x;
1580                 for (x = bd; x < bd + blocks; x++) {
1581                     x->step = cap->r.rNursery;
1582                     x->gen_no = 0;
1583                     x->flags = 0;
1584                 }
1585             }
1586             
1587             // This assert can be a killer if the app is doing lots
1588             // of large block allocations.
1589             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1590             
1591             // now update the nursery to point to the new block
1592             cap->r.rCurrentNursery = bd;
1593             
1594             // we might be unlucky and have another thread get on the
1595             // run queue before us and steal the large block, but in that
1596             // case the thread will just end up requesting another large
1597             // block.
1598             pushOnRunQueue(cap,t);
1599             return rtsFalse;  /* not actually GC'ing */
1600         }
1601     }
1602     
1603     debugTrace(DEBUG_sched,
1604                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1605                (long)t->id, whatNext_strs[t->what_next]);
1606
1607 #if defined(GRAN)
1608     ASSERT(!is_on_queue(t,CurrentProc));
1609 #elif defined(PARALLEL_HASKELL)
1610     /* Currently we emit a DESCHEDULE event before GC in GUM.
1611        ToDo: either add separate event to distinguish SYSTEM time from rest
1612        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1613     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1614         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1615                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1616         emitSchedule = rtsTrue;
1617     }
1618 #endif
1619       
1620     pushOnRunQueue(cap,t);
1621     return rtsTrue;
1622     /* actual GC is done at the end of the while loop in schedule() */
1623 }
1624
1625 /* -----------------------------------------------------------------------------
1626  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1627  * -------------------------------------------------------------------------- */
1628
1629 static void
1630 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1631 {
1632     debugTrace (DEBUG_sched,
1633                 "--<< thread %ld (%s) stopped, StackOverflow", 
1634                 (long)t->id, whatNext_strs[t->what_next]);
1635
1636     /* just adjust the stack for this thread, then pop it back
1637      * on the run queue.
1638      */
1639     { 
1640         /* enlarge the stack */
1641         StgTSO *new_t = threadStackOverflow(cap, t);
1642         
1643         /* The TSO attached to this Task may have moved, so update the
1644          * pointer to it.
1645          */
1646         if (task->tso == t) {
1647             task->tso = new_t;
1648         }
1649         pushOnRunQueue(cap,new_t);
1650     }
1651 }
1652
1653 /* -----------------------------------------------------------------------------
1654  * Handle a thread that returned to the scheduler with ThreadYielding
1655  * -------------------------------------------------------------------------- */
1656
1657 static rtsBool
1658 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1659 {
1660     // Reset the context switch flag.  We don't do this just before
1661     // running the thread, because that would mean we would lose ticks
1662     // during GC, which can lead to unfair scheduling (a thread hogs
1663     // the CPU because the tick always arrives during GC).  This way
1664     // penalises threads that do a lot of allocation, but that seems
1665     // better than the alternative.
1666     context_switch = 0;
1667     
1668     /* put the thread back on the run queue.  Then, if we're ready to
1669      * GC, check whether this is the last task to stop.  If so, wake
1670      * up the GC thread.  getThread will block during a GC until the
1671      * GC is finished.
1672      */
1673 #ifdef DEBUG
1674     if (t->what_next != prev_what_next) {
1675         debugTrace(DEBUG_sched,
1676                    "--<< thread %ld (%s) stopped to switch evaluators", 
1677                    (long)t->id, whatNext_strs[t->what_next]);
1678     } else {
1679         debugTrace(DEBUG_sched,
1680                    "--<< thread %ld (%s) stopped, yielding",
1681                    (long)t->id, whatNext_strs[t->what_next]);
1682     }
1683 #endif
1684     
1685     IF_DEBUG(sanity,
1686              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1687              checkTSO(t));
1688     ASSERT(t->link == END_TSO_QUEUE);
1689     
1690     // Shortcut if we're just switching evaluators: don't bother
1691     // doing stack squeezing (which can be expensive), just run the
1692     // thread.
1693     if (t->what_next != prev_what_next) {
1694         return rtsTrue;
1695     }
1696     
1697 #if defined(GRAN)
1698     ASSERT(!is_on_queue(t,CurrentProc));
1699       
1700     IF_DEBUG(sanity,
1701              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1702              checkThreadQsSanity(rtsTrue));
1703
1704 #endif
1705
1706     addToRunQueue(cap,t);
1707
1708 #if defined(GRAN)
1709     /* add a ContinueThread event to actually process the thread */
1710     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1711               ContinueThread,
1712               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1713     IF_GRAN_DEBUG(bq, 
1714                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1715                   G_EVENTQ(0);
1716                   G_CURR_THREADQ(0));
1717 #endif
1718     return rtsFalse;
1719 }
1720
1721 /* -----------------------------------------------------------------------------
1722  * Handle a thread that returned to the scheduler with ThreadBlocked
1723  * -------------------------------------------------------------------------- */
1724
1725 static void
1726 scheduleHandleThreadBlocked( StgTSO *t
1727 #if !defined(GRAN) && !defined(DEBUG)
1728     STG_UNUSED
1729 #endif
1730     )
1731 {
1732 #if defined(GRAN)
1733     IF_DEBUG(scheduler,
1734              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1735                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1736              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1737     
1738     // ??? needed; should emit block before
1739     IF_DEBUG(gran, 
1740              DumpGranEvent(GR_DESCHEDULE, t)); 
1741     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1742     /*
1743       ngoq Dogh!
1744       ASSERT(procStatus[CurrentProc]==Busy || 
1745       ((procStatus[CurrentProc]==Fetching) && 
1746       (t->block_info.closure!=(StgClosure*)NULL)));
1747       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1748       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1749       procStatus[CurrentProc]==Fetching)) 
1750       procStatus[CurrentProc] = Idle;
1751     */
1752 #elif defined(PAR)
1753     IF_DEBUG(scheduler,
1754              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1755                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1756     IF_PAR_DEBUG(bq,
1757                  
1758                  if (t->block_info.closure!=(StgClosure*)NULL) 
1759                  print_bq(t->block_info.closure));
1760     
1761     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1762     blockThread(t);
1763     
1764     /* whatever we schedule next, we must log that schedule */
1765     emitSchedule = rtsTrue;
1766     
1767 #else /* !GRAN */
1768
1769       // We don't need to do anything.  The thread is blocked, and it
1770       // has tidied up its stack and placed itself on whatever queue
1771       // it needs to be on.
1772
1773 #if !defined(THREADED_RTS)
1774     ASSERT(t->why_blocked != NotBlocked);
1775              // This might not be true under THREADED_RTS: we don't have
1776              // exclusive access to this TSO, so someone might have
1777              // woken it up by now.  This actually happens: try
1778              // conc023 +RTS -N2.
1779 #endif
1780
1781 #ifdef DEBUG
1782     if (traceClass(DEBUG_sched)) {
1783         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1784                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1785         printThreadBlockage(t);
1786         debugTraceEnd();
1787     }
1788 #endif
1789     
1790     /* Only for dumping event to log file 
1791        ToDo: do I need this in GranSim, too?
1792        blockThread(t);
1793     */
1794 #endif
1795 }
1796
1797 /* -----------------------------------------------------------------------------
1798  * Handle a thread that returned to the scheduler with ThreadFinished
1799  * -------------------------------------------------------------------------- */
1800
1801 static rtsBool
1802 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1803 {
1804     /* Need to check whether this was a main thread, and if so,
1805      * return with the return value.
1806      *
1807      * We also end up here if the thread kills itself with an
1808      * uncaught exception, see Exception.cmm.
1809      */
1810     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1811                (unsigned long)t->id, whatNext_strs[t->what_next]);
1812
1813 #if defined(GRAN)
1814       endThread(t, CurrentProc); // clean-up the thread
1815 #elif defined(PARALLEL_HASKELL)
1816       /* For now all are advisory -- HWL */
1817       //if(t->priority==AdvisoryPriority) ??
1818       advisory_thread_count--; // JB: Caution with this counter, buggy!
1819       
1820 # if defined(DIST)
1821       if(t->dist.priority==RevalPriority)
1822         FinishReval(t);
1823 # endif
1824     
1825 # if defined(EDENOLD)
1826       // the thread could still have an outport... (BUG)
1827       if (t->eden.outport != -1) {
1828       // delete the outport for the tso which has finished...
1829         IF_PAR_DEBUG(eden_ports,
1830                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1831                               t->eden.outport, t->id));
1832         deleteOPT(t);
1833       }
1834       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1835       if (t->eden.epid != -1) {
1836         IF_PAR_DEBUG(eden_ports,
1837                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1838                            t->id, t->eden.epid));
1839         removeTSOfromProcess(t);
1840       }
1841 # endif 
1842
1843 # if defined(PAR)
1844       if (RtsFlags.ParFlags.ParStats.Full &&
1845           !RtsFlags.ParFlags.ParStats.Suppressed) 
1846         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1847
1848       //  t->par only contains statistics: left out for now...
1849       IF_PAR_DEBUG(fish,
1850                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1851                               t->id,t,t->par.sparkname));
1852 # endif
1853 #endif // PARALLEL_HASKELL
1854
1855       //
1856       // Check whether the thread that just completed was a bound
1857       // thread, and if so return with the result.  
1858       //
1859       // There is an assumption here that all thread completion goes
1860       // through this point; we need to make sure that if a thread
1861       // ends up in the ThreadKilled state, that it stays on the run
1862       // queue so it can be dealt with here.
1863       //
1864
1865       if (t->bound) {
1866
1867           if (t->bound != task) {
1868 #if !defined(THREADED_RTS)
1869               // Must be a bound thread that is not the topmost one.  Leave
1870               // it on the run queue until the stack has unwound to the
1871               // point where we can deal with this.  Leaving it on the run
1872               // queue also ensures that the garbage collector knows about
1873               // this thread and its return value (it gets dropped from the
1874               // all_threads list so there's no other way to find it).
1875               appendToRunQueue(cap,t);
1876               return rtsFalse;
1877 #else
1878               // this cannot happen in the threaded RTS, because a
1879               // bound thread can only be run by the appropriate Task.
1880               barf("finished bound thread that isn't mine");
1881 #endif
1882           }
1883
1884           ASSERT(task->tso == t);
1885
1886           if (t->what_next == ThreadComplete) {
1887               if (task->ret) {
1888                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1889                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1890               }
1891               task->stat = Success;
1892           } else {
1893               if (task->ret) {
1894                   *(task->ret) = NULL;
1895               }
1896               if (sched_state >= SCHED_INTERRUPTING) {
1897                   task->stat = Interrupted;
1898               } else {
1899                   task->stat = Killed;
1900               }
1901           }
1902 #ifdef DEBUG
1903           removeThreadLabel((StgWord)task->tso->id);
1904 #endif
1905           return rtsTrue; // tells schedule() to return
1906       }
1907
1908       return rtsFalse;
1909 }
1910
1911 /* -----------------------------------------------------------------------------
1912  * Perform a heap census, if PROFILING
1913  * -------------------------------------------------------------------------- */
1914
1915 static rtsBool
1916 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1917 {
1918 #if defined(PROFILING)
1919     // When we have +RTS -i0 and we're heap profiling, do a census at
1920     // every GC.  This lets us get repeatable runs for debugging.
1921     if (performHeapProfile ||
1922         (RtsFlags.ProfFlags.profileInterval==0 &&
1923          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1924
1925         // checking black holes is necessary before GC, otherwise
1926         // there may be threads that are unreachable except by the
1927         // blackhole queue, which the GC will consider to be
1928         // deadlocked.
1929         scheduleCheckBlackHoles(&MainCapability);
1930
1931         debugTrace(DEBUG_sched, "garbage collecting before heap census");
1932         GarbageCollect(GetRoots, rtsTrue);
1933
1934         debugTrace(DEBUG_sched, "performing heap census");
1935         heapCensus();
1936
1937         performHeapProfile = rtsFalse;
1938         return rtsTrue;  // true <=> we already GC'd
1939     }
1940 #endif
1941     return rtsFalse;
1942 }
1943
1944 /* -----------------------------------------------------------------------------
1945  * Perform a garbage collection if necessary
1946  * -------------------------------------------------------------------------- */
1947
1948 static Capability *
1949 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1950               rtsBool force_major, void (*get_roots)(evac_fn))
1951 {
1952     StgTSO *t;
1953 #ifdef THREADED_RTS
1954     static volatile StgWord waiting_for_gc;
1955     rtsBool was_waiting;
1956     nat i;
1957 #endif
1958
1959 #ifdef THREADED_RTS
1960     // In order to GC, there must be no threads running Haskell code.
1961     // Therefore, the GC thread needs to hold *all* the capabilities,
1962     // and release them after the GC has completed.  
1963     //
1964     // This seems to be the simplest way: previous attempts involved
1965     // making all the threads with capabilities give up their
1966     // capabilities and sleep except for the *last* one, which
1967     // actually did the GC.  But it's quite hard to arrange for all
1968     // the other tasks to sleep and stay asleep.
1969     //
1970         
1971     was_waiting = cas(&waiting_for_gc, 0, 1);
1972     if (was_waiting) {
1973         do {
1974             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1975             if (cap) yieldCapability(&cap,task);
1976         } while (waiting_for_gc);
1977         return cap;  // NOTE: task->cap might have changed here
1978     }
1979
1980     for (i=0; i < n_capabilities; i++) {
1981         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1982         if (cap != &capabilities[i]) {
1983             Capability *pcap = &capabilities[i];
1984             // we better hope this task doesn't get migrated to
1985             // another Capability while we're waiting for this one.
1986             // It won't, because load balancing happens while we have
1987             // all the Capabilities, but even so it's a slightly
1988             // unsavoury invariant.
1989             task->cap = pcap;
1990             context_switch = 1;
1991             waitForReturnCapability(&pcap, task);
1992             if (pcap != &capabilities[i]) {
1993                 barf("scheduleDoGC: got the wrong capability");
1994             }
1995         }
1996     }
1997
1998     waiting_for_gc = rtsFalse;
1999 #endif
2000
2001     /* Kick any transactions which are invalid back to their
2002      * atomically frames.  When next scheduled they will try to
2003      * commit, this commit will fail and they will retry.
2004      */
2005     { 
2006         StgTSO *next;
2007
2008         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2009             if (t->what_next == ThreadRelocated) {
2010                 next = t->link;
2011             } else {
2012                 next = t->global_link;
2013                 
2014                 // This is a good place to check for blocked
2015                 // exceptions.  It might be the case that a thread is
2016                 // blocked on delivering an exception to a thread that
2017                 // is also blocked - we try to ensure that this
2018                 // doesn't happen in throwTo(), but it's too hard (or
2019                 // impossible) to close all the race holes, so we
2020                 // accept that some might get through and deal with
2021                 // them here.  A GC will always happen at some point,
2022                 // even if the system is otherwise deadlocked.
2023                 maybePerformBlockedException (&capabilities[0], t);
2024
2025                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2026                     if (!stmValidateNestOfTransactions (t -> trec)) {
2027                         debugTrace(DEBUG_sched | DEBUG_stm,
2028                                    "trec %p found wasting its time", t);
2029                         
2030                         // strip the stack back to the
2031                         // ATOMICALLY_FRAME, aborting the (nested)
2032                         // transaction, and saving the stack of any
2033                         // partially-evaluated thunks on the heap.
2034                         throwToSingleThreaded_(&capabilities[0], t, 
2035                                                NULL, rtsTrue, NULL);
2036                         
2037 #ifdef REG_R1
2038                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2039 #endif
2040                     }
2041                 }
2042             }
2043         }
2044     }
2045     
2046     // so this happens periodically:
2047     if (cap) scheduleCheckBlackHoles(cap);
2048     
2049     IF_DEBUG(scheduler, printAllThreads());
2050
2051     /*
2052      * We now have all the capabilities; if we're in an interrupting
2053      * state, then we should take the opportunity to delete all the
2054      * threads in the system.
2055      */
2056     if (sched_state >= SCHED_INTERRUPTING) {
2057         deleteAllThreads(&capabilities[0]);
2058         sched_state = SCHED_SHUTTING_DOWN;
2059     }
2060
2061     /* everybody back, start the GC.
2062      * Could do it in this thread, or signal a condition var
2063      * to do it in another thread.  Either way, we need to
2064      * broadcast on gc_pending_cond afterward.
2065      */
2066 #if defined(THREADED_RTS)
2067     debugTrace(DEBUG_sched, "doing GC");
2068 #endif
2069     GarbageCollect(get_roots, force_major);
2070     
2071 #if defined(THREADED_RTS)
2072     // release our stash of capabilities.
2073     for (i = 0; i < n_capabilities; i++) {
2074         if (cap != &capabilities[i]) {
2075             task->cap = &capabilities[i];
2076             releaseCapability(&capabilities[i]);
2077         }
2078     }
2079     if (cap) {
2080         task->cap = cap;
2081     } else {
2082         task->cap = NULL;
2083     }
2084 #endif
2085
2086 #if defined(GRAN)
2087     /* add a ContinueThread event to continue execution of current thread */
2088     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2089               ContinueThread,
2090               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2091     IF_GRAN_DEBUG(bq, 
2092                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2093                   G_EVENTQ(0);
2094                   G_CURR_THREADQ(0));
2095 #endif /* GRAN */
2096
2097     return cap;
2098 }
2099
2100 /* ---------------------------------------------------------------------------
2101  * Singleton fork(). Do not copy any running threads.
2102  * ------------------------------------------------------------------------- */
2103
2104 StgInt
2105 forkProcess(HsStablePtr *entry
2106 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2107             STG_UNUSED
2108 #endif
2109            )
2110 {
2111 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2112     Task *task;
2113     pid_t pid;
2114     StgTSO* t,*next;
2115     Capability *cap;
2116     
2117 #if defined(THREADED_RTS)
2118     if (RtsFlags.ParFlags.nNodes > 1) {
2119         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2120         stg_exit(EXIT_FAILURE);
2121     }
2122 #endif
2123
2124     debugTrace(DEBUG_sched, "forking!");
2125     
2126     // ToDo: for SMP, we should probably acquire *all* the capabilities
2127     cap = rts_lock();
2128     
2129     pid = fork();
2130     
2131     if (pid) { // parent
2132         
2133         // just return the pid
2134         rts_unlock(cap);
2135         return pid;
2136         
2137     } else { // child
2138         
2139         // Now, all OS threads except the thread that forked are
2140         // stopped.  We need to stop all Haskell threads, including
2141         // those involved in foreign calls.  Also we need to delete
2142         // all Tasks, because they correspond to OS threads that are
2143         // now gone.
2144
2145         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2146             if (t->what_next == ThreadRelocated) {
2147                 next = t->link;
2148             } else {
2149                 next = t->global_link;
2150                 // don't allow threads to catch the ThreadKilled
2151                 // exception, but we do want to raiseAsync() because these
2152                 // threads may be evaluating thunks that we need later.
2153                 deleteThread_(cap,t);
2154             }
2155         }
2156         
2157         // Empty the run queue.  It seems tempting to let all the
2158         // killed threads stay on the run queue as zombies to be
2159         // cleaned up later, but some of them correspond to bound
2160         // threads for which the corresponding Task does not exist.
2161         cap->run_queue_hd = END_TSO_QUEUE;
2162         cap->run_queue_tl = END_TSO_QUEUE;
2163
2164         // Any suspended C-calling Tasks are no more, their OS threads
2165         // don't exist now:
2166         cap->suspended_ccalling_tasks = NULL;
2167
2168         // Empty the all_threads list.  Otherwise, the garbage
2169         // collector may attempt to resurrect some of these threads.
2170         all_threads = END_TSO_QUEUE;
2171
2172         // Wipe the task list, except the current Task.
2173         ACQUIRE_LOCK(&sched_mutex);
2174         for (task = all_tasks; task != NULL; task=task->all_link) {
2175             if (task != cap->running_task) {
2176                 discardTask(task);
2177             }
2178         }
2179         RELEASE_LOCK(&sched_mutex);
2180
2181 #if defined(THREADED_RTS)
2182         // Wipe our spare workers list, they no longer exist.  New
2183         // workers will be created if necessary.
2184         cap->spare_workers = NULL;
2185         cap->returning_tasks_hd = NULL;
2186         cap->returning_tasks_tl = NULL;
2187 #endif
2188
2189         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2190         rts_checkSchedStatus("forkProcess",cap);
2191         
2192         rts_unlock(cap);
2193         hs_exit();                      // clean up and exit
2194         stg_exit(EXIT_SUCCESS);
2195     }
2196 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2197     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2198     return -1;
2199 #endif
2200 }
2201
2202 /* ---------------------------------------------------------------------------
2203  * Delete all the threads in the system
2204  * ------------------------------------------------------------------------- */
2205    
2206 static void
2207 deleteAllThreads ( Capability *cap )
2208 {
2209     // NOTE: only safe to call if we own all capabilities.
2210
2211     StgTSO* t, *next;
2212     debugTrace(DEBUG_sched,"deleting all threads");
2213     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2214         if (t->what_next == ThreadRelocated) {
2215             next = t->link;
2216         } else {
2217             next = t->global_link;
2218             deleteThread(cap,t);
2219         }
2220     }      
2221
2222     // The run queue now contains a bunch of ThreadKilled threads.  We
2223     // must not throw these away: the main thread(s) will be in there
2224     // somewhere, and the main scheduler loop has to deal with it.
2225     // Also, the run queue is the only thing keeping these threads from
2226     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2227
2228 #if !defined(THREADED_RTS)
2229     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2230     ASSERT(sleeping_queue == END_TSO_QUEUE);
2231 #endif
2232 }
2233
2234 /* -----------------------------------------------------------------------------
2235    Managing the suspended_ccalling_tasks list.
2236    Locks required: sched_mutex
2237    -------------------------------------------------------------------------- */
2238
2239 STATIC_INLINE void
2240 suspendTask (Capability *cap, Task *task)
2241 {
2242     ASSERT(task->next == NULL && task->prev == NULL);
2243     task->next = cap->suspended_ccalling_tasks;
2244     task->prev = NULL;
2245     if (cap->suspended_ccalling_tasks) {
2246         cap->suspended_ccalling_tasks->prev = task;
2247     }
2248     cap->suspended_ccalling_tasks = task;
2249 }
2250
2251 STATIC_INLINE void
2252 recoverSuspendedTask (Capability *cap, Task *task)
2253 {
2254     if (task->prev) {
2255         task->prev->next = task->next;
2256     } else {
2257         ASSERT(cap->suspended_ccalling_tasks == task);
2258         cap->suspended_ccalling_tasks = task->next;
2259     }
2260     if (task->next) {
2261         task->next->prev = task->prev;
2262     }
2263     task->next = task->prev = NULL;
2264 }
2265
2266 /* ---------------------------------------------------------------------------
2267  * Suspending & resuming Haskell threads.
2268  * 
2269  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2270  * its capability before calling the C function.  This allows another
2271  * task to pick up the capability and carry on running Haskell
2272  * threads.  It also means that if the C call blocks, it won't lock
2273  * the whole system.
2274  *
2275  * The Haskell thread making the C call is put to sleep for the
2276  * duration of the call, on the susepended_ccalling_threads queue.  We
2277  * give out a token to the task, which it can use to resume the thread
2278  * on return from the C function.
2279  * ------------------------------------------------------------------------- */
2280    
2281 void *
2282 suspendThread (StgRegTable *reg)
2283 {
2284   Capability *cap;
2285   int saved_errno = errno;
2286   StgTSO *tso;
2287   Task *task;
2288
2289   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2290    */
2291   cap = regTableToCapability(reg);
2292
2293   task = cap->running_task;
2294   tso = cap->r.rCurrentTSO;
2295
2296   debugTrace(DEBUG_sched, 
2297              "thread %lu did a safe foreign call", 
2298              (unsigned long)cap->r.rCurrentTSO->id);
2299
2300   // XXX this might not be necessary --SDM
2301   tso->what_next = ThreadRunGHC;
2302
2303   threadPaused(cap,tso);
2304
2305   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2306       tso->why_blocked = BlockedOnCCall;
2307       tso->flags |= TSO_BLOCKEX;
2308       tso->flags &= ~TSO_INTERRUPTIBLE;
2309   } else {
2310       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2311   }
2312
2313   // Hand back capability
2314   task->suspended_tso = tso;
2315
2316   ACQUIRE_LOCK(&cap->lock);
2317
2318   suspendTask(cap,task);
2319   cap->in_haskell = rtsFalse;
2320   releaseCapability_(cap);
2321   
2322   RELEASE_LOCK(&cap->lock);
2323
2324 #if defined(THREADED_RTS)
2325   /* Preparing to leave the RTS, so ensure there's a native thread/task
2326      waiting to take over.
2327   */
2328   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2329 #endif
2330
2331   errno = saved_errno;
2332   return task;
2333 }
2334
2335 StgRegTable *
2336 resumeThread (void *task_)
2337 {
2338     StgTSO *tso;
2339     Capability *cap;
2340     int saved_errno = errno;
2341     Task *task = task_;
2342
2343     cap = task->cap;
2344     // Wait for permission to re-enter the RTS with the result.
2345     waitForReturnCapability(&cap,task);
2346     // we might be on a different capability now... but if so, our
2347     // entry on the suspended_ccalling_tasks list will also have been
2348     // migrated.
2349
2350     // Remove the thread from the suspended list
2351     recoverSuspendedTask(cap,task);
2352
2353     tso = task->suspended_tso;
2354     task->suspended_tso = NULL;
2355     tso->link = END_TSO_QUEUE;
2356     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2357     
2358     if (tso->why_blocked == BlockedOnCCall) {
2359         awakenBlockedExceptionQueue(cap,tso);
2360         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2361     }
2362     
2363     /* Reset blocking status */
2364     tso->why_blocked  = NotBlocked;
2365     
2366     cap->r.rCurrentTSO = tso;
2367     cap->in_haskell = rtsTrue;
2368     errno = saved_errno;
2369
2370     /* We might have GC'd, mark the TSO dirty again */
2371     dirtyTSO(tso);
2372
2373     IF_DEBUG(sanity, checkTSO(tso));
2374
2375     return &cap->r;
2376 }
2377
2378 /* ---------------------------------------------------------------------------
2379  * scheduleThread()
2380  *
2381  * scheduleThread puts a thread on the end  of the runnable queue.
2382  * This will usually be done immediately after a thread is created.
2383  * The caller of scheduleThread must create the thread using e.g.
2384  * createThread and push an appropriate closure
2385  * on this thread's stack before the scheduler is invoked.
2386  * ------------------------------------------------------------------------ */
2387
2388 void
2389 scheduleThread(Capability *cap, StgTSO *tso)
2390 {
2391     // The thread goes at the *end* of the run-queue, to avoid possible
2392     // starvation of any threads already on the queue.
2393     appendToRunQueue(cap,tso);
2394 }
2395
2396 void
2397 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2398 {
2399 #if defined(THREADED_RTS)
2400     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2401                               // move this thread from now on.
2402     cpu %= RtsFlags.ParFlags.nNodes;
2403     if (cpu == cap->no) {
2404         appendToRunQueue(cap,tso);
2405     } else {
2406         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2407     }
2408 #else
2409     appendToRunQueue(cap,tso);
2410 #endif
2411 }
2412
2413 Capability *
2414 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2415 {
2416     Task *task;
2417
2418     // We already created/initialised the Task
2419     task = cap->running_task;
2420
2421     // This TSO is now a bound thread; make the Task and TSO
2422     // point to each other.
2423     tso->bound = task;
2424     tso->cap = cap;
2425
2426     task->tso = tso;
2427     task->ret = ret;
2428     task->stat = NoStatus;
2429
2430     appendToRunQueue(cap,tso);
2431
2432     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2433
2434 #if defined(GRAN)
2435     /* GranSim specific init */
2436     CurrentTSO = m->tso;                // the TSO to run
2437     procStatus[MainProc] = Busy;        // status of main PE
2438     CurrentProc = MainProc;             // PE to run it on
2439 #endif
2440
2441     cap = schedule(cap,task);
2442
2443     ASSERT(task->stat != NoStatus);
2444     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2445
2446     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2447     return cap;
2448 }
2449
2450 /* ----------------------------------------------------------------------------
2451  * Starting Tasks
2452  * ------------------------------------------------------------------------- */
2453
2454 #if defined(THREADED_RTS)
2455 void
2456 workerStart(Task *task)
2457 {
2458     Capability *cap;
2459
2460     // See startWorkerTask().
2461     ACQUIRE_LOCK(&task->lock);
2462     cap = task->cap;
2463     RELEASE_LOCK(&task->lock);
2464
2465     // set the thread-local pointer to the Task:
2466     taskEnter(task);
2467
2468     // schedule() runs without a lock.
2469     cap = schedule(cap,task);
2470
2471     // On exit from schedule(), we have a Capability.
2472     releaseCapability(cap);
2473     workerTaskStop(task);
2474 }
2475 #endif
2476
2477 /* ---------------------------------------------------------------------------
2478  * initScheduler()
2479  *
2480  * Initialise the scheduler.  This resets all the queues - if the
2481  * queues contained any threads, they'll be garbage collected at the
2482  * next pass.
2483  *
2484  * ------------------------------------------------------------------------ */
2485
2486 void 
2487 initScheduler(void)
2488 {
2489 #if defined(GRAN)
2490   nat i;
2491   for (i=0; i<=MAX_PROC; i++) {
2492     run_queue_hds[i]      = END_TSO_QUEUE;
2493     run_queue_tls[i]      = END_TSO_QUEUE;
2494     blocked_queue_hds[i]  = END_TSO_QUEUE;
2495     blocked_queue_tls[i]  = END_TSO_QUEUE;
2496     ccalling_threadss[i]  = END_TSO_QUEUE;
2497     blackhole_queue[i]    = END_TSO_QUEUE;
2498     sleeping_queue        = END_TSO_QUEUE;
2499   }
2500 #elif !defined(THREADED_RTS)
2501   blocked_queue_hd  = END_TSO_QUEUE;
2502   blocked_queue_tl  = END_TSO_QUEUE;
2503   sleeping_queue    = END_TSO_QUEUE;
2504 #endif
2505
2506   blackhole_queue   = END_TSO_QUEUE;
2507   all_threads       = END_TSO_QUEUE;
2508
2509   context_switch = 0;
2510   sched_state    = SCHED_RUNNING;
2511
2512 #if defined(THREADED_RTS)
2513   /* Initialise the mutex and condition variables used by
2514    * the scheduler. */
2515   initMutex(&sched_mutex);
2516 #endif
2517   
2518   ACQUIRE_LOCK(&sched_mutex);
2519
2520   /* A capability holds the state a native thread needs in
2521    * order to execute STG code. At least one capability is
2522    * floating around (only THREADED_RTS builds have more than one).
2523    */
2524   initCapabilities();
2525
2526   initTaskManager();
2527
2528 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2529   initSparkPools();
2530 #endif
2531
2532 #if defined(THREADED_RTS)
2533   /*
2534    * Eagerly start one worker to run each Capability, except for
2535    * Capability 0.  The idea is that we're probably going to start a
2536    * bound thread on Capability 0 pretty soon, so we don't want a
2537    * worker task hogging it.
2538    */
2539   { 
2540       nat i;
2541       Capability *cap;
2542       for (i = 1; i < n_capabilities; i++) {
2543           cap = &capabilities[i];
2544           ACQUIRE_LOCK(&cap->lock);
2545           startWorkerTask(cap, workerStart);
2546           RELEASE_LOCK(&cap->lock);
2547       }
2548   }
2549 #endif
2550
2551   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2552
2553   RELEASE_LOCK(&sched_mutex);
2554 }
2555
2556 void
2557 exitScheduler( void )
2558 {
2559     Task *task = NULL;
2560
2561 #if defined(THREADED_RTS)
2562     ACQUIRE_LOCK(&sched_mutex);
2563     task = newBoundTask();
2564     RELEASE_LOCK(&sched_mutex);
2565 #endif
2566
2567     // If we haven't killed all the threads yet, do it now.
2568     if (sched_state < SCHED_SHUTTING_DOWN) {
2569         sched_state = SCHED_INTERRUPTING;
2570         scheduleDoGC(NULL,task,rtsFalse,GetRoots);    
2571     }
2572     sched_state = SCHED_SHUTTING_DOWN;
2573
2574 #if defined(THREADED_RTS)
2575     { 
2576         nat i;
2577         
2578         for (i = 0; i < n_capabilities; i++) {
2579             shutdownCapability(&capabilities[i], task);
2580         }
2581         boundTaskExiting(task);
2582         stopTaskManager();
2583     }
2584     closeMutex(&sched_mutex);
2585 #endif
2586 }
2587
2588 /* ---------------------------------------------------------------------------
2589    Where are the roots that we know about?
2590
2591         - all the threads on the runnable queue
2592         - all the threads on the blocked queue
2593         - all the threads on the sleeping queue
2594         - all the thread currently executing a _ccall_GC
2595         - all the "main threads"
2596      
2597    ------------------------------------------------------------------------ */
2598
2599 /* This has to be protected either by the scheduler monitor, or by the
2600         garbage collection monitor (probably the latter).
2601         KH @ 25/10/99
2602 */
2603
2604 void
2605 GetRoots( evac_fn evac )
2606 {
2607     nat i;
2608     Capability *cap;
2609     Task *task;
2610
2611 #if defined(GRAN)
2612     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2613         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2614             evac((StgClosure **)&run_queue_hds[i]);
2615         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2616             evac((StgClosure **)&run_queue_tls[i]);
2617         
2618         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2619             evac((StgClosure **)&blocked_queue_hds[i]);
2620         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2621             evac((StgClosure **)&blocked_queue_tls[i]);
2622         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2623             evac((StgClosure **)&ccalling_threads[i]);
2624     }
2625
2626     markEventQueue();
2627
2628 #else /* !GRAN */
2629
2630     for (i = 0; i < n_capabilities; i++) {
2631         cap = &capabilities[i];
2632         evac((StgClosure **)(void *)&cap->run_queue_hd);
2633         evac((StgClosure **)(void *)&cap->run_queue_tl);
2634 #if defined(THREADED_RTS)
2635         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2636         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2637 #endif
2638         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2639              task=task->next) {
2640             debugTrace(DEBUG_sched,
2641                        "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2642             evac((StgClosure **)(void *)&task->suspended_tso);
2643         }
2644
2645     }
2646     
2647
2648 #if !defined(THREADED_RTS)
2649     evac((StgClosure **)(void *)&blocked_queue_hd);
2650     evac((StgClosure **)(void *)&blocked_queue_tl);
2651     evac((StgClosure **)(void *)&sleeping_queue);
2652 #endif 
2653 #endif
2654
2655     // evac((StgClosure **)&blackhole_queue);
2656
2657 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2658     markSparkQueue(evac);
2659 #endif
2660     
2661 #if defined(RTS_USER_SIGNALS)
2662     // mark the signal handlers (signals should be already blocked)
2663     markSignalHandlers(evac);
2664 #endif
2665 }
2666
2667 /* -----------------------------------------------------------------------------
2668    performGC
2669
2670    This is the interface to the garbage collector from Haskell land.
2671    We provide this so that external C code can allocate and garbage
2672    collect when called from Haskell via _ccall_GC.
2673
2674    It might be useful to provide an interface whereby the programmer
2675    can specify more roots (ToDo).
2676    
2677    This needs to be protected by the GC condition variable above.  KH.
2678    -------------------------------------------------------------------------- */
2679
2680 static void (*extra_roots)(evac_fn);
2681
2682 static void
2683 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2684 {
2685     Task *task;
2686     // We must grab a new Task here, because the existing Task may be
2687     // associated with a particular Capability, and chained onto the 
2688     // suspended_ccalling_tasks queue.
2689     ACQUIRE_LOCK(&sched_mutex);
2690     task = newBoundTask();
2691     RELEASE_LOCK(&sched_mutex);
2692     scheduleDoGC(NULL,task,force_major, get_roots);
2693     boundTaskExiting(task);
2694 }
2695
2696 void
2697 performGC(void)
2698 {
2699     performGC_(rtsFalse, GetRoots);
2700 }
2701
2702 void
2703 performMajorGC(void)
2704 {
2705     performGC_(rtsTrue, GetRoots);
2706 }
2707
2708 static void
2709 AllRoots(evac_fn evac)
2710 {
2711     GetRoots(evac);             // the scheduler's roots
2712     extra_roots(evac);          // the user's roots
2713 }
2714
2715 void
2716 performGCWithRoots(void (*get_roots)(evac_fn))
2717 {
2718     extra_roots = get_roots;
2719     performGC_(rtsFalse, AllRoots);
2720 }
2721
2722 /* -----------------------------------------------------------------------------
2723    Stack overflow
2724
2725    If the thread has reached its maximum stack size, then raise the
2726    StackOverflow exception in the offending thread.  Otherwise
2727    relocate the TSO into a larger chunk of memory and adjust its stack
2728    size appropriately.
2729    -------------------------------------------------------------------------- */
2730
2731 static StgTSO *
2732 threadStackOverflow(Capability *cap, StgTSO *tso)
2733 {
2734   nat new_stack_size, stack_words;
2735   lnat new_tso_size;
2736   StgPtr new_sp;
2737   StgTSO *dest;
2738
2739   IF_DEBUG(sanity,checkTSO(tso));
2740
2741   // don't allow throwTo() to modify the blocked_exceptions queue
2742   // while we are moving the TSO:
2743   lockClosure((StgClosure *)tso);
2744
2745   if (tso->stack_size >= tso->max_stack_size) {
2746
2747       debugTrace(DEBUG_gc,
2748                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2749                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2750       IF_DEBUG(gc,
2751                /* If we're debugging, just print out the top of the stack */
2752                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2753                                                 tso->sp+64)));
2754
2755       // Send this thread the StackOverflow exception
2756       unlockTSO(tso);
2757       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2758       return tso;
2759   }
2760
2761   /* Try to double the current stack size.  If that takes us over the
2762    * maximum stack size for this thread, then use the maximum instead.
2763    * Finally round up so the TSO ends up as a whole number of blocks.
2764    */
2765   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2766   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2767                                        TSO_STRUCT_SIZE)/sizeof(W_);
2768   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2769   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2770
2771   debugTrace(DEBUG_sched, 
2772              "increasing stack size from %ld words to %d.",
2773              (long)tso->stack_size, new_stack_size);
2774
2775   dest = (StgTSO *)allocate(new_tso_size);
2776   TICK_ALLOC_TSO(new_stack_size,0);
2777
2778   /* copy the TSO block and the old stack into the new area */
2779   memcpy(dest,tso,TSO_STRUCT_SIZE);
2780   stack_words = tso->stack + tso->stack_size - tso->sp;
2781   new_sp = (P_)dest + new_tso_size - stack_words;
2782   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2783
2784   /* relocate the stack pointers... */
2785   dest->sp         = new_sp;
2786   dest->stack_size = new_stack_size;
2787         
2788   /* Mark the old TSO as relocated.  We have to check for relocated
2789    * TSOs in the garbage collector and any primops that deal with TSOs.
2790    *
2791    * It's important to set the sp value to just beyond the end
2792    * of the stack, so we don't attempt to scavenge any part of the
2793    * dead TSO's stack.
2794    */
2795   tso->what_next = ThreadRelocated;
2796   tso->link = dest;
2797   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2798   tso->why_blocked = NotBlocked;
2799
2800   IF_PAR_DEBUG(verbose,
2801                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2802                      tso->id, tso, tso->stack_size);
2803                /* If we're debugging, just print out the top of the stack */
2804                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2805                                                 tso->sp+64)));
2806   
2807   unlockTSO(dest);
2808   unlockTSO(tso);
2809
2810   IF_DEBUG(sanity,checkTSO(dest));
2811 #if 0
2812   IF_DEBUG(scheduler,printTSO(dest));
2813 #endif
2814
2815   return dest;
2816 }
2817
2818 /* ---------------------------------------------------------------------------
2819    Interrupt execution
2820    - usually called inside a signal handler so it mustn't do anything fancy.   
2821    ------------------------------------------------------------------------ */
2822
2823 void
2824 interruptStgRts(void)
2825 {
2826     sched_state = SCHED_INTERRUPTING;
2827     context_switch = 1;
2828     wakeUpRts();
2829 }
2830
2831 /* -----------------------------------------------------------------------------
2832    Wake up the RTS
2833    
2834    This function causes at least one OS thread to wake up and run the
2835    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2836    an external event has arrived that may need servicing (eg. a
2837    keyboard interrupt).
2838
2839    In the single-threaded RTS we don't do anything here; we only have
2840    one thread anyway, and the event that caused us to want to wake up
2841    will have interrupted any blocking system call in progress anyway.
2842    -------------------------------------------------------------------------- */
2843
2844 void
2845 wakeUpRts(void)
2846 {
2847 #if defined(THREADED_RTS)
2848 #if !defined(mingw32_HOST_OS)
2849     // This forces the IO Manager thread to wakeup, which will
2850     // in turn ensure that some OS thread wakes up and runs the
2851     // scheduler loop, which will cause a GC and deadlock check.
2852     ioManagerWakeup();
2853 #else
2854     // On Windows this might be safe enough, because we aren't
2855     // in a signal handler.  Later we should use the IO Manager,
2856     // though.
2857     prodOneCapability();
2858 #endif
2859 #endif
2860 }
2861
2862 /* -----------------------------------------------------------------------------
2863  * checkBlackHoles()
2864  *
2865  * Check the blackhole_queue for threads that can be woken up.  We do
2866  * this periodically: before every GC, and whenever the run queue is
2867  * empty.
2868  *
2869  * An elegant solution might be to just wake up all the blocked
2870  * threads with awakenBlockedQueue occasionally: they'll go back to
2871  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2872  * doesn't give us a way to tell whether we've actually managed to
2873  * wake up any threads, so we would be busy-waiting.
2874  *
2875  * -------------------------------------------------------------------------- */
2876
2877 static rtsBool
2878 checkBlackHoles (Capability *cap)
2879 {
2880     StgTSO **prev, *t;
2881     rtsBool any_woke_up = rtsFalse;
2882     StgHalfWord type;
2883
2884     // blackhole_queue is global:
2885     ASSERT_LOCK_HELD(&sched_mutex);
2886
2887     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2888
2889     // ASSUMES: sched_mutex
2890     prev = &blackhole_queue;
2891     t = blackhole_queue;
2892     while (t != END_TSO_QUEUE) {
2893         ASSERT(t->why_blocked == BlockedOnBlackHole);
2894         type = get_itbl(t->block_info.closure)->type;
2895         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2896             IF_DEBUG(sanity,checkTSO(t));
2897             t = unblockOne(cap, t);
2898             // urk, the threads migrate to the current capability
2899             // here, but we'd like to keep them on the original one.
2900             *prev = t;
2901             any_woke_up = rtsTrue;
2902         } else {
2903             prev = &t->link;
2904             t = t->link;
2905         }
2906     }
2907
2908     return any_woke_up;
2909 }
2910
2911 /* -----------------------------------------------------------------------------
2912    Deleting threads
2913
2914    This is used for interruption (^C) and forking, and corresponds to
2915    raising an exception but without letting the thread catch the
2916    exception.
2917    -------------------------------------------------------------------------- */
2918
2919 static void 
2920 deleteThread (Capability *cap, StgTSO *tso)
2921 {
2922     // NOTE: must only be called on a TSO that we have exclusive
2923     // access to, because we will call throwToSingleThreaded() below.
2924     // The TSO must be on the run queue of the Capability we own, or 
2925     // we must own all Capabilities.
2926
2927     if (tso->why_blocked != BlockedOnCCall &&
2928         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2929         throwToSingleThreaded(cap,tso,NULL);
2930     }
2931 }
2932
2933 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2934 static void 
2935 deleteThread_(Capability *cap, StgTSO *tso)
2936 { // for forkProcess only:
2937   // like deleteThread(), but we delete threads in foreign calls, too.
2938
2939     if (tso->why_blocked == BlockedOnCCall ||
2940         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2941         unblockOne(cap,tso);
2942         tso->what_next = ThreadKilled;
2943     } else {
2944         deleteThread(cap,tso);
2945     }
2946 }
2947 #endif
2948
2949 /* -----------------------------------------------------------------------------
2950    raiseExceptionHelper
2951    
2952    This function is called by the raise# primitve, just so that we can
2953    move some of the tricky bits of raising an exception from C-- into
2954    C.  Who knows, it might be a useful re-useable thing here too.
2955    -------------------------------------------------------------------------- */
2956
2957 StgWord
2958 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2959 {
2960     Capability *cap = regTableToCapability(reg);
2961     StgThunk *raise_closure = NULL;
2962     StgPtr p, next;
2963     StgRetInfoTable *info;
2964     //
2965     // This closure represents the expression 'raise# E' where E
2966     // is the exception raise.  It is used to overwrite all the
2967     // thunks which are currently under evaluataion.
2968     //
2969
2970     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2971     // LDV profiling: stg_raise_info has THUNK as its closure
2972     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2973     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2974     // 1 does not cause any problem unless profiling is performed.
2975     // However, when LDV profiling goes on, we need to linearly scan
2976     // small object pool, where raise_closure is stored, so we should
2977     // use MIN_UPD_SIZE.
2978     //
2979     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2980     //                                 sizeofW(StgClosure)+1);
2981     //
2982
2983     //
2984     // Walk up the stack, looking for the catch frame.  On the way,
2985     // we update any closures pointed to from update frames with the
2986     // raise closure that we just built.
2987     //
2988     p = tso->sp;
2989     while(1) {
2990         info = get_ret_itbl((StgClosure *)p);
2991         next = p + stack_frame_sizeW((StgClosure *)p);
2992         switch (info->i.type) {
2993             
2994         case UPDATE_FRAME:
2995             // Only create raise_closure if we need to.
2996             if (raise_closure == NULL) {
2997                 raise_closure = 
2998                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2999                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3000                 raise_closure->payload[0] = exception;
3001             }
3002             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3003             p = next;
3004             continue;
3005
3006         case ATOMICALLY_FRAME:
3007             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3008             tso->sp = p;
3009             return ATOMICALLY_FRAME;
3010             
3011         case CATCH_FRAME:
3012             tso->sp = p;
3013             return CATCH_FRAME;
3014
3015         case CATCH_STM_FRAME:
3016             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3017             tso->sp = p;
3018             return CATCH_STM_FRAME;
3019             
3020         case STOP_FRAME:
3021             tso->sp = p;
3022             return STOP_FRAME;
3023
3024         case CATCH_RETRY_FRAME:
3025         default:
3026             p = next; 
3027             continue;
3028         }
3029     }
3030 }
3031
3032
3033 /* -----------------------------------------------------------------------------
3034    findRetryFrameHelper
3035
3036    This function is called by the retry# primitive.  It traverses the stack
3037    leaving tso->sp referring to the frame which should handle the retry.  
3038
3039    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3040    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3041
3042    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3043    despite the similar implementation.
3044
3045    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3046    not be created within memory transactions.
3047    -------------------------------------------------------------------------- */
3048
3049 StgWord
3050 findRetryFrameHelper (StgTSO *tso)
3051 {
3052   StgPtr           p, next;
3053   StgRetInfoTable *info;
3054
3055   p = tso -> sp;
3056   while (1) {
3057     info = get_ret_itbl((StgClosure *)p);
3058     next = p + stack_frame_sizeW((StgClosure *)p);
3059     switch (info->i.type) {
3060       
3061     case ATOMICALLY_FRAME:
3062         debugTrace(DEBUG_stm,
3063                    "found ATOMICALLY_FRAME at %p during retrry", p);
3064         tso->sp = p;
3065         return ATOMICALLY_FRAME;
3066       
3067     case CATCH_RETRY_FRAME:
3068         debugTrace(DEBUG_stm,
3069                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3070         tso->sp = p;
3071         return CATCH_RETRY_FRAME;
3072       
3073     case CATCH_STM_FRAME:
3074     default:
3075       ASSERT(info->i.type != CATCH_FRAME);
3076       ASSERT(info->i.type != STOP_FRAME);
3077       p = next; 
3078       continue;
3079     }
3080   }
3081 }
3082
3083 /* -----------------------------------------------------------------------------
3084    resurrectThreads is called after garbage collection on the list of
3085    threads found to be garbage.  Each of these threads will be woken
3086    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3087    on an MVar, or NonTermination if the thread was blocked on a Black
3088    Hole.
3089
3090    Locks: assumes we hold *all* the capabilities.
3091    -------------------------------------------------------------------------- */
3092
3093 void
3094 resurrectThreads (StgTSO *threads)
3095 {
3096     StgTSO *tso, *next;
3097     Capability *cap;
3098
3099     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3100         next = tso->global_link;
3101         tso->global_link = all_threads;
3102         all_threads = tso;
3103         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3104         
3105         // Wake up the thread on the Capability it was last on
3106         cap = tso->cap;
3107         
3108         switch (tso->why_blocked) {
3109         case BlockedOnMVar:
3110         case BlockedOnException:
3111             /* Called by GC - sched_mutex lock is currently held. */
3112             throwToSingleThreaded(cap, tso,
3113                                   (StgClosure *)BlockedOnDeadMVar_closure);
3114             break;
3115         case BlockedOnBlackHole:
3116             throwToSingleThreaded(cap, tso,
3117                                   (StgClosure *)NonTermination_closure);
3118             break;
3119         case BlockedOnSTM:
3120             throwToSingleThreaded(cap, tso,
3121                                   (StgClosure *)BlockedIndefinitely_closure);
3122             break;
3123         case NotBlocked:
3124             /* This might happen if the thread was blocked on a black hole
3125              * belonging to a thread that we've just woken up (raiseAsync
3126              * can wake up threads, remember...).
3127              */
3128             continue;
3129         default:
3130             barf("resurrectThreads: thread blocked in a strange way");
3131         }
3132     }
3133 }