Fix some more shutdown races
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  *
96  * NB. must be StgWord, we do xchg() on it.
97  */
98 volatile StgWord recent_activity = ACTIVITY_YES;
99
100 /* if this flag is set as well, give up execution
101  * LOCK: none (changes monotonically)
102  */
103 volatile StgWord sched_state = SCHED_RUNNING;
104
105 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
106  *  exists - earlier gccs apparently didn't.
107  *  -= chak
108  */
109 StgTSO dummy_tso;
110
111 /*
112  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
113  * in an MT setting, needed to signal that a worker thread shouldn't hang around
114  * in the scheduler when it is out of work.
115  */
116 rtsBool shutting_down_scheduler = rtsFalse;
117
118 /*
119  * This mutex protects most of the global scheduler data in
120  * the THREADED_RTS runtime.
121  */
122 #if defined(THREADED_RTS)
123 Mutex sched_mutex;
124 #endif
125
126 #if !defined(mingw32_HOST_OS)
127 #define FORKPROCESS_PRIMOP_SUPPORTED
128 #endif
129
130 /* -----------------------------------------------------------------------------
131  * static function prototypes
132  * -------------------------------------------------------------------------- */
133
134 static Capability *schedule (Capability *initialCapability, Task *task);
135
136 //
137 // These function all encapsulate parts of the scheduler loop, and are
138 // abstracted only to make the structure and control flow of the
139 // scheduler clearer.
140 //
141 static void schedulePreLoop (void);
142 static void scheduleFindWork (Capability *cap);
143 #if defined(THREADED_RTS)
144 static void scheduleYield (Capability **pcap, Task *task);
145 #endif
146 static void scheduleStartSignalHandlers (Capability *cap);
147 static void scheduleCheckBlockedThreads (Capability *cap);
148 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
149 static void scheduleCheckBlackHoles (Capability *cap);
150 static void scheduleDetectDeadlock (Capability *cap, Task *task);
151 static void schedulePushWork(Capability *cap, Task *task);
152 #if defined(PARALLEL_HASKELL)
153 static rtsBool scheduleGetRemoteWork(Capability *cap);
154 static void scheduleSendPendingMessages(void);
155 #endif
156 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
157 static void scheduleActivateSpark(Capability *cap);
158 #endif
159 static void schedulePostRunThread(Capability *cap, StgTSO *t);
160 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
161 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
162                                          StgTSO *t);
163 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
164                                     nat prev_what_next );
165 static void scheduleHandleThreadBlocked( StgTSO *t );
166 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
167                                              StgTSO *t );
168 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
169 static Capability *scheduleDoGC(Capability *cap, Task *task,
170                                 rtsBool force_major);
171
172 static rtsBool checkBlackHoles(Capability *cap);
173
174 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
175 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
176
177 static void deleteThread (Capability *cap, StgTSO *tso);
178 static void deleteAllThreads (Capability *cap);
179
180 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
181 static void deleteThread_(Capability *cap, StgTSO *tso);
182 #endif
183
184 #ifdef DEBUG
185 static char *whatNext_strs[] = {
186   "(unknown)",
187   "ThreadRunGHC",
188   "ThreadInterpret",
189   "ThreadKilled",
190   "ThreadRelocated",
191   "ThreadComplete"
192 };
193 #endif
194
195 /* -----------------------------------------------------------------------------
196  * Putting a thread on the run queue: different scheduling policies
197  * -------------------------------------------------------------------------- */
198
199 STATIC_INLINE void
200 addToRunQueue( Capability *cap, StgTSO *t )
201 {
202 #if defined(PARALLEL_HASKELL)
203     if (RtsFlags.ParFlags.doFairScheduling) { 
204         // this does round-robin scheduling; good for concurrency
205         appendToRunQueue(cap,t);
206     } else {
207         // this does unfair scheduling; good for parallelism
208         pushOnRunQueue(cap,t);
209     }
210 #else
211     // this does round-robin scheduling; good for concurrency
212     appendToRunQueue(cap,t);
213 #endif
214 }
215
216 /* ---------------------------------------------------------------------------
217    Main scheduling loop.
218
219    We use round-robin scheduling, each thread returning to the
220    scheduler loop when one of these conditions is detected:
221
222       * out of heap space
223       * timer expires (thread yields)
224       * thread blocks
225       * thread ends
226       * stack overflow
227
228    GRAN version:
229      In a GranSim setup this loop iterates over the global event queue.
230      This revolves around the global event queue, which determines what 
231      to do next. Therefore, it's more complicated than either the 
232      concurrent or the parallel (GUM) setup.
233   This version has been entirely removed (JB 2008/08).
234
235    GUM version:
236      GUM iterates over incoming messages.
237      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
238      and sends out a fish whenever it has nothing to do; in-between
239      doing the actual reductions (shared code below) it processes the
240      incoming messages and deals with delayed operations 
241      (see PendingFetches).
242      This is not the ugliest code you could imagine, but it's bloody close.
243
244   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
245   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
246   as well as future GUM versions. This file has been refurbished to
247   only contain valid code, which is however incomplete, refers to
248   invalid includes etc.
249
250    ------------------------------------------------------------------------ */
251
252 static Capability *
253 schedule (Capability *initialCapability, Task *task)
254 {
255   StgTSO *t;
256   Capability *cap;
257   StgThreadReturnCode ret;
258 #if defined(PARALLEL_HASKELL)
259   rtsBool receivedFinish = rtsFalse;
260 #endif
261   nat prev_what_next;
262   rtsBool ready_to_gc;
263 #if defined(THREADED_RTS)
264   rtsBool first = rtsTrue;
265 #endif
266   
267   cap = initialCapability;
268
269   // Pre-condition: this task owns initialCapability.
270   // The sched_mutex is *NOT* held
271   // NB. on return, we still hold a capability.
272
273   debugTrace (DEBUG_sched, 
274               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
275               task, initialCapability);
276
277   schedulePreLoop();
278
279   // -----------------------------------------------------------
280   // Scheduler loop starts here:
281
282 #if defined(PARALLEL_HASKELL)
283 #define TERMINATION_CONDITION        (!receivedFinish)
284 #else
285 #define TERMINATION_CONDITION        rtsTrue
286 #endif
287
288   while (TERMINATION_CONDITION) {
289
290     // Check whether we have re-entered the RTS from Haskell without
291     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
292     // call).
293     if (cap->in_haskell) {
294           errorBelch("schedule: re-entered unsafely.\n"
295                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
296           stg_exit(EXIT_FAILURE);
297     }
298
299     // The interruption / shutdown sequence.
300     // 
301     // In order to cleanly shut down the runtime, we want to:
302     //   * make sure that all main threads return to their callers
303     //     with the state 'Interrupted'.
304     //   * clean up all OS threads assocated with the runtime
305     //   * free all memory etc.
306     //
307     // So the sequence for ^C goes like this:
308     //
309     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
310     //     arranges for some Capability to wake up
311     //
312     //   * all threads in the system are halted, and the zombies are
313     //     placed on the run queue for cleaning up.  We acquire all
314     //     the capabilities in order to delete the threads, this is
315     //     done by scheduleDoGC() for convenience (because GC already
316     //     needs to acquire all the capabilities).  We can't kill
317     //     threads involved in foreign calls.
318     // 
319     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
320     //
321     //   * sched_state := SCHED_SHUTTING_DOWN
322     //
323     //   * all workers exit when the run queue on their capability
324     //     drains.  All main threads will also exit when their TSO
325     //     reaches the head of the run queue and they can return.
326     //
327     //   * eventually all Capabilities will shut down, and the RTS can
328     //     exit.
329     //
330     //   * We might be left with threads blocked in foreign calls, 
331     //     we should really attempt to kill these somehow (TODO);
332     
333     switch (sched_state) {
334     case SCHED_RUNNING:
335         break;
336     case SCHED_INTERRUPTING:
337         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
338 #if defined(THREADED_RTS)
339         discardSparksCap(cap);
340 #endif
341         /* scheduleDoGC() deletes all the threads */
342         cap = scheduleDoGC(cap,task,rtsFalse);
343
344         // after scheduleDoGC(), we must be shutting down.  Either some
345         // other Capability did the final GC, or we did it above,
346         // either way we can fall through to the SCHED_SHUTTING_DOWN
347         // case now.
348         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
349         // fall through
350
351     case SCHED_SHUTTING_DOWN:
352         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
353         // If we are a worker, just exit.  If we're a bound thread
354         // then we will exit below when we've removed our TSO from
355         // the run queue.
356         if (task->tso == NULL && emptyRunQueue(cap)) {
357             return cap;
358         }
359         break;
360     default:
361         barf("sched_state: %d", sched_state);
362     }
363
364     scheduleFindWork(cap);
365
366     /* work pushing, currently relevant only for THREADED_RTS:
367        (pushes threads, wakes up idle capabilities for stealing) */
368     schedulePushWork(cap,task);
369
370 #if defined(PARALLEL_HASKELL)
371     /* since we perform a blocking receive and continue otherwise,
372        either we never reach here or we definitely have work! */
373     // from here: non-empty run queue
374     ASSERT(!emptyRunQueue(cap));
375
376     if (PacketsWaiting()) {  /* now process incoming messages, if any
377                                 pending...  
378
379                                 CAUTION: scheduleGetRemoteWork called
380                                 above, waits for messages as well! */
381       processMessages(cap, &receivedFinish);
382     }
383 #endif // PARALLEL_HASKELL: non-empty run queue!
384
385     scheduleDetectDeadlock(cap,task);
386
387 #if defined(THREADED_RTS)
388     cap = task->cap;    // reload cap, it might have changed
389 #endif
390
391     // Normally, the only way we can get here with no threads to
392     // run is if a keyboard interrupt received during 
393     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
394     // Additionally, it is not fatal for the
395     // threaded RTS to reach here with no threads to run.
396     //
397     // win32: might be here due to awaitEvent() being abandoned
398     // as a result of a console event having been delivered.
399     
400 #if defined(THREADED_RTS)
401     if (first) 
402     {
403     // XXX: ToDo
404     //     // don't yield the first time, we want a chance to run this
405     //     // thread for a bit, even if there are others banging at the
406     //     // door.
407     //     first = rtsFalse;
408     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
409     }
410
411   yield:
412     scheduleYield(&cap,task);
413     if (emptyRunQueue(cap)) continue; // look for work again
414 #endif
415
416 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
417     if ( emptyRunQueue(cap) ) {
418         ASSERT(sched_state >= SCHED_INTERRUPTING);
419     }
420 #endif
421
422     // 
423     // Get a thread to run
424     //
425     t = popRunQueue(cap);
426
427     // Sanity check the thread we're about to run.  This can be
428     // expensive if there is lots of thread switching going on...
429     IF_DEBUG(sanity,checkTSO(t));
430
431 #if defined(THREADED_RTS)
432     // Check whether we can run this thread in the current task.
433     // If not, we have to pass our capability to the right task.
434     {
435         Task *bound = t->bound;
436       
437         if (bound) {
438             if (bound == task) {
439                 debugTrace(DEBUG_sched,
440                            "### Running thread %lu in bound thread", (unsigned long)t->id);
441                 // yes, the Haskell thread is bound to the current native thread
442             } else {
443                 debugTrace(DEBUG_sched,
444                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
445                 // no, bound to a different Haskell thread: pass to that thread
446                 pushOnRunQueue(cap,t);
447                 continue;
448             }
449         } else {
450             // The thread we want to run is unbound.
451             if (task->tso) { 
452                 debugTrace(DEBUG_sched,
453                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
454                 // no, the current native thread is bound to a different
455                 // Haskell thread, so pass it to any worker thread
456                 pushOnRunQueue(cap,t);
457                 continue; 
458             }
459         }
460     }
461 #endif
462
463     // If we're shutting down, and this thread has not yet been
464     // killed, kill it now.  This sometimes happens when a finalizer
465     // thread is created by the final GC, or a thread previously
466     // in a foreign call returns.
467     if (sched_state >= SCHED_INTERRUPTING &&
468         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
469         deleteThread(cap,t);
470     }
471
472     /* context switches are initiated by the timer signal, unless
473      * the user specified "context switch as often as possible", with
474      * +RTS -C0
475      */
476     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
477         && !emptyThreadQueues(cap)) {
478         cap->context_switch = 1;
479     }
480          
481 run_thread:
482
483     // CurrentTSO is the thread to run.  t might be different if we
484     // loop back to run_thread, so make sure to set CurrentTSO after
485     // that.
486     cap->r.rCurrentTSO = t;
487
488     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
489                               (long)t->id, whatNext_strs[t->what_next]);
490
491     startHeapProfTimer();
492
493     // Check for exceptions blocked on this thread
494     maybePerformBlockedException (cap, t);
495
496     // ----------------------------------------------------------------------
497     // Run the current thread 
498
499     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
500     ASSERT(t->cap == cap);
501     ASSERT(t->bound ? t->bound->cap == cap : 1);
502
503     prev_what_next = t->what_next;
504
505     errno = t->saved_errno;
506 #if mingw32_HOST_OS
507     SetLastError(t->saved_winerror);
508 #endif
509
510     cap->in_haskell = rtsTrue;
511
512     dirty_TSO(cap,t);
513
514 #if defined(THREADED_RTS)
515     if (recent_activity == ACTIVITY_DONE_GC) {
516         // ACTIVITY_DONE_GC means we turned off the timer signal to
517         // conserve power (see #1623).  Re-enable it here.
518         nat prev;
519         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
520         if (prev == ACTIVITY_DONE_GC) {
521             startTimer();
522         }
523     } else {
524         recent_activity = ACTIVITY_YES;
525     }
526 #endif
527
528     switch (prev_what_next) {
529         
530     case ThreadKilled:
531     case ThreadComplete:
532         /* Thread already finished, return to scheduler. */
533         ret = ThreadFinished;
534         break;
535         
536     case ThreadRunGHC:
537     {
538         StgRegTable *r;
539         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
540         cap = regTableToCapability(r);
541         ret = r->rRet;
542         break;
543     }
544     
545     case ThreadInterpret:
546         cap = interpretBCO(cap);
547         ret = cap->r.rRet;
548         break;
549         
550     default:
551         barf("schedule: invalid what_next field");
552     }
553
554     cap->in_haskell = rtsFalse;
555
556     // The TSO might have moved, eg. if it re-entered the RTS and a GC
557     // happened.  So find the new location:
558     t = cap->r.rCurrentTSO;
559
560     // We have run some Haskell code: there might be blackhole-blocked
561     // threads to wake up now.
562     // Lock-free test here should be ok, we're just setting a flag.
563     if ( blackhole_queue != END_TSO_QUEUE ) {
564         blackholes_need_checking = rtsTrue;
565     }
566     
567     // And save the current errno in this thread.
568     // XXX: possibly bogus for SMP because this thread might already
569     // be running again, see code below.
570     t->saved_errno = errno;
571 #if mingw32_HOST_OS
572     // Similarly for Windows error code
573     t->saved_winerror = GetLastError();
574 #endif
575
576 #if defined(THREADED_RTS)
577     // If ret is ThreadBlocked, and this Task is bound to the TSO that
578     // blocked, we are in limbo - the TSO is now owned by whatever it
579     // is blocked on, and may in fact already have been woken up,
580     // perhaps even on a different Capability.  It may be the case
581     // that task->cap != cap.  We better yield this Capability
582     // immediately and return to normaility.
583     if (ret == ThreadBlocked) {
584         debugTrace(DEBUG_sched,
585                    "--<< thread %lu (%s) stopped: blocked",
586                    (unsigned long)t->id, whatNext_strs[t->what_next]);
587         goto yield;
588     }
589 #endif
590
591     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
592     ASSERT(t->cap == cap);
593
594     // ----------------------------------------------------------------------
595     
596     // Costs for the scheduler are assigned to CCS_SYSTEM
597     stopHeapProfTimer();
598 #if defined(PROFILING)
599     CCCS = CCS_SYSTEM;
600 #endif
601     
602     schedulePostRunThread(cap,t);
603
604     t = threadStackUnderflow(task,t);
605
606     ready_to_gc = rtsFalse;
607
608     switch (ret) {
609     case HeapOverflow:
610         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
611         break;
612
613     case StackOverflow:
614         scheduleHandleStackOverflow(cap,task,t);
615         break;
616
617     case ThreadYielding:
618         if (scheduleHandleYield(cap, t, prev_what_next)) {
619             // shortcut for switching between compiler/interpreter:
620             goto run_thread; 
621         }
622         break;
623
624     case ThreadBlocked:
625         scheduleHandleThreadBlocked(t);
626         break;
627
628     case ThreadFinished:
629         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
630         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
631         break;
632
633     default:
634       barf("schedule: invalid thread return code %d", (int)ret);
635     }
636
637     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
638       cap = scheduleDoGC(cap,task,rtsFalse);
639     }
640   } /* end of while() */
641 }
642
643 /* ----------------------------------------------------------------------------
644  * Setting up the scheduler loop
645  * ------------------------------------------------------------------------- */
646
647 static void
648 schedulePreLoop(void)
649 {
650   // initialisation for scheduler - what cannot go into initScheduler()  
651 }
652
653 /* -----------------------------------------------------------------------------
654  * scheduleFindWork()
655  *
656  * Search for work to do, and handle messages from elsewhere.
657  * -------------------------------------------------------------------------- */
658
659 static void
660 scheduleFindWork (Capability *cap)
661 {
662     scheduleStartSignalHandlers(cap);
663
664     // Only check the black holes here if we've nothing else to do.
665     // During normal execution, the black hole list only gets checked
666     // at GC time, to avoid repeatedly traversing this possibly long
667     // list each time around the scheduler.
668     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
669
670     scheduleCheckWakeupThreads(cap);
671
672     scheduleCheckBlockedThreads(cap);
673
674 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
675     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
676 #endif
677
678 #if defined(PARALLEL_HASKELL)
679     // if messages have been buffered...
680     scheduleSendPendingMessages();
681 #endif
682
683 #if defined(PARALLEL_HASKELL)
684     if (emptyRunQueue(cap)) {
685         receivedFinish = scheduleGetRemoteWork(cap);
686         continue; //  a new round, (hopefully) with new work
687         /* 
688            in GUM, this a) sends out a FISH and returns IF no fish is
689                            out already
690                         b) (blocking) awaits and receives messages
691            
692            in Eden, this is only the blocking receive, as b) in GUM.
693         */
694     }
695 #endif
696 }
697
698 #if defined(THREADED_RTS)
699 STATIC_INLINE rtsBool
700 shouldYieldCapability (Capability *cap, Task *task)
701 {
702     // we need to yield this capability to someone else if..
703     //   - another thread is initiating a GC
704     //   - another Task is returning from a foreign call
705     //   - the thread at the head of the run queue cannot be run
706     //     by this Task (it is bound to another Task, or it is unbound
707     //     and this task it bound).
708     return (waiting_for_gc || 
709             cap->returning_tasks_hd != NULL ||
710             (!emptyRunQueue(cap) && (task->tso == NULL
711                                      ? cap->run_queue_hd->bound != NULL
712                                      : cap->run_queue_hd->bound != task)));
713 }
714
715 // This is the single place where a Task goes to sleep.  There are
716 // two reasons it might need to sleep:
717 //    - there are no threads to run
718 //    - we need to yield this Capability to someone else 
719 //      (see shouldYieldCapability())
720 //
721 // Careful: the scheduler loop is quite delicate.  Make sure you run
722 // the tests in testsuite/concurrent (all ways) after modifying this,
723 // and also check the benchmarks in nofib/parallel for regressions.
724
725 static void
726 scheduleYield (Capability **pcap, Task *task)
727 {
728     Capability *cap = *pcap;
729
730     // if we have work, and we don't need to give up the Capability, continue.
731     if (!shouldYieldCapability(cap,task) && 
732         (!emptyRunQueue(cap) ||
733          blackholes_need_checking ||
734          sched_state >= SCHED_INTERRUPTING))
735         return;
736
737     // otherwise yield (sleep), and keep yielding if necessary.
738     do {
739         yieldCapability(&cap,task);
740     } 
741     while (shouldYieldCapability(cap,task));
742
743     // note there may still be no threads on the run queue at this
744     // point, the caller has to check.
745
746     *pcap = cap;
747     return;
748 }
749 #endif
750     
751 /* -----------------------------------------------------------------------------
752  * schedulePushWork()
753  *
754  * Push work to other Capabilities if we have some.
755  * -------------------------------------------------------------------------- */
756
757 static void
758 schedulePushWork(Capability *cap USED_IF_THREADS, 
759                  Task *task      USED_IF_THREADS)
760 {
761   /* following code not for PARALLEL_HASKELL. I kept the call general,
762      future GUM versions might use pushing in a distributed setup */
763 #if defined(THREADED_RTS)
764
765     Capability *free_caps[n_capabilities], *cap0;
766     nat i, n_free_caps;
767
768     // migration can be turned off with +RTS -qg
769     if (!RtsFlags.ParFlags.migrate) return;
770
771     // Check whether we have more threads on our run queue, or sparks
772     // in our pool, that we could hand to another Capability.
773     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
774         && sparkPoolSizeCap(cap) < 2) {
775         return;
776     }
777
778     // First grab as many free Capabilities as we can.
779     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
780         cap0 = &capabilities[i];
781         if (cap != cap0 && tryGrabCapability(cap0,task)) {
782             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
783                 // it already has some work, we just grabbed it at 
784                 // the wrong moment.  Or maybe it's deadlocked!
785                 releaseCapability(cap0);
786             } else {
787                 free_caps[n_free_caps++] = cap0;
788             }
789         }
790     }
791
792     // we now have n_free_caps free capabilities stashed in
793     // free_caps[].  Share our run queue equally with them.  This is
794     // probably the simplest thing we could do; improvements we might
795     // want to do include:
796     //
797     //   - giving high priority to moving relatively new threads, on 
798     //     the gournds that they haven't had time to build up a
799     //     working set in the cache on this CPU/Capability.
800     //
801     //   - giving low priority to moving long-lived threads
802
803     if (n_free_caps > 0) {
804         StgTSO *prev, *t, *next;
805         rtsBool pushed_to_all;
806
807         debugTrace(DEBUG_sched, 
808                    "cap %d: %s and %d free capabilities, sharing...", 
809                    cap->no, 
810                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
811                    "excess threads on run queue":"sparks to share (>=2)",
812                    n_free_caps);
813
814         i = 0;
815         pushed_to_all = rtsFalse;
816
817         if (cap->run_queue_hd != END_TSO_QUEUE) {
818             prev = cap->run_queue_hd;
819             t = prev->_link;
820             prev->_link = END_TSO_QUEUE;
821             for (; t != END_TSO_QUEUE; t = next) {
822                 next = t->_link;
823                 t->_link = END_TSO_QUEUE;
824                 if (t->what_next == ThreadRelocated
825                     || t->bound == task // don't move my bound thread
826                     || tsoLocked(t)) {  // don't move a locked thread
827                     setTSOLink(cap, prev, t);
828                     prev = t;
829                 } else if (i == n_free_caps) {
830                     pushed_to_all = rtsTrue;
831                     i = 0;
832                     // keep one for us
833                     setTSOLink(cap, prev, t);
834                     prev = t;
835                 } else {
836                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
837                     appendToRunQueue(free_caps[i],t);
838                     if (t->bound) { t->bound->cap = free_caps[i]; }
839                     t->cap = free_caps[i];
840                     i++;
841                 }
842             }
843             cap->run_queue_tl = prev;
844         }
845
846 #ifdef SPARK_PUSHING
847         /* JB I left this code in place, it would work but is not necessary */
848
849         // If there are some free capabilities that we didn't push any
850         // threads to, then try to push a spark to each one.
851         if (!pushed_to_all) {
852             StgClosure *spark;
853             // i is the next free capability to push to
854             for (; i < n_free_caps; i++) {
855                 if (emptySparkPoolCap(free_caps[i])) {
856                     spark = tryStealSpark(cap->sparks);
857                     if (spark != NULL) {
858                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
859                         newSpark(&(free_caps[i]->r), spark);
860                     }
861                 }
862             }
863         }
864 #endif /* SPARK_PUSHING */
865
866         // release the capabilities
867         for (i = 0; i < n_free_caps; i++) {
868             task->cap = free_caps[i];
869             releaseAndWakeupCapability(free_caps[i]);
870         }
871     }
872     task->cap = cap; // reset to point to our Capability.
873
874 #endif /* THREADED_RTS */
875
876 }
877
878 /* ----------------------------------------------------------------------------
879  * Start any pending signal handlers
880  * ------------------------------------------------------------------------- */
881
882 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
883 static void
884 scheduleStartSignalHandlers(Capability *cap)
885 {
886     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
887         // safe outside the lock
888         startSignalHandlers(cap);
889     }
890 }
891 #else
892 static void
893 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
894 {
895 }
896 #endif
897
898 /* ----------------------------------------------------------------------------
899  * Check for blocked threads that can be woken up.
900  * ------------------------------------------------------------------------- */
901
902 static void
903 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
904 {
905 #if !defined(THREADED_RTS)
906     //
907     // Check whether any waiting threads need to be woken up.  If the
908     // run queue is empty, and there are no other tasks running, we
909     // can wait indefinitely for something to happen.
910     //
911     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
912     {
913         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
914     }
915 #endif
916 }
917
918
919 /* ----------------------------------------------------------------------------
920  * Check for threads woken up by other Capabilities
921  * ------------------------------------------------------------------------- */
922
923 static void
924 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
925 {
926 #if defined(THREADED_RTS)
927     // Any threads that were woken up by other Capabilities get
928     // appended to our run queue.
929     if (!emptyWakeupQueue(cap)) {
930         ACQUIRE_LOCK(&cap->lock);
931         if (emptyRunQueue(cap)) {
932             cap->run_queue_hd = cap->wakeup_queue_hd;
933             cap->run_queue_tl = cap->wakeup_queue_tl;
934         } else {
935             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
936             cap->run_queue_tl = cap->wakeup_queue_tl;
937         }
938         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
939         RELEASE_LOCK(&cap->lock);
940     }
941 #endif
942 }
943
944 /* ----------------------------------------------------------------------------
945  * Check for threads blocked on BLACKHOLEs that can be woken up
946  * ------------------------------------------------------------------------- */
947 static void
948 scheduleCheckBlackHoles (Capability *cap)
949 {
950     if ( blackholes_need_checking ) // check without the lock first
951     {
952         ACQUIRE_LOCK(&sched_mutex);
953         if ( blackholes_need_checking ) {
954             blackholes_need_checking = rtsFalse;
955             // important that we reset the flag *before* checking the
956             // blackhole queue, otherwise we could get deadlock.  This
957             // happens as follows: we wake up a thread that
958             // immediately runs on another Capability, blocks on a
959             // blackhole, and then we reset the blackholes_need_checking flag.
960             checkBlackHoles(cap);
961         }
962         RELEASE_LOCK(&sched_mutex);
963     }
964 }
965
966 /* ----------------------------------------------------------------------------
967  * Detect deadlock conditions and attempt to resolve them.
968  * ------------------------------------------------------------------------- */
969
970 static void
971 scheduleDetectDeadlock (Capability *cap, Task *task)
972 {
973
974 #if defined(PARALLEL_HASKELL)
975     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
976     return;
977 #endif
978
979     /* 
980      * Detect deadlock: when we have no threads to run, there are no
981      * threads blocked, waiting for I/O, or sleeping, and all the
982      * other tasks are waiting for work, we must have a deadlock of
983      * some description.
984      */
985     if ( emptyThreadQueues(cap) )
986     {
987 #if defined(THREADED_RTS)
988         /* 
989          * In the threaded RTS, we only check for deadlock if there
990          * has been no activity in a complete timeslice.  This means
991          * we won't eagerly start a full GC just because we don't have
992          * any threads to run currently.
993          */
994         if (recent_activity != ACTIVITY_INACTIVE) return;
995 #endif
996
997         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
998
999         // Garbage collection can release some new threads due to
1000         // either (a) finalizers or (b) threads resurrected because
1001         // they are unreachable and will therefore be sent an
1002         // exception.  Any threads thus released will be immediately
1003         // runnable.
1004         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
1005
1006         recent_activity = ACTIVITY_DONE_GC;
1007         // disable timer signals (see #1623)
1008         stopTimer();
1009         
1010         if ( !emptyRunQueue(cap) ) return;
1011
1012 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1013         /* If we have user-installed signal handlers, then wait
1014          * for signals to arrive rather then bombing out with a
1015          * deadlock.
1016          */
1017         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1018             debugTrace(DEBUG_sched,
1019                        "still deadlocked, waiting for signals...");
1020
1021             awaitUserSignals();
1022
1023             if (signals_pending()) {
1024                 startSignalHandlers(cap);
1025             }
1026
1027             // either we have threads to run, or we were interrupted:
1028             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1029
1030             return;
1031         }
1032 #endif
1033
1034 #if !defined(THREADED_RTS)
1035         /* Probably a real deadlock.  Send the current main thread the
1036          * Deadlock exception.
1037          */
1038         if (task->tso) {
1039             switch (task->tso->why_blocked) {
1040             case BlockedOnSTM:
1041             case BlockedOnBlackHole:
1042             case BlockedOnException:
1043             case BlockedOnMVar:
1044                 throwToSingleThreaded(cap, task->tso, 
1045                                       (StgClosure *)nonTermination_closure);
1046                 return;
1047             default:
1048                 barf("deadlock: main thread blocked in a strange way");
1049             }
1050         }
1051         return;
1052 #endif
1053     }
1054 }
1055
1056
1057 /* ----------------------------------------------------------------------------
1058  * Send pending messages (PARALLEL_HASKELL only)
1059  * ------------------------------------------------------------------------- */
1060
1061 #if defined(PARALLEL_HASKELL)
1062 static void
1063 scheduleSendPendingMessages(void)
1064 {
1065
1066 # if defined(PAR) // global Mem.Mgmt., omit for now
1067     if (PendingFetches != END_BF_QUEUE) {
1068         processFetches();
1069     }
1070 # endif
1071     
1072     if (RtsFlags.ParFlags.BufferTime) {
1073         // if we use message buffering, we must send away all message
1074         // packets which have become too old...
1075         sendOldBuffers(); 
1076     }
1077 }
1078 #endif
1079
1080 /* ----------------------------------------------------------------------------
1081  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1082  * ------------------------------------------------------------------------- */
1083
1084 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1085 static void
1086 scheduleActivateSpark(Capability *cap)
1087 {
1088     if (anySparks())
1089     {
1090         createSparkThread(cap);
1091         debugTrace(DEBUG_sched, "creating a spark thread");
1092     }
1093 }
1094 #endif // PARALLEL_HASKELL || THREADED_RTS
1095
1096 /* ----------------------------------------------------------------------------
1097  * Get work from a remote node (PARALLEL_HASKELL only)
1098  * ------------------------------------------------------------------------- */
1099     
1100 #if defined(PARALLEL_HASKELL)
1101 static rtsBool /* return value used in PARALLEL_HASKELL only */
1102 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1103 {
1104 #if defined(PARALLEL_HASKELL)
1105   rtsBool receivedFinish = rtsFalse;
1106
1107   // idle() , i.e. send all buffers, wait for work
1108   if (RtsFlags.ParFlags.BufferTime) {
1109         IF_PAR_DEBUG(verbose, 
1110                 debugBelch("...send all pending data,"));
1111         {
1112           nat i;
1113           for (i=1; i<=nPEs; i++)
1114             sendImmediately(i); // send all messages away immediately
1115         }
1116   }
1117
1118   /* this would be the place for fishing in GUM... 
1119
1120      if (no-earlier-fish-around) 
1121           sendFish(choosePe());
1122    */
1123
1124   // Eden:just look for incoming messages (blocking receive)
1125   IF_PAR_DEBUG(verbose, 
1126                debugBelch("...wait for incoming messages...\n"));
1127   processMessages(cap, &receivedFinish); // blocking receive...
1128
1129
1130   return receivedFinish;
1131   // reenter scheduling look after having received something
1132
1133 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1134
1135   return rtsFalse; /* return value unused in THREADED_RTS */
1136
1137 #endif /* PARALLEL_HASKELL */
1138 }
1139 #endif // PARALLEL_HASKELL || THREADED_RTS
1140
1141 /* ----------------------------------------------------------------------------
1142  * After running a thread...
1143  * ------------------------------------------------------------------------- */
1144
1145 static void
1146 schedulePostRunThread (Capability *cap, StgTSO *t)
1147 {
1148     // We have to be able to catch transactions that are in an
1149     // infinite loop as a result of seeing an inconsistent view of
1150     // memory, e.g. 
1151     //
1152     //   atomically $ do
1153     //       [a,b] <- mapM readTVar [ta,tb]
1154     //       when (a == b) loop
1155     //
1156     // and a is never equal to b given a consistent view of memory.
1157     //
1158     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1159         if (!stmValidateNestOfTransactions (t -> trec)) {
1160             debugTrace(DEBUG_sched | DEBUG_stm,
1161                        "trec %p found wasting its time", t);
1162             
1163             // strip the stack back to the
1164             // ATOMICALLY_FRAME, aborting the (nested)
1165             // transaction, and saving the stack of any
1166             // partially-evaluated thunks on the heap.
1167             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1168             
1169             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1170         }
1171     }
1172
1173   /* some statistics gathering in the parallel case */
1174 }
1175
1176 /* -----------------------------------------------------------------------------
1177  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1178  * -------------------------------------------------------------------------- */
1179
1180 static rtsBool
1181 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1182 {
1183     // did the task ask for a large block?
1184     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1185         // if so, get one and push it on the front of the nursery.
1186         bdescr *bd;
1187         lnat blocks;
1188         
1189         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1190         
1191         debugTrace(DEBUG_sched,
1192                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1193                    (long)t->id, whatNext_strs[t->what_next], blocks);
1194     
1195         // don't do this if the nursery is (nearly) full, we'll GC first.
1196         if (cap->r.rCurrentNursery->link != NULL ||
1197             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1198                                                // if the nursery has only one block.
1199             
1200             ACQUIRE_SM_LOCK
1201             bd = allocGroup( blocks );
1202             RELEASE_SM_LOCK
1203             cap->r.rNursery->n_blocks += blocks;
1204             
1205             // link the new group into the list
1206             bd->link = cap->r.rCurrentNursery;
1207             bd->u.back = cap->r.rCurrentNursery->u.back;
1208             if (cap->r.rCurrentNursery->u.back != NULL) {
1209                 cap->r.rCurrentNursery->u.back->link = bd;
1210             } else {
1211 #if !defined(THREADED_RTS)
1212                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1213                        g0s0 == cap->r.rNursery);
1214 #endif
1215                 cap->r.rNursery->blocks = bd;
1216             }             
1217             cap->r.rCurrentNursery->u.back = bd;
1218             
1219             // initialise it as a nursery block.  We initialise the
1220             // step, gen_no, and flags field of *every* sub-block in
1221             // this large block, because this is easier than making
1222             // sure that we always find the block head of a large
1223             // block whenever we call Bdescr() (eg. evacuate() and
1224             // isAlive() in the GC would both have to do this, at
1225             // least).
1226             { 
1227                 bdescr *x;
1228                 for (x = bd; x < bd + blocks; x++) {
1229                     x->step = cap->r.rNursery;
1230                     x->gen_no = 0;
1231                     x->flags = 0;
1232                 }
1233             }
1234             
1235             // This assert can be a killer if the app is doing lots
1236             // of large block allocations.
1237             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1238             
1239             // now update the nursery to point to the new block
1240             cap->r.rCurrentNursery = bd;
1241             
1242             // we might be unlucky and have another thread get on the
1243             // run queue before us and steal the large block, but in that
1244             // case the thread will just end up requesting another large
1245             // block.
1246             pushOnRunQueue(cap,t);
1247             return rtsFalse;  /* not actually GC'ing */
1248         }
1249     }
1250     
1251     debugTrace(DEBUG_sched,
1252                "--<< thread %ld (%s) stopped: HeapOverflow",
1253                (long)t->id, whatNext_strs[t->what_next]);
1254
1255     if (cap->context_switch) {
1256         // Sometimes we miss a context switch, e.g. when calling
1257         // primitives in a tight loop, MAYBE_GC() doesn't check the
1258         // context switch flag, and we end up waiting for a GC.
1259         // See #1984, and concurrent/should_run/1984
1260         cap->context_switch = 0;
1261         addToRunQueue(cap,t);
1262     } else {
1263         pushOnRunQueue(cap,t);
1264     }
1265     return rtsTrue;
1266     /* actual GC is done at the end of the while loop in schedule() */
1267 }
1268
1269 /* -----------------------------------------------------------------------------
1270  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1271  * -------------------------------------------------------------------------- */
1272
1273 static void
1274 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1275 {
1276     debugTrace (DEBUG_sched,
1277                 "--<< thread %ld (%s) stopped, StackOverflow", 
1278                 (long)t->id, whatNext_strs[t->what_next]);
1279
1280     /* just adjust the stack for this thread, then pop it back
1281      * on the run queue.
1282      */
1283     { 
1284         /* enlarge the stack */
1285         StgTSO *new_t = threadStackOverflow(cap, t);
1286         
1287         /* The TSO attached to this Task may have moved, so update the
1288          * pointer to it.
1289          */
1290         if (task->tso == t) {
1291             task->tso = new_t;
1292         }
1293         pushOnRunQueue(cap,new_t);
1294     }
1295 }
1296
1297 /* -----------------------------------------------------------------------------
1298  * Handle a thread that returned to the scheduler with ThreadYielding
1299  * -------------------------------------------------------------------------- */
1300
1301 static rtsBool
1302 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1303 {
1304     // Reset the context switch flag.  We don't do this just before
1305     // running the thread, because that would mean we would lose ticks
1306     // during GC, which can lead to unfair scheduling (a thread hogs
1307     // the CPU because the tick always arrives during GC).  This way
1308     // penalises threads that do a lot of allocation, but that seems
1309     // better than the alternative.
1310     cap->context_switch = 0;
1311     
1312     /* put the thread back on the run queue.  Then, if we're ready to
1313      * GC, check whether this is the last task to stop.  If so, wake
1314      * up the GC thread.  getThread will block during a GC until the
1315      * GC is finished.
1316      */
1317 #ifdef DEBUG
1318     if (t->what_next != prev_what_next) {
1319         debugTrace(DEBUG_sched,
1320                    "--<< thread %ld (%s) stopped to switch evaluators", 
1321                    (long)t->id, whatNext_strs[t->what_next]);
1322     } else {
1323         debugTrace(DEBUG_sched,
1324                    "--<< thread %ld (%s) stopped, yielding",
1325                    (long)t->id, whatNext_strs[t->what_next]);
1326     }
1327 #endif
1328     
1329     IF_DEBUG(sanity,
1330              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1331              checkTSO(t));
1332     ASSERT(t->_link == END_TSO_QUEUE);
1333     
1334     // Shortcut if we're just switching evaluators: don't bother
1335     // doing stack squeezing (which can be expensive), just run the
1336     // thread.
1337     if (t->what_next != prev_what_next) {
1338         return rtsTrue;
1339     }
1340
1341     addToRunQueue(cap,t);
1342
1343     return rtsFalse;
1344 }
1345
1346 /* -----------------------------------------------------------------------------
1347  * Handle a thread that returned to the scheduler with ThreadBlocked
1348  * -------------------------------------------------------------------------- */
1349
1350 static void
1351 scheduleHandleThreadBlocked( StgTSO *t
1352 #if !defined(GRAN) && !defined(DEBUG)
1353     STG_UNUSED
1354 #endif
1355     )
1356 {
1357
1358       // We don't need to do anything.  The thread is blocked, and it
1359       // has tidied up its stack and placed itself on whatever queue
1360       // it needs to be on.
1361
1362     // ASSERT(t->why_blocked != NotBlocked);
1363     // Not true: for example,
1364     //    - in THREADED_RTS, the thread may already have been woken
1365     //      up by another Capability.  This actually happens: try
1366     //      conc023 +RTS -N2.
1367     //    - the thread may have woken itself up already, because
1368     //      threadPaused() might have raised a blocked throwTo
1369     //      exception, see maybePerformBlockedException().
1370
1371 #ifdef DEBUG
1372     if (traceClass(DEBUG_sched)) {
1373         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1374                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1375         printThreadBlockage(t);
1376         debugTraceEnd();
1377     }
1378 #endif
1379 }
1380
1381 /* -----------------------------------------------------------------------------
1382  * Handle a thread that returned to the scheduler with ThreadFinished
1383  * -------------------------------------------------------------------------- */
1384
1385 static rtsBool
1386 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1387 {
1388     /* Need to check whether this was a main thread, and if so,
1389      * return with the return value.
1390      *
1391      * We also end up here if the thread kills itself with an
1392      * uncaught exception, see Exception.cmm.
1393      */
1394     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1395                (unsigned long)t->id, whatNext_strs[t->what_next]);
1396
1397       //
1398       // Check whether the thread that just completed was a bound
1399       // thread, and if so return with the result.  
1400       //
1401       // There is an assumption here that all thread completion goes
1402       // through this point; we need to make sure that if a thread
1403       // ends up in the ThreadKilled state, that it stays on the run
1404       // queue so it can be dealt with here.
1405       //
1406
1407       if (t->bound) {
1408
1409           if (t->bound != task) {
1410 #if !defined(THREADED_RTS)
1411               // Must be a bound thread that is not the topmost one.  Leave
1412               // it on the run queue until the stack has unwound to the
1413               // point where we can deal with this.  Leaving it on the run
1414               // queue also ensures that the garbage collector knows about
1415               // this thread and its return value (it gets dropped from the
1416               // step->threads list so there's no other way to find it).
1417               appendToRunQueue(cap,t);
1418               return rtsFalse;
1419 #else
1420               // this cannot happen in the threaded RTS, because a
1421               // bound thread can only be run by the appropriate Task.
1422               barf("finished bound thread that isn't mine");
1423 #endif
1424           }
1425
1426           ASSERT(task->tso == t);
1427
1428           if (t->what_next == ThreadComplete) {
1429               if (task->ret) {
1430                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1431                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1432               }
1433               task->stat = Success;
1434           } else {
1435               if (task->ret) {
1436                   *(task->ret) = NULL;
1437               }
1438               if (sched_state >= SCHED_INTERRUPTING) {
1439                   task->stat = Interrupted;
1440               } else {
1441                   task->stat = Killed;
1442               }
1443           }
1444 #ifdef DEBUG
1445           removeThreadLabel((StgWord)task->tso->id);
1446 #endif
1447           return rtsTrue; // tells schedule() to return
1448       }
1449
1450       return rtsFalse;
1451 }
1452
1453 /* -----------------------------------------------------------------------------
1454  * Perform a heap census
1455  * -------------------------------------------------------------------------- */
1456
1457 static rtsBool
1458 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1459 {
1460     // When we have +RTS -i0 and we're heap profiling, do a census at
1461     // every GC.  This lets us get repeatable runs for debugging.
1462     if (performHeapProfile ||
1463         (RtsFlags.ProfFlags.profileInterval==0 &&
1464          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1465         return rtsTrue;
1466     } else {
1467         return rtsFalse;
1468     }
1469 }
1470
1471 /* -----------------------------------------------------------------------------
1472  * Perform a garbage collection if necessary
1473  * -------------------------------------------------------------------------- */
1474
1475 static Capability *
1476 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1477 {
1478     rtsBool heap_census;
1479 #ifdef THREADED_RTS
1480     /* extern static volatile StgWord waiting_for_gc; 
1481        lives inside capability.c */
1482     rtsBool was_waiting;
1483     nat i;
1484 #endif
1485
1486     if (sched_state == SCHED_SHUTTING_DOWN) {
1487         // The final GC has already been done, and the system is
1488         // shutting down.  We'll probably deadlock if we try to GC
1489         // now.
1490         return cap;
1491     }
1492
1493 #ifdef THREADED_RTS
1494     // In order to GC, there must be no threads running Haskell code.
1495     // Therefore, the GC thread needs to hold *all* the capabilities,
1496     // and release them after the GC has completed.  
1497     //
1498     // This seems to be the simplest way: previous attempts involved
1499     // making all the threads with capabilities give up their
1500     // capabilities and sleep except for the *last* one, which
1501     // actually did the GC.  But it's quite hard to arrange for all
1502     // the other tasks to sleep and stay asleep.
1503     //
1504         
1505     /*  Other capabilities are prevented from running yet more Haskell
1506         threads if waiting_for_gc is set. Tested inside
1507         yieldCapability() and releaseCapability() in Capability.c */
1508
1509     was_waiting = cas(&waiting_for_gc, 0, 1);
1510     if (was_waiting) {
1511         do {
1512             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1513             if (cap) yieldCapability(&cap,task);
1514         } while (waiting_for_gc);
1515         return cap;  // NOTE: task->cap might have changed here
1516     }
1517
1518     setContextSwitches();
1519     for (i=0; i < n_capabilities; i++) {
1520         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1521         if (cap != &capabilities[i]) {
1522             Capability *pcap = &capabilities[i];
1523             // we better hope this task doesn't get migrated to
1524             // another Capability while we're waiting for this one.
1525             // It won't, because load balancing happens while we have
1526             // all the Capabilities, but even so it's a slightly
1527             // unsavoury invariant.
1528             task->cap = pcap;
1529             waitForReturnCapability(&pcap, task);
1530             if (pcap != &capabilities[i]) {
1531                 barf("scheduleDoGC: got the wrong capability");
1532             }
1533         }
1534     }
1535
1536     waiting_for_gc = rtsFalse;
1537 #endif
1538
1539     // so this happens periodically:
1540     if (cap) scheduleCheckBlackHoles(cap);
1541     
1542     IF_DEBUG(scheduler, printAllThreads());
1543
1544     /*
1545      * We now have all the capabilities; if we're in an interrupting
1546      * state, then we should take the opportunity to delete all the
1547      * threads in the system.
1548      */
1549     if (sched_state >= SCHED_INTERRUPTING) {
1550         deleteAllThreads(&capabilities[0]);
1551         sched_state = SCHED_SHUTTING_DOWN;
1552     }
1553     
1554     heap_census = scheduleNeedHeapProfile(rtsTrue);
1555
1556     /* everybody back, start the GC.
1557      * Could do it in this thread, or signal a condition var
1558      * to do it in another thread.  Either way, we need to
1559      * broadcast on gc_pending_cond afterward.
1560      */
1561 #if defined(THREADED_RTS)
1562     debugTrace(DEBUG_sched, "doing GC");
1563 #endif
1564     GarbageCollect(force_major || heap_census);
1565     
1566     if (heap_census) {
1567         debugTrace(DEBUG_sched, "performing heap census");
1568         heapCensus();
1569         performHeapProfile = rtsFalse;
1570     }
1571
1572 #ifdef SPARKBALANCE
1573     /* JB 
1574        Once we are all together... this would be the place to balance all
1575        spark pools. No concurrent stealing or adding of new sparks can
1576        occur. Should be defined in Sparks.c. */
1577     balanceSparkPoolsCaps(n_capabilities, capabilities);
1578 #endif
1579
1580 #if defined(THREADED_RTS)
1581     // release our stash of capabilities.
1582     for (i = 0; i < n_capabilities; i++) {
1583         if (cap != &capabilities[i]) {
1584             task->cap = &capabilities[i];
1585             releaseCapability(&capabilities[i]);
1586         }
1587     }
1588     if (cap) {
1589         task->cap = cap;
1590     } else {
1591         task->cap = NULL;
1592     }
1593 #endif
1594
1595     return cap;
1596 }
1597
1598 /* ---------------------------------------------------------------------------
1599  * Singleton fork(). Do not copy any running threads.
1600  * ------------------------------------------------------------------------- */
1601
1602 pid_t
1603 forkProcess(HsStablePtr *entry
1604 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1605             STG_UNUSED
1606 #endif
1607            )
1608 {
1609 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1610     Task *task;
1611     pid_t pid;
1612     StgTSO* t,*next;
1613     Capability *cap;
1614     nat s;
1615     
1616 #if defined(THREADED_RTS)
1617     if (RtsFlags.ParFlags.nNodes > 1) {
1618         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1619         stg_exit(EXIT_FAILURE);
1620     }
1621 #endif
1622
1623     debugTrace(DEBUG_sched, "forking!");
1624     
1625     // ToDo: for SMP, we should probably acquire *all* the capabilities
1626     cap = rts_lock();
1627     
1628     // no funny business: hold locks while we fork, otherwise if some
1629     // other thread is holding a lock when the fork happens, the data
1630     // structure protected by the lock will forever be in an
1631     // inconsistent state in the child.  See also #1391.
1632     ACQUIRE_LOCK(&sched_mutex);
1633     ACQUIRE_LOCK(&cap->lock);
1634     ACQUIRE_LOCK(&cap->running_task->lock);
1635
1636     pid = fork();
1637     
1638     if (pid) { // parent
1639         
1640         RELEASE_LOCK(&sched_mutex);
1641         RELEASE_LOCK(&cap->lock);
1642         RELEASE_LOCK(&cap->running_task->lock);
1643
1644         // just return the pid
1645         rts_unlock(cap);
1646         return pid;
1647         
1648     } else { // child
1649         
1650 #if defined(THREADED_RTS)
1651         initMutex(&sched_mutex);
1652         initMutex(&cap->lock);
1653         initMutex(&cap->running_task->lock);
1654 #endif
1655
1656         // Now, all OS threads except the thread that forked are
1657         // stopped.  We need to stop all Haskell threads, including
1658         // those involved in foreign calls.  Also we need to delete
1659         // all Tasks, because they correspond to OS threads that are
1660         // now gone.
1661
1662         for (s = 0; s < total_steps; s++) {
1663           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1664             if (t->what_next == ThreadRelocated) {
1665                 next = t->_link;
1666             } else {
1667                 next = t->global_link;
1668                 // don't allow threads to catch the ThreadKilled
1669                 // exception, but we do want to raiseAsync() because these
1670                 // threads may be evaluating thunks that we need later.
1671                 deleteThread_(cap,t);
1672             }
1673           }
1674         }
1675         
1676         // Empty the run queue.  It seems tempting to let all the
1677         // killed threads stay on the run queue as zombies to be
1678         // cleaned up later, but some of them correspond to bound
1679         // threads for which the corresponding Task does not exist.
1680         cap->run_queue_hd = END_TSO_QUEUE;
1681         cap->run_queue_tl = END_TSO_QUEUE;
1682
1683         // Any suspended C-calling Tasks are no more, their OS threads
1684         // don't exist now:
1685         cap->suspended_ccalling_tasks = NULL;
1686
1687         // Empty the threads lists.  Otherwise, the garbage
1688         // collector may attempt to resurrect some of these threads.
1689         for (s = 0; s < total_steps; s++) {
1690             all_steps[s].threads = END_TSO_QUEUE;
1691         }
1692
1693         // Wipe the task list, except the current Task.
1694         ACQUIRE_LOCK(&sched_mutex);
1695         for (task = all_tasks; task != NULL; task=task->all_link) {
1696             if (task != cap->running_task) {
1697 #if defined(THREADED_RTS)
1698                 initMutex(&task->lock); // see #1391
1699 #endif
1700                 discardTask(task);
1701             }
1702         }
1703         RELEASE_LOCK(&sched_mutex);
1704
1705 #if defined(THREADED_RTS)
1706         // Wipe our spare workers list, they no longer exist.  New
1707         // workers will be created if necessary.
1708         cap->spare_workers = NULL;
1709         cap->returning_tasks_hd = NULL;
1710         cap->returning_tasks_tl = NULL;
1711 #endif
1712
1713         // On Unix, all timers are reset in the child, so we need to start
1714         // the timer again.
1715         initTimer();
1716         startTimer();
1717
1718         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1719         rts_checkSchedStatus("forkProcess",cap);
1720         
1721         rts_unlock(cap);
1722         hs_exit();                      // clean up and exit
1723         stg_exit(EXIT_SUCCESS);
1724     }
1725 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1726     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1727     return -1;
1728 #endif
1729 }
1730
1731 /* ---------------------------------------------------------------------------
1732  * Delete all the threads in the system
1733  * ------------------------------------------------------------------------- */
1734    
1735 static void
1736 deleteAllThreads ( Capability *cap )
1737 {
1738     // NOTE: only safe to call if we own all capabilities.
1739
1740     StgTSO* t, *next;
1741     nat s;
1742
1743     debugTrace(DEBUG_sched,"deleting all threads");
1744     for (s = 0; s < total_steps; s++) {
1745       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1746         if (t->what_next == ThreadRelocated) {
1747             next = t->_link;
1748         } else {
1749             next = t->global_link;
1750             deleteThread(cap,t);
1751         }
1752       }
1753     }      
1754
1755     // The run queue now contains a bunch of ThreadKilled threads.  We
1756     // must not throw these away: the main thread(s) will be in there
1757     // somewhere, and the main scheduler loop has to deal with it.
1758     // Also, the run queue is the only thing keeping these threads from
1759     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1760
1761 #if !defined(THREADED_RTS)
1762     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1763     ASSERT(sleeping_queue == END_TSO_QUEUE);
1764 #endif
1765 }
1766
1767 /* -----------------------------------------------------------------------------
1768    Managing the suspended_ccalling_tasks list.
1769    Locks required: sched_mutex
1770    -------------------------------------------------------------------------- */
1771
1772 STATIC_INLINE void
1773 suspendTask (Capability *cap, Task *task)
1774 {
1775     ASSERT(task->next == NULL && task->prev == NULL);
1776     task->next = cap->suspended_ccalling_tasks;
1777     task->prev = NULL;
1778     if (cap->suspended_ccalling_tasks) {
1779         cap->suspended_ccalling_tasks->prev = task;
1780     }
1781     cap->suspended_ccalling_tasks = task;
1782 }
1783
1784 STATIC_INLINE void
1785 recoverSuspendedTask (Capability *cap, Task *task)
1786 {
1787     if (task->prev) {
1788         task->prev->next = task->next;
1789     } else {
1790         ASSERT(cap->suspended_ccalling_tasks == task);
1791         cap->suspended_ccalling_tasks = task->next;
1792     }
1793     if (task->next) {
1794         task->next->prev = task->prev;
1795     }
1796     task->next = task->prev = NULL;
1797 }
1798
1799 /* ---------------------------------------------------------------------------
1800  * Suspending & resuming Haskell threads.
1801  * 
1802  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1803  * its capability before calling the C function.  This allows another
1804  * task to pick up the capability and carry on running Haskell
1805  * threads.  It also means that if the C call blocks, it won't lock
1806  * the whole system.
1807  *
1808  * The Haskell thread making the C call is put to sleep for the
1809  * duration of the call, on the susepended_ccalling_threads queue.  We
1810  * give out a token to the task, which it can use to resume the thread
1811  * on return from the C function.
1812  * ------------------------------------------------------------------------- */
1813    
1814 void *
1815 suspendThread (StgRegTable *reg)
1816 {
1817   Capability *cap;
1818   int saved_errno;
1819   StgTSO *tso;
1820   Task *task;
1821 #if mingw32_HOST_OS
1822   StgWord32 saved_winerror;
1823 #endif
1824
1825   saved_errno = errno;
1826 #if mingw32_HOST_OS
1827   saved_winerror = GetLastError();
1828 #endif
1829
1830   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1831    */
1832   cap = regTableToCapability(reg);
1833
1834   task = cap->running_task;
1835   tso = cap->r.rCurrentTSO;
1836
1837   debugTrace(DEBUG_sched, 
1838              "thread %lu did a safe foreign call", 
1839              (unsigned long)cap->r.rCurrentTSO->id);
1840
1841   // XXX this might not be necessary --SDM
1842   tso->what_next = ThreadRunGHC;
1843
1844   threadPaused(cap,tso);
1845
1846   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1847       tso->why_blocked = BlockedOnCCall;
1848       tso->flags |= TSO_BLOCKEX;
1849       tso->flags &= ~TSO_INTERRUPTIBLE;
1850   } else {
1851       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1852   }
1853
1854   // Hand back capability
1855   task->suspended_tso = tso;
1856
1857   ACQUIRE_LOCK(&cap->lock);
1858
1859   suspendTask(cap,task);
1860   cap->in_haskell = rtsFalse;
1861   releaseCapability_(cap,rtsFalse);
1862   
1863   RELEASE_LOCK(&cap->lock);
1864
1865 #if defined(THREADED_RTS)
1866   /* Preparing to leave the RTS, so ensure there's a native thread/task
1867      waiting to take over.
1868   */
1869   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1870 #endif
1871
1872   errno = saved_errno;
1873 #if mingw32_HOST_OS
1874   SetLastError(saved_winerror);
1875 #endif
1876   return task;
1877 }
1878
1879 StgRegTable *
1880 resumeThread (void *task_)
1881 {
1882     StgTSO *tso;
1883     Capability *cap;
1884     Task *task = task_;
1885     int saved_errno;
1886 #if mingw32_HOST_OS
1887     StgWord32 saved_winerror;
1888 #endif
1889
1890     saved_errno = errno;
1891 #if mingw32_HOST_OS
1892     saved_winerror = GetLastError();
1893 #endif
1894
1895     cap = task->cap;
1896     // Wait for permission to re-enter the RTS with the result.
1897     waitForReturnCapability(&cap,task);
1898     // we might be on a different capability now... but if so, our
1899     // entry on the suspended_ccalling_tasks list will also have been
1900     // migrated.
1901
1902     // Remove the thread from the suspended list
1903     recoverSuspendedTask(cap,task);
1904
1905     tso = task->suspended_tso;
1906     task->suspended_tso = NULL;
1907     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1908     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1909     
1910     if (tso->why_blocked == BlockedOnCCall) {
1911         awakenBlockedExceptionQueue(cap,tso);
1912         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1913     }
1914     
1915     /* Reset blocking status */
1916     tso->why_blocked  = NotBlocked;
1917     
1918     cap->r.rCurrentTSO = tso;
1919     cap->in_haskell = rtsTrue;
1920     errno = saved_errno;
1921 #if mingw32_HOST_OS
1922     SetLastError(saved_winerror);
1923 #endif
1924
1925     /* We might have GC'd, mark the TSO dirty again */
1926     dirty_TSO(cap,tso);
1927
1928     IF_DEBUG(sanity, checkTSO(tso));
1929
1930     return &cap->r;
1931 }
1932
1933 /* ---------------------------------------------------------------------------
1934  * scheduleThread()
1935  *
1936  * scheduleThread puts a thread on the end  of the runnable queue.
1937  * This will usually be done immediately after a thread is created.
1938  * The caller of scheduleThread must create the thread using e.g.
1939  * createThread and push an appropriate closure
1940  * on this thread's stack before the scheduler is invoked.
1941  * ------------------------------------------------------------------------ */
1942
1943 void
1944 scheduleThread(Capability *cap, StgTSO *tso)
1945 {
1946     // The thread goes at the *end* of the run-queue, to avoid possible
1947     // starvation of any threads already on the queue.
1948     appendToRunQueue(cap,tso);
1949 }
1950
1951 void
1952 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1953 {
1954 #if defined(THREADED_RTS)
1955     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1956                               // move this thread from now on.
1957     cpu %= RtsFlags.ParFlags.nNodes;
1958     if (cpu == cap->no) {
1959         appendToRunQueue(cap,tso);
1960     } else {
1961         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1962     }
1963 #else
1964     appendToRunQueue(cap,tso);
1965 #endif
1966 }
1967
1968 Capability *
1969 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1970 {
1971     Task *task;
1972
1973     // We already created/initialised the Task
1974     task = cap->running_task;
1975
1976     // This TSO is now a bound thread; make the Task and TSO
1977     // point to each other.
1978     tso->bound = task;
1979     tso->cap = cap;
1980
1981     task->tso = tso;
1982     task->ret = ret;
1983     task->stat = NoStatus;
1984
1985     appendToRunQueue(cap,tso);
1986
1987     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1988
1989     cap = schedule(cap,task);
1990
1991     ASSERT(task->stat != NoStatus);
1992     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1993
1994     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1995     return cap;
1996 }
1997
1998 /* ----------------------------------------------------------------------------
1999  * Starting Tasks
2000  * ------------------------------------------------------------------------- */
2001
2002 #if defined(THREADED_RTS)
2003 void OSThreadProcAttr
2004 workerStart(Task *task)
2005 {
2006     Capability *cap;
2007
2008     // See startWorkerTask().
2009     ACQUIRE_LOCK(&task->lock);
2010     cap = task->cap;
2011     RELEASE_LOCK(&task->lock);
2012
2013     // set the thread-local pointer to the Task:
2014     taskEnter(task);
2015
2016     // schedule() runs without a lock.
2017     cap = schedule(cap,task);
2018
2019     // On exit from schedule(), we have a Capability, but possibly not
2020     // the same one we started with.
2021
2022     // During shutdown, the requirement is that after all the
2023     // Capabilities are shut down, all workers that are shutting down
2024     // have finished workerTaskStop().  This is why we hold on to
2025     // cap->lock until we've finished workerTaskStop() below.
2026     //
2027     // There may be workers still involved in foreign calls; those
2028     // will just block in waitForReturnCapability() because the
2029     // Capability has been shut down.
2030     //
2031     ACQUIRE_LOCK(&cap->lock);
2032     releaseCapability_(cap,rtsFalse);
2033     workerTaskStop(task);
2034     RELEASE_LOCK(&cap->lock);
2035 }
2036 #endif
2037
2038 /* ---------------------------------------------------------------------------
2039  * initScheduler()
2040  *
2041  * Initialise the scheduler.  This resets all the queues - if the
2042  * queues contained any threads, they'll be garbage collected at the
2043  * next pass.
2044  *
2045  * ------------------------------------------------------------------------ */
2046
2047 void 
2048 initScheduler(void)
2049 {
2050 #if !defined(THREADED_RTS)
2051   blocked_queue_hd  = END_TSO_QUEUE;
2052   blocked_queue_tl  = END_TSO_QUEUE;
2053   sleeping_queue    = END_TSO_QUEUE;
2054 #endif
2055
2056   blackhole_queue   = END_TSO_QUEUE;
2057
2058   sched_state    = SCHED_RUNNING;
2059   recent_activity = ACTIVITY_YES;
2060
2061 #if defined(THREADED_RTS)
2062   /* Initialise the mutex and condition variables used by
2063    * the scheduler. */
2064   initMutex(&sched_mutex);
2065 #endif
2066   
2067   ACQUIRE_LOCK(&sched_mutex);
2068
2069   /* A capability holds the state a native thread needs in
2070    * order to execute STG code. At least one capability is
2071    * floating around (only THREADED_RTS builds have more than one).
2072    */
2073   initCapabilities();
2074
2075   initTaskManager();
2076
2077 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2078   initSparkPools();
2079 #endif
2080
2081 #if defined(THREADED_RTS)
2082   /*
2083    * Eagerly start one worker to run each Capability, except for
2084    * Capability 0.  The idea is that we're probably going to start a
2085    * bound thread on Capability 0 pretty soon, so we don't want a
2086    * worker task hogging it.
2087    */
2088   { 
2089       nat i;
2090       Capability *cap;
2091       for (i = 1; i < n_capabilities; i++) {
2092           cap = &capabilities[i];
2093           ACQUIRE_LOCK(&cap->lock);
2094           startWorkerTask(cap, workerStart);
2095           RELEASE_LOCK(&cap->lock);
2096       }
2097   }
2098 #endif
2099
2100   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2101
2102   RELEASE_LOCK(&sched_mutex);
2103 }
2104
2105 void
2106 exitScheduler(
2107     rtsBool wait_foreign
2108 #if !defined(THREADED_RTS)
2109                          __attribute__((unused))
2110 #endif
2111 )
2112                /* see Capability.c, shutdownCapability() */
2113 {
2114     Task *task = NULL;
2115
2116 #if defined(THREADED_RTS)
2117     ACQUIRE_LOCK(&sched_mutex);
2118     task = newBoundTask();
2119     RELEASE_LOCK(&sched_mutex);
2120 #endif
2121
2122     // If we haven't killed all the threads yet, do it now.
2123     if (sched_state < SCHED_SHUTTING_DOWN) {
2124         sched_state = SCHED_INTERRUPTING;
2125         scheduleDoGC(NULL,task,rtsFalse);    
2126     }
2127     sched_state = SCHED_SHUTTING_DOWN;
2128
2129 #if defined(THREADED_RTS)
2130     { 
2131         nat i;
2132         
2133         for (i = 0; i < n_capabilities; i++) {
2134             shutdownCapability(&capabilities[i], task, wait_foreign);
2135         }
2136         boundTaskExiting(task);
2137     }
2138 #endif
2139 }
2140
2141 void
2142 freeScheduler( void )
2143 {
2144     nat still_running;
2145
2146     ACQUIRE_LOCK(&sched_mutex);
2147     still_running = freeTaskManager();
2148     // We can only free the Capabilities if there are no Tasks still
2149     // running.  We might have a Task about to return from a foreign
2150     // call into waitForReturnCapability(), for example (actually,
2151     // this should be the *only* thing that a still-running Task can
2152     // do at this point, and it will block waiting for the
2153     // Capability).
2154     if (still_running == 0) {
2155         freeCapabilities();
2156         if (n_capabilities != 1) {
2157             stgFree(capabilities);
2158         }
2159     }
2160     RELEASE_LOCK(&sched_mutex);
2161 #if defined(THREADED_RTS)
2162     closeMutex(&sched_mutex);
2163 #endif
2164 }
2165
2166 /* -----------------------------------------------------------------------------
2167    performGC
2168
2169    This is the interface to the garbage collector from Haskell land.
2170    We provide this so that external C code can allocate and garbage
2171    collect when called from Haskell via _ccall_GC.
2172    -------------------------------------------------------------------------- */
2173
2174 static void
2175 performGC_(rtsBool force_major)
2176 {
2177     Task *task;
2178     // We must grab a new Task here, because the existing Task may be
2179     // associated with a particular Capability, and chained onto the 
2180     // suspended_ccalling_tasks queue.
2181     ACQUIRE_LOCK(&sched_mutex);
2182     task = newBoundTask();
2183     RELEASE_LOCK(&sched_mutex);
2184     scheduleDoGC(NULL,task,force_major);
2185     boundTaskExiting(task);
2186 }
2187
2188 void
2189 performGC(void)
2190 {
2191     performGC_(rtsFalse);
2192 }
2193
2194 void
2195 performMajorGC(void)
2196 {
2197     performGC_(rtsTrue);
2198 }
2199
2200 /* -----------------------------------------------------------------------------
2201    Stack overflow
2202
2203    If the thread has reached its maximum stack size, then raise the
2204    StackOverflow exception in the offending thread.  Otherwise
2205    relocate the TSO into a larger chunk of memory and adjust its stack
2206    size appropriately.
2207    -------------------------------------------------------------------------- */
2208
2209 static StgTSO *
2210 threadStackOverflow(Capability *cap, StgTSO *tso)
2211 {
2212   nat new_stack_size, stack_words;
2213   lnat new_tso_size;
2214   StgPtr new_sp;
2215   StgTSO *dest;
2216
2217   IF_DEBUG(sanity,checkTSO(tso));
2218
2219   // don't allow throwTo() to modify the blocked_exceptions queue
2220   // while we are moving the TSO:
2221   lockClosure((StgClosure *)tso);
2222
2223   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2224       // NB. never raise a StackOverflow exception if the thread is
2225       // inside Control.Exceptino.block.  It is impractical to protect
2226       // against stack overflow exceptions, since virtually anything
2227       // can raise one (even 'catch'), so this is the only sensible
2228       // thing to do here.  See bug #767.
2229
2230       debugTrace(DEBUG_gc,
2231                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2232                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2233       IF_DEBUG(gc,
2234                /* If we're debugging, just print out the top of the stack */
2235                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2236                                                 tso->sp+64)));
2237
2238       // Send this thread the StackOverflow exception
2239       unlockTSO(tso);
2240       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2241       return tso;
2242   }
2243
2244   /* Try to double the current stack size.  If that takes us over the
2245    * maximum stack size for this thread, then use the maximum instead
2246    * (that is, unless we're already at or over the max size and we
2247    * can't raise the StackOverflow exception (see above), in which
2248    * case just double the size). Finally round up so the TSO ends up as
2249    * a whole number of blocks.
2250    */
2251   if (tso->stack_size >= tso->max_stack_size) {
2252       new_stack_size = tso->stack_size * 2;
2253   } else { 
2254       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2255   }
2256   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2257                                        TSO_STRUCT_SIZE)/sizeof(W_);
2258   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2259   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2260
2261   debugTrace(DEBUG_sched, 
2262              "increasing stack size from %ld words to %d.",
2263              (long)tso->stack_size, new_stack_size);
2264
2265   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2266   TICK_ALLOC_TSO(new_stack_size,0);
2267
2268   /* copy the TSO block and the old stack into the new area */
2269   memcpy(dest,tso,TSO_STRUCT_SIZE);
2270   stack_words = tso->stack + tso->stack_size - tso->sp;
2271   new_sp = (P_)dest + new_tso_size - stack_words;
2272   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2273
2274   /* relocate the stack pointers... */
2275   dest->sp         = new_sp;
2276   dest->stack_size = new_stack_size;
2277         
2278   /* Mark the old TSO as relocated.  We have to check for relocated
2279    * TSOs in the garbage collector and any primops that deal with TSOs.
2280    *
2281    * It's important to set the sp value to just beyond the end
2282    * of the stack, so we don't attempt to scavenge any part of the
2283    * dead TSO's stack.
2284    */
2285   tso->what_next = ThreadRelocated;
2286   setTSOLink(cap,tso,dest);
2287   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2288   tso->why_blocked = NotBlocked;
2289
2290   IF_PAR_DEBUG(verbose,
2291                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2292                      tso->id, tso, tso->stack_size);
2293                /* If we're debugging, just print out the top of the stack */
2294                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2295                                                 tso->sp+64)));
2296   
2297   unlockTSO(dest);
2298   unlockTSO(tso);
2299
2300   IF_DEBUG(sanity,checkTSO(dest));
2301 #if 0
2302   IF_DEBUG(scheduler,printTSO(dest));
2303 #endif
2304
2305   return dest;
2306 }
2307
2308 static StgTSO *
2309 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2310 {
2311     bdescr *bd, *new_bd;
2312     lnat free_w, tso_size_w;
2313     StgTSO *new_tso;
2314
2315     tso_size_w = tso_sizeW(tso);
2316
2317     if (tso_size_w < MBLOCK_SIZE_W || 
2318         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2319     {
2320         return tso;
2321     }
2322
2323     // don't allow throwTo() to modify the blocked_exceptions queue
2324     // while we are moving the TSO:
2325     lockClosure((StgClosure *)tso);
2326
2327     // this is the number of words we'll free
2328     free_w = round_to_mblocks(tso_size_w/2);
2329
2330     bd = Bdescr((StgPtr)tso);
2331     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2332     bd->free = bd->start + TSO_STRUCT_SIZEW;
2333
2334     new_tso = (StgTSO *)new_bd->start;
2335     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2336     new_tso->stack_size = new_bd->free - new_tso->stack;
2337
2338     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2339                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2340
2341     tso->what_next = ThreadRelocated;
2342     tso->_link = new_tso; // no write barrier reqd: same generation
2343
2344     // The TSO attached to this Task may have moved, so update the
2345     // pointer to it.
2346     if (task->tso == tso) {
2347         task->tso = new_tso;
2348     }
2349
2350     unlockTSO(new_tso);
2351     unlockTSO(tso);
2352
2353     IF_DEBUG(sanity,checkTSO(new_tso));
2354
2355     return new_tso;
2356 }
2357
2358 /* ---------------------------------------------------------------------------
2359    Interrupt execution
2360    - usually called inside a signal handler so it mustn't do anything fancy.   
2361    ------------------------------------------------------------------------ */
2362
2363 void
2364 interruptStgRts(void)
2365 {
2366     sched_state = SCHED_INTERRUPTING;
2367     setContextSwitches();
2368     wakeUpRts();
2369 }
2370
2371 /* -----------------------------------------------------------------------------
2372    Wake up the RTS
2373    
2374    This function causes at least one OS thread to wake up and run the
2375    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2376    an external event has arrived that may need servicing (eg. a
2377    keyboard interrupt).
2378
2379    In the single-threaded RTS we don't do anything here; we only have
2380    one thread anyway, and the event that caused us to want to wake up
2381    will have interrupted any blocking system call in progress anyway.
2382    -------------------------------------------------------------------------- */
2383
2384 void
2385 wakeUpRts(void)
2386 {
2387 #if defined(THREADED_RTS)
2388     // This forces the IO Manager thread to wakeup, which will
2389     // in turn ensure that some OS thread wakes up and runs the
2390     // scheduler loop, which will cause a GC and deadlock check.
2391     ioManagerWakeup();
2392 #endif
2393 }
2394
2395 /* -----------------------------------------------------------------------------
2396  * checkBlackHoles()
2397  *
2398  * Check the blackhole_queue for threads that can be woken up.  We do
2399  * this periodically: before every GC, and whenever the run queue is
2400  * empty.
2401  *
2402  * An elegant solution might be to just wake up all the blocked
2403  * threads with awakenBlockedQueue occasionally: they'll go back to
2404  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2405  * doesn't give us a way to tell whether we've actually managed to
2406  * wake up any threads, so we would be busy-waiting.
2407  *
2408  * -------------------------------------------------------------------------- */
2409
2410 static rtsBool
2411 checkBlackHoles (Capability *cap)
2412 {
2413     StgTSO **prev, *t;
2414     rtsBool any_woke_up = rtsFalse;
2415     StgHalfWord type;
2416
2417     // blackhole_queue is global:
2418     ASSERT_LOCK_HELD(&sched_mutex);
2419
2420     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2421
2422     // ASSUMES: sched_mutex
2423     prev = &blackhole_queue;
2424     t = blackhole_queue;
2425     while (t != END_TSO_QUEUE) {
2426         ASSERT(t->why_blocked == BlockedOnBlackHole);
2427         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2428         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2429             IF_DEBUG(sanity,checkTSO(t));
2430             t = unblockOne(cap, t);
2431             *prev = t;
2432             any_woke_up = rtsTrue;
2433         } else {
2434             prev = &t->_link;
2435             t = t->_link;
2436         }
2437     }
2438
2439     return any_woke_up;
2440 }
2441
2442 /* -----------------------------------------------------------------------------
2443    Deleting threads
2444
2445    This is used for interruption (^C) and forking, and corresponds to
2446    raising an exception but without letting the thread catch the
2447    exception.
2448    -------------------------------------------------------------------------- */
2449
2450 static void 
2451 deleteThread (Capability *cap, StgTSO *tso)
2452 {
2453     // NOTE: must only be called on a TSO that we have exclusive
2454     // access to, because we will call throwToSingleThreaded() below.
2455     // The TSO must be on the run queue of the Capability we own, or 
2456     // we must own all Capabilities.
2457
2458     if (tso->why_blocked != BlockedOnCCall &&
2459         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2460         throwToSingleThreaded(cap,tso,NULL);
2461     }
2462 }
2463
2464 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2465 static void 
2466 deleteThread_(Capability *cap, StgTSO *tso)
2467 { // for forkProcess only:
2468   // like deleteThread(), but we delete threads in foreign calls, too.
2469
2470     if (tso->why_blocked == BlockedOnCCall ||
2471         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2472         unblockOne(cap,tso);
2473         tso->what_next = ThreadKilled;
2474     } else {
2475         deleteThread(cap,tso);
2476     }
2477 }
2478 #endif
2479
2480 /* -----------------------------------------------------------------------------
2481    raiseExceptionHelper
2482    
2483    This function is called by the raise# primitve, just so that we can
2484    move some of the tricky bits of raising an exception from C-- into
2485    C.  Who knows, it might be a useful re-useable thing here too.
2486    -------------------------------------------------------------------------- */
2487
2488 StgWord
2489 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2490 {
2491     Capability *cap = regTableToCapability(reg);
2492     StgThunk *raise_closure = NULL;
2493     StgPtr p, next;
2494     StgRetInfoTable *info;
2495     //
2496     // This closure represents the expression 'raise# E' where E
2497     // is the exception raise.  It is used to overwrite all the
2498     // thunks which are currently under evaluataion.
2499     //
2500
2501     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2502     // LDV profiling: stg_raise_info has THUNK as its closure
2503     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2504     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2505     // 1 does not cause any problem unless profiling is performed.
2506     // However, when LDV profiling goes on, we need to linearly scan
2507     // small object pool, where raise_closure is stored, so we should
2508     // use MIN_UPD_SIZE.
2509     //
2510     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2511     //                                 sizeofW(StgClosure)+1);
2512     //
2513
2514     //
2515     // Walk up the stack, looking for the catch frame.  On the way,
2516     // we update any closures pointed to from update frames with the
2517     // raise closure that we just built.
2518     //
2519     p = tso->sp;
2520     while(1) {
2521         info = get_ret_itbl((StgClosure *)p);
2522         next = p + stack_frame_sizeW((StgClosure *)p);
2523         switch (info->i.type) {
2524             
2525         case UPDATE_FRAME:
2526             // Only create raise_closure if we need to.
2527             if (raise_closure == NULL) {
2528                 raise_closure = 
2529                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2530                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2531                 raise_closure->payload[0] = exception;
2532             }
2533             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2534             p = next;
2535             continue;
2536
2537         case ATOMICALLY_FRAME:
2538             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2539             tso->sp = p;
2540             return ATOMICALLY_FRAME;
2541             
2542         case CATCH_FRAME:
2543             tso->sp = p;
2544             return CATCH_FRAME;
2545
2546         case CATCH_STM_FRAME:
2547             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2548             tso->sp = p;
2549             return CATCH_STM_FRAME;
2550             
2551         case STOP_FRAME:
2552             tso->sp = p;
2553             return STOP_FRAME;
2554
2555         case CATCH_RETRY_FRAME:
2556         default:
2557             p = next; 
2558             continue;
2559         }
2560     }
2561 }
2562
2563
2564 /* -----------------------------------------------------------------------------
2565    findRetryFrameHelper
2566
2567    This function is called by the retry# primitive.  It traverses the stack
2568    leaving tso->sp referring to the frame which should handle the retry.  
2569
2570    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2571    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2572
2573    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2574    create) because retries are not considered to be exceptions, despite the
2575    similar implementation.
2576
2577    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2578    not be created within memory transactions.
2579    -------------------------------------------------------------------------- */
2580
2581 StgWord
2582 findRetryFrameHelper (StgTSO *tso)
2583 {
2584   StgPtr           p, next;
2585   StgRetInfoTable *info;
2586
2587   p = tso -> sp;
2588   while (1) {
2589     info = get_ret_itbl((StgClosure *)p);
2590     next = p + stack_frame_sizeW((StgClosure *)p);
2591     switch (info->i.type) {
2592       
2593     case ATOMICALLY_FRAME:
2594         debugTrace(DEBUG_stm,
2595                    "found ATOMICALLY_FRAME at %p during retry", p);
2596         tso->sp = p;
2597         return ATOMICALLY_FRAME;
2598       
2599     case CATCH_RETRY_FRAME:
2600         debugTrace(DEBUG_stm,
2601                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2602         tso->sp = p;
2603         return CATCH_RETRY_FRAME;
2604       
2605     case CATCH_STM_FRAME: {
2606         StgTRecHeader *trec = tso -> trec;
2607         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2608         debugTrace(DEBUG_stm,
2609                    "found CATCH_STM_FRAME at %p during retry", p);
2610         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2611         stmAbortTransaction(tso -> cap, trec);
2612         stmFreeAbortedTRec(tso -> cap, trec);
2613         tso -> trec = outer;
2614         p = next; 
2615         continue;
2616     }
2617       
2618
2619     default:
2620       ASSERT(info->i.type != CATCH_FRAME);
2621       ASSERT(info->i.type != STOP_FRAME);
2622       p = next; 
2623       continue;
2624     }
2625   }
2626 }
2627
2628 /* -----------------------------------------------------------------------------
2629    resurrectThreads is called after garbage collection on the list of
2630    threads found to be garbage.  Each of these threads will be woken
2631    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2632    on an MVar, or NonTermination if the thread was blocked on a Black
2633    Hole.
2634
2635    Locks: assumes we hold *all* the capabilities.
2636    -------------------------------------------------------------------------- */
2637
2638 void
2639 resurrectThreads (StgTSO *threads)
2640 {
2641     StgTSO *tso, *next;
2642     Capability *cap;
2643     step *step;
2644
2645     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2646         next = tso->global_link;
2647
2648         step = Bdescr((P_)tso)->step;
2649         tso->global_link = step->threads;
2650         step->threads = tso;
2651
2652         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2653         
2654         // Wake up the thread on the Capability it was last on
2655         cap = tso->cap;
2656         
2657         switch (tso->why_blocked) {
2658         case BlockedOnMVar:
2659         case BlockedOnException:
2660             /* Called by GC - sched_mutex lock is currently held. */
2661             throwToSingleThreaded(cap, tso,
2662                                   (StgClosure *)blockedOnDeadMVar_closure);
2663             break;
2664         case BlockedOnBlackHole:
2665             throwToSingleThreaded(cap, tso,
2666                                   (StgClosure *)nonTermination_closure);
2667             break;
2668         case BlockedOnSTM:
2669             throwToSingleThreaded(cap, tso,
2670                                   (StgClosure *)blockedIndefinitely_closure);
2671             break;
2672         case NotBlocked:
2673             /* This might happen if the thread was blocked on a black hole
2674              * belonging to a thread that we've just woken up (raiseAsync
2675              * can wake up threads, remember...).
2676              */
2677             continue;
2678         default:
2679             barf("resurrectThreads: thread blocked in a strange way");
2680         }
2681     }
2682 }
2683
2684 /* -----------------------------------------------------------------------------
2685    performPendingThrowTos is called after garbage collection, and
2686    passed a list of threads that were found to have pending throwTos
2687    (tso->blocked_exceptions was not empty), and were blocked.
2688    Normally this doesn't happen, because we would deliver the
2689    exception directly if the target thread is blocked, but there are
2690    small windows where it might occur on a multiprocessor (see
2691    throwTo()).
2692
2693    NB. we must be holding all the capabilities at this point, just
2694    like resurrectThreads().
2695    -------------------------------------------------------------------------- */
2696
2697 void
2698 performPendingThrowTos (StgTSO *threads)
2699 {
2700     StgTSO *tso, *next;
2701     Capability *cap;
2702     step *step;
2703
2704     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2705         next = tso->global_link;
2706
2707         step = Bdescr((P_)tso)->step;
2708         tso->global_link = step->threads;
2709         step->threads = tso;
2710
2711         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2712         
2713         cap = tso->cap;
2714         maybePerformBlockedException(cap, tso);
2715     }
2716 }