use C99-style array initialisers
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "eventlog/EventLog.h"
30 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43
44 #ifdef HAVE_SYS_TYPES_H
45 #include <sys/types.h>
46 #endif
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <string.h>
52 #include <stdlib.h>
53 #include <stdarg.h>
54
55 #ifdef HAVE_ERRNO_H
56 #include <errno.h>
57 #endif
58
59 /* -----------------------------------------------------------------------------
60  * Global variables
61  * -------------------------------------------------------------------------- */
62
63 #if !defined(THREADED_RTS)
64 // Blocked/sleeping thrads
65 StgTSO *blocked_queue_hd = NULL;
66 StgTSO *blocked_queue_tl = NULL;
67 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
68 #endif
69
70 /* Threads blocked on blackholes.
71  * LOCK: sched_mutex+capability, or all capabilities
72  */
73 StgTSO *blackhole_queue = NULL;
74
75 /* The blackhole_queue should be checked for threads to wake up.  See
76  * Schedule.h for more thorough comment.
77  * LOCK: none (doesn't matter if we miss an update)
78  */
79 rtsBool blackholes_need_checking = rtsFalse;
80
81 /* Set to true when the latest garbage collection failed to reclaim
82  * enough space, and the runtime should proceed to shut itself down in
83  * an orderly fashion (emitting profiling info etc.)
84  */
85 rtsBool heap_overflow = rtsFalse;
86
87 /* flag that tracks whether we have done any execution in this time slice.
88  * LOCK: currently none, perhaps we should lock (but needs to be
89  * updated in the fast path of the scheduler).
90  *
91  * NB. must be StgWord, we do xchg() on it.
92  */
93 volatile StgWord recent_activity = ACTIVITY_YES;
94
95 /* if this flag is set as well, give up execution
96  * LOCK: none (changes monotonically)
97  */
98 volatile StgWord sched_state = SCHED_RUNNING;
99
100 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
101  *  exists - earlier gccs apparently didn't.
102  *  -= chak
103  */
104 StgTSO dummy_tso;
105
106 /*
107  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
108  * in an MT setting, needed to signal that a worker thread shouldn't hang around
109  * in the scheduler when it is out of work.
110  */
111 rtsBool shutting_down_scheduler = rtsFalse;
112
113 /*
114  * This mutex protects most of the global scheduler data in
115  * the THREADED_RTS runtime.
116  */
117 #if defined(THREADED_RTS)
118 Mutex sched_mutex;
119 #endif
120
121 #if !defined(mingw32_HOST_OS)
122 #define FORKPROCESS_PRIMOP_SUPPORTED
123 #endif
124
125 /* -----------------------------------------------------------------------------
126  * static function prototypes
127  * -------------------------------------------------------------------------- */
128
129 static Capability *schedule (Capability *initialCapability, Task *task);
130
131 //
132 // These function all encapsulate parts of the scheduler loop, and are
133 // abstracted only to make the structure and control flow of the
134 // scheduler clearer.
135 //
136 static void schedulePreLoop (void);
137 static void scheduleFindWork (Capability *cap);
138 #if defined(THREADED_RTS)
139 static void scheduleYield (Capability **pcap, Task *task);
140 #endif
141 static void scheduleStartSignalHandlers (Capability *cap);
142 static void scheduleCheckBlockedThreads (Capability *cap);
143 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
144 static void scheduleCheckBlackHoles (Capability *cap);
145 static void scheduleDetectDeadlock (Capability *cap, Task *task);
146 static void schedulePushWork(Capability *cap, Task *task);
147 #if defined(THREADED_RTS)
148 static void scheduleActivateSpark(Capability *cap);
149 #endif
150 static void schedulePostRunThread(Capability *cap, StgTSO *t);
151 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
152 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
153                                          StgTSO *t);
154 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
155                                     nat prev_what_next );
156 static void scheduleHandleThreadBlocked( StgTSO *t );
157 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
158                                              StgTSO *t );
159 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
160 static Capability *scheduleDoGC(Capability *cap, Task *task,
161                                 rtsBool force_major);
162
163 static rtsBool checkBlackHoles(Capability *cap);
164
165 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
166 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
167
168 static void deleteThread (Capability *cap, StgTSO *tso);
169 static void deleteAllThreads (Capability *cap);
170
171 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
172 static void deleteThread_(Capability *cap, StgTSO *tso);
173 #endif
174
175 #ifdef DEBUG
176 static char *whatNext_strs[] = {
177   [0]               = "(unknown)",
178   [ThreadRunGHC]    = "ThreadRunGHC",
179   [ThreadInterpret] = "ThreadInterpret",
180   [ThreadKilled]    = "ThreadKilled",
181   [ThreadRelocated] = "ThreadRelocated",
182   [ThreadComplete]  = "ThreadComplete"
183 };
184 #endif
185
186 /* -----------------------------------------------------------------------------
187  * Putting a thread on the run queue: different scheduling policies
188  * -------------------------------------------------------------------------- */
189
190 STATIC_INLINE void
191 addToRunQueue( Capability *cap, StgTSO *t )
192 {
193     // this does round-robin scheduling; good for concurrency
194     appendToRunQueue(cap,t);
195 }
196
197 /* ---------------------------------------------------------------------------
198    Main scheduling loop.
199
200    We use round-robin scheduling, each thread returning to the
201    scheduler loop when one of these conditions is detected:
202
203       * out of heap space
204       * timer expires (thread yields)
205       * thread blocks
206       * thread ends
207       * stack overflow
208
209    GRAN version:
210      In a GranSim setup this loop iterates over the global event queue.
211      This revolves around the global event queue, which determines what 
212      to do next. Therefore, it's more complicated than either the 
213      concurrent or the parallel (GUM) setup.
214   This version has been entirely removed (JB 2008/08).
215
216    GUM version:
217      GUM iterates over incoming messages.
218      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
219      and sends out a fish whenever it has nothing to do; in-between
220      doing the actual reductions (shared code below) it processes the
221      incoming messages and deals with delayed operations 
222      (see PendingFetches).
223      This is not the ugliest code you could imagine, but it's bloody close.
224
225   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
226   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
227   as well as future GUM versions. This file has been refurbished to
228   only contain valid code, which is however incomplete, refers to
229   invalid includes etc.
230
231    ------------------------------------------------------------------------ */
232
233 static Capability *
234 schedule (Capability *initialCapability, Task *task)
235 {
236   StgTSO *t;
237   Capability *cap;
238   StgThreadReturnCode ret;
239   nat prev_what_next;
240   rtsBool ready_to_gc;
241 #if defined(THREADED_RTS)
242   rtsBool first = rtsTrue;
243 #endif
244   
245   cap = initialCapability;
246
247   // Pre-condition: this task owns initialCapability.
248   // The sched_mutex is *NOT* held
249   // NB. on return, we still hold a capability.
250
251   debugTrace (DEBUG_sched, 
252               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
253               task, initialCapability);
254
255   if (running_finalizers) {
256       errorBelch("error: a C finalizer called back into Haskell.\n"
257                  "   This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n"
258                  "   To create finalizers that may call back into Haskll, use\n"
259                  "   Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr.");
260       stg_exit(EXIT_FAILURE);
261   }
262
263   schedulePreLoop();
264
265   // -----------------------------------------------------------
266   // Scheduler loop starts here:
267
268   while (1) {
269
270     // Check whether we have re-entered the RTS from Haskell without
271     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
272     // call).
273     if (cap->in_haskell) {
274           errorBelch("schedule: re-entered unsafely.\n"
275                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
276           stg_exit(EXIT_FAILURE);
277     }
278
279     // The interruption / shutdown sequence.
280     // 
281     // In order to cleanly shut down the runtime, we want to:
282     //   * make sure that all main threads return to their callers
283     //     with the state 'Interrupted'.
284     //   * clean up all OS threads assocated with the runtime
285     //   * free all memory etc.
286     //
287     // So the sequence for ^C goes like this:
288     //
289     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
290     //     arranges for some Capability to wake up
291     //
292     //   * all threads in the system are halted, and the zombies are
293     //     placed on the run queue for cleaning up.  We acquire all
294     //     the capabilities in order to delete the threads, this is
295     //     done by scheduleDoGC() for convenience (because GC already
296     //     needs to acquire all the capabilities).  We can't kill
297     //     threads involved in foreign calls.
298     // 
299     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
300     //
301     //   * sched_state := SCHED_SHUTTING_DOWN
302     //
303     //   * all workers exit when the run queue on their capability
304     //     drains.  All main threads will also exit when their TSO
305     //     reaches the head of the run queue and they can return.
306     //
307     //   * eventually all Capabilities will shut down, and the RTS can
308     //     exit.
309     //
310     //   * We might be left with threads blocked in foreign calls, 
311     //     we should really attempt to kill these somehow (TODO);
312     
313     switch (sched_state) {
314     case SCHED_RUNNING:
315         break;
316     case SCHED_INTERRUPTING:
317         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
318 #if defined(THREADED_RTS)
319         discardSparksCap(cap);
320 #endif
321         /* scheduleDoGC() deletes all the threads */
322         cap = scheduleDoGC(cap,task,rtsFalse);
323
324         // after scheduleDoGC(), we must be shutting down.  Either some
325         // other Capability did the final GC, or we did it above,
326         // either way we can fall through to the SCHED_SHUTTING_DOWN
327         // case now.
328         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
329         // fall through
330
331     case SCHED_SHUTTING_DOWN:
332         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
333         // If we are a worker, just exit.  If we're a bound thread
334         // then we will exit below when we've removed our TSO from
335         // the run queue.
336         if (task->tso == NULL && emptyRunQueue(cap)) {
337             return cap;
338         }
339         break;
340     default:
341         barf("sched_state: %d", sched_state);
342     }
343
344     scheduleFindWork(cap);
345
346     /* work pushing, currently relevant only for THREADED_RTS:
347        (pushes threads, wakes up idle capabilities for stealing) */
348     schedulePushWork(cap,task);
349
350     scheduleDetectDeadlock(cap,task);
351
352 #if defined(THREADED_RTS)
353     cap = task->cap;    // reload cap, it might have changed
354 #endif
355
356     // Normally, the only way we can get here with no threads to
357     // run is if a keyboard interrupt received during 
358     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
359     // Additionally, it is not fatal for the
360     // threaded RTS to reach here with no threads to run.
361     //
362     // win32: might be here due to awaitEvent() being abandoned
363     // as a result of a console event having been delivered.
364     
365 #if defined(THREADED_RTS)
366     if (first) 
367     {
368     // XXX: ToDo
369     //     // don't yield the first time, we want a chance to run this
370     //     // thread for a bit, even if there are others banging at the
371     //     // door.
372     //     first = rtsFalse;
373     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
374     }
375
376   yield:
377     scheduleYield(&cap,task);
378     if (emptyRunQueue(cap)) continue; // look for work again
379 #endif
380
381 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
382     if ( emptyRunQueue(cap) ) {
383         ASSERT(sched_state >= SCHED_INTERRUPTING);
384     }
385 #endif
386
387     // 
388     // Get a thread to run
389     //
390     t = popRunQueue(cap);
391
392     // Sanity check the thread we're about to run.  This can be
393     // expensive if there is lots of thread switching going on...
394     IF_DEBUG(sanity,checkTSO(t));
395
396 #if defined(THREADED_RTS)
397     // Check whether we can run this thread in the current task.
398     // If not, we have to pass our capability to the right task.
399     {
400         Task *bound = t->bound;
401       
402         if (bound) {
403             if (bound == task) {
404                 debugTrace(DEBUG_sched,
405                            "### Running thread %lu in bound thread", (unsigned long)t->id);
406                 // yes, the Haskell thread is bound to the current native thread
407             } else {
408                 debugTrace(DEBUG_sched,
409                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
410                 // no, bound to a different Haskell thread: pass to that thread
411                 pushOnRunQueue(cap,t);
412                 continue;
413             }
414         } else {
415             // The thread we want to run is unbound.
416             if (task->tso) { 
417                 debugTrace(DEBUG_sched,
418                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
419                 // no, the current native thread is bound to a different
420                 // Haskell thread, so pass it to any worker thread
421                 pushOnRunQueue(cap,t);
422                 continue; 
423             }
424         }
425     }
426 #endif
427
428     // If we're shutting down, and this thread has not yet been
429     // killed, kill it now.  This sometimes happens when a finalizer
430     // thread is created by the final GC, or a thread previously
431     // in a foreign call returns.
432     if (sched_state >= SCHED_INTERRUPTING &&
433         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
434         deleteThread(cap,t);
435     }
436
437     /* context switches are initiated by the timer signal, unless
438      * the user specified "context switch as often as possible", with
439      * +RTS -C0
440      */
441     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
442         && !emptyThreadQueues(cap)) {
443         cap->context_switch = 1;
444     }
445          
446 run_thread:
447
448     // CurrentTSO is the thread to run.  t might be different if we
449     // loop back to run_thread, so make sure to set CurrentTSO after
450     // that.
451     cap->r.rCurrentTSO = t;
452
453     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
454                               (long)t->id, whatNext_strs[t->what_next]);
455
456     startHeapProfTimer();
457
458     // Check for exceptions blocked on this thread
459     maybePerformBlockedException (cap, t);
460
461     // ----------------------------------------------------------------------
462     // Run the current thread 
463
464     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
465     ASSERT(t->cap == cap);
466     ASSERT(t->bound ? t->bound->cap == cap : 1);
467
468     prev_what_next = t->what_next;
469
470     errno = t->saved_errno;
471 #if mingw32_HOST_OS
472     SetLastError(t->saved_winerror);
473 #endif
474
475     cap->in_haskell = rtsTrue;
476
477     dirty_TSO(cap,t);
478
479 #if defined(THREADED_RTS)
480     if (recent_activity == ACTIVITY_DONE_GC) {
481         // ACTIVITY_DONE_GC means we turned off the timer signal to
482         // conserve power (see #1623).  Re-enable it here.
483         nat prev;
484         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
485         if (prev == ACTIVITY_DONE_GC) {
486             startTimer();
487         }
488     } else {
489         recent_activity = ACTIVITY_YES;
490     }
491 #endif
492
493     postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
494
495     switch (prev_what_next) {
496         
497     case ThreadKilled:
498     case ThreadComplete:
499         /* Thread already finished, return to scheduler. */
500         ret = ThreadFinished;
501         break;
502         
503     case ThreadRunGHC:
504     {
505         StgRegTable *r;
506         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
507         cap = regTableToCapability(r);
508         ret = r->rRet;
509         break;
510     }
511     
512     case ThreadInterpret:
513         cap = interpretBCO(cap);
514         ret = cap->r.rRet;
515         break;
516         
517     default:
518         barf("schedule: invalid what_next field");
519     }
520
521     cap->in_haskell = rtsFalse;
522
523     // The TSO might have moved, eg. if it re-entered the RTS and a GC
524     // happened.  So find the new location:
525     t = cap->r.rCurrentTSO;
526
527     // We have run some Haskell code: there might be blackhole-blocked
528     // threads to wake up now.
529     // Lock-free test here should be ok, we're just setting a flag.
530     if ( blackhole_queue != END_TSO_QUEUE ) {
531         blackholes_need_checking = rtsTrue;
532     }
533     
534     // And save the current errno in this thread.
535     // XXX: possibly bogus for SMP because this thread might already
536     // be running again, see code below.
537     t->saved_errno = errno;
538 #if mingw32_HOST_OS
539     // Similarly for Windows error code
540     t->saved_winerror = GetLastError();
541 #endif
542
543     postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
544
545 #if defined(THREADED_RTS)
546     // If ret is ThreadBlocked, and this Task is bound to the TSO that
547     // blocked, we are in limbo - the TSO is now owned by whatever it
548     // is blocked on, and may in fact already have been woken up,
549     // perhaps even on a different Capability.  It may be the case
550     // that task->cap != cap.  We better yield this Capability
551     // immediately and return to normaility.
552     if (ret == ThreadBlocked) {
553         debugTrace(DEBUG_sched,
554                    "--<< thread %lu (%s) stopped: blocked",
555                    (unsigned long)t->id, whatNext_strs[t->what_next]);
556         goto yield;
557     }
558 #endif
559
560     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
561     ASSERT(t->cap == cap);
562
563     // ----------------------------------------------------------------------
564     
565     // Costs for the scheduler are assigned to CCS_SYSTEM
566     stopHeapProfTimer();
567 #if defined(PROFILING)
568     CCCS = CCS_SYSTEM;
569 #endif
570     
571     schedulePostRunThread(cap,t);
572
573     if (ret != StackOverflow) {
574         t = threadStackUnderflow(task,t);
575     }
576
577     ready_to_gc = rtsFalse;
578
579     switch (ret) {
580     case HeapOverflow:
581         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
582         break;
583
584     case StackOverflow:
585         scheduleHandleStackOverflow(cap,task,t);
586         break;
587
588     case ThreadYielding:
589         if (scheduleHandleYield(cap, t, prev_what_next)) {
590             // shortcut for switching between compiler/interpreter:
591             goto run_thread; 
592         }
593         break;
594
595     case ThreadBlocked:
596         scheduleHandleThreadBlocked(t);
597         break;
598
599     case ThreadFinished:
600         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
601         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
602         break;
603
604     default:
605       barf("schedule: invalid thread return code %d", (int)ret);
606     }
607
608     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
609       cap = scheduleDoGC(cap,task,rtsFalse);
610     }
611   } /* end of while() */
612 }
613
614 /* ----------------------------------------------------------------------------
615  * Setting up the scheduler loop
616  * ------------------------------------------------------------------------- */
617
618 static void
619 schedulePreLoop(void)
620 {
621   // initialisation for scheduler - what cannot go into initScheduler()  
622 }
623
624 /* -----------------------------------------------------------------------------
625  * scheduleFindWork()
626  *
627  * Search for work to do, and handle messages from elsewhere.
628  * -------------------------------------------------------------------------- */
629
630 static void
631 scheduleFindWork (Capability *cap)
632 {
633     scheduleStartSignalHandlers(cap);
634
635     // Only check the black holes here if we've nothing else to do.
636     // During normal execution, the black hole list only gets checked
637     // at GC time, to avoid repeatedly traversing this possibly long
638     // list each time around the scheduler.
639     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
640
641     scheduleCheckWakeupThreads(cap);
642
643     scheduleCheckBlockedThreads(cap);
644
645 #if defined(THREADED_RTS)
646     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
647 #endif
648 }
649
650 #if defined(THREADED_RTS)
651 STATIC_INLINE rtsBool
652 shouldYieldCapability (Capability *cap, Task *task)
653 {
654     // we need to yield this capability to someone else if..
655     //   - another thread is initiating a GC
656     //   - another Task is returning from a foreign call
657     //   - the thread at the head of the run queue cannot be run
658     //     by this Task (it is bound to another Task, or it is unbound
659     //     and this task it bound).
660     return (waiting_for_gc || 
661             cap->returning_tasks_hd != NULL ||
662             (!emptyRunQueue(cap) && (task->tso == NULL
663                                      ? cap->run_queue_hd->bound != NULL
664                                      : cap->run_queue_hd->bound != task)));
665 }
666
667 // This is the single place where a Task goes to sleep.  There are
668 // two reasons it might need to sleep:
669 //    - there are no threads to run
670 //    - we need to yield this Capability to someone else 
671 //      (see shouldYieldCapability())
672 //
673 // Careful: the scheduler loop is quite delicate.  Make sure you run
674 // the tests in testsuite/concurrent (all ways) after modifying this,
675 // and also check the benchmarks in nofib/parallel for regressions.
676
677 static void
678 scheduleYield (Capability **pcap, Task *task)
679 {
680     Capability *cap = *pcap;
681
682     // if we have work, and we don't need to give up the Capability, continue.
683     if (!shouldYieldCapability(cap,task) && 
684         (!emptyRunQueue(cap) ||
685          !emptyWakeupQueue(cap) ||
686          blackholes_need_checking ||
687          sched_state >= SCHED_INTERRUPTING))
688         return;
689
690     // otherwise yield (sleep), and keep yielding if necessary.
691     do {
692         yieldCapability(&cap,task);
693     } 
694     while (shouldYieldCapability(cap,task));
695
696     // note there may still be no threads on the run queue at this
697     // point, the caller has to check.
698
699     *pcap = cap;
700     return;
701 }
702 #endif
703     
704 /* -----------------------------------------------------------------------------
705  * schedulePushWork()
706  *
707  * Push work to other Capabilities if we have some.
708  * -------------------------------------------------------------------------- */
709
710 static void
711 schedulePushWork(Capability *cap USED_IF_THREADS, 
712                  Task *task      USED_IF_THREADS)
713 {
714   /* following code not for PARALLEL_HASKELL. I kept the call general,
715      future GUM versions might use pushing in a distributed setup */
716 #if defined(THREADED_RTS)
717
718     Capability *free_caps[n_capabilities], *cap0;
719     nat i, n_free_caps;
720
721     // migration can be turned off with +RTS -qg
722     if (!RtsFlags.ParFlags.migrate) return;
723
724     // Check whether we have more threads on our run queue, or sparks
725     // in our pool, that we could hand to another Capability.
726     if (cap->run_queue_hd == END_TSO_QUEUE) {
727         if (sparkPoolSizeCap(cap) < 2) return;
728     } else {
729         if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
730             sparkPoolSizeCap(cap) < 1) return;
731     }
732
733     // First grab as many free Capabilities as we can.
734     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
735         cap0 = &capabilities[i];
736         if (cap != cap0 && tryGrabCapability(cap0,task)) {
737             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
738                 // it already has some work, we just grabbed it at 
739                 // the wrong moment.  Or maybe it's deadlocked!
740                 releaseCapability(cap0);
741             } else {
742                 free_caps[n_free_caps++] = cap0;
743             }
744         }
745     }
746
747     // we now have n_free_caps free capabilities stashed in
748     // free_caps[].  Share our run queue equally with them.  This is
749     // probably the simplest thing we could do; improvements we might
750     // want to do include:
751     //
752     //   - giving high priority to moving relatively new threads, on 
753     //     the gournds that they haven't had time to build up a
754     //     working set in the cache on this CPU/Capability.
755     //
756     //   - giving low priority to moving long-lived threads
757
758     if (n_free_caps > 0) {
759         StgTSO *prev, *t, *next;
760         rtsBool pushed_to_all;
761
762         debugTrace(DEBUG_sched, 
763                    "cap %d: %s and %d free capabilities, sharing...", 
764                    cap->no, 
765                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
766                    "excess threads on run queue":"sparks to share (>=2)",
767                    n_free_caps);
768
769         i = 0;
770         pushed_to_all = rtsFalse;
771
772         if (cap->run_queue_hd != END_TSO_QUEUE) {
773             prev = cap->run_queue_hd;
774             t = prev->_link;
775             prev->_link = END_TSO_QUEUE;
776             for (; t != END_TSO_QUEUE; t = next) {
777                 next = t->_link;
778                 t->_link = END_TSO_QUEUE;
779                 if (t->what_next == ThreadRelocated
780                     || t->bound == task // don't move my bound thread
781                     || tsoLocked(t)) {  // don't move a locked thread
782                     setTSOLink(cap, prev, t);
783                     prev = t;
784                 } else if (i == n_free_caps) {
785                     pushed_to_all = rtsTrue;
786                     i = 0;
787                     // keep one for us
788                     setTSOLink(cap, prev, t);
789                     prev = t;
790                 } else {
791                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
792                     appendToRunQueue(free_caps[i],t);
793
794         postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
795
796                     if (t->bound) { t->bound->cap = free_caps[i]; }
797                     t->cap = free_caps[i];
798                     i++;
799                 }
800             }
801             cap->run_queue_tl = prev;
802         }
803
804 #ifdef SPARK_PUSHING
805         /* JB I left this code in place, it would work but is not necessary */
806
807         // If there are some free capabilities that we didn't push any
808         // threads to, then try to push a spark to each one.
809         if (!pushed_to_all) {
810             StgClosure *spark;
811             // i is the next free capability to push to
812             for (; i < n_free_caps; i++) {
813                 if (emptySparkPoolCap(free_caps[i])) {
814                     spark = tryStealSpark(cap->sparks);
815                     if (spark != NULL) {
816                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
817
818       postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no);
819
820                         newSpark(&(free_caps[i]->r), spark);
821                     }
822                 }
823             }
824         }
825 #endif /* SPARK_PUSHING */
826
827         // release the capabilities
828         for (i = 0; i < n_free_caps; i++) {
829             task->cap = free_caps[i];
830             releaseAndWakeupCapability(free_caps[i]);
831         }
832     }
833     task->cap = cap; // reset to point to our Capability.
834
835 #endif /* THREADED_RTS */
836
837 }
838
839 /* ----------------------------------------------------------------------------
840  * Start any pending signal handlers
841  * ------------------------------------------------------------------------- */
842
843 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
844 static void
845 scheduleStartSignalHandlers(Capability *cap)
846 {
847     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
848         // safe outside the lock
849         startSignalHandlers(cap);
850     }
851 }
852 #else
853 static void
854 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
855 {
856 }
857 #endif
858
859 /* ----------------------------------------------------------------------------
860  * Check for blocked threads that can be woken up.
861  * ------------------------------------------------------------------------- */
862
863 static void
864 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
865 {
866 #if !defined(THREADED_RTS)
867     //
868     // Check whether any waiting threads need to be woken up.  If the
869     // run queue is empty, and there are no other tasks running, we
870     // can wait indefinitely for something to happen.
871     //
872     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
873     {
874         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
875     }
876 #endif
877 }
878
879
880 /* ----------------------------------------------------------------------------
881  * Check for threads woken up by other Capabilities
882  * ------------------------------------------------------------------------- */
883
884 static void
885 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
886 {
887 #if defined(THREADED_RTS)
888     // Any threads that were woken up by other Capabilities get
889     // appended to our run queue.
890     if (!emptyWakeupQueue(cap)) {
891         ACQUIRE_LOCK(&cap->lock);
892         if (emptyRunQueue(cap)) {
893             cap->run_queue_hd = cap->wakeup_queue_hd;
894             cap->run_queue_tl = cap->wakeup_queue_tl;
895         } else {
896             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
897             cap->run_queue_tl = cap->wakeup_queue_tl;
898         }
899         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
900         RELEASE_LOCK(&cap->lock);
901     }
902 #endif
903 }
904
905 /* ----------------------------------------------------------------------------
906  * Check for threads blocked on BLACKHOLEs that can be woken up
907  * ------------------------------------------------------------------------- */
908 static void
909 scheduleCheckBlackHoles (Capability *cap)
910 {
911     if ( blackholes_need_checking ) // check without the lock first
912     {
913         ACQUIRE_LOCK(&sched_mutex);
914         if ( blackholes_need_checking ) {
915             blackholes_need_checking = rtsFalse;
916             // important that we reset the flag *before* checking the
917             // blackhole queue, otherwise we could get deadlock.  This
918             // happens as follows: we wake up a thread that
919             // immediately runs on another Capability, blocks on a
920             // blackhole, and then we reset the blackholes_need_checking flag.
921             checkBlackHoles(cap);
922         }
923         RELEASE_LOCK(&sched_mutex);
924     }
925 }
926
927 /* ----------------------------------------------------------------------------
928  * Detect deadlock conditions and attempt to resolve them.
929  * ------------------------------------------------------------------------- */
930
931 static void
932 scheduleDetectDeadlock (Capability *cap, Task *task)
933 {
934     /* 
935      * Detect deadlock: when we have no threads to run, there are no
936      * threads blocked, waiting for I/O, or sleeping, and all the
937      * other tasks are waiting for work, we must have a deadlock of
938      * some description.
939      */
940     if ( emptyThreadQueues(cap) )
941     {
942 #if defined(THREADED_RTS)
943         /* 
944          * In the threaded RTS, we only check for deadlock if there
945          * has been no activity in a complete timeslice.  This means
946          * we won't eagerly start a full GC just because we don't have
947          * any threads to run currently.
948          */
949         if (recent_activity != ACTIVITY_INACTIVE) return;
950 #endif
951
952         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
953
954         // Garbage collection can release some new threads due to
955         // either (a) finalizers or (b) threads resurrected because
956         // they are unreachable and will therefore be sent an
957         // exception.  Any threads thus released will be immediately
958         // runnable.
959         cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
960         // when force_major == rtsTrue. scheduleDoGC sets
961         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
962         // signal.
963
964         if ( !emptyRunQueue(cap) ) return;
965
966 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
967         /* If we have user-installed signal handlers, then wait
968          * for signals to arrive rather then bombing out with a
969          * deadlock.
970          */
971         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
972             debugTrace(DEBUG_sched,
973                        "still deadlocked, waiting for signals...");
974
975             awaitUserSignals();
976
977             if (signals_pending()) {
978                 startSignalHandlers(cap);
979             }
980
981             // either we have threads to run, or we were interrupted:
982             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
983
984             return;
985         }
986 #endif
987
988 #if !defined(THREADED_RTS)
989         /* Probably a real deadlock.  Send the current main thread the
990          * Deadlock exception.
991          */
992         if (task->tso) {
993             switch (task->tso->why_blocked) {
994             case BlockedOnSTM:
995             case BlockedOnBlackHole:
996             case BlockedOnException:
997             case BlockedOnMVar:
998                 throwToSingleThreaded(cap, task->tso, 
999                                       (StgClosure *)nonTermination_closure);
1000                 return;
1001             default:
1002                 barf("deadlock: main thread blocked in a strange way");
1003             }
1004         }
1005         return;
1006 #endif
1007     }
1008 }
1009
1010
1011 /* ----------------------------------------------------------------------------
1012  * Send pending messages (PARALLEL_HASKELL only)
1013  * ------------------------------------------------------------------------- */
1014
1015 #if defined(PARALLEL_HASKELL)
1016 static void
1017 scheduleSendPendingMessages(void)
1018 {
1019
1020 # if defined(PAR) // global Mem.Mgmt., omit for now
1021     if (PendingFetches != END_BF_QUEUE) {
1022         processFetches();
1023     }
1024 # endif
1025     
1026     if (RtsFlags.ParFlags.BufferTime) {
1027         // if we use message buffering, we must send away all message
1028         // packets which have become too old...
1029         sendOldBuffers(); 
1030     }
1031 }
1032 #endif
1033
1034 /* ----------------------------------------------------------------------------
1035  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1036  * ------------------------------------------------------------------------- */
1037
1038 #if defined(THREADED_RTS)
1039 static void
1040 scheduleActivateSpark(Capability *cap)
1041 {
1042     if (anySparks())
1043     {
1044         createSparkThread(cap);
1045         debugTrace(DEBUG_sched, "creating a spark thread");
1046     }
1047 }
1048 #endif // PARALLEL_HASKELL || THREADED_RTS
1049
1050 /* ----------------------------------------------------------------------------
1051  * After running a thread...
1052  * ------------------------------------------------------------------------- */
1053
1054 static void
1055 schedulePostRunThread (Capability *cap, StgTSO *t)
1056 {
1057     // We have to be able to catch transactions that are in an
1058     // infinite loop as a result of seeing an inconsistent view of
1059     // memory, e.g. 
1060     //
1061     //   atomically $ do
1062     //       [a,b] <- mapM readTVar [ta,tb]
1063     //       when (a == b) loop
1064     //
1065     // and a is never equal to b given a consistent view of memory.
1066     //
1067     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1068         if (!stmValidateNestOfTransactions (t -> trec)) {
1069             debugTrace(DEBUG_sched | DEBUG_stm,
1070                        "trec %p found wasting its time", t);
1071             
1072             // strip the stack back to the
1073             // ATOMICALLY_FRAME, aborting the (nested)
1074             // transaction, and saving the stack of any
1075             // partially-evaluated thunks on the heap.
1076             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1077             
1078             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1079         }
1080     }
1081
1082   /* some statistics gathering in the parallel case */
1083 }
1084
1085 /* -----------------------------------------------------------------------------
1086  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1087  * -------------------------------------------------------------------------- */
1088
1089 static rtsBool
1090 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1091 {
1092     // did the task ask for a large block?
1093     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1094         // if so, get one and push it on the front of the nursery.
1095         bdescr *bd;
1096         lnat blocks;
1097         
1098         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1099         
1100         debugTrace(DEBUG_sched,
1101                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1102                    (long)t->id, whatNext_strs[t->what_next], blocks);
1103     
1104         // don't do this if the nursery is (nearly) full, we'll GC first.
1105         if (cap->r.rCurrentNursery->link != NULL ||
1106             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1107                                                // if the nursery has only one block.
1108             
1109             ACQUIRE_SM_LOCK
1110             bd = allocGroup( blocks );
1111             RELEASE_SM_LOCK
1112             cap->r.rNursery->n_blocks += blocks;
1113             
1114             // link the new group into the list
1115             bd->link = cap->r.rCurrentNursery;
1116             bd->u.back = cap->r.rCurrentNursery->u.back;
1117             if (cap->r.rCurrentNursery->u.back != NULL) {
1118                 cap->r.rCurrentNursery->u.back->link = bd;
1119             } else {
1120 #if !defined(THREADED_RTS)
1121                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1122                        g0s0 == cap->r.rNursery);
1123 #endif
1124                 cap->r.rNursery->blocks = bd;
1125             }             
1126             cap->r.rCurrentNursery->u.back = bd;
1127             
1128             // initialise it as a nursery block.  We initialise the
1129             // step, gen_no, and flags field of *every* sub-block in
1130             // this large block, because this is easier than making
1131             // sure that we always find the block head of a large
1132             // block whenever we call Bdescr() (eg. evacuate() and
1133             // isAlive() in the GC would both have to do this, at
1134             // least).
1135             { 
1136                 bdescr *x;
1137                 for (x = bd; x < bd + blocks; x++) {
1138                     x->step = cap->r.rNursery;
1139                     x->gen_no = 0;
1140                     x->flags = 0;
1141                 }
1142             }
1143             
1144             // This assert can be a killer if the app is doing lots
1145             // of large block allocations.
1146             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1147             
1148             // now update the nursery to point to the new block
1149             cap->r.rCurrentNursery = bd;
1150             
1151             // we might be unlucky and have another thread get on the
1152             // run queue before us and steal the large block, but in that
1153             // case the thread will just end up requesting another large
1154             // block.
1155             pushOnRunQueue(cap,t);
1156             return rtsFalse;  /* not actually GC'ing */
1157         }
1158     }
1159     
1160     debugTrace(DEBUG_sched,
1161                "--<< thread %ld (%s) stopped: HeapOverflow",
1162                (long)t->id, whatNext_strs[t->what_next]);
1163
1164     if (cap->r.rHpLim == NULL || cap->context_switch) {
1165         // Sometimes we miss a context switch, e.g. when calling
1166         // primitives in a tight loop, MAYBE_GC() doesn't check the
1167         // context switch flag, and we end up waiting for a GC.
1168         // See #1984, and concurrent/should_run/1984
1169         cap->context_switch = 0;
1170         addToRunQueue(cap,t);
1171     } else {
1172         pushOnRunQueue(cap,t);
1173     }
1174     return rtsTrue;
1175     /* actual GC is done at the end of the while loop in schedule() */
1176 }
1177
1178 /* -----------------------------------------------------------------------------
1179  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1180  * -------------------------------------------------------------------------- */
1181
1182 static void
1183 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1184 {
1185     debugTrace (DEBUG_sched,
1186                 "--<< thread %ld (%s) stopped, StackOverflow", 
1187                 (long)t->id, whatNext_strs[t->what_next]);
1188
1189     /* just adjust the stack for this thread, then pop it back
1190      * on the run queue.
1191      */
1192     { 
1193         /* enlarge the stack */
1194         StgTSO *new_t = threadStackOverflow(cap, t);
1195         
1196         /* The TSO attached to this Task may have moved, so update the
1197          * pointer to it.
1198          */
1199         if (task->tso == t) {
1200             task->tso = new_t;
1201         }
1202         pushOnRunQueue(cap,new_t);
1203     }
1204 }
1205
1206 /* -----------------------------------------------------------------------------
1207  * Handle a thread that returned to the scheduler with ThreadYielding
1208  * -------------------------------------------------------------------------- */
1209
1210 static rtsBool
1211 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1212 {
1213     // Reset the context switch flag.  We don't do this just before
1214     // running the thread, because that would mean we would lose ticks
1215     // during GC, which can lead to unfair scheduling (a thread hogs
1216     // the CPU because the tick always arrives during GC).  This way
1217     // penalises threads that do a lot of allocation, but that seems
1218     // better than the alternative.
1219     cap->context_switch = 0;
1220     
1221     /* put the thread back on the run queue.  Then, if we're ready to
1222      * GC, check whether this is the last task to stop.  If so, wake
1223      * up the GC thread.  getThread will block during a GC until the
1224      * GC is finished.
1225      */
1226 #ifdef DEBUG
1227     if (t->what_next != prev_what_next) {
1228         debugTrace(DEBUG_sched,
1229                    "--<< thread %ld (%s) stopped to switch evaluators", 
1230                    (long)t->id, whatNext_strs[t->what_next]);
1231     } else {
1232         debugTrace(DEBUG_sched,
1233                    "--<< thread %ld (%s) stopped, yielding",
1234                    (long)t->id, whatNext_strs[t->what_next]);
1235     }
1236 #endif
1237     
1238     IF_DEBUG(sanity,
1239              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1240              checkTSO(t));
1241     ASSERT(t->_link == END_TSO_QUEUE);
1242     
1243     // Shortcut if we're just switching evaluators: don't bother
1244     // doing stack squeezing (which can be expensive), just run the
1245     // thread.
1246     if (t->what_next != prev_what_next) {
1247         return rtsTrue;
1248     }
1249
1250     addToRunQueue(cap,t);
1251
1252     return rtsFalse;
1253 }
1254
1255 /* -----------------------------------------------------------------------------
1256  * Handle a thread that returned to the scheduler with ThreadBlocked
1257  * -------------------------------------------------------------------------- */
1258
1259 static void
1260 scheduleHandleThreadBlocked( StgTSO *t
1261 #if !defined(DEBUG)
1262     STG_UNUSED
1263 #endif
1264     )
1265 {
1266
1267       // We don't need to do anything.  The thread is blocked, and it
1268       // has tidied up its stack and placed itself on whatever queue
1269       // it needs to be on.
1270
1271     // ASSERT(t->why_blocked != NotBlocked);
1272     // Not true: for example,
1273     //    - in THREADED_RTS, the thread may already have been woken
1274     //      up by another Capability.  This actually happens: try
1275     //      conc023 +RTS -N2.
1276     //    - the thread may have woken itself up already, because
1277     //      threadPaused() might have raised a blocked throwTo
1278     //      exception, see maybePerformBlockedException().
1279
1280 #ifdef DEBUG
1281     if (traceClass(DEBUG_sched)) {
1282         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1283                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1284         printThreadBlockage(t);
1285         debugTraceEnd();
1286     }
1287 #endif
1288 }
1289
1290 /* -----------------------------------------------------------------------------
1291  * Handle a thread that returned to the scheduler with ThreadFinished
1292  * -------------------------------------------------------------------------- */
1293
1294 static rtsBool
1295 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1296 {
1297     /* Need to check whether this was a main thread, and if so,
1298      * return with the return value.
1299      *
1300      * We also end up here if the thread kills itself with an
1301      * uncaught exception, see Exception.cmm.
1302      */
1303     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1304                (unsigned long)t->id, whatNext_strs[t->what_next]);
1305
1306     // blocked exceptions can now complete, even if the thread was in
1307     // blocked mode (see #2910).  This unconditionally calls
1308     // lockTSO(), which ensures that we don't miss any threads that
1309     // are engaged in throwTo() with this thread as a target.
1310     awakenBlockedExceptionQueue (cap, t);
1311
1312       //
1313       // Check whether the thread that just completed was a bound
1314       // thread, and if so return with the result.  
1315       //
1316       // There is an assumption here that all thread completion goes
1317       // through this point; we need to make sure that if a thread
1318       // ends up in the ThreadKilled state, that it stays on the run
1319       // queue so it can be dealt with here.
1320       //
1321
1322       if (t->bound) {
1323
1324           if (t->bound != task) {
1325 #if !defined(THREADED_RTS)
1326               // Must be a bound thread that is not the topmost one.  Leave
1327               // it on the run queue until the stack has unwound to the
1328               // point where we can deal with this.  Leaving it on the run
1329               // queue also ensures that the garbage collector knows about
1330               // this thread and its return value (it gets dropped from the
1331               // step->threads list so there's no other way to find it).
1332               appendToRunQueue(cap,t);
1333               return rtsFalse;
1334 #else
1335               // this cannot happen in the threaded RTS, because a
1336               // bound thread can only be run by the appropriate Task.
1337               barf("finished bound thread that isn't mine");
1338 #endif
1339           }
1340
1341           ASSERT(task->tso == t);
1342
1343           if (t->what_next == ThreadComplete) {
1344               if (task->ret) {
1345                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1346                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1347               }
1348               task->stat = Success;
1349           } else {
1350               if (task->ret) {
1351                   *(task->ret) = NULL;
1352               }
1353               if (sched_state >= SCHED_INTERRUPTING) {
1354                   if (heap_overflow) {
1355                       task->stat = HeapExhausted;
1356                   } else {
1357                       task->stat = Interrupted;
1358                   }
1359               } else {
1360                   task->stat = Killed;
1361               }
1362           }
1363 #ifdef DEBUG
1364           removeThreadLabel((StgWord)task->tso->id);
1365 #endif
1366           return rtsTrue; // tells schedule() to return
1367       }
1368
1369       return rtsFalse;
1370 }
1371
1372 /* -----------------------------------------------------------------------------
1373  * Perform a heap census
1374  * -------------------------------------------------------------------------- */
1375
1376 static rtsBool
1377 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1378 {
1379     // When we have +RTS -i0 and we're heap profiling, do a census at
1380     // every GC.  This lets us get repeatable runs for debugging.
1381     if (performHeapProfile ||
1382         (RtsFlags.ProfFlags.profileInterval==0 &&
1383          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1384         return rtsTrue;
1385     } else {
1386         return rtsFalse;
1387     }
1388 }
1389
1390 /* -----------------------------------------------------------------------------
1391  * Perform a garbage collection if necessary
1392  * -------------------------------------------------------------------------- */
1393
1394 static Capability *
1395 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1396 {
1397     rtsBool heap_census;
1398 #ifdef THREADED_RTS
1399     /* extern static volatile StgWord waiting_for_gc; 
1400        lives inside capability.c */
1401     rtsBool gc_type, prev_pending_gc;
1402     nat i;
1403 #endif
1404
1405     if (sched_state == SCHED_SHUTTING_DOWN) {
1406         // The final GC has already been done, and the system is
1407         // shutting down.  We'll probably deadlock if we try to GC
1408         // now.
1409         return cap;
1410     }
1411
1412 #ifdef THREADED_RTS
1413     if (sched_state < SCHED_INTERRUPTING
1414         && RtsFlags.ParFlags.parGcEnabled
1415         && N >= RtsFlags.ParFlags.parGcGen
1416         && ! oldest_gen->steps[0].mark)
1417     {
1418         gc_type = PENDING_GC_PAR;
1419     } else {
1420         gc_type = PENDING_GC_SEQ;
1421     }
1422
1423     // In order to GC, there must be no threads running Haskell code.
1424     // Therefore, the GC thread needs to hold *all* the capabilities,
1425     // and release them after the GC has completed.  
1426     //
1427     // This seems to be the simplest way: previous attempts involved
1428     // making all the threads with capabilities give up their
1429     // capabilities and sleep except for the *last* one, which
1430     // actually did the GC.  But it's quite hard to arrange for all
1431     // the other tasks to sleep and stay asleep.
1432     //
1433
1434     /*  Other capabilities are prevented from running yet more Haskell
1435         threads if waiting_for_gc is set. Tested inside
1436         yieldCapability() and releaseCapability() in Capability.c */
1437
1438     prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1439     if (prev_pending_gc) {
1440         do {
1441             debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
1442                        prev_pending_gc);
1443             ASSERT(cap);
1444             yieldCapability(&cap,task);
1445         } while (waiting_for_gc);
1446         return cap;  // NOTE: task->cap might have changed here
1447     }
1448
1449     setContextSwitches();
1450
1451     // The final shutdown GC is always single-threaded, because it's
1452     // possible that some of the Capabilities have no worker threads.
1453     
1454     if (gc_type == PENDING_GC_SEQ)
1455     {
1456         postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
1457         // single-threaded GC: grab all the capabilities
1458         for (i=0; i < n_capabilities; i++) {
1459             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1460             if (cap != &capabilities[i]) {
1461                 Capability *pcap = &capabilities[i];
1462                 // we better hope this task doesn't get migrated to
1463                 // another Capability while we're waiting for this one.
1464                 // It won't, because load balancing happens while we have
1465                 // all the Capabilities, but even so it's a slightly
1466                 // unsavoury invariant.
1467                 task->cap = pcap;
1468                 waitForReturnCapability(&pcap, task);
1469                 if (pcap != &capabilities[i]) {
1470                     barf("scheduleDoGC: got the wrong capability");
1471                 }
1472             }
1473         }
1474     }
1475     else
1476     {
1477         // multi-threaded GC: make sure all the Capabilities donate one
1478         // GC thread each.
1479         postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
1480         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1481
1482         waitForGcThreads(cap);
1483     }
1484 #endif
1485
1486     // so this happens periodically:
1487     if (cap) scheduleCheckBlackHoles(cap);
1488     
1489     IF_DEBUG(scheduler, printAllThreads());
1490
1491 delete_threads_and_gc:
1492     /*
1493      * We now have all the capabilities; if we're in an interrupting
1494      * state, then we should take the opportunity to delete all the
1495      * threads in the system.
1496      */
1497     if (sched_state == SCHED_INTERRUPTING) {
1498         deleteAllThreads(cap);
1499         sched_state = SCHED_SHUTTING_DOWN;
1500     }
1501     
1502     heap_census = scheduleNeedHeapProfile(rtsTrue);
1503
1504 #if defined(THREADED_RTS)
1505     postEvent(cap, EVENT_GC_START, 0, 0);
1506     debugTrace(DEBUG_sched, "doing GC");
1507     // reset waiting_for_gc *before* GC, so that when the GC threads
1508     // emerge they don't immediately re-enter the GC.
1509     waiting_for_gc = 0;
1510     GarbageCollect(force_major || heap_census, gc_type, cap);
1511 #else
1512     GarbageCollect(force_major || heap_census, 0, cap);
1513 #endif
1514     postEvent(cap, EVENT_GC_END, 0, 0);
1515
1516     if (recent_activity == ACTIVITY_INACTIVE && force_major)
1517     {
1518         // We are doing a GC because the system has been idle for a
1519         // timeslice and we need to check for deadlock.  Record the
1520         // fact that we've done a GC and turn off the timer signal;
1521         // it will get re-enabled if we run any threads after the GC.
1522         recent_activity = ACTIVITY_DONE_GC;
1523         stopTimer();
1524     }
1525     else
1526     {
1527         // the GC might have taken long enough for the timer to set
1528         // recent_activity = ACTIVITY_INACTIVE, but we aren't
1529         // necessarily deadlocked:
1530         recent_activity = ACTIVITY_YES;
1531     }
1532
1533 #if defined(THREADED_RTS)
1534     if (gc_type == PENDING_GC_PAR)
1535     {
1536         releaseGCThreads(cap);
1537     }
1538 #endif
1539
1540     if (heap_census) {
1541         debugTrace(DEBUG_sched, "performing heap census");
1542         heapCensus();
1543         performHeapProfile = rtsFalse;
1544     }
1545
1546     if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1547         // GC set the heap_overflow flag, so we should proceed with
1548         // an orderly shutdown now.  Ultimately we want the main
1549         // thread to return to its caller with HeapExhausted, at which
1550         // point the caller should call hs_exit().  The first step is
1551         // to delete all the threads.
1552         //
1553         // Another way to do this would be to raise an exception in
1554         // the main thread, which we really should do because it gives
1555         // the program a chance to clean up.  But how do we find the
1556         // main thread?  It should presumably be the same one that
1557         // gets ^C exceptions, but that's all done on the Haskell side
1558         // (GHC.TopHandler).
1559         sched_state = SCHED_INTERRUPTING;
1560         goto delete_threads_and_gc;
1561     }
1562
1563 #ifdef SPARKBALANCE
1564     /* JB 
1565        Once we are all together... this would be the place to balance all
1566        spark pools. No concurrent stealing or adding of new sparks can
1567        occur. Should be defined in Sparks.c. */
1568     balanceSparkPoolsCaps(n_capabilities, capabilities);
1569 #endif
1570
1571 #if defined(THREADED_RTS)
1572     if (gc_type == PENDING_GC_SEQ) {
1573         // release our stash of capabilities.
1574         for (i = 0; i < n_capabilities; i++) {
1575             if (cap != &capabilities[i]) {
1576                 task->cap = &capabilities[i];
1577                 releaseCapability(&capabilities[i]);
1578             }
1579         }
1580     }
1581     if (cap) {
1582         task->cap = cap;
1583     } else {
1584         task->cap = NULL;
1585     }
1586 #endif
1587
1588     return cap;
1589 }
1590
1591 /* ---------------------------------------------------------------------------
1592  * Singleton fork(). Do not copy any running threads.
1593  * ------------------------------------------------------------------------- */
1594
1595 pid_t
1596 forkProcess(HsStablePtr *entry
1597 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1598             STG_UNUSED
1599 #endif
1600            )
1601 {
1602 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1603     Task *task;
1604     pid_t pid;
1605     StgTSO* t,*next;
1606     Capability *cap;
1607     nat s;
1608     
1609 #if defined(THREADED_RTS)
1610     if (RtsFlags.ParFlags.nNodes > 1) {
1611         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1612         stg_exit(EXIT_FAILURE);
1613     }
1614 #endif
1615
1616     debugTrace(DEBUG_sched, "forking!");
1617     
1618     // ToDo: for SMP, we should probably acquire *all* the capabilities
1619     cap = rts_lock();
1620     
1621     // no funny business: hold locks while we fork, otherwise if some
1622     // other thread is holding a lock when the fork happens, the data
1623     // structure protected by the lock will forever be in an
1624     // inconsistent state in the child.  See also #1391.
1625     ACQUIRE_LOCK(&sched_mutex);
1626     ACQUIRE_LOCK(&cap->lock);
1627     ACQUIRE_LOCK(&cap->running_task->lock);
1628
1629     pid = fork();
1630     
1631     if (pid) { // parent
1632         
1633         RELEASE_LOCK(&sched_mutex);
1634         RELEASE_LOCK(&cap->lock);
1635         RELEASE_LOCK(&cap->running_task->lock);
1636
1637         // just return the pid
1638         rts_unlock(cap);
1639         return pid;
1640         
1641     } else { // child
1642         
1643 #if defined(THREADED_RTS)
1644         initMutex(&sched_mutex);
1645         initMutex(&cap->lock);
1646         initMutex(&cap->running_task->lock);
1647 #endif
1648
1649         // Now, all OS threads except the thread that forked are
1650         // stopped.  We need to stop all Haskell threads, including
1651         // those involved in foreign calls.  Also we need to delete
1652         // all Tasks, because they correspond to OS threads that are
1653         // now gone.
1654
1655         for (s = 0; s < total_steps; s++) {
1656           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1657             if (t->what_next == ThreadRelocated) {
1658                 next = t->_link;
1659             } else {
1660                 next = t->global_link;
1661                 // don't allow threads to catch the ThreadKilled
1662                 // exception, but we do want to raiseAsync() because these
1663                 // threads may be evaluating thunks that we need later.
1664                 deleteThread_(cap,t);
1665             }
1666           }
1667         }
1668         
1669         // Empty the run queue.  It seems tempting to let all the
1670         // killed threads stay on the run queue as zombies to be
1671         // cleaned up later, but some of them correspond to bound
1672         // threads for which the corresponding Task does not exist.
1673         cap->run_queue_hd = END_TSO_QUEUE;
1674         cap->run_queue_tl = END_TSO_QUEUE;
1675
1676         // Any suspended C-calling Tasks are no more, their OS threads
1677         // don't exist now:
1678         cap->suspended_ccalling_tasks = NULL;
1679
1680         // Empty the threads lists.  Otherwise, the garbage
1681         // collector may attempt to resurrect some of these threads.
1682         for (s = 0; s < total_steps; s++) {
1683             all_steps[s].threads = END_TSO_QUEUE;
1684         }
1685
1686         // Wipe the task list, except the current Task.
1687         ACQUIRE_LOCK(&sched_mutex);
1688         for (task = all_tasks; task != NULL; task=task->all_link) {
1689             if (task != cap->running_task) {
1690 #if defined(THREADED_RTS)
1691                 initMutex(&task->lock); // see #1391
1692 #endif
1693                 discardTask(task);
1694             }
1695         }
1696         RELEASE_LOCK(&sched_mutex);
1697
1698 #if defined(THREADED_RTS)
1699         // Wipe our spare workers list, they no longer exist.  New
1700         // workers will be created if necessary.
1701         cap->spare_workers = NULL;
1702         cap->returning_tasks_hd = NULL;
1703         cap->returning_tasks_tl = NULL;
1704 #endif
1705
1706         // On Unix, all timers are reset in the child, so we need to start
1707         // the timer again.
1708         initTimer();
1709         startTimer();
1710
1711         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1712         rts_checkSchedStatus("forkProcess",cap);
1713         
1714         rts_unlock(cap);
1715         hs_exit();                      // clean up and exit
1716         stg_exit(EXIT_SUCCESS);
1717     }
1718 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1719     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1720 #endif
1721 }
1722
1723 /* ---------------------------------------------------------------------------
1724  * Delete all the threads in the system
1725  * ------------------------------------------------------------------------- */
1726    
1727 static void
1728 deleteAllThreads ( Capability *cap )
1729 {
1730     // NOTE: only safe to call if we own all capabilities.
1731
1732     StgTSO* t, *next;
1733     nat s;
1734
1735     debugTrace(DEBUG_sched,"deleting all threads");
1736     for (s = 0; s < total_steps; s++) {
1737       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1738         if (t->what_next == ThreadRelocated) {
1739             next = t->_link;
1740         } else {
1741             next = t->global_link;
1742             deleteThread(cap,t);
1743         }
1744       }
1745     }      
1746
1747     // The run queue now contains a bunch of ThreadKilled threads.  We
1748     // must not throw these away: the main thread(s) will be in there
1749     // somewhere, and the main scheduler loop has to deal with it.
1750     // Also, the run queue is the only thing keeping these threads from
1751     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1752
1753 #if !defined(THREADED_RTS)
1754     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1755     ASSERT(sleeping_queue == END_TSO_QUEUE);
1756 #endif
1757 }
1758
1759 /* -----------------------------------------------------------------------------
1760    Managing the suspended_ccalling_tasks list.
1761    Locks required: sched_mutex
1762    -------------------------------------------------------------------------- */
1763
1764 STATIC_INLINE void
1765 suspendTask (Capability *cap, Task *task)
1766 {
1767     ASSERT(task->next == NULL && task->prev == NULL);
1768     task->next = cap->suspended_ccalling_tasks;
1769     task->prev = NULL;
1770     if (cap->suspended_ccalling_tasks) {
1771         cap->suspended_ccalling_tasks->prev = task;
1772     }
1773     cap->suspended_ccalling_tasks = task;
1774 }
1775
1776 STATIC_INLINE void
1777 recoverSuspendedTask (Capability *cap, Task *task)
1778 {
1779     if (task->prev) {
1780         task->prev->next = task->next;
1781     } else {
1782         ASSERT(cap->suspended_ccalling_tasks == task);
1783         cap->suspended_ccalling_tasks = task->next;
1784     }
1785     if (task->next) {
1786         task->next->prev = task->prev;
1787     }
1788     task->next = task->prev = NULL;
1789 }
1790
1791 /* ---------------------------------------------------------------------------
1792  * Suspending & resuming Haskell threads.
1793  * 
1794  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1795  * its capability before calling the C function.  This allows another
1796  * task to pick up the capability and carry on running Haskell
1797  * threads.  It also means that if the C call blocks, it won't lock
1798  * the whole system.
1799  *
1800  * The Haskell thread making the C call is put to sleep for the
1801  * duration of the call, on the susepended_ccalling_threads queue.  We
1802  * give out a token to the task, which it can use to resume the thread
1803  * on return from the C function.
1804  * ------------------------------------------------------------------------- */
1805    
1806 void *
1807 suspendThread (StgRegTable *reg)
1808 {
1809   Capability *cap;
1810   int saved_errno;
1811   StgTSO *tso;
1812   Task *task;
1813 #if mingw32_HOST_OS
1814   StgWord32 saved_winerror;
1815 #endif
1816
1817   saved_errno = errno;
1818 #if mingw32_HOST_OS
1819   saved_winerror = GetLastError();
1820 #endif
1821
1822   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1823    */
1824   cap = regTableToCapability(reg);
1825
1826   task = cap->running_task;
1827   tso = cap->r.rCurrentTSO;
1828
1829   postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
1830   debugTrace(DEBUG_sched, 
1831              "thread %lu did a safe foreign call", 
1832              (unsigned long)cap->r.rCurrentTSO->id);
1833
1834   // XXX this might not be necessary --SDM
1835   tso->what_next = ThreadRunGHC;
1836
1837   threadPaused(cap,tso);
1838
1839   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1840       tso->why_blocked = BlockedOnCCall;
1841       tso->flags |= TSO_BLOCKEX;
1842       tso->flags &= ~TSO_INTERRUPTIBLE;
1843   } else {
1844       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1845   }
1846
1847   // Hand back capability
1848   task->suspended_tso = tso;
1849
1850   ACQUIRE_LOCK(&cap->lock);
1851
1852   suspendTask(cap,task);
1853   cap->in_haskell = rtsFalse;
1854   releaseCapability_(cap,rtsFalse);
1855   
1856   RELEASE_LOCK(&cap->lock);
1857
1858 #if defined(THREADED_RTS)
1859   /* Preparing to leave the RTS, so ensure there's a native thread/task
1860      waiting to take over.
1861   */
1862   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1863 #endif
1864
1865   errno = saved_errno;
1866 #if mingw32_HOST_OS
1867   SetLastError(saved_winerror);
1868 #endif
1869   return task;
1870 }
1871
1872 StgRegTable *
1873 resumeThread (void *task_)
1874 {
1875     StgTSO *tso;
1876     Capability *cap;
1877     Task *task = task_;
1878     int saved_errno;
1879 #if mingw32_HOST_OS
1880     StgWord32 saved_winerror;
1881 #endif
1882
1883     saved_errno = errno;
1884 #if mingw32_HOST_OS
1885     saved_winerror = GetLastError();
1886 #endif
1887
1888     cap = task->cap;
1889     // Wait for permission to re-enter the RTS with the result.
1890     waitForReturnCapability(&cap,task);
1891     // we might be on a different capability now... but if so, our
1892     // entry on the suspended_ccalling_tasks list will also have been
1893     // migrated.
1894
1895     // Remove the thread from the suspended list
1896     recoverSuspendedTask(cap,task);
1897
1898     tso = task->suspended_tso;
1899     task->suspended_tso = NULL;
1900     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1901
1902     postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
1903     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1904     
1905     if (tso->why_blocked == BlockedOnCCall) {
1906         // avoid locking the TSO if we don't have to
1907         if (tso->blocked_exceptions != END_TSO_QUEUE) {
1908             awakenBlockedExceptionQueue(cap,tso);
1909         }
1910         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1911     }
1912     
1913     /* Reset blocking status */
1914     tso->why_blocked  = NotBlocked;
1915     
1916     cap->r.rCurrentTSO = tso;
1917     cap->in_haskell = rtsTrue;
1918     errno = saved_errno;
1919 #if mingw32_HOST_OS
1920     SetLastError(saved_winerror);
1921 #endif
1922
1923     /* We might have GC'd, mark the TSO dirty again */
1924     dirty_TSO(cap,tso);
1925
1926     IF_DEBUG(sanity, checkTSO(tso));
1927
1928     return &cap->r;
1929 }
1930
1931 /* ---------------------------------------------------------------------------
1932  * scheduleThread()
1933  *
1934  * scheduleThread puts a thread on the end  of the runnable queue.
1935  * This will usually be done immediately after a thread is created.
1936  * The caller of scheduleThread must create the thread using e.g.
1937  * createThread and push an appropriate closure
1938  * on this thread's stack before the scheduler is invoked.
1939  * ------------------------------------------------------------------------ */
1940
1941 void
1942 scheduleThread(Capability *cap, StgTSO *tso)
1943 {
1944     // The thread goes at the *end* of the run-queue, to avoid possible
1945     // starvation of any threads already on the queue.
1946     appendToRunQueue(cap,tso);
1947 }
1948
1949 void
1950 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1951 {
1952 #if defined(THREADED_RTS)
1953     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1954                               // move this thread from now on.
1955     cpu %= RtsFlags.ParFlags.nNodes;
1956     if (cpu == cap->no) {
1957         appendToRunQueue(cap,tso);
1958     } else {
1959         postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
1960         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1961     }
1962 #else
1963     appendToRunQueue(cap,tso);
1964 #endif
1965 }
1966
1967 Capability *
1968 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1969 {
1970     Task *task;
1971
1972     // We already created/initialised the Task
1973     task = cap->running_task;
1974
1975     // This TSO is now a bound thread; make the Task and TSO
1976     // point to each other.
1977     tso->bound = task;
1978     tso->cap = cap;
1979
1980     task->tso = tso;
1981     task->ret = ret;
1982     task->stat = NoStatus;
1983
1984     appendToRunQueue(cap,tso);
1985
1986     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1987
1988     cap = schedule(cap,task);
1989
1990     ASSERT(task->stat != NoStatus);
1991     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1992
1993     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1994     return cap;
1995 }
1996
1997 /* ----------------------------------------------------------------------------
1998  * Starting Tasks
1999  * ------------------------------------------------------------------------- */
2000
2001 #if defined(THREADED_RTS)
2002 void OSThreadProcAttr
2003 workerStart(Task *task)
2004 {
2005     Capability *cap;
2006
2007     // See startWorkerTask().
2008     ACQUIRE_LOCK(&task->lock);
2009     cap = task->cap;
2010     RELEASE_LOCK(&task->lock);
2011
2012     if (RtsFlags.ParFlags.setAffinity) {
2013         setThreadAffinity(cap->no, n_capabilities);
2014     }
2015
2016     // set the thread-local pointer to the Task:
2017     taskEnter(task);
2018
2019     // schedule() runs without a lock.
2020     cap = schedule(cap,task);
2021
2022     // On exit from schedule(), we have a Capability, but possibly not
2023     // the same one we started with.
2024
2025     // During shutdown, the requirement is that after all the
2026     // Capabilities are shut down, all workers that are shutting down
2027     // have finished workerTaskStop().  This is why we hold on to
2028     // cap->lock until we've finished workerTaskStop() below.
2029     //
2030     // There may be workers still involved in foreign calls; those
2031     // will just block in waitForReturnCapability() because the
2032     // Capability has been shut down.
2033     //
2034     ACQUIRE_LOCK(&cap->lock);
2035     releaseCapability_(cap,rtsFalse);
2036     workerTaskStop(task);
2037     RELEASE_LOCK(&cap->lock);
2038 }
2039 #endif
2040
2041 /* ---------------------------------------------------------------------------
2042  * initScheduler()
2043  *
2044  * Initialise the scheduler.  This resets all the queues - if the
2045  * queues contained any threads, they'll be garbage collected at the
2046  * next pass.
2047  *
2048  * ------------------------------------------------------------------------ */
2049
2050 void 
2051 initScheduler(void)
2052 {
2053 #if !defined(THREADED_RTS)
2054   blocked_queue_hd  = END_TSO_QUEUE;
2055   blocked_queue_tl  = END_TSO_QUEUE;
2056   sleeping_queue    = END_TSO_QUEUE;
2057 #endif
2058
2059   blackhole_queue   = END_TSO_QUEUE;
2060
2061   sched_state    = SCHED_RUNNING;
2062   recent_activity = ACTIVITY_YES;
2063
2064 #if defined(THREADED_RTS)
2065   /* Initialise the mutex and condition variables used by
2066    * the scheduler. */
2067   initMutex(&sched_mutex);
2068 #endif
2069   
2070   ACQUIRE_LOCK(&sched_mutex);
2071
2072   /* A capability holds the state a native thread needs in
2073    * order to execute STG code. At least one capability is
2074    * floating around (only THREADED_RTS builds have more than one).
2075    */
2076   initCapabilities();
2077
2078   initTaskManager();
2079
2080 #if defined(THREADED_RTS)
2081   initSparkPools();
2082 #endif
2083
2084 #if defined(THREADED_RTS)
2085   /*
2086    * Eagerly start one worker to run each Capability, except for
2087    * Capability 0.  The idea is that we're probably going to start a
2088    * bound thread on Capability 0 pretty soon, so we don't want a
2089    * worker task hogging it.
2090    */
2091   { 
2092       nat i;
2093       Capability *cap;
2094       for (i = 1; i < n_capabilities; i++) {
2095           cap = &capabilities[i];
2096           ACQUIRE_LOCK(&cap->lock);
2097           startWorkerTask(cap, workerStart);
2098           RELEASE_LOCK(&cap->lock);
2099       }
2100   }
2101 #endif
2102
2103   RELEASE_LOCK(&sched_mutex);
2104 }
2105
2106 void
2107 exitScheduler(
2108     rtsBool wait_foreign
2109 #if !defined(THREADED_RTS)
2110                          __attribute__((unused))
2111 #endif
2112 )
2113                /* see Capability.c, shutdownCapability() */
2114 {
2115     Task *task = NULL;
2116
2117     task = newBoundTask();
2118
2119     // If we haven't killed all the threads yet, do it now.
2120     if (sched_state < SCHED_SHUTTING_DOWN) {
2121         sched_state = SCHED_INTERRUPTING;
2122         waitForReturnCapability(&task->cap,task);
2123         scheduleDoGC(task->cap,task,rtsFalse);    
2124         releaseCapability(task->cap);
2125     }
2126     sched_state = SCHED_SHUTTING_DOWN;
2127
2128 #if defined(THREADED_RTS)
2129     { 
2130         nat i;
2131         
2132         for (i = 0; i < n_capabilities; i++) {
2133             shutdownCapability(&capabilities[i], task, wait_foreign);
2134         }
2135         boundTaskExiting(task);
2136     }
2137 #endif
2138 }
2139
2140 void
2141 freeScheduler( void )
2142 {
2143     nat still_running;
2144
2145     ACQUIRE_LOCK(&sched_mutex);
2146     still_running = freeTaskManager();
2147     // We can only free the Capabilities if there are no Tasks still
2148     // running.  We might have a Task about to return from a foreign
2149     // call into waitForReturnCapability(), for example (actually,
2150     // this should be the *only* thing that a still-running Task can
2151     // do at this point, and it will block waiting for the
2152     // Capability).
2153     if (still_running == 0) {
2154         freeCapabilities();
2155         if (n_capabilities != 1) {
2156             stgFree(capabilities);
2157         }
2158     }
2159     RELEASE_LOCK(&sched_mutex);
2160 #if defined(THREADED_RTS)
2161     closeMutex(&sched_mutex);
2162 #endif
2163 }
2164
2165 /* -----------------------------------------------------------------------------
2166    performGC
2167
2168    This is the interface to the garbage collector from Haskell land.
2169    We provide this so that external C code can allocate and garbage
2170    collect when called from Haskell via _ccall_GC.
2171    -------------------------------------------------------------------------- */
2172
2173 static void
2174 performGC_(rtsBool force_major)
2175 {
2176     Task *task;
2177
2178     // We must grab a new Task here, because the existing Task may be
2179     // associated with a particular Capability, and chained onto the 
2180     // suspended_ccalling_tasks queue.
2181     task = newBoundTask();
2182
2183     waitForReturnCapability(&task->cap,task);
2184     scheduleDoGC(task->cap,task,force_major);
2185     releaseCapability(task->cap);
2186     boundTaskExiting(task);
2187 }
2188
2189 void
2190 performGC(void)
2191 {
2192     performGC_(rtsFalse);
2193 }
2194
2195 void
2196 performMajorGC(void)
2197 {
2198     performGC_(rtsTrue);
2199 }
2200
2201 /* -----------------------------------------------------------------------------
2202    Stack overflow
2203
2204    If the thread has reached its maximum stack size, then raise the
2205    StackOverflow exception in the offending thread.  Otherwise
2206    relocate the TSO into a larger chunk of memory and adjust its stack
2207    size appropriately.
2208    -------------------------------------------------------------------------- */
2209
2210 static StgTSO *
2211 threadStackOverflow(Capability *cap, StgTSO *tso)
2212 {
2213   nat new_stack_size, stack_words;
2214   lnat new_tso_size;
2215   StgPtr new_sp;
2216   StgTSO *dest;
2217
2218   IF_DEBUG(sanity,checkTSO(tso));
2219
2220   // don't allow throwTo() to modify the blocked_exceptions queue
2221   // while we are moving the TSO:
2222   lockClosure((StgClosure *)tso);
2223
2224   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2225       // NB. never raise a StackOverflow exception if the thread is
2226       // inside Control.Exceptino.block.  It is impractical to protect
2227       // against stack overflow exceptions, since virtually anything
2228       // can raise one (even 'catch'), so this is the only sensible
2229       // thing to do here.  See bug #767.
2230
2231       debugTrace(DEBUG_gc,
2232                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2233                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2234       IF_DEBUG(gc,
2235                /* If we're debugging, just print out the top of the stack */
2236                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2237                                                 tso->sp+64)));
2238
2239       // Send this thread the StackOverflow exception
2240       unlockTSO(tso);
2241       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2242       return tso;
2243   }
2244
2245   /* Try to double the current stack size.  If that takes us over the
2246    * maximum stack size for this thread, then use the maximum instead
2247    * (that is, unless we're already at or over the max size and we
2248    * can't raise the StackOverflow exception (see above), in which
2249    * case just double the size). Finally round up so the TSO ends up as
2250    * a whole number of blocks.
2251    */
2252   if (tso->stack_size >= tso->max_stack_size) {
2253       new_stack_size = tso->stack_size * 2;
2254   } else { 
2255       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2256   }
2257   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2258                                        TSO_STRUCT_SIZE)/sizeof(W_);
2259   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2260   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2261
2262   debugTrace(DEBUG_sched, 
2263              "increasing stack size from %ld words to %d.",
2264              (long)tso->stack_size, new_stack_size);
2265
2266   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2267   TICK_ALLOC_TSO(new_stack_size,0);
2268
2269   /* copy the TSO block and the old stack into the new area */
2270   memcpy(dest,tso,TSO_STRUCT_SIZE);
2271   stack_words = tso->stack + tso->stack_size - tso->sp;
2272   new_sp = (P_)dest + new_tso_size - stack_words;
2273   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2274
2275   /* relocate the stack pointers... */
2276   dest->sp         = new_sp;
2277   dest->stack_size = new_stack_size;
2278         
2279   /* Mark the old TSO as relocated.  We have to check for relocated
2280    * TSOs in the garbage collector and any primops that deal with TSOs.
2281    *
2282    * It's important to set the sp value to just beyond the end
2283    * of the stack, so we don't attempt to scavenge any part of the
2284    * dead TSO's stack.
2285    */
2286   tso->what_next = ThreadRelocated;
2287   setTSOLink(cap,tso,dest);
2288   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2289   tso->why_blocked = NotBlocked;
2290
2291   unlockTSO(dest);
2292   unlockTSO(tso);
2293
2294   IF_DEBUG(sanity,checkTSO(dest));
2295 #if 0
2296   IF_DEBUG(scheduler,printTSO(dest));
2297 #endif
2298
2299   return dest;
2300 }
2301
2302 static StgTSO *
2303 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2304 {
2305     bdescr *bd, *new_bd;
2306     lnat free_w, tso_size_w;
2307     StgTSO *new_tso;
2308
2309     tso_size_w = tso_sizeW(tso);
2310
2311     if (tso_size_w < MBLOCK_SIZE_W ||
2312           // TSO is less than 2 mblocks (since the first mblock is
2313           // shorter than MBLOCK_SIZE_W)
2314         (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 ||
2315           // or TSO is not a whole number of megablocks (ensuring
2316           // precondition of splitLargeBlock() below)
2317         (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) ||
2318           // or TSO is smaller than the minimum stack size (rounded up)
2319         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2320           // or stack is using more than 1/4 of the available space
2321     {
2322         // then do nothing
2323         return tso;
2324     }
2325
2326     // don't allow throwTo() to modify the blocked_exceptions queue
2327     // while we are moving the TSO:
2328     lockClosure((StgClosure *)tso);
2329
2330     // this is the number of words we'll free
2331     free_w = round_to_mblocks(tso_size_w/2);
2332
2333     bd = Bdescr((StgPtr)tso);
2334     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2335     bd->free = bd->start + TSO_STRUCT_SIZEW;
2336
2337     new_tso = (StgTSO *)new_bd->start;
2338     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2339     new_tso->stack_size = new_bd->free - new_tso->stack;
2340
2341     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2342                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2343
2344     tso->what_next = ThreadRelocated;
2345     tso->_link = new_tso; // no write barrier reqd: same generation
2346
2347     // The TSO attached to this Task may have moved, so update the
2348     // pointer to it.
2349     if (task->tso == tso) {
2350         task->tso = new_tso;
2351     }
2352
2353     unlockTSO(new_tso);
2354     unlockTSO(tso);
2355
2356     IF_DEBUG(sanity,checkTSO(new_tso));
2357
2358     return new_tso;
2359 }
2360
2361 /* ---------------------------------------------------------------------------
2362    Interrupt execution
2363    - usually called inside a signal handler so it mustn't do anything fancy.   
2364    ------------------------------------------------------------------------ */
2365
2366 void
2367 interruptStgRts(void)
2368 {
2369     sched_state = SCHED_INTERRUPTING;
2370     setContextSwitches();
2371 #if defined(THREADED_RTS)
2372     wakeUpRts();
2373 #endif
2374 }
2375
2376 /* -----------------------------------------------------------------------------
2377    Wake up the RTS
2378    
2379    This function causes at least one OS thread to wake up and run the
2380    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2381    an external event has arrived that may need servicing (eg. a
2382    keyboard interrupt).
2383
2384    In the single-threaded RTS we don't do anything here; we only have
2385    one thread anyway, and the event that caused us to want to wake up
2386    will have interrupted any blocking system call in progress anyway.
2387    -------------------------------------------------------------------------- */
2388
2389 #if defined(THREADED_RTS)
2390 void wakeUpRts(void)
2391 {
2392     // This forces the IO Manager thread to wakeup, which will
2393     // in turn ensure that some OS thread wakes up and runs the
2394     // scheduler loop, which will cause a GC and deadlock check.
2395     ioManagerWakeup();
2396 }
2397 #endif
2398
2399 /* -----------------------------------------------------------------------------
2400  * checkBlackHoles()
2401  *
2402  * Check the blackhole_queue for threads that can be woken up.  We do
2403  * this periodically: before every GC, and whenever the run queue is
2404  * empty.
2405  *
2406  * An elegant solution might be to just wake up all the blocked
2407  * threads with awakenBlockedQueue occasionally: they'll go back to
2408  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2409  * doesn't give us a way to tell whether we've actually managed to
2410  * wake up any threads, so we would be busy-waiting.
2411  *
2412  * -------------------------------------------------------------------------- */
2413
2414 static rtsBool
2415 checkBlackHoles (Capability *cap)
2416 {
2417     StgTSO **prev, *t;
2418     rtsBool any_woke_up = rtsFalse;
2419     StgHalfWord type;
2420
2421     // blackhole_queue is global:
2422     ASSERT_LOCK_HELD(&sched_mutex);
2423
2424     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2425
2426     // ASSUMES: sched_mutex
2427     prev = &blackhole_queue;
2428     t = blackhole_queue;
2429     while (t != END_TSO_QUEUE) {
2430         if (t->what_next == ThreadRelocated) {
2431             t = t->_link;
2432             continue;
2433         }
2434         ASSERT(t->why_blocked == BlockedOnBlackHole);
2435         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2436         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2437             IF_DEBUG(sanity,checkTSO(t));
2438             t = unblockOne(cap, t);
2439             *prev = t;
2440             any_woke_up = rtsTrue;
2441         } else {
2442             prev = &t->_link;
2443             t = t->_link;
2444         }
2445     }
2446
2447     return any_woke_up;
2448 }
2449
2450 /* -----------------------------------------------------------------------------
2451    Deleting threads
2452
2453    This is used for interruption (^C) and forking, and corresponds to
2454    raising an exception but without letting the thread catch the
2455    exception.
2456    -------------------------------------------------------------------------- */
2457
2458 static void 
2459 deleteThread (Capability *cap, StgTSO *tso)
2460 {
2461     // NOTE: must only be called on a TSO that we have exclusive
2462     // access to, because we will call throwToSingleThreaded() below.
2463     // The TSO must be on the run queue of the Capability we own, or 
2464     // we must own all Capabilities.
2465
2466     if (tso->why_blocked != BlockedOnCCall &&
2467         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2468         throwToSingleThreaded(cap,tso,NULL);
2469     }
2470 }
2471
2472 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2473 static void 
2474 deleteThread_(Capability *cap, StgTSO *tso)
2475 { // for forkProcess only:
2476   // like deleteThread(), but we delete threads in foreign calls, too.
2477
2478     if (tso->why_blocked == BlockedOnCCall ||
2479         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2480         unblockOne(cap,tso);
2481         tso->what_next = ThreadKilled;
2482     } else {
2483         deleteThread(cap,tso);
2484     }
2485 }
2486 #endif
2487
2488 /* -----------------------------------------------------------------------------
2489    raiseExceptionHelper
2490    
2491    This function is called by the raise# primitve, just so that we can
2492    move some of the tricky bits of raising an exception from C-- into
2493    C.  Who knows, it might be a useful re-useable thing here too.
2494    -------------------------------------------------------------------------- */
2495
2496 StgWord
2497 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2498 {
2499     Capability *cap = regTableToCapability(reg);
2500     StgThunk *raise_closure = NULL;
2501     StgPtr p, next;
2502     StgRetInfoTable *info;
2503     //
2504     // This closure represents the expression 'raise# E' where E
2505     // is the exception raise.  It is used to overwrite all the
2506     // thunks which are currently under evaluataion.
2507     //
2508
2509     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2510     // LDV profiling: stg_raise_info has THUNK as its closure
2511     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2512     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2513     // 1 does not cause any problem unless profiling is performed.
2514     // However, when LDV profiling goes on, we need to linearly scan
2515     // small object pool, where raise_closure is stored, so we should
2516     // use MIN_UPD_SIZE.
2517     //
2518     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2519     //                                 sizeofW(StgClosure)+1);
2520     //
2521
2522     //
2523     // Walk up the stack, looking for the catch frame.  On the way,
2524     // we update any closures pointed to from update frames with the
2525     // raise closure that we just built.
2526     //
2527     p = tso->sp;
2528     while(1) {
2529         info = get_ret_itbl((StgClosure *)p);
2530         next = p + stack_frame_sizeW((StgClosure *)p);
2531         switch (info->i.type) {
2532             
2533         case UPDATE_FRAME:
2534             // Only create raise_closure if we need to.
2535             if (raise_closure == NULL) {
2536                 raise_closure = 
2537                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2538                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2539                 raise_closure->payload[0] = exception;
2540             }
2541             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2542             p = next;
2543             continue;
2544
2545         case ATOMICALLY_FRAME:
2546             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2547             tso->sp = p;
2548             return ATOMICALLY_FRAME;
2549             
2550         case CATCH_FRAME:
2551             tso->sp = p;
2552             return CATCH_FRAME;
2553
2554         case CATCH_STM_FRAME:
2555             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2556             tso->sp = p;
2557             return CATCH_STM_FRAME;
2558             
2559         case STOP_FRAME:
2560             tso->sp = p;
2561             return STOP_FRAME;
2562
2563         case CATCH_RETRY_FRAME:
2564         default:
2565             p = next; 
2566             continue;
2567         }
2568     }
2569 }
2570
2571
2572 /* -----------------------------------------------------------------------------
2573    findRetryFrameHelper
2574
2575    This function is called by the retry# primitive.  It traverses the stack
2576    leaving tso->sp referring to the frame which should handle the retry.  
2577
2578    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2579    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2580
2581    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2582    create) because retries are not considered to be exceptions, despite the
2583    similar implementation.
2584
2585    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2586    not be created within memory transactions.
2587    -------------------------------------------------------------------------- */
2588
2589 StgWord
2590 findRetryFrameHelper (StgTSO *tso)
2591 {
2592   StgPtr           p, next;
2593   StgRetInfoTable *info;
2594
2595   p = tso -> sp;
2596   while (1) {
2597     info = get_ret_itbl((StgClosure *)p);
2598     next = p + stack_frame_sizeW((StgClosure *)p);
2599     switch (info->i.type) {
2600       
2601     case ATOMICALLY_FRAME:
2602         debugTrace(DEBUG_stm,
2603                    "found ATOMICALLY_FRAME at %p during retry", p);
2604         tso->sp = p;
2605         return ATOMICALLY_FRAME;
2606       
2607     case CATCH_RETRY_FRAME:
2608         debugTrace(DEBUG_stm,
2609                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2610         tso->sp = p;
2611         return CATCH_RETRY_FRAME;
2612       
2613     case CATCH_STM_FRAME: {
2614         StgTRecHeader *trec = tso -> trec;
2615         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2616         debugTrace(DEBUG_stm,
2617                    "found CATCH_STM_FRAME at %p during retry", p);
2618         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2619         stmAbortTransaction(tso -> cap, trec);
2620         stmFreeAbortedTRec(tso -> cap, trec);
2621         tso -> trec = outer;
2622         p = next; 
2623         continue;
2624     }
2625       
2626
2627     default:
2628       ASSERT(info->i.type != CATCH_FRAME);
2629       ASSERT(info->i.type != STOP_FRAME);
2630       p = next; 
2631       continue;
2632     }
2633   }
2634 }
2635
2636 /* -----------------------------------------------------------------------------
2637    resurrectThreads is called after garbage collection on the list of
2638    threads found to be garbage.  Each of these threads will be woken
2639    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2640    on an MVar, or NonTermination if the thread was blocked on a Black
2641    Hole.
2642
2643    Locks: assumes we hold *all* the capabilities.
2644    -------------------------------------------------------------------------- */
2645
2646 void
2647 resurrectThreads (StgTSO *threads)
2648 {
2649     StgTSO *tso, *next;
2650     Capability *cap;
2651     step *step;
2652
2653     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2654         next = tso->global_link;
2655
2656         step = Bdescr((P_)tso)->step;
2657         tso->global_link = step->threads;
2658         step->threads = tso;
2659
2660         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2661         
2662         // Wake up the thread on the Capability it was last on
2663         cap = tso->cap;
2664         
2665         switch (tso->why_blocked) {
2666         case BlockedOnMVar:
2667         case BlockedOnException:
2668             /* Called by GC - sched_mutex lock is currently held. */
2669             throwToSingleThreaded(cap, tso,
2670                                   (StgClosure *)blockedOnDeadMVar_closure);
2671             break;
2672         case BlockedOnBlackHole:
2673             throwToSingleThreaded(cap, tso,
2674                                   (StgClosure *)nonTermination_closure);
2675             break;
2676         case BlockedOnSTM:
2677             throwToSingleThreaded(cap, tso,
2678                                   (StgClosure *)blockedIndefinitely_closure);
2679             break;
2680         case NotBlocked:
2681             /* This might happen if the thread was blocked on a black hole
2682              * belonging to a thread that we've just woken up (raiseAsync
2683              * can wake up threads, remember...).
2684              */
2685             continue;
2686         default:
2687             barf("resurrectThreads: thread blocked in a strange way");
2688         }
2689     }
2690 }
2691
2692 /* -----------------------------------------------------------------------------
2693    performPendingThrowTos is called after garbage collection, and
2694    passed a list of threads that were found to have pending throwTos
2695    (tso->blocked_exceptions was not empty), and were blocked.
2696    Normally this doesn't happen, because we would deliver the
2697    exception directly if the target thread is blocked, but there are
2698    small windows where it might occur on a multiprocessor (see
2699    throwTo()).
2700
2701    NB. we must be holding all the capabilities at this point, just
2702    like resurrectThreads().
2703    -------------------------------------------------------------------------- */
2704
2705 void
2706 performPendingThrowTos (StgTSO *threads)
2707 {
2708     StgTSO *tso, *next;
2709     Capability *cap;
2710     step *step;
2711
2712     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2713         next = tso->global_link;
2714
2715         step = Bdescr((P_)tso)->step;
2716         tso->global_link = step->threads;
2717         step->threads = tso;
2718
2719         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2720         
2721         cap = tso->cap;
2722         maybePerformBlockedException(cap, tso);
2723     }
2724 }