Fix #2592: do an orderly shutdown when the heap is exhausted
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #include "GC.h"
35
36 /* PARALLEL_HASKELL includes go here */
37
38 #include "Sparks.h"
39 #include "Capability.h"
40 #include "Task.h"
41 #include "AwaitEvent.h"
42 #if defined(mingw32_HOST_OS)
43 #include "win32/IOManager.h"
44 #endif
45 #include "Trace.h"
46 #include "RaiseAsync.h"
47 #include "Threads.h"
48 #include "ThrIOManager.h"
49
50 #ifdef HAVE_SYS_TYPES_H
51 #include <sys/types.h>
52 #endif
53 #ifdef HAVE_UNISTD_H
54 #include <unistd.h>
55 #endif
56
57 #include <string.h>
58 #include <stdlib.h>
59 #include <stdarg.h>
60
61 #ifdef HAVE_ERRNO_H
62 #include <errno.h>
63 #endif
64
65 // Turn off inlining when debugging - it obfuscates things
66 #ifdef DEBUG
67 # undef  STATIC_INLINE
68 # define STATIC_INLINE static
69 #endif
70
71 /* -----------------------------------------------------------------------------
72  * Global variables
73  * -------------------------------------------------------------------------- */
74
75 #if !defined(THREADED_RTS)
76 // Blocked/sleeping thrads
77 StgTSO *blocked_queue_hd = NULL;
78 StgTSO *blocked_queue_tl = NULL;
79 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
80 #endif
81
82 /* Threads blocked on blackholes.
83  * LOCK: sched_mutex+capability, or all capabilities
84  */
85 StgTSO *blackhole_queue = NULL;
86
87 /* The blackhole_queue should be checked for threads to wake up.  See
88  * Schedule.h for more thorough comment.
89  * LOCK: none (doesn't matter if we miss an update)
90  */
91 rtsBool blackholes_need_checking = rtsFalse;
92
93 /* Set to true when the latest garbage collection failed to reclaim
94  * enough space, and the runtime should proceed to shut itself down in
95  * an orderly fashion (emitting profiling info etc.)
96  */
97 rtsBool heap_overflow = rtsFalse;
98
99 /* flag that tracks whether we have done any execution in this time slice.
100  * LOCK: currently none, perhaps we should lock (but needs to be
101  * updated in the fast path of the scheduler).
102  *
103  * NB. must be StgWord, we do xchg() on it.
104  */
105 volatile StgWord recent_activity = ACTIVITY_YES;
106
107 /* if this flag is set as well, give up execution
108  * LOCK: none (changes monotonically)
109  */
110 volatile StgWord sched_state = SCHED_RUNNING;
111
112 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
113  *  exists - earlier gccs apparently didn't.
114  *  -= chak
115  */
116 StgTSO dummy_tso;
117
118 /*
119  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
120  * in an MT setting, needed to signal that a worker thread shouldn't hang around
121  * in the scheduler when it is out of work.
122  */
123 rtsBool shutting_down_scheduler = rtsFalse;
124
125 /*
126  * This mutex protects most of the global scheduler data in
127  * the THREADED_RTS runtime.
128  */
129 #if defined(THREADED_RTS)
130 Mutex sched_mutex;
131 #endif
132
133 #if !defined(mingw32_HOST_OS)
134 #define FORKPROCESS_PRIMOP_SUPPORTED
135 #endif
136
137 /* -----------------------------------------------------------------------------
138  * static function prototypes
139  * -------------------------------------------------------------------------- */
140
141 static Capability *schedule (Capability *initialCapability, Task *task);
142
143 //
144 // These function all encapsulate parts of the scheduler loop, and are
145 // abstracted only to make the structure and control flow of the
146 // scheduler clearer.
147 //
148 static void schedulePreLoop (void);
149 static void scheduleFindWork (Capability *cap);
150 #if defined(THREADED_RTS)
151 static void scheduleYield (Capability **pcap, Task *task);
152 #endif
153 static void scheduleStartSignalHandlers (Capability *cap);
154 static void scheduleCheckBlockedThreads (Capability *cap);
155 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
156 static void scheduleCheckBlackHoles (Capability *cap);
157 static void scheduleDetectDeadlock (Capability *cap, Task *task);
158 static void schedulePushWork(Capability *cap, Task *task);
159 #if defined(PARALLEL_HASKELL)
160 static rtsBool scheduleGetRemoteWork(Capability *cap);
161 static void scheduleSendPendingMessages(void);
162 #endif
163 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
164 static void scheduleActivateSpark(Capability *cap);
165 #endif
166 static void schedulePostRunThread(Capability *cap, StgTSO *t);
167 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
168 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
169                                          StgTSO *t);
170 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
171                                     nat prev_what_next );
172 static void scheduleHandleThreadBlocked( StgTSO *t );
173 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
174                                              StgTSO *t );
175 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
176 static Capability *scheduleDoGC(Capability *cap, Task *task,
177                                 rtsBool force_major);
178
179 static rtsBool checkBlackHoles(Capability *cap);
180
181 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
182 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
183
184 static void deleteThread (Capability *cap, StgTSO *tso);
185 static void deleteAllThreads (Capability *cap);
186
187 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
188 static void deleteThread_(Capability *cap, StgTSO *tso);
189 #endif
190
191 #ifdef DEBUG
192 static char *whatNext_strs[] = {
193   "(unknown)",
194   "ThreadRunGHC",
195   "ThreadInterpret",
196   "ThreadKilled",
197   "ThreadRelocated",
198   "ThreadComplete"
199 };
200 #endif
201
202 /* -----------------------------------------------------------------------------
203  * Putting a thread on the run queue: different scheduling policies
204  * -------------------------------------------------------------------------- */
205
206 STATIC_INLINE void
207 addToRunQueue( Capability *cap, StgTSO *t )
208 {
209 #if defined(PARALLEL_HASKELL)
210     if (RtsFlags.ParFlags.doFairScheduling) { 
211         // this does round-robin scheduling; good for concurrency
212         appendToRunQueue(cap,t);
213     } else {
214         // this does unfair scheduling; good for parallelism
215         pushOnRunQueue(cap,t);
216     }
217 #else
218     // this does round-robin scheduling; good for concurrency
219     appendToRunQueue(cap,t);
220 #endif
221 }
222
223 /* ---------------------------------------------------------------------------
224    Main scheduling loop.
225
226    We use round-robin scheduling, each thread returning to the
227    scheduler loop when one of these conditions is detected:
228
229       * out of heap space
230       * timer expires (thread yields)
231       * thread blocks
232       * thread ends
233       * stack overflow
234
235    GRAN version:
236      In a GranSim setup this loop iterates over the global event queue.
237      This revolves around the global event queue, which determines what 
238      to do next. Therefore, it's more complicated than either the 
239      concurrent or the parallel (GUM) setup.
240   This version has been entirely removed (JB 2008/08).
241
242    GUM version:
243      GUM iterates over incoming messages.
244      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
245      and sends out a fish whenever it has nothing to do; in-between
246      doing the actual reductions (shared code below) it processes the
247      incoming messages and deals with delayed operations 
248      (see PendingFetches).
249      This is not the ugliest code you could imagine, but it's bloody close.
250
251   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
252   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
253   as well as future GUM versions. This file has been refurbished to
254   only contain valid code, which is however incomplete, refers to
255   invalid includes etc.
256
257    ------------------------------------------------------------------------ */
258
259 static Capability *
260 schedule (Capability *initialCapability, Task *task)
261 {
262   StgTSO *t;
263   Capability *cap;
264   StgThreadReturnCode ret;
265 #if defined(PARALLEL_HASKELL)
266   rtsBool receivedFinish = rtsFalse;
267 #endif
268   nat prev_what_next;
269   rtsBool ready_to_gc;
270 #if defined(THREADED_RTS)
271   rtsBool first = rtsTrue;
272 #endif
273   
274   cap = initialCapability;
275
276   // Pre-condition: this task owns initialCapability.
277   // The sched_mutex is *NOT* held
278   // NB. on return, we still hold a capability.
279
280   debugTrace (DEBUG_sched, 
281               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
282               task, initialCapability);
283
284   schedulePreLoop();
285
286   // -----------------------------------------------------------
287   // Scheduler loop starts here:
288
289 #if defined(PARALLEL_HASKELL)
290 #define TERMINATION_CONDITION        (!receivedFinish)
291 #else
292 #define TERMINATION_CONDITION        rtsTrue
293 #endif
294
295   while (TERMINATION_CONDITION) {
296
297     // Check whether we have re-entered the RTS from Haskell without
298     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
299     // call).
300     if (cap->in_haskell) {
301           errorBelch("schedule: re-entered unsafely.\n"
302                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
303           stg_exit(EXIT_FAILURE);
304     }
305
306     // The interruption / shutdown sequence.
307     // 
308     // In order to cleanly shut down the runtime, we want to:
309     //   * make sure that all main threads return to their callers
310     //     with the state 'Interrupted'.
311     //   * clean up all OS threads assocated with the runtime
312     //   * free all memory etc.
313     //
314     // So the sequence for ^C goes like this:
315     //
316     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
317     //     arranges for some Capability to wake up
318     //
319     //   * all threads in the system are halted, and the zombies are
320     //     placed on the run queue for cleaning up.  We acquire all
321     //     the capabilities in order to delete the threads, this is
322     //     done by scheduleDoGC() for convenience (because GC already
323     //     needs to acquire all the capabilities).  We can't kill
324     //     threads involved in foreign calls.
325     // 
326     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
327     //
328     //   * sched_state := SCHED_SHUTTING_DOWN
329     //
330     //   * all workers exit when the run queue on their capability
331     //     drains.  All main threads will also exit when their TSO
332     //     reaches the head of the run queue and they can return.
333     //
334     //   * eventually all Capabilities will shut down, and the RTS can
335     //     exit.
336     //
337     //   * We might be left with threads blocked in foreign calls, 
338     //     we should really attempt to kill these somehow (TODO);
339     
340     switch (sched_state) {
341     case SCHED_RUNNING:
342         break;
343     case SCHED_INTERRUPTING:
344         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
345 #if defined(THREADED_RTS)
346         discardSparksCap(cap);
347 #endif
348         /* scheduleDoGC() deletes all the threads */
349         cap = scheduleDoGC(cap,task,rtsFalse);
350
351         // after scheduleDoGC(), we must be shutting down.  Either some
352         // other Capability did the final GC, or we did it above,
353         // either way we can fall through to the SCHED_SHUTTING_DOWN
354         // case now.
355         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
356         // fall through
357
358     case SCHED_SHUTTING_DOWN:
359         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
360         // If we are a worker, just exit.  If we're a bound thread
361         // then we will exit below when we've removed our TSO from
362         // the run queue.
363         if (task->tso == NULL && emptyRunQueue(cap)) {
364             return cap;
365         }
366         break;
367     default:
368         barf("sched_state: %d", sched_state);
369     }
370
371     scheduleFindWork(cap);
372
373     /* work pushing, currently relevant only for THREADED_RTS:
374        (pushes threads, wakes up idle capabilities for stealing) */
375     schedulePushWork(cap,task);
376
377 #if defined(PARALLEL_HASKELL)
378     /* since we perform a blocking receive and continue otherwise,
379        either we never reach here or we definitely have work! */
380     // from here: non-empty run queue
381     ASSERT(!emptyRunQueue(cap));
382
383     if (PacketsWaiting()) {  /* now process incoming messages, if any
384                                 pending...  
385
386                                 CAUTION: scheduleGetRemoteWork called
387                                 above, waits for messages as well! */
388       processMessages(cap, &receivedFinish);
389     }
390 #endif // PARALLEL_HASKELL: non-empty run queue!
391
392     scheduleDetectDeadlock(cap,task);
393
394 #if defined(THREADED_RTS)
395     cap = task->cap;    // reload cap, it might have changed
396 #endif
397
398     // Normally, the only way we can get here with no threads to
399     // run is if a keyboard interrupt received during 
400     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
401     // Additionally, it is not fatal for the
402     // threaded RTS to reach here with no threads to run.
403     //
404     // win32: might be here due to awaitEvent() being abandoned
405     // as a result of a console event having been delivered.
406     
407 #if defined(THREADED_RTS)
408     if (first) 
409     {
410     // XXX: ToDo
411     //     // don't yield the first time, we want a chance to run this
412     //     // thread for a bit, even if there are others banging at the
413     //     // door.
414     //     first = rtsFalse;
415     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
416     }
417
418   yield:
419     scheduleYield(&cap,task);
420     if (emptyRunQueue(cap)) continue; // look for work again
421 #endif
422
423 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
424     if ( emptyRunQueue(cap) ) {
425         ASSERT(sched_state >= SCHED_INTERRUPTING);
426     }
427 #endif
428
429     // 
430     // Get a thread to run
431     //
432     t = popRunQueue(cap);
433
434     // Sanity check the thread we're about to run.  This can be
435     // expensive if there is lots of thread switching going on...
436     IF_DEBUG(sanity,checkTSO(t));
437
438 #if defined(THREADED_RTS)
439     // Check whether we can run this thread in the current task.
440     // If not, we have to pass our capability to the right task.
441     {
442         Task *bound = t->bound;
443       
444         if (bound) {
445             if (bound == task) {
446                 debugTrace(DEBUG_sched,
447                            "### Running thread %lu in bound thread", (unsigned long)t->id);
448                 // yes, the Haskell thread is bound to the current native thread
449             } else {
450                 debugTrace(DEBUG_sched,
451                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
452                 // no, bound to a different Haskell thread: pass to that thread
453                 pushOnRunQueue(cap,t);
454                 continue;
455             }
456         } else {
457             // The thread we want to run is unbound.
458             if (task->tso) { 
459                 debugTrace(DEBUG_sched,
460                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
461                 // no, the current native thread is bound to a different
462                 // Haskell thread, so pass it to any worker thread
463                 pushOnRunQueue(cap,t);
464                 continue; 
465             }
466         }
467     }
468 #endif
469
470     // If we're shutting down, and this thread has not yet been
471     // killed, kill it now.  This sometimes happens when a finalizer
472     // thread is created by the final GC, or a thread previously
473     // in a foreign call returns.
474     if (sched_state >= SCHED_INTERRUPTING &&
475         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
476         deleteThread(cap,t);
477     }
478
479     /* context switches are initiated by the timer signal, unless
480      * the user specified "context switch as often as possible", with
481      * +RTS -C0
482      */
483     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
484         && !emptyThreadQueues(cap)) {
485         cap->context_switch = 1;
486     }
487          
488 run_thread:
489
490     // CurrentTSO is the thread to run.  t might be different if we
491     // loop back to run_thread, so make sure to set CurrentTSO after
492     // that.
493     cap->r.rCurrentTSO = t;
494
495     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
496                               (long)t->id, whatNext_strs[t->what_next]);
497
498     startHeapProfTimer();
499
500     // Check for exceptions blocked on this thread
501     maybePerformBlockedException (cap, t);
502
503     // ----------------------------------------------------------------------
504     // Run the current thread 
505
506     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
507     ASSERT(t->cap == cap);
508     ASSERT(t->bound ? t->bound->cap == cap : 1);
509
510     prev_what_next = t->what_next;
511
512     errno = t->saved_errno;
513 #if mingw32_HOST_OS
514     SetLastError(t->saved_winerror);
515 #endif
516
517     cap->in_haskell = rtsTrue;
518
519     dirty_TSO(cap,t);
520
521 #if defined(THREADED_RTS)
522     if (recent_activity == ACTIVITY_DONE_GC) {
523         // ACTIVITY_DONE_GC means we turned off the timer signal to
524         // conserve power (see #1623).  Re-enable it here.
525         nat prev;
526         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
527         if (prev == ACTIVITY_DONE_GC) {
528             startTimer();
529         }
530     } else {
531         recent_activity = ACTIVITY_YES;
532     }
533 #endif
534
535     switch (prev_what_next) {
536         
537     case ThreadKilled:
538     case ThreadComplete:
539         /* Thread already finished, return to scheduler. */
540         ret = ThreadFinished;
541         break;
542         
543     case ThreadRunGHC:
544     {
545         StgRegTable *r;
546         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
547         cap = regTableToCapability(r);
548         ret = r->rRet;
549         break;
550     }
551     
552     case ThreadInterpret:
553         cap = interpretBCO(cap);
554         ret = cap->r.rRet;
555         break;
556         
557     default:
558         barf("schedule: invalid what_next field");
559     }
560
561     cap->in_haskell = rtsFalse;
562
563     // The TSO might have moved, eg. if it re-entered the RTS and a GC
564     // happened.  So find the new location:
565     t = cap->r.rCurrentTSO;
566
567     // We have run some Haskell code: there might be blackhole-blocked
568     // threads to wake up now.
569     // Lock-free test here should be ok, we're just setting a flag.
570     if ( blackhole_queue != END_TSO_QUEUE ) {
571         blackholes_need_checking = rtsTrue;
572     }
573     
574     // And save the current errno in this thread.
575     // XXX: possibly bogus for SMP because this thread might already
576     // be running again, see code below.
577     t->saved_errno = errno;
578 #if mingw32_HOST_OS
579     // Similarly for Windows error code
580     t->saved_winerror = GetLastError();
581 #endif
582
583 #if defined(THREADED_RTS)
584     // If ret is ThreadBlocked, and this Task is bound to the TSO that
585     // blocked, we are in limbo - the TSO is now owned by whatever it
586     // is blocked on, and may in fact already have been woken up,
587     // perhaps even on a different Capability.  It may be the case
588     // that task->cap != cap.  We better yield this Capability
589     // immediately and return to normaility.
590     if (ret == ThreadBlocked) {
591         debugTrace(DEBUG_sched,
592                    "--<< thread %lu (%s) stopped: blocked",
593                    (unsigned long)t->id, whatNext_strs[t->what_next]);
594         goto yield;
595     }
596 #endif
597
598     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
599     ASSERT(t->cap == cap);
600
601     // ----------------------------------------------------------------------
602     
603     // Costs for the scheduler are assigned to CCS_SYSTEM
604     stopHeapProfTimer();
605 #if defined(PROFILING)
606     CCCS = CCS_SYSTEM;
607 #endif
608     
609     schedulePostRunThread(cap,t);
610
611     t = threadStackUnderflow(task,t);
612
613     ready_to_gc = rtsFalse;
614
615     switch (ret) {
616     case HeapOverflow:
617         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
618         break;
619
620     case StackOverflow:
621         scheduleHandleStackOverflow(cap,task,t);
622         break;
623
624     case ThreadYielding:
625         if (scheduleHandleYield(cap, t, prev_what_next)) {
626             // shortcut for switching between compiler/interpreter:
627             goto run_thread; 
628         }
629         break;
630
631     case ThreadBlocked:
632         scheduleHandleThreadBlocked(t);
633         break;
634
635     case ThreadFinished:
636         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
637         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
638         break;
639
640     default:
641       barf("schedule: invalid thread return code %d", (int)ret);
642     }
643
644     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
645       cap = scheduleDoGC(cap,task,rtsFalse);
646     }
647   } /* end of while() */
648 }
649
650 /* ----------------------------------------------------------------------------
651  * Setting up the scheduler loop
652  * ------------------------------------------------------------------------- */
653
654 static void
655 schedulePreLoop(void)
656 {
657   // initialisation for scheduler - what cannot go into initScheduler()  
658 }
659
660 /* -----------------------------------------------------------------------------
661  * scheduleFindWork()
662  *
663  * Search for work to do, and handle messages from elsewhere.
664  * -------------------------------------------------------------------------- */
665
666 static void
667 scheduleFindWork (Capability *cap)
668 {
669     scheduleStartSignalHandlers(cap);
670
671     // Only check the black holes here if we've nothing else to do.
672     // During normal execution, the black hole list only gets checked
673     // at GC time, to avoid repeatedly traversing this possibly long
674     // list each time around the scheduler.
675     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
676
677     scheduleCheckWakeupThreads(cap);
678
679     scheduleCheckBlockedThreads(cap);
680
681 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
682     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
683 #endif
684
685 #if defined(PARALLEL_HASKELL)
686     // if messages have been buffered...
687     scheduleSendPendingMessages();
688 #endif
689
690 #if defined(PARALLEL_HASKELL)
691     if (emptyRunQueue(cap)) {
692         receivedFinish = scheduleGetRemoteWork(cap);
693         continue; //  a new round, (hopefully) with new work
694         /* 
695            in GUM, this a) sends out a FISH and returns IF no fish is
696                            out already
697                         b) (blocking) awaits and receives messages
698            
699            in Eden, this is only the blocking receive, as b) in GUM.
700         */
701     }
702 #endif
703 }
704
705 #if defined(THREADED_RTS)
706 STATIC_INLINE rtsBool
707 shouldYieldCapability (Capability *cap, Task *task)
708 {
709     // we need to yield this capability to someone else if..
710     //   - another thread is initiating a GC
711     //   - another Task is returning from a foreign call
712     //   - the thread at the head of the run queue cannot be run
713     //     by this Task (it is bound to another Task, or it is unbound
714     //     and this task it bound).
715     return (waiting_for_gc || 
716             cap->returning_tasks_hd != NULL ||
717             (!emptyRunQueue(cap) && (task->tso == NULL
718                                      ? cap->run_queue_hd->bound != NULL
719                                      : cap->run_queue_hd->bound != task)));
720 }
721
722 // This is the single place where a Task goes to sleep.  There are
723 // two reasons it might need to sleep:
724 //    - there are no threads to run
725 //    - we need to yield this Capability to someone else 
726 //      (see shouldYieldCapability())
727 //
728 // Careful: the scheduler loop is quite delicate.  Make sure you run
729 // the tests in testsuite/concurrent (all ways) after modifying this,
730 // and also check the benchmarks in nofib/parallel for regressions.
731
732 static void
733 scheduleYield (Capability **pcap, Task *task)
734 {
735     Capability *cap = *pcap;
736
737     // if we have work, and we don't need to give up the Capability, continue.
738     if (!shouldYieldCapability(cap,task) && 
739         (!emptyRunQueue(cap) ||
740          blackholes_need_checking ||
741          sched_state >= SCHED_INTERRUPTING))
742         return;
743
744     // otherwise yield (sleep), and keep yielding if necessary.
745     do {
746         yieldCapability(&cap,task);
747     } 
748     while (shouldYieldCapability(cap,task));
749
750     // note there may still be no threads on the run queue at this
751     // point, the caller has to check.
752
753     *pcap = cap;
754     return;
755 }
756 #endif
757     
758 /* -----------------------------------------------------------------------------
759  * schedulePushWork()
760  *
761  * Push work to other Capabilities if we have some.
762  * -------------------------------------------------------------------------- */
763
764 static void
765 schedulePushWork(Capability *cap USED_IF_THREADS, 
766                  Task *task      USED_IF_THREADS)
767 {
768   /* following code not for PARALLEL_HASKELL. I kept the call general,
769      future GUM versions might use pushing in a distributed setup */
770 #if defined(THREADED_RTS)
771
772     Capability *free_caps[n_capabilities], *cap0;
773     nat i, n_free_caps;
774
775     // migration can be turned off with +RTS -qg
776     if (!RtsFlags.ParFlags.migrate) return;
777
778     // Check whether we have more threads on our run queue, or sparks
779     // in our pool, that we could hand to another Capability.
780     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
781         && sparkPoolSizeCap(cap) < 2) {
782         return;
783     }
784
785     // First grab as many free Capabilities as we can.
786     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
787         cap0 = &capabilities[i];
788         if (cap != cap0 && tryGrabCapability(cap0,task)) {
789             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
790                 // it already has some work, we just grabbed it at 
791                 // the wrong moment.  Or maybe it's deadlocked!
792                 releaseCapability(cap0);
793             } else {
794                 free_caps[n_free_caps++] = cap0;
795             }
796         }
797     }
798
799     // we now have n_free_caps free capabilities stashed in
800     // free_caps[].  Share our run queue equally with them.  This is
801     // probably the simplest thing we could do; improvements we might
802     // want to do include:
803     //
804     //   - giving high priority to moving relatively new threads, on 
805     //     the gournds that they haven't had time to build up a
806     //     working set in the cache on this CPU/Capability.
807     //
808     //   - giving low priority to moving long-lived threads
809
810     if (n_free_caps > 0) {
811         StgTSO *prev, *t, *next;
812         rtsBool pushed_to_all;
813
814         debugTrace(DEBUG_sched, 
815                    "cap %d: %s and %d free capabilities, sharing...", 
816                    cap->no, 
817                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
818                    "excess threads on run queue":"sparks to share (>=2)",
819                    n_free_caps);
820
821         i = 0;
822         pushed_to_all = rtsFalse;
823
824         if (cap->run_queue_hd != END_TSO_QUEUE) {
825             prev = cap->run_queue_hd;
826             t = prev->_link;
827             prev->_link = END_TSO_QUEUE;
828             for (; t != END_TSO_QUEUE; t = next) {
829                 next = t->_link;
830                 t->_link = END_TSO_QUEUE;
831                 if (t->what_next == ThreadRelocated
832                     || t->bound == task // don't move my bound thread
833                     || tsoLocked(t)) {  // don't move a locked thread
834                     setTSOLink(cap, prev, t);
835                     prev = t;
836                 } else if (i == n_free_caps) {
837                     pushed_to_all = rtsTrue;
838                     i = 0;
839                     // keep one for us
840                     setTSOLink(cap, prev, t);
841                     prev = t;
842                 } else {
843                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
844                     appendToRunQueue(free_caps[i],t);
845                     if (t->bound) { t->bound->cap = free_caps[i]; }
846                     t->cap = free_caps[i];
847                     i++;
848                 }
849             }
850             cap->run_queue_tl = prev;
851         }
852
853 #ifdef SPARK_PUSHING
854         /* JB I left this code in place, it would work but is not necessary */
855
856         // If there are some free capabilities that we didn't push any
857         // threads to, then try to push a spark to each one.
858         if (!pushed_to_all) {
859             StgClosure *spark;
860             // i is the next free capability to push to
861             for (; i < n_free_caps; i++) {
862                 if (emptySparkPoolCap(free_caps[i])) {
863                     spark = tryStealSpark(cap->sparks);
864                     if (spark != NULL) {
865                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
866                         newSpark(&(free_caps[i]->r), spark);
867                     }
868                 }
869             }
870         }
871 #endif /* SPARK_PUSHING */
872
873         // release the capabilities
874         for (i = 0; i < n_free_caps; i++) {
875             task->cap = free_caps[i];
876             releaseAndWakeupCapability(free_caps[i]);
877         }
878     }
879     task->cap = cap; // reset to point to our Capability.
880
881 #endif /* THREADED_RTS */
882
883 }
884
885 /* ----------------------------------------------------------------------------
886  * Start any pending signal handlers
887  * ------------------------------------------------------------------------- */
888
889 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
890 static void
891 scheduleStartSignalHandlers(Capability *cap)
892 {
893     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
894         // safe outside the lock
895         startSignalHandlers(cap);
896     }
897 }
898 #else
899 static void
900 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
901 {
902 }
903 #endif
904
905 /* ----------------------------------------------------------------------------
906  * Check for blocked threads that can be woken up.
907  * ------------------------------------------------------------------------- */
908
909 static void
910 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
911 {
912 #if !defined(THREADED_RTS)
913     //
914     // Check whether any waiting threads need to be woken up.  If the
915     // run queue is empty, and there are no other tasks running, we
916     // can wait indefinitely for something to happen.
917     //
918     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
919     {
920         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
921     }
922 #endif
923 }
924
925
926 /* ----------------------------------------------------------------------------
927  * Check for threads woken up by other Capabilities
928  * ------------------------------------------------------------------------- */
929
930 static void
931 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
932 {
933 #if defined(THREADED_RTS)
934     // Any threads that were woken up by other Capabilities get
935     // appended to our run queue.
936     if (!emptyWakeupQueue(cap)) {
937         ACQUIRE_LOCK(&cap->lock);
938         if (emptyRunQueue(cap)) {
939             cap->run_queue_hd = cap->wakeup_queue_hd;
940             cap->run_queue_tl = cap->wakeup_queue_tl;
941         } else {
942             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
943             cap->run_queue_tl = cap->wakeup_queue_tl;
944         }
945         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
946         RELEASE_LOCK(&cap->lock);
947     }
948 #endif
949 }
950
951 /* ----------------------------------------------------------------------------
952  * Check for threads blocked on BLACKHOLEs that can be woken up
953  * ------------------------------------------------------------------------- */
954 static void
955 scheduleCheckBlackHoles (Capability *cap)
956 {
957     if ( blackholes_need_checking ) // check without the lock first
958     {
959         ACQUIRE_LOCK(&sched_mutex);
960         if ( blackholes_need_checking ) {
961             blackholes_need_checking = rtsFalse;
962             // important that we reset the flag *before* checking the
963             // blackhole queue, otherwise we could get deadlock.  This
964             // happens as follows: we wake up a thread that
965             // immediately runs on another Capability, blocks on a
966             // blackhole, and then we reset the blackholes_need_checking flag.
967             checkBlackHoles(cap);
968         }
969         RELEASE_LOCK(&sched_mutex);
970     }
971 }
972
973 /* ----------------------------------------------------------------------------
974  * Detect deadlock conditions and attempt to resolve them.
975  * ------------------------------------------------------------------------- */
976
977 static void
978 scheduleDetectDeadlock (Capability *cap, Task *task)
979 {
980
981 #if defined(PARALLEL_HASKELL)
982     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
983     return;
984 #endif
985
986     /* 
987      * Detect deadlock: when we have no threads to run, there are no
988      * threads blocked, waiting for I/O, or sleeping, and all the
989      * other tasks are waiting for work, we must have a deadlock of
990      * some description.
991      */
992     if ( emptyThreadQueues(cap) )
993     {
994 #if defined(THREADED_RTS)
995         /* 
996          * In the threaded RTS, we only check for deadlock if there
997          * has been no activity in a complete timeslice.  This means
998          * we won't eagerly start a full GC just because we don't have
999          * any threads to run currently.
1000          */
1001         if (recent_activity != ACTIVITY_INACTIVE) return;
1002 #endif
1003
1004         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
1005
1006         // Garbage collection can release some new threads due to
1007         // either (a) finalizers or (b) threads resurrected because
1008         // they are unreachable and will therefore be sent an
1009         // exception.  Any threads thus released will be immediately
1010         // runnable.
1011         cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
1012         // when force_major == rtsTrue. scheduleDoGC sets
1013         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
1014         // signal.
1015
1016         if ( !emptyRunQueue(cap) ) return;
1017
1018 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1019         /* If we have user-installed signal handlers, then wait
1020          * for signals to arrive rather then bombing out with a
1021          * deadlock.
1022          */
1023         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1024             debugTrace(DEBUG_sched,
1025                        "still deadlocked, waiting for signals...");
1026
1027             awaitUserSignals();
1028
1029             if (signals_pending()) {
1030                 startSignalHandlers(cap);
1031             }
1032
1033             // either we have threads to run, or we were interrupted:
1034             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1035
1036             return;
1037         }
1038 #endif
1039
1040 #if !defined(THREADED_RTS)
1041         /* Probably a real deadlock.  Send the current main thread the
1042          * Deadlock exception.
1043          */
1044         if (task->tso) {
1045             switch (task->tso->why_blocked) {
1046             case BlockedOnSTM:
1047             case BlockedOnBlackHole:
1048             case BlockedOnException:
1049             case BlockedOnMVar:
1050                 throwToSingleThreaded(cap, task->tso, 
1051                                       (StgClosure *)nonTermination_closure);
1052                 return;
1053             default:
1054                 barf("deadlock: main thread blocked in a strange way");
1055             }
1056         }
1057         return;
1058 #endif
1059     }
1060 }
1061
1062
1063 /* ----------------------------------------------------------------------------
1064  * Send pending messages (PARALLEL_HASKELL only)
1065  * ------------------------------------------------------------------------- */
1066
1067 #if defined(PARALLEL_HASKELL)
1068 static void
1069 scheduleSendPendingMessages(void)
1070 {
1071
1072 # if defined(PAR) // global Mem.Mgmt., omit for now
1073     if (PendingFetches != END_BF_QUEUE) {
1074         processFetches();
1075     }
1076 # endif
1077     
1078     if (RtsFlags.ParFlags.BufferTime) {
1079         // if we use message buffering, we must send away all message
1080         // packets which have become too old...
1081         sendOldBuffers(); 
1082     }
1083 }
1084 #endif
1085
1086 /* ----------------------------------------------------------------------------
1087  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1088  * ------------------------------------------------------------------------- */
1089
1090 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1091 static void
1092 scheduleActivateSpark(Capability *cap)
1093 {
1094     if (anySparks())
1095     {
1096         createSparkThread(cap);
1097         debugTrace(DEBUG_sched, "creating a spark thread");
1098     }
1099 }
1100 #endif // PARALLEL_HASKELL || THREADED_RTS
1101
1102 /* ----------------------------------------------------------------------------
1103  * Get work from a remote node (PARALLEL_HASKELL only)
1104  * ------------------------------------------------------------------------- */
1105     
1106 #if defined(PARALLEL_HASKELL)
1107 static rtsBool /* return value used in PARALLEL_HASKELL only */
1108 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1109 {
1110 #if defined(PARALLEL_HASKELL)
1111   rtsBool receivedFinish = rtsFalse;
1112
1113   // idle() , i.e. send all buffers, wait for work
1114   if (RtsFlags.ParFlags.BufferTime) {
1115         IF_PAR_DEBUG(verbose, 
1116                 debugBelch("...send all pending data,"));
1117         {
1118           nat i;
1119           for (i=1; i<=nPEs; i++)
1120             sendImmediately(i); // send all messages away immediately
1121         }
1122   }
1123
1124   /* this would be the place for fishing in GUM... 
1125
1126      if (no-earlier-fish-around) 
1127           sendFish(choosePe());
1128    */
1129
1130   // Eden:just look for incoming messages (blocking receive)
1131   IF_PAR_DEBUG(verbose, 
1132                debugBelch("...wait for incoming messages...\n"));
1133   processMessages(cap, &receivedFinish); // blocking receive...
1134
1135
1136   return receivedFinish;
1137   // reenter scheduling look after having received something
1138
1139 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1140
1141   return rtsFalse; /* return value unused in THREADED_RTS */
1142
1143 #endif /* PARALLEL_HASKELL */
1144 }
1145 #endif // PARALLEL_HASKELL || THREADED_RTS
1146
1147 /* ----------------------------------------------------------------------------
1148  * After running a thread...
1149  * ------------------------------------------------------------------------- */
1150
1151 static void
1152 schedulePostRunThread (Capability *cap, StgTSO *t)
1153 {
1154     // We have to be able to catch transactions that are in an
1155     // infinite loop as a result of seeing an inconsistent view of
1156     // memory, e.g. 
1157     //
1158     //   atomically $ do
1159     //       [a,b] <- mapM readTVar [ta,tb]
1160     //       when (a == b) loop
1161     //
1162     // and a is never equal to b given a consistent view of memory.
1163     //
1164     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1165         if (!stmValidateNestOfTransactions (t -> trec)) {
1166             debugTrace(DEBUG_sched | DEBUG_stm,
1167                        "trec %p found wasting its time", t);
1168             
1169             // strip the stack back to the
1170             // ATOMICALLY_FRAME, aborting the (nested)
1171             // transaction, and saving the stack of any
1172             // partially-evaluated thunks on the heap.
1173             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1174             
1175             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1176         }
1177     }
1178
1179   /* some statistics gathering in the parallel case */
1180 }
1181
1182 /* -----------------------------------------------------------------------------
1183  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1184  * -------------------------------------------------------------------------- */
1185
1186 static rtsBool
1187 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1188 {
1189     // did the task ask for a large block?
1190     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1191         // if so, get one and push it on the front of the nursery.
1192         bdescr *bd;
1193         lnat blocks;
1194         
1195         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1196         
1197         debugTrace(DEBUG_sched,
1198                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1199                    (long)t->id, whatNext_strs[t->what_next], blocks);
1200     
1201         // don't do this if the nursery is (nearly) full, we'll GC first.
1202         if (cap->r.rCurrentNursery->link != NULL ||
1203             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1204                                                // if the nursery has only one block.
1205             
1206             ACQUIRE_SM_LOCK
1207             bd = allocGroup( blocks );
1208             RELEASE_SM_LOCK
1209             cap->r.rNursery->n_blocks += blocks;
1210             
1211             // link the new group into the list
1212             bd->link = cap->r.rCurrentNursery;
1213             bd->u.back = cap->r.rCurrentNursery->u.back;
1214             if (cap->r.rCurrentNursery->u.back != NULL) {
1215                 cap->r.rCurrentNursery->u.back->link = bd;
1216             } else {
1217 #if !defined(THREADED_RTS)
1218                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1219                        g0s0 == cap->r.rNursery);
1220 #endif
1221                 cap->r.rNursery->blocks = bd;
1222             }             
1223             cap->r.rCurrentNursery->u.back = bd;
1224             
1225             // initialise it as a nursery block.  We initialise the
1226             // step, gen_no, and flags field of *every* sub-block in
1227             // this large block, because this is easier than making
1228             // sure that we always find the block head of a large
1229             // block whenever we call Bdescr() (eg. evacuate() and
1230             // isAlive() in the GC would both have to do this, at
1231             // least).
1232             { 
1233                 bdescr *x;
1234                 for (x = bd; x < bd + blocks; x++) {
1235                     x->step = cap->r.rNursery;
1236                     x->gen_no = 0;
1237                     x->flags = 0;
1238                 }
1239             }
1240             
1241             // This assert can be a killer if the app is doing lots
1242             // of large block allocations.
1243             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1244             
1245             // now update the nursery to point to the new block
1246             cap->r.rCurrentNursery = bd;
1247             
1248             // we might be unlucky and have another thread get on the
1249             // run queue before us and steal the large block, but in that
1250             // case the thread will just end up requesting another large
1251             // block.
1252             pushOnRunQueue(cap,t);
1253             return rtsFalse;  /* not actually GC'ing */
1254         }
1255     }
1256     
1257     debugTrace(DEBUG_sched,
1258                "--<< thread %ld (%s) stopped: HeapOverflow",
1259                (long)t->id, whatNext_strs[t->what_next]);
1260
1261     if (cap->context_switch) {
1262         // Sometimes we miss a context switch, e.g. when calling
1263         // primitives in a tight loop, MAYBE_GC() doesn't check the
1264         // context switch flag, and we end up waiting for a GC.
1265         // See #1984, and concurrent/should_run/1984
1266         cap->context_switch = 0;
1267         addToRunQueue(cap,t);
1268     } else {
1269         pushOnRunQueue(cap,t);
1270     }
1271     return rtsTrue;
1272     /* actual GC is done at the end of the while loop in schedule() */
1273 }
1274
1275 /* -----------------------------------------------------------------------------
1276  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1277  * -------------------------------------------------------------------------- */
1278
1279 static void
1280 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1281 {
1282     debugTrace (DEBUG_sched,
1283                 "--<< thread %ld (%s) stopped, StackOverflow", 
1284                 (long)t->id, whatNext_strs[t->what_next]);
1285
1286     /* just adjust the stack for this thread, then pop it back
1287      * on the run queue.
1288      */
1289     { 
1290         /* enlarge the stack */
1291         StgTSO *new_t = threadStackOverflow(cap, t);
1292         
1293         /* The TSO attached to this Task may have moved, so update the
1294          * pointer to it.
1295          */
1296         if (task->tso == t) {
1297             task->tso = new_t;
1298         }
1299         pushOnRunQueue(cap,new_t);
1300     }
1301 }
1302
1303 /* -----------------------------------------------------------------------------
1304  * Handle a thread that returned to the scheduler with ThreadYielding
1305  * -------------------------------------------------------------------------- */
1306
1307 static rtsBool
1308 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1309 {
1310     // Reset the context switch flag.  We don't do this just before
1311     // running the thread, because that would mean we would lose ticks
1312     // during GC, which can lead to unfair scheduling (a thread hogs
1313     // the CPU because the tick always arrives during GC).  This way
1314     // penalises threads that do a lot of allocation, but that seems
1315     // better than the alternative.
1316     cap->context_switch = 0;
1317     
1318     /* put the thread back on the run queue.  Then, if we're ready to
1319      * GC, check whether this is the last task to stop.  If so, wake
1320      * up the GC thread.  getThread will block during a GC until the
1321      * GC is finished.
1322      */
1323 #ifdef DEBUG
1324     if (t->what_next != prev_what_next) {
1325         debugTrace(DEBUG_sched,
1326                    "--<< thread %ld (%s) stopped to switch evaluators", 
1327                    (long)t->id, whatNext_strs[t->what_next]);
1328     } else {
1329         debugTrace(DEBUG_sched,
1330                    "--<< thread %ld (%s) stopped, yielding",
1331                    (long)t->id, whatNext_strs[t->what_next]);
1332     }
1333 #endif
1334     
1335     IF_DEBUG(sanity,
1336              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1337              checkTSO(t));
1338     ASSERT(t->_link == END_TSO_QUEUE);
1339     
1340     // Shortcut if we're just switching evaluators: don't bother
1341     // doing stack squeezing (which can be expensive), just run the
1342     // thread.
1343     if (t->what_next != prev_what_next) {
1344         return rtsTrue;
1345     }
1346
1347     addToRunQueue(cap,t);
1348
1349     return rtsFalse;
1350 }
1351
1352 /* -----------------------------------------------------------------------------
1353  * Handle a thread that returned to the scheduler with ThreadBlocked
1354  * -------------------------------------------------------------------------- */
1355
1356 static void
1357 scheduleHandleThreadBlocked( StgTSO *t
1358 #if !defined(GRAN) && !defined(DEBUG)
1359     STG_UNUSED
1360 #endif
1361     )
1362 {
1363
1364       // We don't need to do anything.  The thread is blocked, and it
1365       // has tidied up its stack and placed itself on whatever queue
1366       // it needs to be on.
1367
1368     // ASSERT(t->why_blocked != NotBlocked);
1369     // Not true: for example,
1370     //    - in THREADED_RTS, the thread may already have been woken
1371     //      up by another Capability.  This actually happens: try
1372     //      conc023 +RTS -N2.
1373     //    - the thread may have woken itself up already, because
1374     //      threadPaused() might have raised a blocked throwTo
1375     //      exception, see maybePerformBlockedException().
1376
1377 #ifdef DEBUG
1378     if (traceClass(DEBUG_sched)) {
1379         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1380                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1381         printThreadBlockage(t);
1382         debugTraceEnd();
1383     }
1384 #endif
1385 }
1386
1387 /* -----------------------------------------------------------------------------
1388  * Handle a thread that returned to the scheduler with ThreadFinished
1389  * -------------------------------------------------------------------------- */
1390
1391 static rtsBool
1392 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1393 {
1394     /* Need to check whether this was a main thread, and if so,
1395      * return with the return value.
1396      *
1397      * We also end up here if the thread kills itself with an
1398      * uncaught exception, see Exception.cmm.
1399      */
1400     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1401                (unsigned long)t->id, whatNext_strs[t->what_next]);
1402
1403       //
1404       // Check whether the thread that just completed was a bound
1405       // thread, and if so return with the result.  
1406       //
1407       // There is an assumption here that all thread completion goes
1408       // through this point; we need to make sure that if a thread
1409       // ends up in the ThreadKilled state, that it stays on the run
1410       // queue so it can be dealt with here.
1411       //
1412
1413       if (t->bound) {
1414
1415           if (t->bound != task) {
1416 #if !defined(THREADED_RTS)
1417               // Must be a bound thread that is not the topmost one.  Leave
1418               // it on the run queue until the stack has unwound to the
1419               // point where we can deal with this.  Leaving it on the run
1420               // queue also ensures that the garbage collector knows about
1421               // this thread and its return value (it gets dropped from the
1422               // step->threads list so there's no other way to find it).
1423               appendToRunQueue(cap,t);
1424               return rtsFalse;
1425 #else
1426               // this cannot happen in the threaded RTS, because a
1427               // bound thread can only be run by the appropriate Task.
1428               barf("finished bound thread that isn't mine");
1429 #endif
1430           }
1431
1432           ASSERT(task->tso == t);
1433
1434           if (t->what_next == ThreadComplete) {
1435               if (task->ret) {
1436                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1437                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1438               }
1439               task->stat = Success;
1440           } else {
1441               if (task->ret) {
1442                   *(task->ret) = NULL;
1443               }
1444               if (sched_state >= SCHED_INTERRUPTING) {
1445                   if (heap_overflow) {
1446                       task->stat = HeapExhausted;
1447                   } else {
1448                       task->stat = Interrupted;
1449                   }
1450               } else {
1451                   task->stat = Killed;
1452               }
1453           }
1454 #ifdef DEBUG
1455           removeThreadLabel((StgWord)task->tso->id);
1456 #endif
1457           return rtsTrue; // tells schedule() to return
1458       }
1459
1460       return rtsFalse;
1461 }
1462
1463 /* -----------------------------------------------------------------------------
1464  * Perform a heap census
1465  * -------------------------------------------------------------------------- */
1466
1467 static rtsBool
1468 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1469 {
1470     // When we have +RTS -i0 and we're heap profiling, do a census at
1471     // every GC.  This lets us get repeatable runs for debugging.
1472     if (performHeapProfile ||
1473         (RtsFlags.ProfFlags.profileInterval==0 &&
1474          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1475         return rtsTrue;
1476     } else {
1477         return rtsFalse;
1478     }
1479 }
1480
1481 /* -----------------------------------------------------------------------------
1482  * Perform a garbage collection if necessary
1483  * -------------------------------------------------------------------------- */
1484
1485 static Capability *
1486 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1487 {
1488     rtsBool heap_census;
1489 #ifdef THREADED_RTS
1490     /* extern static volatile StgWord waiting_for_gc; 
1491        lives inside capability.c */
1492     rtsBool gc_type, prev_pending_gc;
1493     nat i;
1494 #endif
1495
1496     if (sched_state == SCHED_SHUTTING_DOWN) {
1497         // The final GC has already been done, and the system is
1498         // shutting down.  We'll probably deadlock if we try to GC
1499         // now.
1500         return cap;
1501     }
1502
1503 #ifdef THREADED_RTS
1504     if (sched_state < SCHED_INTERRUPTING
1505         && RtsFlags.ParFlags.parGcEnabled
1506         && N >= RtsFlags.ParFlags.parGcGen
1507         && ! oldest_gen->steps[0].mark)
1508     {
1509         gc_type = PENDING_GC_PAR;
1510     } else {
1511         gc_type = PENDING_GC_SEQ;
1512     }
1513
1514     // In order to GC, there must be no threads running Haskell code.
1515     // Therefore, the GC thread needs to hold *all* the capabilities,
1516     // and release them after the GC has completed.  
1517     //
1518     // This seems to be the simplest way: previous attempts involved
1519     // making all the threads with capabilities give up their
1520     // capabilities and sleep except for the *last* one, which
1521     // actually did the GC.  But it's quite hard to arrange for all
1522     // the other tasks to sleep and stay asleep.
1523     //
1524
1525     /*  Other capabilities are prevented from running yet more Haskell
1526         threads if waiting_for_gc is set. Tested inside
1527         yieldCapability() and releaseCapability() in Capability.c */
1528
1529     prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1530     if (prev_pending_gc) {
1531         do {
1532             debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
1533                        prev_pending_gc);
1534             ASSERT(cap);
1535             yieldCapability(&cap,task);
1536         } while (waiting_for_gc);
1537         return cap;  // NOTE: task->cap might have changed here
1538     }
1539
1540     setContextSwitches();
1541
1542     // The final shutdown GC is always single-threaded, because it's
1543     // possible that some of the Capabilities have no worker threads.
1544     
1545     if (gc_type == PENDING_GC_SEQ)
1546     {
1547         // single-threaded GC: grab all the capabilities
1548         for (i=0; i < n_capabilities; i++) {
1549             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1550             if (cap != &capabilities[i]) {
1551                 Capability *pcap = &capabilities[i];
1552                 // we better hope this task doesn't get migrated to
1553                 // another Capability while we're waiting for this one.
1554                 // It won't, because load balancing happens while we have
1555                 // all the Capabilities, but even so it's a slightly
1556                 // unsavoury invariant.
1557                 task->cap = pcap;
1558                 waitForReturnCapability(&pcap, task);
1559                 if (pcap != &capabilities[i]) {
1560                     barf("scheduleDoGC: got the wrong capability");
1561                 }
1562             }
1563         }
1564     }
1565     else
1566     {
1567         // multi-threaded GC: make sure all the Capabilities donate one
1568         // GC thread each.
1569         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1570
1571         waitForGcThreads(cap);
1572     }
1573 #endif
1574
1575     // so this happens periodically:
1576     if (cap) scheduleCheckBlackHoles(cap);
1577     
1578     IF_DEBUG(scheduler, printAllThreads());
1579
1580 delete_threads_and_gc:
1581     /*
1582      * We now have all the capabilities; if we're in an interrupting
1583      * state, then we should take the opportunity to delete all the
1584      * threads in the system.
1585      */
1586     if (sched_state == SCHED_INTERRUPTING) {
1587         deleteAllThreads(cap);
1588         sched_state = SCHED_SHUTTING_DOWN;
1589     }
1590     
1591     heap_census = scheduleNeedHeapProfile(rtsTrue);
1592
1593 #if defined(THREADED_RTS)
1594     debugTrace(DEBUG_sched, "doing GC");
1595     // reset waiting_for_gc *before* GC, so that when the GC threads
1596     // emerge they don't immediately re-enter the GC.
1597     waiting_for_gc = 0;
1598     GarbageCollect(force_major || heap_census, gc_type, cap);
1599 #else
1600     GarbageCollect(force_major || heap_census, 0, cap);
1601 #endif
1602
1603     if (heap_census) {
1604         debugTrace(DEBUG_sched, "performing heap census");
1605         heapCensus();
1606         performHeapProfile = rtsFalse;
1607     }
1608
1609     if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1610         // GC set the heap_overflow flag, so we should proceed with
1611         // an orderly shutdown now.  Ultimately we want the main
1612         // thread to return to its caller with HeapExhausted, at which
1613         // point the caller should call hs_exit().  The first step is
1614         // to delete all the threads.
1615         //
1616         // Another way to do this would be to raise an exception in
1617         // the main thread, which we really should do because it gives
1618         // the program a chance to clean up.  But how do we find the
1619         // main thread?  It should presumably be the same one that
1620         // gets ^C exceptions, but that's all done on the Haskell side
1621         // (GHC.TopHandler).
1622         sched_state = SCHED_INTERRUPTING;
1623         goto delete_threads_and_gc;
1624     }
1625
1626 #ifdef SPARKBALANCE
1627     /* JB 
1628        Once we are all together... this would be the place to balance all
1629        spark pools. No concurrent stealing or adding of new sparks can
1630        occur. Should be defined in Sparks.c. */
1631     balanceSparkPoolsCaps(n_capabilities, capabilities);
1632 #endif
1633
1634     if (force_major)
1635     {
1636         // We've just done a major GC and we don't need the timer
1637         // signal turned on any more (#1623).
1638         // NB. do this *before* releasing the Capabilities, to avoid
1639         // deadlocks!
1640         recent_activity = ACTIVITY_DONE_GC;
1641         stopTimer();
1642     }
1643
1644 #if defined(THREADED_RTS)
1645     if (gc_type == PENDING_GC_SEQ) {
1646         // release our stash of capabilities.
1647         for (i = 0; i < n_capabilities; i++) {
1648             if (cap != &capabilities[i]) {
1649                 task->cap = &capabilities[i];
1650                 releaseCapability(&capabilities[i]);
1651             }
1652         }
1653     }
1654     if (cap) {
1655         task->cap = cap;
1656     } else {
1657         task->cap = NULL;
1658     }
1659 #endif
1660
1661     return cap;
1662 }
1663
1664 /* ---------------------------------------------------------------------------
1665  * Singleton fork(). Do not copy any running threads.
1666  * ------------------------------------------------------------------------- */
1667
1668 pid_t
1669 forkProcess(HsStablePtr *entry
1670 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1671             STG_UNUSED
1672 #endif
1673            )
1674 {
1675 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1676     Task *task;
1677     pid_t pid;
1678     StgTSO* t,*next;
1679     Capability *cap;
1680     nat s;
1681     
1682 #if defined(THREADED_RTS)
1683     if (RtsFlags.ParFlags.nNodes > 1) {
1684         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1685         stg_exit(EXIT_FAILURE);
1686     }
1687 #endif
1688
1689     debugTrace(DEBUG_sched, "forking!");
1690     
1691     // ToDo: for SMP, we should probably acquire *all* the capabilities
1692     cap = rts_lock();
1693     
1694     // no funny business: hold locks while we fork, otherwise if some
1695     // other thread is holding a lock when the fork happens, the data
1696     // structure protected by the lock will forever be in an
1697     // inconsistent state in the child.  See also #1391.
1698     ACQUIRE_LOCK(&sched_mutex);
1699     ACQUIRE_LOCK(&cap->lock);
1700     ACQUIRE_LOCK(&cap->running_task->lock);
1701
1702     pid = fork();
1703     
1704     if (pid) { // parent
1705         
1706         RELEASE_LOCK(&sched_mutex);
1707         RELEASE_LOCK(&cap->lock);
1708         RELEASE_LOCK(&cap->running_task->lock);
1709
1710         // just return the pid
1711         rts_unlock(cap);
1712         return pid;
1713         
1714     } else { // child
1715         
1716 #if defined(THREADED_RTS)
1717         initMutex(&sched_mutex);
1718         initMutex(&cap->lock);
1719         initMutex(&cap->running_task->lock);
1720 #endif
1721
1722         // Now, all OS threads except the thread that forked are
1723         // stopped.  We need to stop all Haskell threads, including
1724         // those involved in foreign calls.  Also we need to delete
1725         // all Tasks, because they correspond to OS threads that are
1726         // now gone.
1727
1728         for (s = 0; s < total_steps; s++) {
1729           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1730             if (t->what_next == ThreadRelocated) {
1731                 next = t->_link;
1732             } else {
1733                 next = t->global_link;
1734                 // don't allow threads to catch the ThreadKilled
1735                 // exception, but we do want to raiseAsync() because these
1736                 // threads may be evaluating thunks that we need later.
1737                 deleteThread_(cap,t);
1738             }
1739           }
1740         }
1741         
1742         // Empty the run queue.  It seems tempting to let all the
1743         // killed threads stay on the run queue as zombies to be
1744         // cleaned up later, but some of them correspond to bound
1745         // threads for which the corresponding Task does not exist.
1746         cap->run_queue_hd = END_TSO_QUEUE;
1747         cap->run_queue_tl = END_TSO_QUEUE;
1748
1749         // Any suspended C-calling Tasks are no more, their OS threads
1750         // don't exist now:
1751         cap->suspended_ccalling_tasks = NULL;
1752
1753         // Empty the threads lists.  Otherwise, the garbage
1754         // collector may attempt to resurrect some of these threads.
1755         for (s = 0; s < total_steps; s++) {
1756             all_steps[s].threads = END_TSO_QUEUE;
1757         }
1758
1759         // Wipe the task list, except the current Task.
1760         ACQUIRE_LOCK(&sched_mutex);
1761         for (task = all_tasks; task != NULL; task=task->all_link) {
1762             if (task != cap->running_task) {
1763 #if defined(THREADED_RTS)
1764                 initMutex(&task->lock); // see #1391
1765 #endif
1766                 discardTask(task);
1767             }
1768         }
1769         RELEASE_LOCK(&sched_mutex);
1770
1771 #if defined(THREADED_RTS)
1772         // Wipe our spare workers list, they no longer exist.  New
1773         // workers will be created if necessary.
1774         cap->spare_workers = NULL;
1775         cap->returning_tasks_hd = NULL;
1776         cap->returning_tasks_tl = NULL;
1777 #endif
1778
1779         // On Unix, all timers are reset in the child, so we need to start
1780         // the timer again.
1781         initTimer();
1782         startTimer();
1783
1784         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1785         rts_checkSchedStatus("forkProcess",cap);
1786         
1787         rts_unlock(cap);
1788         hs_exit();                      // clean up and exit
1789         stg_exit(EXIT_SUCCESS);
1790     }
1791 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1792     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1793     return -1;
1794 #endif
1795 }
1796
1797 /* ---------------------------------------------------------------------------
1798  * Delete all the threads in the system
1799  * ------------------------------------------------------------------------- */
1800    
1801 static void
1802 deleteAllThreads ( Capability *cap )
1803 {
1804     // NOTE: only safe to call if we own all capabilities.
1805
1806     StgTSO* t, *next;
1807     nat s;
1808
1809     debugTrace(DEBUG_sched,"deleting all threads");
1810     for (s = 0; s < total_steps; s++) {
1811       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1812         if (t->what_next == ThreadRelocated) {
1813             next = t->_link;
1814         } else {
1815             next = t->global_link;
1816             deleteThread(cap,t);
1817         }
1818       }
1819     }      
1820
1821     // The run queue now contains a bunch of ThreadKilled threads.  We
1822     // must not throw these away: the main thread(s) will be in there
1823     // somewhere, and the main scheduler loop has to deal with it.
1824     // Also, the run queue is the only thing keeping these threads from
1825     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1826
1827 #if !defined(THREADED_RTS)
1828     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1829     ASSERT(sleeping_queue == END_TSO_QUEUE);
1830 #endif
1831 }
1832
1833 /* -----------------------------------------------------------------------------
1834    Managing the suspended_ccalling_tasks list.
1835    Locks required: sched_mutex
1836    -------------------------------------------------------------------------- */
1837
1838 STATIC_INLINE void
1839 suspendTask (Capability *cap, Task *task)
1840 {
1841     ASSERT(task->next == NULL && task->prev == NULL);
1842     task->next = cap->suspended_ccalling_tasks;
1843     task->prev = NULL;
1844     if (cap->suspended_ccalling_tasks) {
1845         cap->suspended_ccalling_tasks->prev = task;
1846     }
1847     cap->suspended_ccalling_tasks = task;
1848 }
1849
1850 STATIC_INLINE void
1851 recoverSuspendedTask (Capability *cap, Task *task)
1852 {
1853     if (task->prev) {
1854         task->prev->next = task->next;
1855     } else {
1856         ASSERT(cap->suspended_ccalling_tasks == task);
1857         cap->suspended_ccalling_tasks = task->next;
1858     }
1859     if (task->next) {
1860         task->next->prev = task->prev;
1861     }
1862     task->next = task->prev = NULL;
1863 }
1864
1865 /* ---------------------------------------------------------------------------
1866  * Suspending & resuming Haskell threads.
1867  * 
1868  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1869  * its capability before calling the C function.  This allows another
1870  * task to pick up the capability and carry on running Haskell
1871  * threads.  It also means that if the C call blocks, it won't lock
1872  * the whole system.
1873  *
1874  * The Haskell thread making the C call is put to sleep for the
1875  * duration of the call, on the susepended_ccalling_threads queue.  We
1876  * give out a token to the task, which it can use to resume the thread
1877  * on return from the C function.
1878  * ------------------------------------------------------------------------- */
1879    
1880 void *
1881 suspendThread (StgRegTable *reg)
1882 {
1883   Capability *cap;
1884   int saved_errno;
1885   StgTSO *tso;
1886   Task *task;
1887 #if mingw32_HOST_OS
1888   StgWord32 saved_winerror;
1889 #endif
1890
1891   saved_errno = errno;
1892 #if mingw32_HOST_OS
1893   saved_winerror = GetLastError();
1894 #endif
1895
1896   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1897    */
1898   cap = regTableToCapability(reg);
1899
1900   task = cap->running_task;
1901   tso = cap->r.rCurrentTSO;
1902
1903   debugTrace(DEBUG_sched, 
1904              "thread %lu did a safe foreign call", 
1905              (unsigned long)cap->r.rCurrentTSO->id);
1906
1907   // XXX this might not be necessary --SDM
1908   tso->what_next = ThreadRunGHC;
1909
1910   threadPaused(cap,tso);
1911
1912   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1913       tso->why_blocked = BlockedOnCCall;
1914       tso->flags |= TSO_BLOCKEX;
1915       tso->flags &= ~TSO_INTERRUPTIBLE;
1916   } else {
1917       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1918   }
1919
1920   // Hand back capability
1921   task->suspended_tso = tso;
1922
1923   ACQUIRE_LOCK(&cap->lock);
1924
1925   suspendTask(cap,task);
1926   cap->in_haskell = rtsFalse;
1927   releaseCapability_(cap,rtsFalse);
1928   
1929   RELEASE_LOCK(&cap->lock);
1930
1931 #if defined(THREADED_RTS)
1932   /* Preparing to leave the RTS, so ensure there's a native thread/task
1933      waiting to take over.
1934   */
1935   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1936 #endif
1937
1938   errno = saved_errno;
1939 #if mingw32_HOST_OS
1940   SetLastError(saved_winerror);
1941 #endif
1942   return task;
1943 }
1944
1945 StgRegTable *
1946 resumeThread (void *task_)
1947 {
1948     StgTSO *tso;
1949     Capability *cap;
1950     Task *task = task_;
1951     int saved_errno;
1952 #if mingw32_HOST_OS
1953     StgWord32 saved_winerror;
1954 #endif
1955
1956     saved_errno = errno;
1957 #if mingw32_HOST_OS
1958     saved_winerror = GetLastError();
1959 #endif
1960
1961     cap = task->cap;
1962     // Wait for permission to re-enter the RTS with the result.
1963     waitForReturnCapability(&cap,task);
1964     // we might be on a different capability now... but if so, our
1965     // entry on the suspended_ccalling_tasks list will also have been
1966     // migrated.
1967
1968     // Remove the thread from the suspended list
1969     recoverSuspendedTask(cap,task);
1970
1971     tso = task->suspended_tso;
1972     task->suspended_tso = NULL;
1973     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1974     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1975     
1976     if (tso->why_blocked == BlockedOnCCall) {
1977         awakenBlockedExceptionQueue(cap,tso);
1978         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1979     }
1980     
1981     /* Reset blocking status */
1982     tso->why_blocked  = NotBlocked;
1983     
1984     cap->r.rCurrentTSO = tso;
1985     cap->in_haskell = rtsTrue;
1986     errno = saved_errno;
1987 #if mingw32_HOST_OS
1988     SetLastError(saved_winerror);
1989 #endif
1990
1991     /* We might have GC'd, mark the TSO dirty again */
1992     dirty_TSO(cap,tso);
1993
1994     IF_DEBUG(sanity, checkTSO(tso));
1995
1996     return &cap->r;
1997 }
1998
1999 /* ---------------------------------------------------------------------------
2000  * scheduleThread()
2001  *
2002  * scheduleThread puts a thread on the end  of the runnable queue.
2003  * This will usually be done immediately after a thread is created.
2004  * The caller of scheduleThread must create the thread using e.g.
2005  * createThread and push an appropriate closure
2006  * on this thread's stack before the scheduler is invoked.
2007  * ------------------------------------------------------------------------ */
2008
2009 void
2010 scheduleThread(Capability *cap, StgTSO *tso)
2011 {
2012     // The thread goes at the *end* of the run-queue, to avoid possible
2013     // starvation of any threads already on the queue.
2014     appendToRunQueue(cap,tso);
2015 }
2016
2017 void
2018 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2019 {
2020 #if defined(THREADED_RTS)
2021     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2022                               // move this thread from now on.
2023     cpu %= RtsFlags.ParFlags.nNodes;
2024     if (cpu == cap->no) {
2025         appendToRunQueue(cap,tso);
2026     } else {
2027         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
2028     }
2029 #else
2030     appendToRunQueue(cap,tso);
2031 #endif
2032 }
2033
2034 Capability *
2035 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2036 {
2037     Task *task;
2038
2039     // We already created/initialised the Task
2040     task = cap->running_task;
2041
2042     // This TSO is now a bound thread; make the Task and TSO
2043     // point to each other.
2044     tso->bound = task;
2045     tso->cap = cap;
2046
2047     task->tso = tso;
2048     task->ret = ret;
2049     task->stat = NoStatus;
2050
2051     appendToRunQueue(cap,tso);
2052
2053     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2054
2055     cap = schedule(cap,task);
2056
2057     ASSERT(task->stat != NoStatus);
2058     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2059
2060     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2061     return cap;
2062 }
2063
2064 /* ----------------------------------------------------------------------------
2065  * Starting Tasks
2066  * ------------------------------------------------------------------------- */
2067
2068 #if defined(THREADED_RTS)
2069 void OSThreadProcAttr
2070 workerStart(Task *task)
2071 {
2072     Capability *cap;
2073
2074     // See startWorkerTask().
2075     ACQUIRE_LOCK(&task->lock);
2076     cap = task->cap;
2077     RELEASE_LOCK(&task->lock);
2078
2079     // set the thread-local pointer to the Task:
2080     taskEnter(task);
2081
2082     // schedule() runs without a lock.
2083     cap = schedule(cap,task);
2084
2085     // On exit from schedule(), we have a Capability, but possibly not
2086     // the same one we started with.
2087
2088     // During shutdown, the requirement is that after all the
2089     // Capabilities are shut down, all workers that are shutting down
2090     // have finished workerTaskStop().  This is why we hold on to
2091     // cap->lock until we've finished workerTaskStop() below.
2092     //
2093     // There may be workers still involved in foreign calls; those
2094     // will just block in waitForReturnCapability() because the
2095     // Capability has been shut down.
2096     //
2097     ACQUIRE_LOCK(&cap->lock);
2098     releaseCapability_(cap,rtsFalse);
2099     workerTaskStop(task);
2100     RELEASE_LOCK(&cap->lock);
2101 }
2102 #endif
2103
2104 /* ---------------------------------------------------------------------------
2105  * initScheduler()
2106  *
2107  * Initialise the scheduler.  This resets all the queues - if the
2108  * queues contained any threads, they'll be garbage collected at the
2109  * next pass.
2110  *
2111  * ------------------------------------------------------------------------ */
2112
2113 void 
2114 initScheduler(void)
2115 {
2116 #if !defined(THREADED_RTS)
2117   blocked_queue_hd  = END_TSO_QUEUE;
2118   blocked_queue_tl  = END_TSO_QUEUE;
2119   sleeping_queue    = END_TSO_QUEUE;
2120 #endif
2121
2122   blackhole_queue   = END_TSO_QUEUE;
2123
2124   sched_state    = SCHED_RUNNING;
2125   recent_activity = ACTIVITY_YES;
2126
2127 #if defined(THREADED_RTS)
2128   /* Initialise the mutex and condition variables used by
2129    * the scheduler. */
2130   initMutex(&sched_mutex);
2131 #endif
2132   
2133   ACQUIRE_LOCK(&sched_mutex);
2134
2135   /* A capability holds the state a native thread needs in
2136    * order to execute STG code. At least one capability is
2137    * floating around (only THREADED_RTS builds have more than one).
2138    */
2139   initCapabilities();
2140
2141   initTaskManager();
2142
2143 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2144   initSparkPools();
2145 #endif
2146
2147 #if defined(THREADED_RTS)
2148   /*
2149    * Eagerly start one worker to run each Capability, except for
2150    * Capability 0.  The idea is that we're probably going to start a
2151    * bound thread on Capability 0 pretty soon, so we don't want a
2152    * worker task hogging it.
2153    */
2154   { 
2155       nat i;
2156       Capability *cap;
2157       for (i = 1; i < n_capabilities; i++) {
2158           cap = &capabilities[i];
2159           ACQUIRE_LOCK(&cap->lock);
2160           startWorkerTask(cap, workerStart);
2161           RELEASE_LOCK(&cap->lock);
2162       }
2163   }
2164 #endif
2165
2166   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2167
2168   RELEASE_LOCK(&sched_mutex);
2169 }
2170
2171 void
2172 exitScheduler(
2173     rtsBool wait_foreign
2174 #if !defined(THREADED_RTS)
2175                          __attribute__((unused))
2176 #endif
2177 )
2178                /* see Capability.c, shutdownCapability() */
2179 {
2180     Task *task = NULL;
2181
2182 #if defined(THREADED_RTS)
2183     ACQUIRE_LOCK(&sched_mutex);
2184     task = newBoundTask();
2185     RELEASE_LOCK(&sched_mutex);
2186 #endif
2187
2188     // If we haven't killed all the threads yet, do it now.
2189     if (sched_state < SCHED_SHUTTING_DOWN) {
2190         sched_state = SCHED_INTERRUPTING;
2191 #if defined(THREADED_RTS)
2192         waitForReturnCapability(&task->cap,task);
2193         scheduleDoGC(task->cap,task,rtsFalse);    
2194         releaseCapability(task->cap);
2195 #else
2196         scheduleDoGC(&MainCapability,task,rtsFalse);    
2197 #endif
2198     }
2199     sched_state = SCHED_SHUTTING_DOWN;
2200
2201 #if defined(THREADED_RTS)
2202     { 
2203         nat i;
2204         
2205         for (i = 0; i < n_capabilities; i++) {
2206             shutdownCapability(&capabilities[i], task, wait_foreign);
2207         }
2208         boundTaskExiting(task);
2209     }
2210 #endif
2211 }
2212
2213 void
2214 freeScheduler( void )
2215 {
2216     nat still_running;
2217
2218     ACQUIRE_LOCK(&sched_mutex);
2219     still_running = freeTaskManager();
2220     // We can only free the Capabilities if there are no Tasks still
2221     // running.  We might have a Task about to return from a foreign
2222     // call into waitForReturnCapability(), for example (actually,
2223     // this should be the *only* thing that a still-running Task can
2224     // do at this point, and it will block waiting for the
2225     // Capability).
2226     if (still_running == 0) {
2227         freeCapabilities();
2228         if (n_capabilities != 1) {
2229             stgFree(capabilities);
2230         }
2231     }
2232     RELEASE_LOCK(&sched_mutex);
2233 #if defined(THREADED_RTS)
2234     closeMutex(&sched_mutex);
2235 #endif
2236 }
2237
2238 /* -----------------------------------------------------------------------------
2239    performGC
2240
2241    This is the interface to the garbage collector from Haskell land.
2242    We provide this so that external C code can allocate and garbage
2243    collect when called from Haskell via _ccall_GC.
2244    -------------------------------------------------------------------------- */
2245
2246 static void
2247 performGC_(rtsBool force_major)
2248 {
2249     Task *task;
2250
2251     // We must grab a new Task here, because the existing Task may be
2252     // associated with a particular Capability, and chained onto the 
2253     // suspended_ccalling_tasks queue.
2254     ACQUIRE_LOCK(&sched_mutex);
2255     task = newBoundTask();
2256     RELEASE_LOCK(&sched_mutex);
2257
2258     waitForReturnCapability(&task->cap,task);
2259     scheduleDoGC(task->cap,task,force_major);
2260     releaseCapability(task->cap);
2261     boundTaskExiting(task);
2262 }
2263
2264 void
2265 performGC(void)
2266 {
2267     performGC_(rtsFalse);
2268 }
2269
2270 void
2271 performMajorGC(void)
2272 {
2273     performGC_(rtsTrue);
2274 }
2275
2276 /* -----------------------------------------------------------------------------
2277    Stack overflow
2278
2279    If the thread has reached its maximum stack size, then raise the
2280    StackOverflow exception in the offending thread.  Otherwise
2281    relocate the TSO into a larger chunk of memory and adjust its stack
2282    size appropriately.
2283    -------------------------------------------------------------------------- */
2284
2285 static StgTSO *
2286 threadStackOverflow(Capability *cap, StgTSO *tso)
2287 {
2288   nat new_stack_size, stack_words;
2289   lnat new_tso_size;
2290   StgPtr new_sp;
2291   StgTSO *dest;
2292
2293   IF_DEBUG(sanity,checkTSO(tso));
2294
2295   // don't allow throwTo() to modify the blocked_exceptions queue
2296   // while we are moving the TSO:
2297   lockClosure((StgClosure *)tso);
2298
2299   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2300       // NB. never raise a StackOverflow exception if the thread is
2301       // inside Control.Exceptino.block.  It is impractical to protect
2302       // against stack overflow exceptions, since virtually anything
2303       // can raise one (even 'catch'), so this is the only sensible
2304       // thing to do here.  See bug #767.
2305
2306       debugTrace(DEBUG_gc,
2307                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2308                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2309       IF_DEBUG(gc,
2310                /* If we're debugging, just print out the top of the stack */
2311                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2312                                                 tso->sp+64)));
2313
2314       // Send this thread the StackOverflow exception
2315       unlockTSO(tso);
2316       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2317       return tso;
2318   }
2319
2320   /* Try to double the current stack size.  If that takes us over the
2321    * maximum stack size for this thread, then use the maximum instead
2322    * (that is, unless we're already at or over the max size and we
2323    * can't raise the StackOverflow exception (see above), in which
2324    * case just double the size). Finally round up so the TSO ends up as
2325    * a whole number of blocks.
2326    */
2327   if (tso->stack_size >= tso->max_stack_size) {
2328       new_stack_size = tso->stack_size * 2;
2329   } else { 
2330       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2331   }
2332   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2333                                        TSO_STRUCT_SIZE)/sizeof(W_);
2334   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2335   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2336
2337   debugTrace(DEBUG_sched, 
2338              "increasing stack size from %ld words to %d.",
2339              (long)tso->stack_size, new_stack_size);
2340
2341   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2342   TICK_ALLOC_TSO(new_stack_size,0);
2343
2344   /* copy the TSO block and the old stack into the new area */
2345   memcpy(dest,tso,TSO_STRUCT_SIZE);
2346   stack_words = tso->stack + tso->stack_size - tso->sp;
2347   new_sp = (P_)dest + new_tso_size - stack_words;
2348   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2349
2350   /* relocate the stack pointers... */
2351   dest->sp         = new_sp;
2352   dest->stack_size = new_stack_size;
2353         
2354   /* Mark the old TSO as relocated.  We have to check for relocated
2355    * TSOs in the garbage collector and any primops that deal with TSOs.
2356    *
2357    * It's important to set the sp value to just beyond the end
2358    * of the stack, so we don't attempt to scavenge any part of the
2359    * dead TSO's stack.
2360    */
2361   tso->what_next = ThreadRelocated;
2362   setTSOLink(cap,tso,dest);
2363   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2364   tso->why_blocked = NotBlocked;
2365
2366   IF_PAR_DEBUG(verbose,
2367                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2368                      tso->id, tso, tso->stack_size);
2369                /* If we're debugging, just print out the top of the stack */
2370                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2371                                                 tso->sp+64)));
2372   
2373   unlockTSO(dest);
2374   unlockTSO(tso);
2375
2376   IF_DEBUG(sanity,checkTSO(dest));
2377 #if 0
2378   IF_DEBUG(scheduler,printTSO(dest));
2379 #endif
2380
2381   return dest;
2382 }
2383
2384 static StgTSO *
2385 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2386 {
2387     bdescr *bd, *new_bd;
2388     lnat free_w, tso_size_w;
2389     StgTSO *new_tso;
2390
2391     tso_size_w = tso_sizeW(tso);
2392
2393     if (tso_size_w < MBLOCK_SIZE_W || 
2394         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2395     {
2396         return tso;
2397     }
2398
2399     // don't allow throwTo() to modify the blocked_exceptions queue
2400     // while we are moving the TSO:
2401     lockClosure((StgClosure *)tso);
2402
2403     // this is the number of words we'll free
2404     free_w = round_to_mblocks(tso_size_w/2);
2405
2406     bd = Bdescr((StgPtr)tso);
2407     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2408     bd->free = bd->start + TSO_STRUCT_SIZEW;
2409
2410     new_tso = (StgTSO *)new_bd->start;
2411     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2412     new_tso->stack_size = new_bd->free - new_tso->stack;
2413
2414     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2415                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2416
2417     tso->what_next = ThreadRelocated;
2418     tso->_link = new_tso; // no write barrier reqd: same generation
2419
2420     // The TSO attached to this Task may have moved, so update the
2421     // pointer to it.
2422     if (task->tso == tso) {
2423         task->tso = new_tso;
2424     }
2425
2426     unlockTSO(new_tso);
2427     unlockTSO(tso);
2428
2429     IF_DEBUG(sanity,checkTSO(new_tso));
2430
2431     return new_tso;
2432 }
2433
2434 /* ---------------------------------------------------------------------------
2435    Interrupt execution
2436    - usually called inside a signal handler so it mustn't do anything fancy.   
2437    ------------------------------------------------------------------------ */
2438
2439 void
2440 interruptStgRts(void)
2441 {
2442     sched_state = SCHED_INTERRUPTING;
2443     setContextSwitches();
2444     wakeUpRts();
2445 }
2446
2447 /* -----------------------------------------------------------------------------
2448    Wake up the RTS
2449    
2450    This function causes at least one OS thread to wake up and run the
2451    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2452    an external event has arrived that may need servicing (eg. a
2453    keyboard interrupt).
2454
2455    In the single-threaded RTS we don't do anything here; we only have
2456    one thread anyway, and the event that caused us to want to wake up
2457    will have interrupted any blocking system call in progress anyway.
2458    -------------------------------------------------------------------------- */
2459
2460 void
2461 wakeUpRts(void)
2462 {
2463 #if defined(THREADED_RTS)
2464     // This forces the IO Manager thread to wakeup, which will
2465     // in turn ensure that some OS thread wakes up and runs the
2466     // scheduler loop, which will cause a GC and deadlock check.
2467     ioManagerWakeup();
2468 #endif
2469 }
2470
2471 /* -----------------------------------------------------------------------------
2472  * checkBlackHoles()
2473  *
2474  * Check the blackhole_queue for threads that can be woken up.  We do
2475  * this periodically: before every GC, and whenever the run queue is
2476  * empty.
2477  *
2478  * An elegant solution might be to just wake up all the blocked
2479  * threads with awakenBlockedQueue occasionally: they'll go back to
2480  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2481  * doesn't give us a way to tell whether we've actually managed to
2482  * wake up any threads, so we would be busy-waiting.
2483  *
2484  * -------------------------------------------------------------------------- */
2485
2486 static rtsBool
2487 checkBlackHoles (Capability *cap)
2488 {
2489     StgTSO **prev, *t;
2490     rtsBool any_woke_up = rtsFalse;
2491     StgHalfWord type;
2492
2493     // blackhole_queue is global:
2494     ASSERT_LOCK_HELD(&sched_mutex);
2495
2496     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2497
2498     // ASSUMES: sched_mutex
2499     prev = &blackhole_queue;
2500     t = blackhole_queue;
2501     while (t != END_TSO_QUEUE) {
2502         ASSERT(t->why_blocked == BlockedOnBlackHole);
2503         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2504         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2505             IF_DEBUG(sanity,checkTSO(t));
2506             t = unblockOne(cap, t);
2507             *prev = t;
2508             any_woke_up = rtsTrue;
2509         } else {
2510             prev = &t->_link;
2511             t = t->_link;
2512         }
2513     }
2514
2515     return any_woke_up;
2516 }
2517
2518 /* -----------------------------------------------------------------------------
2519    Deleting threads
2520
2521    This is used for interruption (^C) and forking, and corresponds to
2522    raising an exception but without letting the thread catch the
2523    exception.
2524    -------------------------------------------------------------------------- */
2525
2526 static void 
2527 deleteThread (Capability *cap, StgTSO *tso)
2528 {
2529     // NOTE: must only be called on a TSO that we have exclusive
2530     // access to, because we will call throwToSingleThreaded() below.
2531     // The TSO must be on the run queue of the Capability we own, or 
2532     // we must own all Capabilities.
2533
2534     if (tso->why_blocked != BlockedOnCCall &&
2535         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2536         throwToSingleThreaded(cap,tso,NULL);
2537     }
2538 }
2539
2540 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2541 static void 
2542 deleteThread_(Capability *cap, StgTSO *tso)
2543 { // for forkProcess only:
2544   // like deleteThread(), but we delete threads in foreign calls, too.
2545
2546     if (tso->why_blocked == BlockedOnCCall ||
2547         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2548         unblockOne(cap,tso);
2549         tso->what_next = ThreadKilled;
2550     } else {
2551         deleteThread(cap,tso);
2552     }
2553 }
2554 #endif
2555
2556 /* -----------------------------------------------------------------------------
2557    raiseExceptionHelper
2558    
2559    This function is called by the raise# primitve, just so that we can
2560    move some of the tricky bits of raising an exception from C-- into
2561    C.  Who knows, it might be a useful re-useable thing here too.
2562    -------------------------------------------------------------------------- */
2563
2564 StgWord
2565 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2566 {
2567     Capability *cap = regTableToCapability(reg);
2568     StgThunk *raise_closure = NULL;
2569     StgPtr p, next;
2570     StgRetInfoTable *info;
2571     //
2572     // This closure represents the expression 'raise# E' where E
2573     // is the exception raise.  It is used to overwrite all the
2574     // thunks which are currently under evaluataion.
2575     //
2576
2577     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2578     // LDV profiling: stg_raise_info has THUNK as its closure
2579     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2580     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2581     // 1 does not cause any problem unless profiling is performed.
2582     // However, when LDV profiling goes on, we need to linearly scan
2583     // small object pool, where raise_closure is stored, so we should
2584     // use MIN_UPD_SIZE.
2585     //
2586     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2587     //                                 sizeofW(StgClosure)+1);
2588     //
2589
2590     //
2591     // Walk up the stack, looking for the catch frame.  On the way,
2592     // we update any closures pointed to from update frames with the
2593     // raise closure that we just built.
2594     //
2595     p = tso->sp;
2596     while(1) {
2597         info = get_ret_itbl((StgClosure *)p);
2598         next = p + stack_frame_sizeW((StgClosure *)p);
2599         switch (info->i.type) {
2600             
2601         case UPDATE_FRAME:
2602             // Only create raise_closure if we need to.
2603             if (raise_closure == NULL) {
2604                 raise_closure = 
2605                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2606                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2607                 raise_closure->payload[0] = exception;
2608             }
2609             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2610             p = next;
2611             continue;
2612
2613         case ATOMICALLY_FRAME:
2614             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2615             tso->sp = p;
2616             return ATOMICALLY_FRAME;
2617             
2618         case CATCH_FRAME:
2619             tso->sp = p;
2620             return CATCH_FRAME;
2621
2622         case CATCH_STM_FRAME:
2623             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2624             tso->sp = p;
2625             return CATCH_STM_FRAME;
2626             
2627         case STOP_FRAME:
2628             tso->sp = p;
2629             return STOP_FRAME;
2630
2631         case CATCH_RETRY_FRAME:
2632         default:
2633             p = next; 
2634             continue;
2635         }
2636     }
2637 }
2638
2639
2640 /* -----------------------------------------------------------------------------
2641    findRetryFrameHelper
2642
2643    This function is called by the retry# primitive.  It traverses the stack
2644    leaving tso->sp referring to the frame which should handle the retry.  
2645
2646    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2647    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2648
2649    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2650    create) because retries are not considered to be exceptions, despite the
2651    similar implementation.
2652
2653    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2654    not be created within memory transactions.
2655    -------------------------------------------------------------------------- */
2656
2657 StgWord
2658 findRetryFrameHelper (StgTSO *tso)
2659 {
2660   StgPtr           p, next;
2661   StgRetInfoTable *info;
2662
2663   p = tso -> sp;
2664   while (1) {
2665     info = get_ret_itbl((StgClosure *)p);
2666     next = p + stack_frame_sizeW((StgClosure *)p);
2667     switch (info->i.type) {
2668       
2669     case ATOMICALLY_FRAME:
2670         debugTrace(DEBUG_stm,
2671                    "found ATOMICALLY_FRAME at %p during retry", p);
2672         tso->sp = p;
2673         return ATOMICALLY_FRAME;
2674       
2675     case CATCH_RETRY_FRAME:
2676         debugTrace(DEBUG_stm,
2677                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2678         tso->sp = p;
2679         return CATCH_RETRY_FRAME;
2680       
2681     case CATCH_STM_FRAME: {
2682         StgTRecHeader *trec = tso -> trec;
2683         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2684         debugTrace(DEBUG_stm,
2685                    "found CATCH_STM_FRAME at %p during retry", p);
2686         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2687         stmAbortTransaction(tso -> cap, trec);
2688         stmFreeAbortedTRec(tso -> cap, trec);
2689         tso -> trec = outer;
2690         p = next; 
2691         continue;
2692     }
2693       
2694
2695     default:
2696       ASSERT(info->i.type != CATCH_FRAME);
2697       ASSERT(info->i.type != STOP_FRAME);
2698       p = next; 
2699       continue;
2700     }
2701   }
2702 }
2703
2704 /* -----------------------------------------------------------------------------
2705    resurrectThreads is called after garbage collection on the list of
2706    threads found to be garbage.  Each of these threads will be woken
2707    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2708    on an MVar, or NonTermination if the thread was blocked on a Black
2709    Hole.
2710
2711    Locks: assumes we hold *all* the capabilities.
2712    -------------------------------------------------------------------------- */
2713
2714 void
2715 resurrectThreads (StgTSO *threads)
2716 {
2717     StgTSO *tso, *next;
2718     Capability *cap;
2719     step *step;
2720
2721     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2722         next = tso->global_link;
2723
2724         step = Bdescr((P_)tso)->step;
2725         tso->global_link = step->threads;
2726         step->threads = tso;
2727
2728         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2729         
2730         // Wake up the thread on the Capability it was last on
2731         cap = tso->cap;
2732         
2733         switch (tso->why_blocked) {
2734         case BlockedOnMVar:
2735         case BlockedOnException:
2736             /* Called by GC - sched_mutex lock is currently held. */
2737             throwToSingleThreaded(cap, tso,
2738                                   (StgClosure *)blockedOnDeadMVar_closure);
2739             break;
2740         case BlockedOnBlackHole:
2741             throwToSingleThreaded(cap, tso,
2742                                   (StgClosure *)nonTermination_closure);
2743             break;
2744         case BlockedOnSTM:
2745             throwToSingleThreaded(cap, tso,
2746                                   (StgClosure *)blockedIndefinitely_closure);
2747             break;
2748         case NotBlocked:
2749             /* This might happen if the thread was blocked on a black hole
2750              * belonging to a thread that we've just woken up (raiseAsync
2751              * can wake up threads, remember...).
2752              */
2753             continue;
2754         default:
2755             barf("resurrectThreads: thread blocked in a strange way");
2756         }
2757     }
2758 }
2759
2760 /* -----------------------------------------------------------------------------
2761    performPendingThrowTos is called after garbage collection, and
2762    passed a list of threads that were found to have pending throwTos
2763    (tso->blocked_exceptions was not empty), and were blocked.
2764    Normally this doesn't happen, because we would deliver the
2765    exception directly if the target thread is blocked, but there are
2766    small windows where it might occur on a multiprocessor (see
2767    throwTo()).
2768
2769    NB. we must be holding all the capabilities at this point, just
2770    like resurrectThreads().
2771    -------------------------------------------------------------------------- */
2772
2773 void
2774 performPendingThrowTos (StgTSO *threads)
2775 {
2776     StgTSO *tso, *next;
2777     Capability *cap;
2778     step *step;
2779
2780     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2781         next = tso->global_link;
2782
2783         step = Bdescr((P_)tso)->step;
2784         tso->global_link = step->threads;
2785         step->threads = tso;
2786
2787         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2788         
2789         cap = tso->cap;
2790         maybePerformBlockedException(cap, tso);
2791     }
2792 }