RTS tidyup sweep, first phase
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "eventlog/EventLog.h"
30 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43
44 #ifdef HAVE_SYS_TYPES_H
45 #include <sys/types.h>
46 #endif
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <string.h>
52 #include <stdlib.h>
53 #include <stdarg.h>
54
55 #ifdef HAVE_ERRNO_H
56 #include <errno.h>
57 #endif
58
59 /* -----------------------------------------------------------------------------
60  * Global variables
61  * -------------------------------------------------------------------------- */
62
63 #if !defined(THREADED_RTS)
64 // Blocked/sleeping thrads
65 StgTSO *blocked_queue_hd = NULL;
66 StgTSO *blocked_queue_tl = NULL;
67 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
68 #endif
69
70 /* Threads blocked on blackholes.
71  * LOCK: sched_mutex+capability, or all capabilities
72  */
73 StgTSO *blackhole_queue = NULL;
74
75 /* The blackhole_queue should be checked for threads to wake up.  See
76  * Schedule.h for more thorough comment.
77  * LOCK: none (doesn't matter if we miss an update)
78  */
79 rtsBool blackholes_need_checking = rtsFalse;
80
81 /* Set to true when the latest garbage collection failed to reclaim
82  * enough space, and the runtime should proceed to shut itself down in
83  * an orderly fashion (emitting profiling info etc.)
84  */
85 rtsBool heap_overflow = rtsFalse;
86
87 /* flag that tracks whether we have done any execution in this time slice.
88  * LOCK: currently none, perhaps we should lock (but needs to be
89  * updated in the fast path of the scheduler).
90  *
91  * NB. must be StgWord, we do xchg() on it.
92  */
93 volatile StgWord recent_activity = ACTIVITY_YES;
94
95 /* if this flag is set as well, give up execution
96  * LOCK: none (changes monotonically)
97  */
98 volatile StgWord sched_state = SCHED_RUNNING;
99
100 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
101  *  exists - earlier gccs apparently didn't.
102  *  -= chak
103  */
104 StgTSO dummy_tso;
105
106 /*
107  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
108  * in an MT setting, needed to signal that a worker thread shouldn't hang around
109  * in the scheduler when it is out of work.
110  */
111 rtsBool shutting_down_scheduler = rtsFalse;
112
113 /*
114  * This mutex protects most of the global scheduler data in
115  * the THREADED_RTS runtime.
116  */
117 #if defined(THREADED_RTS)
118 Mutex sched_mutex;
119 #endif
120
121 #if !defined(mingw32_HOST_OS)
122 #define FORKPROCESS_PRIMOP_SUPPORTED
123 #endif
124
125 /* -----------------------------------------------------------------------------
126  * static function prototypes
127  * -------------------------------------------------------------------------- */
128
129 static Capability *schedule (Capability *initialCapability, Task *task);
130
131 //
132 // These function all encapsulate parts of the scheduler loop, and are
133 // abstracted only to make the structure and control flow of the
134 // scheduler clearer.
135 //
136 static void schedulePreLoop (void);
137 static void scheduleFindWork (Capability *cap);
138 #if defined(THREADED_RTS)
139 static void scheduleYield (Capability **pcap, Task *task);
140 #endif
141 static void scheduleStartSignalHandlers (Capability *cap);
142 static void scheduleCheckBlockedThreads (Capability *cap);
143 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
144 static void scheduleCheckBlackHoles (Capability *cap);
145 static void scheduleDetectDeadlock (Capability *cap, Task *task);
146 static void schedulePushWork(Capability *cap, Task *task);
147 #if defined(THREADED_RTS)
148 static void scheduleActivateSpark(Capability *cap);
149 #endif
150 static void schedulePostRunThread(Capability *cap, StgTSO *t);
151 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
152 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
153                                          StgTSO *t);
154 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
155                                     nat prev_what_next );
156 static void scheduleHandleThreadBlocked( StgTSO *t );
157 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
158                                              StgTSO *t );
159 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
160 static Capability *scheduleDoGC(Capability *cap, Task *task,
161                                 rtsBool force_major);
162
163 static rtsBool checkBlackHoles(Capability *cap);
164
165 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
166 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
167
168 static void deleteThread (Capability *cap, StgTSO *tso);
169 static void deleteAllThreads (Capability *cap);
170
171 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
172 static void deleteThread_(Capability *cap, StgTSO *tso);
173 #endif
174
175 #ifdef DEBUG
176 static char *whatNext_strs[] = {
177   "(unknown)",
178   "ThreadRunGHC",
179   "ThreadInterpret",
180   "ThreadKilled",
181   "ThreadRelocated",
182   "ThreadComplete"
183 };
184 #endif
185
186 /* -----------------------------------------------------------------------------
187  * Putting a thread on the run queue: different scheduling policies
188  * -------------------------------------------------------------------------- */
189
190 STATIC_INLINE void
191 addToRunQueue( Capability *cap, StgTSO *t )
192 {
193     // this does round-robin scheduling; good for concurrency
194     appendToRunQueue(cap,t);
195 }
196
197 /* ---------------------------------------------------------------------------
198    Main scheduling loop.
199
200    We use round-robin scheduling, each thread returning to the
201    scheduler loop when one of these conditions is detected:
202
203       * out of heap space
204       * timer expires (thread yields)
205       * thread blocks
206       * thread ends
207       * stack overflow
208
209    GRAN version:
210      In a GranSim setup this loop iterates over the global event queue.
211      This revolves around the global event queue, which determines what 
212      to do next. Therefore, it's more complicated than either the 
213      concurrent or the parallel (GUM) setup.
214   This version has been entirely removed (JB 2008/08).
215
216    GUM version:
217      GUM iterates over incoming messages.
218      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
219      and sends out a fish whenever it has nothing to do; in-between
220      doing the actual reductions (shared code below) it processes the
221      incoming messages and deals with delayed operations 
222      (see PendingFetches).
223      This is not the ugliest code you could imagine, but it's bloody close.
224
225   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
226   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
227   as well as future GUM versions. This file has been refurbished to
228   only contain valid code, which is however incomplete, refers to
229   invalid includes etc.
230
231    ------------------------------------------------------------------------ */
232
233 static Capability *
234 schedule (Capability *initialCapability, Task *task)
235 {
236   StgTSO *t;
237   Capability *cap;
238   StgThreadReturnCode ret;
239   nat prev_what_next;
240   rtsBool ready_to_gc;
241 #if defined(THREADED_RTS)
242   rtsBool first = rtsTrue;
243 #endif
244   
245   cap = initialCapability;
246
247   // Pre-condition: this task owns initialCapability.
248   // The sched_mutex is *NOT* held
249   // NB. on return, we still hold a capability.
250
251   debugTrace (DEBUG_sched, 
252               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
253               task, initialCapability);
254
255   if (running_finalizers) {
256       errorBelch("error: a C finalizer called back into Haskell.\n"
257                  "   This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n"
258                  "   To create finalizers that may call back into Haskll, use\n"
259                  "   Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr.");
260       stg_exit(EXIT_FAILURE);
261   }
262
263   schedulePreLoop();
264
265   // -----------------------------------------------------------
266   // Scheduler loop starts here:
267
268   while (1) {
269
270     // Check whether we have re-entered the RTS from Haskell without
271     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
272     // call).
273     if (cap->in_haskell) {
274           errorBelch("schedule: re-entered unsafely.\n"
275                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
276           stg_exit(EXIT_FAILURE);
277     }
278
279     // The interruption / shutdown sequence.
280     // 
281     // In order to cleanly shut down the runtime, we want to:
282     //   * make sure that all main threads return to their callers
283     //     with the state 'Interrupted'.
284     //   * clean up all OS threads assocated with the runtime
285     //   * free all memory etc.
286     //
287     // So the sequence for ^C goes like this:
288     //
289     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
290     //     arranges for some Capability to wake up
291     //
292     //   * all threads in the system are halted, and the zombies are
293     //     placed on the run queue for cleaning up.  We acquire all
294     //     the capabilities in order to delete the threads, this is
295     //     done by scheduleDoGC() for convenience (because GC already
296     //     needs to acquire all the capabilities).  We can't kill
297     //     threads involved in foreign calls.
298     // 
299     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
300     //
301     //   * sched_state := SCHED_SHUTTING_DOWN
302     //
303     //   * all workers exit when the run queue on their capability
304     //     drains.  All main threads will also exit when their TSO
305     //     reaches the head of the run queue and they can return.
306     //
307     //   * eventually all Capabilities will shut down, and the RTS can
308     //     exit.
309     //
310     //   * We might be left with threads blocked in foreign calls, 
311     //     we should really attempt to kill these somehow (TODO);
312     
313     switch (sched_state) {
314     case SCHED_RUNNING:
315         break;
316     case SCHED_INTERRUPTING:
317         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
318 #if defined(THREADED_RTS)
319         discardSparksCap(cap);
320 #endif
321         /* scheduleDoGC() deletes all the threads */
322         cap = scheduleDoGC(cap,task,rtsFalse);
323
324         // after scheduleDoGC(), we must be shutting down.  Either some
325         // other Capability did the final GC, or we did it above,
326         // either way we can fall through to the SCHED_SHUTTING_DOWN
327         // case now.
328         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
329         // fall through
330
331     case SCHED_SHUTTING_DOWN:
332         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
333         // If we are a worker, just exit.  If we're a bound thread
334         // then we will exit below when we've removed our TSO from
335         // the run queue.
336         if (task->tso == NULL && emptyRunQueue(cap)) {
337             return cap;
338         }
339         break;
340     default:
341         barf("sched_state: %d", sched_state);
342     }
343
344     scheduleFindWork(cap);
345
346     /* work pushing, currently relevant only for THREADED_RTS:
347        (pushes threads, wakes up idle capabilities for stealing) */
348     schedulePushWork(cap,task);
349
350     scheduleDetectDeadlock(cap,task);
351
352 #if defined(THREADED_RTS)
353     cap = task->cap;    // reload cap, it might have changed
354 #endif
355
356     // Normally, the only way we can get here with no threads to
357     // run is if a keyboard interrupt received during 
358     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
359     // Additionally, it is not fatal for the
360     // threaded RTS to reach here with no threads to run.
361     //
362     // win32: might be here due to awaitEvent() being abandoned
363     // as a result of a console event having been delivered.
364     
365 #if defined(THREADED_RTS)
366     if (first) 
367     {
368     // XXX: ToDo
369     //     // don't yield the first time, we want a chance to run this
370     //     // thread for a bit, even if there are others banging at the
371     //     // door.
372     //     first = rtsFalse;
373     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
374     }
375
376   yield:
377     scheduleYield(&cap,task);
378     if (emptyRunQueue(cap)) continue; // look for work again
379 #endif
380
381 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
382     if ( emptyRunQueue(cap) ) {
383         ASSERT(sched_state >= SCHED_INTERRUPTING);
384     }
385 #endif
386
387     // 
388     // Get a thread to run
389     //
390     t = popRunQueue(cap);
391
392     // Sanity check the thread we're about to run.  This can be
393     // expensive if there is lots of thread switching going on...
394     IF_DEBUG(sanity,checkTSO(t));
395
396 #if defined(THREADED_RTS)
397     // Check whether we can run this thread in the current task.
398     // If not, we have to pass our capability to the right task.
399     {
400         Task *bound = t->bound;
401       
402         if (bound) {
403             if (bound == task) {
404                 debugTrace(DEBUG_sched,
405                            "### Running thread %lu in bound thread", (unsigned long)t->id);
406                 // yes, the Haskell thread is bound to the current native thread
407             } else {
408                 debugTrace(DEBUG_sched,
409                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
410                 // no, bound to a different Haskell thread: pass to that thread
411                 pushOnRunQueue(cap,t);
412                 continue;
413             }
414         } else {
415             // The thread we want to run is unbound.
416             if (task->tso) { 
417                 debugTrace(DEBUG_sched,
418                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
419                 // no, the current native thread is bound to a different
420                 // Haskell thread, so pass it to any worker thread
421                 pushOnRunQueue(cap,t);
422                 continue; 
423             }
424         }
425     }
426 #endif
427
428     // If we're shutting down, and this thread has not yet been
429     // killed, kill it now.  This sometimes happens when a finalizer
430     // thread is created by the final GC, or a thread previously
431     // in a foreign call returns.
432     if (sched_state >= SCHED_INTERRUPTING &&
433         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
434         deleteThread(cap,t);
435     }
436
437     /* context switches are initiated by the timer signal, unless
438      * the user specified "context switch as often as possible", with
439      * +RTS -C0
440      */
441     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
442         && !emptyThreadQueues(cap)) {
443         cap->context_switch = 1;
444     }
445          
446 run_thread:
447
448     // CurrentTSO is the thread to run.  t might be different if we
449     // loop back to run_thread, so make sure to set CurrentTSO after
450     // that.
451     cap->r.rCurrentTSO = t;
452
453     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
454                               (long)t->id, whatNext_strs[t->what_next]);
455
456     startHeapProfTimer();
457
458     // Check for exceptions blocked on this thread
459     maybePerformBlockedException (cap, t);
460
461     // ----------------------------------------------------------------------
462     // Run the current thread 
463
464     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
465     ASSERT(t->cap == cap);
466     ASSERT(t->bound ? t->bound->cap == cap : 1);
467
468     prev_what_next = t->what_next;
469
470     errno = t->saved_errno;
471 #if mingw32_HOST_OS
472     SetLastError(t->saved_winerror);
473 #endif
474
475     cap->in_haskell = rtsTrue;
476
477     dirty_TSO(cap,t);
478
479 #if defined(THREADED_RTS)
480     if (recent_activity == ACTIVITY_DONE_GC) {
481         // ACTIVITY_DONE_GC means we turned off the timer signal to
482         // conserve power (see #1623).  Re-enable it here.
483         nat prev;
484         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
485         if (prev == ACTIVITY_DONE_GC) {
486             startTimer();
487         }
488     } else {
489         recent_activity = ACTIVITY_YES;
490     }
491 #endif
492
493     postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
494
495     switch (prev_what_next) {
496         
497     case ThreadKilled:
498     case ThreadComplete:
499         /* Thread already finished, return to scheduler. */
500         ret = ThreadFinished;
501         break;
502         
503     case ThreadRunGHC:
504     {
505         StgRegTable *r;
506         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
507         cap = regTableToCapability(r);
508         ret = r->rRet;
509         break;
510     }
511     
512     case ThreadInterpret:
513         cap = interpretBCO(cap);
514         ret = cap->r.rRet;
515         break;
516         
517     default:
518         barf("schedule: invalid what_next field");
519     }
520
521     cap->in_haskell = rtsFalse;
522
523     // The TSO might have moved, eg. if it re-entered the RTS and a GC
524     // happened.  So find the new location:
525     t = cap->r.rCurrentTSO;
526
527     // We have run some Haskell code: there might be blackhole-blocked
528     // threads to wake up now.
529     // Lock-free test here should be ok, we're just setting a flag.
530     if ( blackhole_queue != END_TSO_QUEUE ) {
531         blackholes_need_checking = rtsTrue;
532     }
533     
534     // And save the current errno in this thread.
535     // XXX: possibly bogus for SMP because this thread might already
536     // be running again, see code below.
537     t->saved_errno = errno;
538 #if mingw32_HOST_OS
539     // Similarly for Windows error code
540     t->saved_winerror = GetLastError();
541 #endif
542
543     postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
544
545 #if defined(THREADED_RTS)
546     // If ret is ThreadBlocked, and this Task is bound to the TSO that
547     // blocked, we are in limbo - the TSO is now owned by whatever it
548     // is blocked on, and may in fact already have been woken up,
549     // perhaps even on a different Capability.  It may be the case
550     // that task->cap != cap.  We better yield this Capability
551     // immediately and return to normaility.
552     if (ret == ThreadBlocked) {
553         debugTrace(DEBUG_sched,
554                    "--<< thread %lu (%s) stopped: blocked",
555                    (unsigned long)t->id, whatNext_strs[t->what_next]);
556         goto yield;
557     }
558 #endif
559
560     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
561     ASSERT(t->cap == cap);
562
563     // ----------------------------------------------------------------------
564     
565     // Costs for the scheduler are assigned to CCS_SYSTEM
566     stopHeapProfTimer();
567 #if defined(PROFILING)
568     CCCS = CCS_SYSTEM;
569 #endif
570     
571     schedulePostRunThread(cap,t);
572
573     if (ret != StackOverflow) {
574         t = threadStackUnderflow(task,t);
575     }
576
577     ready_to_gc = rtsFalse;
578
579     switch (ret) {
580     case HeapOverflow:
581         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
582         break;
583
584     case StackOverflow:
585         scheduleHandleStackOverflow(cap,task,t);
586         break;
587
588     case ThreadYielding:
589         if (scheduleHandleYield(cap, t, prev_what_next)) {
590             // shortcut for switching between compiler/interpreter:
591             goto run_thread; 
592         }
593         break;
594
595     case ThreadBlocked:
596         scheduleHandleThreadBlocked(t);
597         break;
598
599     case ThreadFinished:
600         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
601         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
602         break;
603
604     default:
605       barf("schedule: invalid thread return code %d", (int)ret);
606     }
607
608     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
609       cap = scheduleDoGC(cap,task,rtsFalse);
610     }
611   } /* end of while() */
612 }
613
614 /* ----------------------------------------------------------------------------
615  * Setting up the scheduler loop
616  * ------------------------------------------------------------------------- */
617
618 static void
619 schedulePreLoop(void)
620 {
621   // initialisation for scheduler - what cannot go into initScheduler()  
622 }
623
624 /* -----------------------------------------------------------------------------
625  * scheduleFindWork()
626  *
627  * Search for work to do, and handle messages from elsewhere.
628  * -------------------------------------------------------------------------- */
629
630 static void
631 scheduleFindWork (Capability *cap)
632 {
633     scheduleStartSignalHandlers(cap);
634
635     // Only check the black holes here if we've nothing else to do.
636     // During normal execution, the black hole list only gets checked
637     // at GC time, to avoid repeatedly traversing this possibly long
638     // list each time around the scheduler.
639     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
640
641     scheduleCheckWakeupThreads(cap);
642
643     scheduleCheckBlockedThreads(cap);
644
645 #if defined(THREADED_RTS)
646     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
647 #endif
648 }
649
650 #if defined(THREADED_RTS)
651 STATIC_INLINE rtsBool
652 shouldYieldCapability (Capability *cap, Task *task)
653 {
654     // we need to yield this capability to someone else if..
655     //   - another thread is initiating a GC
656     //   - another Task is returning from a foreign call
657     //   - the thread at the head of the run queue cannot be run
658     //     by this Task (it is bound to another Task, or it is unbound
659     //     and this task it bound).
660     return (waiting_for_gc || 
661             cap->returning_tasks_hd != NULL ||
662             (!emptyRunQueue(cap) && (task->tso == NULL
663                                      ? cap->run_queue_hd->bound != NULL
664                                      : cap->run_queue_hd->bound != task)));
665 }
666
667 // This is the single place where a Task goes to sleep.  There are
668 // two reasons it might need to sleep:
669 //    - there are no threads to run
670 //    - we need to yield this Capability to someone else 
671 //      (see shouldYieldCapability())
672 //
673 // Careful: the scheduler loop is quite delicate.  Make sure you run
674 // the tests in testsuite/concurrent (all ways) after modifying this,
675 // and also check the benchmarks in nofib/parallel for regressions.
676
677 static void
678 scheduleYield (Capability **pcap, Task *task)
679 {
680     Capability *cap = *pcap;
681
682     // if we have work, and we don't need to give up the Capability, continue.
683     if (!shouldYieldCapability(cap,task) && 
684         (!emptyRunQueue(cap) ||
685          !emptyWakeupQueue(cap) ||
686          blackholes_need_checking ||
687          sched_state >= SCHED_INTERRUPTING))
688         return;
689
690     // otherwise yield (sleep), and keep yielding if necessary.
691     do {
692         yieldCapability(&cap,task);
693     } 
694     while (shouldYieldCapability(cap,task));
695
696     // note there may still be no threads on the run queue at this
697     // point, the caller has to check.
698
699     *pcap = cap;
700     return;
701 }
702 #endif
703     
704 /* -----------------------------------------------------------------------------
705  * schedulePushWork()
706  *
707  * Push work to other Capabilities if we have some.
708  * -------------------------------------------------------------------------- */
709
710 static void
711 schedulePushWork(Capability *cap USED_IF_THREADS, 
712                  Task *task      USED_IF_THREADS)
713 {
714   /* following code not for PARALLEL_HASKELL. I kept the call general,
715      future GUM versions might use pushing in a distributed setup */
716 #if defined(THREADED_RTS)
717
718     Capability *free_caps[n_capabilities], *cap0;
719     nat i, n_free_caps;
720
721     // migration can be turned off with +RTS -qg
722     if (!RtsFlags.ParFlags.migrate) return;
723
724     // Check whether we have more threads on our run queue, or sparks
725     // in our pool, that we could hand to another Capability.
726     if (cap->run_queue_hd == END_TSO_QUEUE) {
727         if (sparkPoolSizeCap(cap) < 2) return;
728     } else {
729         if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
730             sparkPoolSizeCap(cap) < 1) return;
731     }
732
733     // First grab as many free Capabilities as we can.
734     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
735         cap0 = &capabilities[i];
736         if (cap != cap0 && tryGrabCapability(cap0,task)) {
737             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
738                 // it already has some work, we just grabbed it at 
739                 // the wrong moment.  Or maybe it's deadlocked!
740                 releaseCapability(cap0);
741             } else {
742                 free_caps[n_free_caps++] = cap0;
743             }
744         }
745     }
746
747     // we now have n_free_caps free capabilities stashed in
748     // free_caps[].  Share our run queue equally with them.  This is
749     // probably the simplest thing we could do; improvements we might
750     // want to do include:
751     //
752     //   - giving high priority to moving relatively new threads, on 
753     //     the gournds that they haven't had time to build up a
754     //     working set in the cache on this CPU/Capability.
755     //
756     //   - giving low priority to moving long-lived threads
757
758     if (n_free_caps > 0) {
759         StgTSO *prev, *t, *next;
760         rtsBool pushed_to_all;
761
762         debugTrace(DEBUG_sched, 
763                    "cap %d: %s and %d free capabilities, sharing...", 
764                    cap->no, 
765                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
766                    "excess threads on run queue":"sparks to share (>=2)",
767                    n_free_caps);
768
769         i = 0;
770         pushed_to_all = rtsFalse;
771
772         if (cap->run_queue_hd != END_TSO_QUEUE) {
773             prev = cap->run_queue_hd;
774             t = prev->_link;
775             prev->_link = END_TSO_QUEUE;
776             for (; t != END_TSO_QUEUE; t = next) {
777                 next = t->_link;
778                 t->_link = END_TSO_QUEUE;
779                 if (t->what_next == ThreadRelocated
780                     || t->bound == task // don't move my bound thread
781                     || tsoLocked(t)) {  // don't move a locked thread
782                     setTSOLink(cap, prev, t);
783                     prev = t;
784                 } else if (i == n_free_caps) {
785                     pushed_to_all = rtsTrue;
786                     i = 0;
787                     // keep one for us
788                     setTSOLink(cap, prev, t);
789                     prev = t;
790                 } else {
791                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
792                     appendToRunQueue(free_caps[i],t);
793
794         postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
795
796                     if (t->bound) { t->bound->cap = free_caps[i]; }
797                     t->cap = free_caps[i];
798                     i++;
799                 }
800             }
801             cap->run_queue_tl = prev;
802         }
803
804 #ifdef SPARK_PUSHING
805         /* JB I left this code in place, it would work but is not necessary */
806
807         // If there are some free capabilities that we didn't push any
808         // threads to, then try to push a spark to each one.
809         if (!pushed_to_all) {
810             StgClosure *spark;
811             // i is the next free capability to push to
812             for (; i < n_free_caps; i++) {
813                 if (emptySparkPoolCap(free_caps[i])) {
814                     spark = tryStealSpark(cap->sparks);
815                     if (spark != NULL) {
816                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
817
818       postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no);
819
820                         newSpark(&(free_caps[i]->r), spark);
821                     }
822                 }
823             }
824         }
825 #endif /* SPARK_PUSHING */
826
827         // release the capabilities
828         for (i = 0; i < n_free_caps; i++) {
829             task->cap = free_caps[i];
830             releaseAndWakeupCapability(free_caps[i]);
831         }
832     }
833     task->cap = cap; // reset to point to our Capability.
834
835 #endif /* THREADED_RTS */
836
837 }
838
839 /* ----------------------------------------------------------------------------
840  * Start any pending signal handlers
841  * ------------------------------------------------------------------------- */
842
843 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
844 static void
845 scheduleStartSignalHandlers(Capability *cap)
846 {
847     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
848         // safe outside the lock
849         startSignalHandlers(cap);
850     }
851 }
852 #else
853 static void
854 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
855 {
856 }
857 #endif
858
859 /* ----------------------------------------------------------------------------
860  * Check for blocked threads that can be woken up.
861  * ------------------------------------------------------------------------- */
862
863 static void
864 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
865 {
866 #if !defined(THREADED_RTS)
867     //
868     // Check whether any waiting threads need to be woken up.  If the
869     // run queue is empty, and there are no other tasks running, we
870     // can wait indefinitely for something to happen.
871     //
872     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
873     {
874         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
875     }
876 #endif
877 }
878
879
880 /* ----------------------------------------------------------------------------
881  * Check for threads woken up by other Capabilities
882  * ------------------------------------------------------------------------- */
883
884 static void
885 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
886 {
887 #if defined(THREADED_RTS)
888     // Any threads that were woken up by other Capabilities get
889     // appended to our run queue.
890     if (!emptyWakeupQueue(cap)) {
891         ACQUIRE_LOCK(&cap->lock);
892         if (emptyRunQueue(cap)) {
893             cap->run_queue_hd = cap->wakeup_queue_hd;
894             cap->run_queue_tl = cap->wakeup_queue_tl;
895         } else {
896             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
897             cap->run_queue_tl = cap->wakeup_queue_tl;
898         }
899         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
900         RELEASE_LOCK(&cap->lock);
901     }
902 #endif
903 }
904
905 /* ----------------------------------------------------------------------------
906  * Check for threads blocked on BLACKHOLEs that can be woken up
907  * ------------------------------------------------------------------------- */
908 static void
909 scheduleCheckBlackHoles (Capability *cap)
910 {
911     if ( blackholes_need_checking ) // check without the lock first
912     {
913         ACQUIRE_LOCK(&sched_mutex);
914         if ( blackholes_need_checking ) {
915             blackholes_need_checking = rtsFalse;
916             // important that we reset the flag *before* checking the
917             // blackhole queue, otherwise we could get deadlock.  This
918             // happens as follows: we wake up a thread that
919             // immediately runs on another Capability, blocks on a
920             // blackhole, and then we reset the blackholes_need_checking flag.
921             checkBlackHoles(cap);
922         }
923         RELEASE_LOCK(&sched_mutex);
924     }
925 }
926
927 /* ----------------------------------------------------------------------------
928  * Detect deadlock conditions and attempt to resolve them.
929  * ------------------------------------------------------------------------- */
930
931 static void
932 scheduleDetectDeadlock (Capability *cap, Task *task)
933 {
934     /* 
935      * Detect deadlock: when we have no threads to run, there are no
936      * threads blocked, waiting for I/O, or sleeping, and all the
937      * other tasks are waiting for work, we must have a deadlock of
938      * some description.
939      */
940     if ( emptyThreadQueues(cap) )
941     {
942 #if defined(THREADED_RTS)
943         /* 
944          * In the threaded RTS, we only check for deadlock if there
945          * has been no activity in a complete timeslice.  This means
946          * we won't eagerly start a full GC just because we don't have
947          * any threads to run currently.
948          */
949         if (recent_activity != ACTIVITY_INACTIVE) return;
950 #endif
951
952         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
953
954         // Garbage collection can release some new threads due to
955         // either (a) finalizers or (b) threads resurrected because
956         // they are unreachable and will therefore be sent an
957         // exception.  Any threads thus released will be immediately
958         // runnable.
959         cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
960         // when force_major == rtsTrue. scheduleDoGC sets
961         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
962         // signal.
963
964         if ( !emptyRunQueue(cap) ) return;
965
966 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
967         /* If we have user-installed signal handlers, then wait
968          * for signals to arrive rather then bombing out with a
969          * deadlock.
970          */
971         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
972             debugTrace(DEBUG_sched,
973                        "still deadlocked, waiting for signals...");
974
975             awaitUserSignals();
976
977             if (signals_pending()) {
978                 startSignalHandlers(cap);
979             }
980
981             // either we have threads to run, or we were interrupted:
982             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
983
984             return;
985         }
986 #endif
987
988 #if !defined(THREADED_RTS)
989         /* Probably a real deadlock.  Send the current main thread the
990          * Deadlock exception.
991          */
992         if (task->tso) {
993             switch (task->tso->why_blocked) {
994             case BlockedOnSTM:
995             case BlockedOnBlackHole:
996             case BlockedOnException:
997             case BlockedOnMVar:
998                 throwToSingleThreaded(cap, task->tso, 
999                                       (StgClosure *)nonTermination_closure);
1000                 return;
1001             default:
1002                 barf("deadlock: main thread blocked in a strange way");
1003             }
1004         }
1005         return;
1006 #endif
1007     }
1008 }
1009
1010
1011 /* ----------------------------------------------------------------------------
1012  * Send pending messages (PARALLEL_HASKELL only)
1013  * ------------------------------------------------------------------------- */
1014
1015 #if defined(PARALLEL_HASKELL)
1016 static void
1017 scheduleSendPendingMessages(void)
1018 {
1019
1020 # if defined(PAR) // global Mem.Mgmt., omit for now
1021     if (PendingFetches != END_BF_QUEUE) {
1022         processFetches();
1023     }
1024 # endif
1025     
1026     if (RtsFlags.ParFlags.BufferTime) {
1027         // if we use message buffering, we must send away all message
1028         // packets which have become too old...
1029         sendOldBuffers(); 
1030     }
1031 }
1032 #endif
1033
1034 /* ----------------------------------------------------------------------------
1035  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1036  * ------------------------------------------------------------------------- */
1037
1038 #if defined(THREADED_RTS)
1039 static void
1040 scheduleActivateSpark(Capability *cap)
1041 {
1042     if (anySparks())
1043     {
1044         createSparkThread(cap);
1045         debugTrace(DEBUG_sched, "creating a spark thread");
1046     }
1047 }
1048 #endif // PARALLEL_HASKELL || THREADED_RTS
1049
1050 /* ----------------------------------------------------------------------------
1051  * After running a thread...
1052  * ------------------------------------------------------------------------- */
1053
1054 static void
1055 schedulePostRunThread (Capability *cap, StgTSO *t)
1056 {
1057     // We have to be able to catch transactions that are in an
1058     // infinite loop as a result of seeing an inconsistent view of
1059     // memory, e.g. 
1060     //
1061     //   atomically $ do
1062     //       [a,b] <- mapM readTVar [ta,tb]
1063     //       when (a == b) loop
1064     //
1065     // and a is never equal to b given a consistent view of memory.
1066     //
1067     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1068         if (!stmValidateNestOfTransactions (t -> trec)) {
1069             debugTrace(DEBUG_sched | DEBUG_stm,
1070                        "trec %p found wasting its time", t);
1071             
1072             // strip the stack back to the
1073             // ATOMICALLY_FRAME, aborting the (nested)
1074             // transaction, and saving the stack of any
1075             // partially-evaluated thunks on the heap.
1076             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1077             
1078             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1079         }
1080     }
1081
1082   /* some statistics gathering in the parallel case */
1083 }
1084
1085 /* -----------------------------------------------------------------------------
1086  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1087  * -------------------------------------------------------------------------- */
1088
1089 static rtsBool
1090 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1091 {
1092     // did the task ask for a large block?
1093     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1094         // if so, get one and push it on the front of the nursery.
1095         bdescr *bd;
1096         lnat blocks;
1097         
1098         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1099         
1100         debugTrace(DEBUG_sched,
1101                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1102                    (long)t->id, whatNext_strs[t->what_next], blocks);
1103     
1104         // don't do this if the nursery is (nearly) full, we'll GC first.
1105         if (cap->r.rCurrentNursery->link != NULL ||
1106             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1107                                                // if the nursery has only one block.
1108             
1109             ACQUIRE_SM_LOCK
1110             bd = allocGroup( blocks );
1111             RELEASE_SM_LOCK
1112             cap->r.rNursery->n_blocks += blocks;
1113             
1114             // link the new group into the list
1115             bd->link = cap->r.rCurrentNursery;
1116             bd->u.back = cap->r.rCurrentNursery->u.back;
1117             if (cap->r.rCurrentNursery->u.back != NULL) {
1118                 cap->r.rCurrentNursery->u.back->link = bd;
1119             } else {
1120 #if !defined(THREADED_RTS)
1121                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1122                        g0s0 == cap->r.rNursery);
1123 #endif
1124                 cap->r.rNursery->blocks = bd;
1125             }             
1126             cap->r.rCurrentNursery->u.back = bd;
1127             
1128             // initialise it as a nursery block.  We initialise the
1129             // step, gen_no, and flags field of *every* sub-block in
1130             // this large block, because this is easier than making
1131             // sure that we always find the block head of a large
1132             // block whenever we call Bdescr() (eg. evacuate() and
1133             // isAlive() in the GC would both have to do this, at
1134             // least).
1135             { 
1136                 bdescr *x;
1137                 for (x = bd; x < bd + blocks; x++) {
1138                     x->step = cap->r.rNursery;
1139                     x->gen_no = 0;
1140                     x->flags = 0;
1141                 }
1142             }
1143             
1144             // This assert can be a killer if the app is doing lots
1145             // of large block allocations.
1146             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1147             
1148             // now update the nursery to point to the new block
1149             cap->r.rCurrentNursery = bd;
1150             
1151             // we might be unlucky and have another thread get on the
1152             // run queue before us and steal the large block, but in that
1153             // case the thread will just end up requesting another large
1154             // block.
1155             pushOnRunQueue(cap,t);
1156             return rtsFalse;  /* not actually GC'ing */
1157         }
1158     }
1159     
1160     debugTrace(DEBUG_sched,
1161                "--<< thread %ld (%s) stopped: HeapOverflow",
1162                (long)t->id, whatNext_strs[t->what_next]);
1163
1164     if (cap->r.rHpLim == NULL || cap->context_switch) {
1165         // Sometimes we miss a context switch, e.g. when calling
1166         // primitives in a tight loop, MAYBE_GC() doesn't check the
1167         // context switch flag, and we end up waiting for a GC.
1168         // See #1984, and concurrent/should_run/1984
1169         cap->context_switch = 0;
1170         addToRunQueue(cap,t);
1171     } else {
1172         pushOnRunQueue(cap,t);
1173     }
1174     return rtsTrue;
1175     /* actual GC is done at the end of the while loop in schedule() */
1176 }
1177
1178 /* -----------------------------------------------------------------------------
1179  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1180  * -------------------------------------------------------------------------- */
1181
1182 static void
1183 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1184 {
1185     debugTrace (DEBUG_sched,
1186                 "--<< thread %ld (%s) stopped, StackOverflow", 
1187                 (long)t->id, whatNext_strs[t->what_next]);
1188
1189     /* just adjust the stack for this thread, then pop it back
1190      * on the run queue.
1191      */
1192     { 
1193         /* enlarge the stack */
1194         StgTSO *new_t = threadStackOverflow(cap, t);
1195         
1196         /* The TSO attached to this Task may have moved, so update the
1197          * pointer to it.
1198          */
1199         if (task->tso == t) {
1200             task->tso = new_t;
1201         }
1202         pushOnRunQueue(cap,new_t);
1203     }
1204 }
1205
1206 /* -----------------------------------------------------------------------------
1207  * Handle a thread that returned to the scheduler with ThreadYielding
1208  * -------------------------------------------------------------------------- */
1209
1210 static rtsBool
1211 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1212 {
1213     // Reset the context switch flag.  We don't do this just before
1214     // running the thread, because that would mean we would lose ticks
1215     // during GC, which can lead to unfair scheduling (a thread hogs
1216     // the CPU because the tick always arrives during GC).  This way
1217     // penalises threads that do a lot of allocation, but that seems
1218     // better than the alternative.
1219     cap->context_switch = 0;
1220     
1221     /* put the thread back on the run queue.  Then, if we're ready to
1222      * GC, check whether this is the last task to stop.  If so, wake
1223      * up the GC thread.  getThread will block during a GC until the
1224      * GC is finished.
1225      */
1226 #ifdef DEBUG
1227     if (t->what_next != prev_what_next) {
1228         debugTrace(DEBUG_sched,
1229                    "--<< thread %ld (%s) stopped to switch evaluators", 
1230                    (long)t->id, whatNext_strs[t->what_next]);
1231     } else {
1232         debugTrace(DEBUG_sched,
1233                    "--<< thread %ld (%s) stopped, yielding",
1234                    (long)t->id, whatNext_strs[t->what_next]);
1235     }
1236 #endif
1237     
1238     IF_DEBUG(sanity,
1239              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1240              checkTSO(t));
1241     ASSERT(t->_link == END_TSO_QUEUE);
1242     
1243     // Shortcut if we're just switching evaluators: don't bother
1244     // doing stack squeezing (which can be expensive), just run the
1245     // thread.
1246     if (t->what_next != prev_what_next) {
1247         return rtsTrue;
1248     }
1249
1250     addToRunQueue(cap,t);
1251
1252     return rtsFalse;
1253 }
1254
1255 /* -----------------------------------------------------------------------------
1256  * Handle a thread that returned to the scheduler with ThreadBlocked
1257  * -------------------------------------------------------------------------- */
1258
1259 static void
1260 scheduleHandleThreadBlocked( StgTSO *t
1261 #if !defined(DEBUG)
1262     STG_UNUSED
1263 #endif
1264     )
1265 {
1266
1267       // We don't need to do anything.  The thread is blocked, and it
1268       // has tidied up its stack and placed itself on whatever queue
1269       // it needs to be on.
1270
1271     // ASSERT(t->why_blocked != NotBlocked);
1272     // Not true: for example,
1273     //    - in THREADED_RTS, the thread may already have been woken
1274     //      up by another Capability.  This actually happens: try
1275     //      conc023 +RTS -N2.
1276     //    - the thread may have woken itself up already, because
1277     //      threadPaused() might have raised a blocked throwTo
1278     //      exception, see maybePerformBlockedException().
1279
1280 #ifdef DEBUG
1281     if (traceClass(DEBUG_sched)) {
1282         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1283                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1284         printThreadBlockage(t);
1285         debugTraceEnd();
1286     }
1287 #endif
1288 }
1289
1290 /* -----------------------------------------------------------------------------
1291  * Handle a thread that returned to the scheduler with ThreadFinished
1292  * -------------------------------------------------------------------------- */
1293
1294 static rtsBool
1295 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1296 {
1297     /* Need to check whether this was a main thread, and if so,
1298      * return with the return value.
1299      *
1300      * We also end up here if the thread kills itself with an
1301      * uncaught exception, see Exception.cmm.
1302      */
1303     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1304                (unsigned long)t->id, whatNext_strs[t->what_next]);
1305
1306     // blocked exceptions can now complete, even if the thread was in
1307     // blocked mode (see #2910).  This unconditionally calls
1308     // lockTSO(), which ensures that we don't miss any threads that
1309     // are engaged in throwTo() with this thread as a target.
1310     awakenBlockedExceptionQueue (cap, t);
1311
1312       //
1313       // Check whether the thread that just completed was a bound
1314       // thread, and if so return with the result.  
1315       //
1316       // There is an assumption here that all thread completion goes
1317       // through this point; we need to make sure that if a thread
1318       // ends up in the ThreadKilled state, that it stays on the run
1319       // queue so it can be dealt with here.
1320       //
1321
1322       if (t->bound) {
1323
1324           if (t->bound != task) {
1325 #if !defined(THREADED_RTS)
1326               // Must be a bound thread that is not the topmost one.  Leave
1327               // it on the run queue until the stack has unwound to the
1328               // point where we can deal with this.  Leaving it on the run
1329               // queue also ensures that the garbage collector knows about
1330               // this thread and its return value (it gets dropped from the
1331               // step->threads list so there's no other way to find it).
1332               appendToRunQueue(cap,t);
1333               return rtsFalse;
1334 #else
1335               // this cannot happen in the threaded RTS, because a
1336               // bound thread can only be run by the appropriate Task.
1337               barf("finished bound thread that isn't mine");
1338 #endif
1339           }
1340
1341           ASSERT(task->tso == t);
1342
1343           if (t->what_next == ThreadComplete) {
1344               if (task->ret) {
1345                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1346                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1347               }
1348               task->stat = Success;
1349           } else {
1350               if (task->ret) {
1351                   *(task->ret) = NULL;
1352               }
1353               if (sched_state >= SCHED_INTERRUPTING) {
1354                   if (heap_overflow) {
1355                       task->stat = HeapExhausted;
1356                   } else {
1357                       task->stat = Interrupted;
1358                   }
1359               } else {
1360                   task->stat = Killed;
1361               }
1362           }
1363 #ifdef DEBUG
1364           removeThreadLabel((StgWord)task->tso->id);
1365 #endif
1366           return rtsTrue; // tells schedule() to return
1367       }
1368
1369       return rtsFalse;
1370 }
1371
1372 /* -----------------------------------------------------------------------------
1373  * Perform a heap census
1374  * -------------------------------------------------------------------------- */
1375
1376 static rtsBool
1377 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1378 {
1379     // When we have +RTS -i0 and we're heap profiling, do a census at
1380     // every GC.  This lets us get repeatable runs for debugging.
1381     if (performHeapProfile ||
1382         (RtsFlags.ProfFlags.profileInterval==0 &&
1383          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1384         return rtsTrue;
1385     } else {
1386         return rtsFalse;
1387     }
1388 }
1389
1390 /* -----------------------------------------------------------------------------
1391  * Perform a garbage collection if necessary
1392  * -------------------------------------------------------------------------- */
1393
1394 static Capability *
1395 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1396 {
1397     rtsBool heap_census;
1398 #ifdef THREADED_RTS
1399     /* extern static volatile StgWord waiting_for_gc; 
1400        lives inside capability.c */
1401     rtsBool gc_type, prev_pending_gc;
1402     nat i;
1403 #endif
1404
1405     if (sched_state == SCHED_SHUTTING_DOWN) {
1406         // The final GC has already been done, and the system is
1407         // shutting down.  We'll probably deadlock if we try to GC
1408         // now.
1409         return cap;
1410     }
1411
1412 #ifdef THREADED_RTS
1413     if (sched_state < SCHED_INTERRUPTING
1414         && RtsFlags.ParFlags.parGcEnabled
1415         && N >= RtsFlags.ParFlags.parGcGen
1416         && ! oldest_gen->steps[0].mark)
1417     {
1418         gc_type = PENDING_GC_PAR;
1419     } else {
1420         gc_type = PENDING_GC_SEQ;
1421     }
1422
1423     // In order to GC, there must be no threads running Haskell code.
1424     // Therefore, the GC thread needs to hold *all* the capabilities,
1425     // and release them after the GC has completed.  
1426     //
1427     // This seems to be the simplest way: previous attempts involved
1428     // making all the threads with capabilities give up their
1429     // capabilities and sleep except for the *last* one, which
1430     // actually did the GC.  But it's quite hard to arrange for all
1431     // the other tasks to sleep and stay asleep.
1432     //
1433
1434     /*  Other capabilities are prevented from running yet more Haskell
1435         threads if waiting_for_gc is set. Tested inside
1436         yieldCapability() and releaseCapability() in Capability.c */
1437
1438     prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1439     if (prev_pending_gc) {
1440         do {
1441             debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
1442                        prev_pending_gc);
1443             ASSERT(cap);
1444             yieldCapability(&cap,task);
1445         } while (waiting_for_gc);
1446         return cap;  // NOTE: task->cap might have changed here
1447     }
1448
1449     setContextSwitches();
1450
1451     // The final shutdown GC is always single-threaded, because it's
1452     // possible that some of the Capabilities have no worker threads.
1453     
1454     if (gc_type == PENDING_GC_SEQ)
1455     {
1456         postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
1457         // single-threaded GC: grab all the capabilities
1458         for (i=0; i < n_capabilities; i++) {
1459             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1460             if (cap != &capabilities[i]) {
1461                 Capability *pcap = &capabilities[i];
1462                 // we better hope this task doesn't get migrated to
1463                 // another Capability while we're waiting for this one.
1464                 // It won't, because load balancing happens while we have
1465                 // all the Capabilities, but even so it's a slightly
1466                 // unsavoury invariant.
1467                 task->cap = pcap;
1468                 waitForReturnCapability(&pcap, task);
1469                 if (pcap != &capabilities[i]) {
1470                     barf("scheduleDoGC: got the wrong capability");
1471                 }
1472             }
1473         }
1474     }
1475     else
1476     {
1477         // multi-threaded GC: make sure all the Capabilities donate one
1478         // GC thread each.
1479         postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
1480         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1481
1482         waitForGcThreads(cap);
1483     }
1484 #endif
1485
1486     // so this happens periodically:
1487     if (cap) scheduleCheckBlackHoles(cap);
1488     
1489     IF_DEBUG(scheduler, printAllThreads());
1490
1491 delete_threads_and_gc:
1492     /*
1493      * We now have all the capabilities; if we're in an interrupting
1494      * state, then we should take the opportunity to delete all the
1495      * threads in the system.
1496      */
1497     if (sched_state == SCHED_INTERRUPTING) {
1498         deleteAllThreads(cap);
1499         sched_state = SCHED_SHUTTING_DOWN;
1500     }
1501     
1502     heap_census = scheduleNeedHeapProfile(rtsTrue);
1503
1504 #if defined(THREADED_RTS)
1505     postEvent(cap, EVENT_GC_START, 0, 0);
1506     debugTrace(DEBUG_sched, "doing GC");
1507     // reset waiting_for_gc *before* GC, so that when the GC threads
1508     // emerge they don't immediately re-enter the GC.
1509     waiting_for_gc = 0;
1510     GarbageCollect(force_major || heap_census, gc_type, cap);
1511 #else
1512     GarbageCollect(force_major || heap_census, 0, cap);
1513 #endif
1514     postEvent(cap, EVENT_GC_END, 0, 0);
1515
1516     if (recent_activity == ACTIVITY_INACTIVE && force_major)
1517     {
1518         // We are doing a GC because the system has been idle for a
1519         // timeslice and we need to check for deadlock.  Record the
1520         // fact that we've done a GC and turn off the timer signal;
1521         // it will get re-enabled if we run any threads after the GC.
1522         recent_activity = ACTIVITY_DONE_GC;
1523         stopTimer();
1524     }
1525     else
1526     {
1527         // the GC might have taken long enough for the timer to set
1528         // recent_activity = ACTIVITY_INACTIVE, but we aren't
1529         // necessarily deadlocked:
1530         recent_activity = ACTIVITY_YES;
1531     }
1532
1533 #if defined(THREADED_RTS)
1534     if (gc_type == PENDING_GC_PAR)
1535     {
1536         releaseGCThreads(cap);
1537     }
1538 #endif
1539
1540     if (heap_census) {
1541         debugTrace(DEBUG_sched, "performing heap census");
1542         heapCensus();
1543         performHeapProfile = rtsFalse;
1544     }
1545
1546     if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1547         // GC set the heap_overflow flag, so we should proceed with
1548         // an orderly shutdown now.  Ultimately we want the main
1549         // thread to return to its caller with HeapExhausted, at which
1550         // point the caller should call hs_exit().  The first step is
1551         // to delete all the threads.
1552         //
1553         // Another way to do this would be to raise an exception in
1554         // the main thread, which we really should do because it gives
1555         // the program a chance to clean up.  But how do we find the
1556         // main thread?  It should presumably be the same one that
1557         // gets ^C exceptions, but that's all done on the Haskell side
1558         // (GHC.TopHandler).
1559         sched_state = SCHED_INTERRUPTING;
1560         goto delete_threads_and_gc;
1561     }
1562
1563 #ifdef SPARKBALANCE
1564     /* JB 
1565        Once we are all together... this would be the place to balance all
1566        spark pools. No concurrent stealing or adding of new sparks can
1567        occur. Should be defined in Sparks.c. */
1568     balanceSparkPoolsCaps(n_capabilities, capabilities);
1569 #endif
1570
1571 #if defined(THREADED_RTS)
1572     if (gc_type == PENDING_GC_SEQ) {
1573         // release our stash of capabilities.
1574         for (i = 0; i < n_capabilities; i++) {
1575             if (cap != &capabilities[i]) {
1576                 task->cap = &capabilities[i];
1577                 releaseCapability(&capabilities[i]);
1578             }
1579         }
1580     }
1581     if (cap) {
1582         task->cap = cap;
1583     } else {
1584         task->cap = NULL;
1585     }
1586 #endif
1587
1588     return cap;
1589 }
1590
1591 /* ---------------------------------------------------------------------------
1592  * Singleton fork(). Do not copy any running threads.
1593  * ------------------------------------------------------------------------- */
1594
1595 pid_t
1596 forkProcess(HsStablePtr *entry
1597 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1598             STG_UNUSED
1599 #endif
1600            )
1601 {
1602 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1603     Task *task;
1604     pid_t pid;
1605     StgTSO* t,*next;
1606     Capability *cap;
1607     nat s;
1608     
1609 #if defined(THREADED_RTS)
1610     if (RtsFlags.ParFlags.nNodes > 1) {
1611         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1612         stg_exit(EXIT_FAILURE);
1613     }
1614 #endif
1615
1616     debugTrace(DEBUG_sched, "forking!");
1617     
1618     // ToDo: for SMP, we should probably acquire *all* the capabilities
1619     cap = rts_lock();
1620     
1621     // no funny business: hold locks while we fork, otherwise if some
1622     // other thread is holding a lock when the fork happens, the data
1623     // structure protected by the lock will forever be in an
1624     // inconsistent state in the child.  See also #1391.
1625     ACQUIRE_LOCK(&sched_mutex);
1626     ACQUIRE_LOCK(&cap->lock);
1627     ACQUIRE_LOCK(&cap->running_task->lock);
1628
1629     pid = fork();
1630     
1631     if (pid) { // parent
1632         
1633         RELEASE_LOCK(&sched_mutex);
1634         RELEASE_LOCK(&cap->lock);
1635         RELEASE_LOCK(&cap->running_task->lock);
1636
1637         // just return the pid
1638         rts_unlock(cap);
1639         return pid;
1640         
1641     } else { // child
1642         
1643 #if defined(THREADED_RTS)
1644         initMutex(&sched_mutex);
1645         initMutex(&cap->lock);
1646         initMutex(&cap->running_task->lock);
1647 #endif
1648
1649         // Now, all OS threads except the thread that forked are
1650         // stopped.  We need to stop all Haskell threads, including
1651         // those involved in foreign calls.  Also we need to delete
1652         // all Tasks, because they correspond to OS threads that are
1653         // now gone.
1654
1655         for (s = 0; s < total_steps; s++) {
1656           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1657             if (t->what_next == ThreadRelocated) {
1658                 next = t->_link;
1659             } else {
1660                 next = t->global_link;
1661                 // don't allow threads to catch the ThreadKilled
1662                 // exception, but we do want to raiseAsync() because these
1663                 // threads may be evaluating thunks that we need later.
1664                 deleteThread_(cap,t);
1665             }
1666           }
1667         }
1668         
1669         // Empty the run queue.  It seems tempting to let all the
1670         // killed threads stay on the run queue as zombies to be
1671         // cleaned up later, but some of them correspond to bound
1672         // threads for which the corresponding Task does not exist.
1673         cap->run_queue_hd = END_TSO_QUEUE;
1674         cap->run_queue_tl = END_TSO_QUEUE;
1675
1676         // Any suspended C-calling Tasks are no more, their OS threads
1677         // don't exist now:
1678         cap->suspended_ccalling_tasks = NULL;
1679
1680         // Empty the threads lists.  Otherwise, the garbage
1681         // collector may attempt to resurrect some of these threads.
1682         for (s = 0; s < total_steps; s++) {
1683             all_steps[s].threads = END_TSO_QUEUE;
1684         }
1685
1686         // Wipe the task list, except the current Task.
1687         ACQUIRE_LOCK(&sched_mutex);
1688         for (task = all_tasks; task != NULL; task=task->all_link) {
1689             if (task != cap->running_task) {
1690 #if defined(THREADED_RTS)
1691                 initMutex(&task->lock); // see #1391
1692 #endif
1693                 discardTask(task);
1694             }
1695         }
1696         RELEASE_LOCK(&sched_mutex);
1697
1698 #if defined(THREADED_RTS)
1699         // Wipe our spare workers list, they no longer exist.  New
1700         // workers will be created if necessary.
1701         cap->spare_workers = NULL;
1702         cap->returning_tasks_hd = NULL;
1703         cap->returning_tasks_tl = NULL;
1704 #endif
1705
1706         // On Unix, all timers are reset in the child, so we need to start
1707         // the timer again.
1708         initTimer();
1709         startTimer();
1710
1711         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1712         rts_checkSchedStatus("forkProcess",cap);
1713         
1714         rts_unlock(cap);
1715         hs_exit();                      // clean up and exit
1716         stg_exit(EXIT_SUCCESS);
1717     }
1718 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1719     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1720     return -1;
1721 #endif
1722 }
1723
1724 /* ---------------------------------------------------------------------------
1725  * Delete all the threads in the system
1726  * ------------------------------------------------------------------------- */
1727    
1728 static void
1729 deleteAllThreads ( Capability *cap )
1730 {
1731     // NOTE: only safe to call if we own all capabilities.
1732
1733     StgTSO* t, *next;
1734     nat s;
1735
1736     debugTrace(DEBUG_sched,"deleting all threads");
1737     for (s = 0; s < total_steps; s++) {
1738       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1739         if (t->what_next == ThreadRelocated) {
1740             next = t->_link;
1741         } else {
1742             next = t->global_link;
1743             deleteThread(cap,t);
1744         }
1745       }
1746     }      
1747
1748     // The run queue now contains a bunch of ThreadKilled threads.  We
1749     // must not throw these away: the main thread(s) will be in there
1750     // somewhere, and the main scheduler loop has to deal with it.
1751     // Also, the run queue is the only thing keeping these threads from
1752     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1753
1754 #if !defined(THREADED_RTS)
1755     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1756     ASSERT(sleeping_queue == END_TSO_QUEUE);
1757 #endif
1758 }
1759
1760 /* -----------------------------------------------------------------------------
1761    Managing the suspended_ccalling_tasks list.
1762    Locks required: sched_mutex
1763    -------------------------------------------------------------------------- */
1764
1765 STATIC_INLINE void
1766 suspendTask (Capability *cap, Task *task)
1767 {
1768     ASSERT(task->next == NULL && task->prev == NULL);
1769     task->next = cap->suspended_ccalling_tasks;
1770     task->prev = NULL;
1771     if (cap->suspended_ccalling_tasks) {
1772         cap->suspended_ccalling_tasks->prev = task;
1773     }
1774     cap->suspended_ccalling_tasks = task;
1775 }
1776
1777 STATIC_INLINE void
1778 recoverSuspendedTask (Capability *cap, Task *task)
1779 {
1780     if (task->prev) {
1781         task->prev->next = task->next;
1782     } else {
1783         ASSERT(cap->suspended_ccalling_tasks == task);
1784         cap->suspended_ccalling_tasks = task->next;
1785     }
1786     if (task->next) {
1787         task->next->prev = task->prev;
1788     }
1789     task->next = task->prev = NULL;
1790 }
1791
1792 /* ---------------------------------------------------------------------------
1793  * Suspending & resuming Haskell threads.
1794  * 
1795  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1796  * its capability before calling the C function.  This allows another
1797  * task to pick up the capability and carry on running Haskell
1798  * threads.  It also means that if the C call blocks, it won't lock
1799  * the whole system.
1800  *
1801  * The Haskell thread making the C call is put to sleep for the
1802  * duration of the call, on the susepended_ccalling_threads queue.  We
1803  * give out a token to the task, which it can use to resume the thread
1804  * on return from the C function.
1805  * ------------------------------------------------------------------------- */
1806    
1807 void *
1808 suspendThread (StgRegTable *reg)
1809 {
1810   Capability *cap;
1811   int saved_errno;
1812   StgTSO *tso;
1813   Task *task;
1814 #if mingw32_HOST_OS
1815   StgWord32 saved_winerror;
1816 #endif
1817
1818   saved_errno = errno;
1819 #if mingw32_HOST_OS
1820   saved_winerror = GetLastError();
1821 #endif
1822
1823   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1824    */
1825   cap = regTableToCapability(reg);
1826
1827   task = cap->running_task;
1828   tso = cap->r.rCurrentTSO;
1829
1830   postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
1831   debugTrace(DEBUG_sched, 
1832              "thread %lu did a safe foreign call", 
1833              (unsigned long)cap->r.rCurrentTSO->id);
1834
1835   // XXX this might not be necessary --SDM
1836   tso->what_next = ThreadRunGHC;
1837
1838   threadPaused(cap,tso);
1839
1840   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1841       tso->why_blocked = BlockedOnCCall;
1842       tso->flags |= TSO_BLOCKEX;
1843       tso->flags &= ~TSO_INTERRUPTIBLE;
1844   } else {
1845       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1846   }
1847
1848   // Hand back capability
1849   task->suspended_tso = tso;
1850
1851   ACQUIRE_LOCK(&cap->lock);
1852
1853   suspendTask(cap,task);
1854   cap->in_haskell = rtsFalse;
1855   releaseCapability_(cap,rtsFalse);
1856   
1857   RELEASE_LOCK(&cap->lock);
1858
1859 #if defined(THREADED_RTS)
1860   /* Preparing to leave the RTS, so ensure there's a native thread/task
1861      waiting to take over.
1862   */
1863   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1864 #endif
1865
1866   errno = saved_errno;
1867 #if mingw32_HOST_OS
1868   SetLastError(saved_winerror);
1869 #endif
1870   return task;
1871 }
1872
1873 StgRegTable *
1874 resumeThread (void *task_)
1875 {
1876     StgTSO *tso;
1877     Capability *cap;
1878     Task *task = task_;
1879     int saved_errno;
1880 #if mingw32_HOST_OS
1881     StgWord32 saved_winerror;
1882 #endif
1883
1884     saved_errno = errno;
1885 #if mingw32_HOST_OS
1886     saved_winerror = GetLastError();
1887 #endif
1888
1889     cap = task->cap;
1890     // Wait for permission to re-enter the RTS with the result.
1891     waitForReturnCapability(&cap,task);
1892     // we might be on a different capability now... but if so, our
1893     // entry on the suspended_ccalling_tasks list will also have been
1894     // migrated.
1895
1896     // Remove the thread from the suspended list
1897     recoverSuspendedTask(cap,task);
1898
1899     tso = task->suspended_tso;
1900     task->suspended_tso = NULL;
1901     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1902
1903     postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
1904     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1905     
1906     if (tso->why_blocked == BlockedOnCCall) {
1907         // avoid locking the TSO if we don't have to
1908         if (tso->blocked_exceptions != END_TSO_QUEUE) {
1909             awakenBlockedExceptionQueue(cap,tso);
1910         }
1911         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1912     }
1913     
1914     /* Reset blocking status */
1915     tso->why_blocked  = NotBlocked;
1916     
1917     cap->r.rCurrentTSO = tso;
1918     cap->in_haskell = rtsTrue;
1919     errno = saved_errno;
1920 #if mingw32_HOST_OS
1921     SetLastError(saved_winerror);
1922 #endif
1923
1924     /* We might have GC'd, mark the TSO dirty again */
1925     dirty_TSO(cap,tso);
1926
1927     IF_DEBUG(sanity, checkTSO(tso));
1928
1929     return &cap->r;
1930 }
1931
1932 /* ---------------------------------------------------------------------------
1933  * scheduleThread()
1934  *
1935  * scheduleThread puts a thread on the end  of the runnable queue.
1936  * This will usually be done immediately after a thread is created.
1937  * The caller of scheduleThread must create the thread using e.g.
1938  * createThread and push an appropriate closure
1939  * on this thread's stack before the scheduler is invoked.
1940  * ------------------------------------------------------------------------ */
1941
1942 void
1943 scheduleThread(Capability *cap, StgTSO *tso)
1944 {
1945     // The thread goes at the *end* of the run-queue, to avoid possible
1946     // starvation of any threads already on the queue.
1947     appendToRunQueue(cap,tso);
1948 }
1949
1950 void
1951 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1952 {
1953 #if defined(THREADED_RTS)
1954     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1955                               // move this thread from now on.
1956     cpu %= RtsFlags.ParFlags.nNodes;
1957     if (cpu == cap->no) {
1958         appendToRunQueue(cap,tso);
1959     } else {
1960         postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
1961         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1962     }
1963 #else
1964     appendToRunQueue(cap,tso);
1965 #endif
1966 }
1967
1968 Capability *
1969 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1970 {
1971     Task *task;
1972
1973     // We already created/initialised the Task
1974     task = cap->running_task;
1975
1976     // This TSO is now a bound thread; make the Task and TSO
1977     // point to each other.
1978     tso->bound = task;
1979     tso->cap = cap;
1980
1981     task->tso = tso;
1982     task->ret = ret;
1983     task->stat = NoStatus;
1984
1985     appendToRunQueue(cap,tso);
1986
1987     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1988
1989     cap = schedule(cap,task);
1990
1991     ASSERT(task->stat != NoStatus);
1992     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1993
1994     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1995     return cap;
1996 }
1997
1998 /* ----------------------------------------------------------------------------
1999  * Starting Tasks
2000  * ------------------------------------------------------------------------- */
2001
2002 #if defined(THREADED_RTS)
2003 void OSThreadProcAttr
2004 workerStart(Task *task)
2005 {
2006     Capability *cap;
2007
2008     // See startWorkerTask().
2009     ACQUIRE_LOCK(&task->lock);
2010     cap = task->cap;
2011     RELEASE_LOCK(&task->lock);
2012
2013     if (RtsFlags.ParFlags.setAffinity) {
2014         setThreadAffinity(cap->no, n_capabilities);
2015     }
2016
2017     // set the thread-local pointer to the Task:
2018     taskEnter(task);
2019
2020     // schedule() runs without a lock.
2021     cap = schedule(cap,task);
2022
2023     // On exit from schedule(), we have a Capability, but possibly not
2024     // the same one we started with.
2025
2026     // During shutdown, the requirement is that after all the
2027     // Capabilities are shut down, all workers that are shutting down
2028     // have finished workerTaskStop().  This is why we hold on to
2029     // cap->lock until we've finished workerTaskStop() below.
2030     //
2031     // There may be workers still involved in foreign calls; those
2032     // will just block in waitForReturnCapability() because the
2033     // Capability has been shut down.
2034     //
2035     ACQUIRE_LOCK(&cap->lock);
2036     releaseCapability_(cap,rtsFalse);
2037     workerTaskStop(task);
2038     RELEASE_LOCK(&cap->lock);
2039 }
2040 #endif
2041
2042 /* ---------------------------------------------------------------------------
2043  * initScheduler()
2044  *
2045  * Initialise the scheduler.  This resets all the queues - if the
2046  * queues contained any threads, they'll be garbage collected at the
2047  * next pass.
2048  *
2049  * ------------------------------------------------------------------------ */
2050
2051 void 
2052 initScheduler(void)
2053 {
2054 #if !defined(THREADED_RTS)
2055   blocked_queue_hd  = END_TSO_QUEUE;
2056   blocked_queue_tl  = END_TSO_QUEUE;
2057   sleeping_queue    = END_TSO_QUEUE;
2058 #endif
2059
2060   blackhole_queue   = END_TSO_QUEUE;
2061
2062   sched_state    = SCHED_RUNNING;
2063   recent_activity = ACTIVITY_YES;
2064
2065 #if defined(THREADED_RTS)
2066   /* Initialise the mutex and condition variables used by
2067    * the scheduler. */
2068   initMutex(&sched_mutex);
2069 #endif
2070   
2071   ACQUIRE_LOCK(&sched_mutex);
2072
2073   /* A capability holds the state a native thread needs in
2074    * order to execute STG code. At least one capability is
2075    * floating around (only THREADED_RTS builds have more than one).
2076    */
2077   initCapabilities();
2078
2079   initTaskManager();
2080
2081 #if defined(THREADED_RTS)
2082   initSparkPools();
2083 #endif
2084
2085 #if defined(THREADED_RTS)
2086   /*
2087    * Eagerly start one worker to run each Capability, except for
2088    * Capability 0.  The idea is that we're probably going to start a
2089    * bound thread on Capability 0 pretty soon, so we don't want a
2090    * worker task hogging it.
2091    */
2092   { 
2093       nat i;
2094       Capability *cap;
2095       for (i = 1; i < n_capabilities; i++) {
2096           cap = &capabilities[i];
2097           ACQUIRE_LOCK(&cap->lock);
2098           startWorkerTask(cap, workerStart);
2099           RELEASE_LOCK(&cap->lock);
2100       }
2101   }
2102 #endif
2103
2104   RELEASE_LOCK(&sched_mutex);
2105 }
2106
2107 void
2108 exitScheduler(
2109     rtsBool wait_foreign
2110 #if !defined(THREADED_RTS)
2111                          __attribute__((unused))
2112 #endif
2113 )
2114                /* see Capability.c, shutdownCapability() */
2115 {
2116     Task *task = NULL;
2117
2118     task = newBoundTask();
2119
2120     // If we haven't killed all the threads yet, do it now.
2121     if (sched_state < SCHED_SHUTTING_DOWN) {
2122         sched_state = SCHED_INTERRUPTING;
2123         waitForReturnCapability(&task->cap,task);
2124         scheduleDoGC(task->cap,task,rtsFalse);    
2125         releaseCapability(task->cap);
2126     }
2127     sched_state = SCHED_SHUTTING_DOWN;
2128
2129 #if defined(THREADED_RTS)
2130     { 
2131         nat i;
2132         
2133         for (i = 0; i < n_capabilities; i++) {
2134             shutdownCapability(&capabilities[i], task, wait_foreign);
2135         }
2136         boundTaskExiting(task);
2137     }
2138 #endif
2139 }
2140
2141 void
2142 freeScheduler( void )
2143 {
2144     nat still_running;
2145
2146     ACQUIRE_LOCK(&sched_mutex);
2147     still_running = freeTaskManager();
2148     // We can only free the Capabilities if there are no Tasks still
2149     // running.  We might have a Task about to return from a foreign
2150     // call into waitForReturnCapability(), for example (actually,
2151     // this should be the *only* thing that a still-running Task can
2152     // do at this point, and it will block waiting for the
2153     // Capability).
2154     if (still_running == 0) {
2155         freeCapabilities();
2156         if (n_capabilities != 1) {
2157             stgFree(capabilities);
2158         }
2159     }
2160     RELEASE_LOCK(&sched_mutex);
2161 #if defined(THREADED_RTS)
2162     closeMutex(&sched_mutex);
2163 #endif
2164 }
2165
2166 /* -----------------------------------------------------------------------------
2167    performGC
2168
2169    This is the interface to the garbage collector from Haskell land.
2170    We provide this so that external C code can allocate and garbage
2171    collect when called from Haskell via _ccall_GC.
2172    -------------------------------------------------------------------------- */
2173
2174 static void
2175 performGC_(rtsBool force_major)
2176 {
2177     Task *task;
2178
2179     // We must grab a new Task here, because the existing Task may be
2180     // associated with a particular Capability, and chained onto the 
2181     // suspended_ccalling_tasks queue.
2182     task = newBoundTask();
2183
2184     waitForReturnCapability(&task->cap,task);
2185     scheduleDoGC(task->cap,task,force_major);
2186     releaseCapability(task->cap);
2187     boundTaskExiting(task);
2188 }
2189
2190 void
2191 performGC(void)
2192 {
2193     performGC_(rtsFalse);
2194 }
2195
2196 void
2197 performMajorGC(void)
2198 {
2199     performGC_(rtsTrue);
2200 }
2201
2202 /* -----------------------------------------------------------------------------
2203    Stack overflow
2204
2205    If the thread has reached its maximum stack size, then raise the
2206    StackOverflow exception in the offending thread.  Otherwise
2207    relocate the TSO into a larger chunk of memory and adjust its stack
2208    size appropriately.
2209    -------------------------------------------------------------------------- */
2210
2211 static StgTSO *
2212 threadStackOverflow(Capability *cap, StgTSO *tso)
2213 {
2214   nat new_stack_size, stack_words;
2215   lnat new_tso_size;
2216   StgPtr new_sp;
2217   StgTSO *dest;
2218
2219   IF_DEBUG(sanity,checkTSO(tso));
2220
2221   // don't allow throwTo() to modify the blocked_exceptions queue
2222   // while we are moving the TSO:
2223   lockClosure((StgClosure *)tso);
2224
2225   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2226       // NB. never raise a StackOverflow exception if the thread is
2227       // inside Control.Exceptino.block.  It is impractical to protect
2228       // against stack overflow exceptions, since virtually anything
2229       // can raise one (even 'catch'), so this is the only sensible
2230       // thing to do here.  See bug #767.
2231
2232       debugTrace(DEBUG_gc,
2233                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2234                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2235       IF_DEBUG(gc,
2236                /* If we're debugging, just print out the top of the stack */
2237                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2238                                                 tso->sp+64)));
2239
2240       // Send this thread the StackOverflow exception
2241       unlockTSO(tso);
2242       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2243       return tso;
2244   }
2245
2246   /* Try to double the current stack size.  If that takes us over the
2247    * maximum stack size for this thread, then use the maximum instead
2248    * (that is, unless we're already at or over the max size and we
2249    * can't raise the StackOverflow exception (see above), in which
2250    * case just double the size). Finally round up so the TSO ends up as
2251    * a whole number of blocks.
2252    */
2253   if (tso->stack_size >= tso->max_stack_size) {
2254       new_stack_size = tso->stack_size * 2;
2255   } else { 
2256       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2257   }
2258   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2259                                        TSO_STRUCT_SIZE)/sizeof(W_);
2260   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2261   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2262
2263   debugTrace(DEBUG_sched, 
2264              "increasing stack size from %ld words to %d.",
2265              (long)tso->stack_size, new_stack_size);
2266
2267   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2268   TICK_ALLOC_TSO(new_stack_size,0);
2269
2270   /* copy the TSO block and the old stack into the new area */
2271   memcpy(dest,tso,TSO_STRUCT_SIZE);
2272   stack_words = tso->stack + tso->stack_size - tso->sp;
2273   new_sp = (P_)dest + new_tso_size - stack_words;
2274   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2275
2276   /* relocate the stack pointers... */
2277   dest->sp         = new_sp;
2278   dest->stack_size = new_stack_size;
2279         
2280   /* Mark the old TSO as relocated.  We have to check for relocated
2281    * TSOs in the garbage collector and any primops that deal with TSOs.
2282    *
2283    * It's important to set the sp value to just beyond the end
2284    * of the stack, so we don't attempt to scavenge any part of the
2285    * dead TSO's stack.
2286    */
2287   tso->what_next = ThreadRelocated;
2288   setTSOLink(cap,tso,dest);
2289   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2290   tso->why_blocked = NotBlocked;
2291
2292   unlockTSO(dest);
2293   unlockTSO(tso);
2294
2295   IF_DEBUG(sanity,checkTSO(dest));
2296 #if 0
2297   IF_DEBUG(scheduler,printTSO(dest));
2298 #endif
2299
2300   return dest;
2301 }
2302
2303 static StgTSO *
2304 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2305 {
2306     bdescr *bd, *new_bd;
2307     lnat free_w, tso_size_w;
2308     StgTSO *new_tso;
2309
2310     tso_size_w = tso_sizeW(tso);
2311
2312     if (tso_size_w < MBLOCK_SIZE_W ||
2313           // TSO is less than 2 mblocks (since the first mblock is
2314           // shorter than MBLOCK_SIZE_W)
2315         (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 ||
2316           // or TSO is not a whole number of megablocks (ensuring
2317           // precondition of splitLargeBlock() below)
2318         (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) ||
2319           // or TSO is smaller than the minimum stack size (rounded up)
2320         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2321           // or stack is using more than 1/4 of the available space
2322     {
2323         // then do nothing
2324         return tso;
2325     }
2326
2327     // don't allow throwTo() to modify the blocked_exceptions queue
2328     // while we are moving the TSO:
2329     lockClosure((StgClosure *)tso);
2330
2331     // this is the number of words we'll free
2332     free_w = round_to_mblocks(tso_size_w/2);
2333
2334     bd = Bdescr((StgPtr)tso);
2335     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2336     bd->free = bd->start + TSO_STRUCT_SIZEW;
2337
2338     new_tso = (StgTSO *)new_bd->start;
2339     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2340     new_tso->stack_size = new_bd->free - new_tso->stack;
2341
2342     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2343                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2344
2345     tso->what_next = ThreadRelocated;
2346     tso->_link = new_tso; // no write barrier reqd: same generation
2347
2348     // The TSO attached to this Task may have moved, so update the
2349     // pointer to it.
2350     if (task->tso == tso) {
2351         task->tso = new_tso;
2352     }
2353
2354     unlockTSO(new_tso);
2355     unlockTSO(tso);
2356
2357     IF_DEBUG(sanity,checkTSO(new_tso));
2358
2359     return new_tso;
2360 }
2361
2362 /* ---------------------------------------------------------------------------
2363    Interrupt execution
2364    - usually called inside a signal handler so it mustn't do anything fancy.   
2365    ------------------------------------------------------------------------ */
2366
2367 void
2368 interruptStgRts(void)
2369 {
2370     sched_state = SCHED_INTERRUPTING;
2371     setContextSwitches();
2372 #if defined(THREADED_RTS)
2373     wakeUpRts();
2374 #endif
2375 }
2376
2377 /* -----------------------------------------------------------------------------
2378    Wake up the RTS
2379    
2380    This function causes at least one OS thread to wake up and run the
2381    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2382    an external event has arrived that may need servicing (eg. a
2383    keyboard interrupt).
2384
2385    In the single-threaded RTS we don't do anything here; we only have
2386    one thread anyway, and the event that caused us to want to wake up
2387    will have interrupted any blocking system call in progress anyway.
2388    -------------------------------------------------------------------------- */
2389
2390 #if defined(THREADED_RTS)
2391 void wakeUpRts(void)
2392 {
2393     // This forces the IO Manager thread to wakeup, which will
2394     // in turn ensure that some OS thread wakes up and runs the
2395     // scheduler loop, which will cause a GC and deadlock check.
2396     ioManagerWakeup();
2397 }
2398 #endif
2399
2400 /* -----------------------------------------------------------------------------
2401  * checkBlackHoles()
2402  *
2403  * Check the blackhole_queue for threads that can be woken up.  We do
2404  * this periodically: before every GC, and whenever the run queue is
2405  * empty.
2406  *
2407  * An elegant solution might be to just wake up all the blocked
2408  * threads with awakenBlockedQueue occasionally: they'll go back to
2409  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2410  * doesn't give us a way to tell whether we've actually managed to
2411  * wake up any threads, so we would be busy-waiting.
2412  *
2413  * -------------------------------------------------------------------------- */
2414
2415 static rtsBool
2416 checkBlackHoles (Capability *cap)
2417 {
2418     StgTSO **prev, *t;
2419     rtsBool any_woke_up = rtsFalse;
2420     StgHalfWord type;
2421
2422     // blackhole_queue is global:
2423     ASSERT_LOCK_HELD(&sched_mutex);
2424
2425     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2426
2427     // ASSUMES: sched_mutex
2428     prev = &blackhole_queue;
2429     t = blackhole_queue;
2430     while (t != END_TSO_QUEUE) {
2431         if (t->what_next == ThreadRelocated) {
2432             t = t->_link;
2433             continue;
2434         }
2435         ASSERT(t->why_blocked == BlockedOnBlackHole);
2436         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2437         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2438             IF_DEBUG(sanity,checkTSO(t));
2439             t = unblockOne(cap, t);
2440             *prev = t;
2441             any_woke_up = rtsTrue;
2442         } else {
2443             prev = &t->_link;
2444             t = t->_link;
2445         }
2446     }
2447
2448     return any_woke_up;
2449 }
2450
2451 /* -----------------------------------------------------------------------------
2452    Deleting threads
2453
2454    This is used for interruption (^C) and forking, and corresponds to
2455    raising an exception but without letting the thread catch the
2456    exception.
2457    -------------------------------------------------------------------------- */
2458
2459 static void 
2460 deleteThread (Capability *cap, StgTSO *tso)
2461 {
2462     // NOTE: must only be called on a TSO that we have exclusive
2463     // access to, because we will call throwToSingleThreaded() below.
2464     // The TSO must be on the run queue of the Capability we own, or 
2465     // we must own all Capabilities.
2466
2467     if (tso->why_blocked != BlockedOnCCall &&
2468         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2469         throwToSingleThreaded(cap,tso,NULL);
2470     }
2471 }
2472
2473 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2474 static void 
2475 deleteThread_(Capability *cap, StgTSO *tso)
2476 { // for forkProcess only:
2477   // like deleteThread(), but we delete threads in foreign calls, too.
2478
2479     if (tso->why_blocked == BlockedOnCCall ||
2480         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2481         unblockOne(cap,tso);
2482         tso->what_next = ThreadKilled;
2483     } else {
2484         deleteThread(cap,tso);
2485     }
2486 }
2487 #endif
2488
2489 /* -----------------------------------------------------------------------------
2490    raiseExceptionHelper
2491    
2492    This function is called by the raise# primitve, just so that we can
2493    move some of the tricky bits of raising an exception from C-- into
2494    C.  Who knows, it might be a useful re-useable thing here too.
2495    -------------------------------------------------------------------------- */
2496
2497 StgWord
2498 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2499 {
2500     Capability *cap = regTableToCapability(reg);
2501     StgThunk *raise_closure = NULL;
2502     StgPtr p, next;
2503     StgRetInfoTable *info;
2504     //
2505     // This closure represents the expression 'raise# E' where E
2506     // is the exception raise.  It is used to overwrite all the
2507     // thunks which are currently under evaluataion.
2508     //
2509
2510     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2511     // LDV profiling: stg_raise_info has THUNK as its closure
2512     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2513     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2514     // 1 does not cause any problem unless profiling is performed.
2515     // However, when LDV profiling goes on, we need to linearly scan
2516     // small object pool, where raise_closure is stored, so we should
2517     // use MIN_UPD_SIZE.
2518     //
2519     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2520     //                                 sizeofW(StgClosure)+1);
2521     //
2522
2523     //
2524     // Walk up the stack, looking for the catch frame.  On the way,
2525     // we update any closures pointed to from update frames with the
2526     // raise closure that we just built.
2527     //
2528     p = tso->sp;
2529     while(1) {
2530         info = get_ret_itbl((StgClosure *)p);
2531         next = p + stack_frame_sizeW((StgClosure *)p);
2532         switch (info->i.type) {
2533             
2534         case UPDATE_FRAME:
2535             // Only create raise_closure if we need to.
2536             if (raise_closure == NULL) {
2537                 raise_closure = 
2538                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2539                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2540                 raise_closure->payload[0] = exception;
2541             }
2542             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2543             p = next;
2544             continue;
2545
2546         case ATOMICALLY_FRAME:
2547             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2548             tso->sp = p;
2549             return ATOMICALLY_FRAME;
2550             
2551         case CATCH_FRAME:
2552             tso->sp = p;
2553             return CATCH_FRAME;
2554
2555         case CATCH_STM_FRAME:
2556             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2557             tso->sp = p;
2558             return CATCH_STM_FRAME;
2559             
2560         case STOP_FRAME:
2561             tso->sp = p;
2562             return STOP_FRAME;
2563
2564         case CATCH_RETRY_FRAME:
2565         default:
2566             p = next; 
2567             continue;
2568         }
2569     }
2570 }
2571
2572
2573 /* -----------------------------------------------------------------------------
2574    findRetryFrameHelper
2575
2576    This function is called by the retry# primitive.  It traverses the stack
2577    leaving tso->sp referring to the frame which should handle the retry.  
2578
2579    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2580    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2581
2582    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2583    create) because retries are not considered to be exceptions, despite the
2584    similar implementation.
2585
2586    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2587    not be created within memory transactions.
2588    -------------------------------------------------------------------------- */
2589
2590 StgWord
2591 findRetryFrameHelper (StgTSO *tso)
2592 {
2593   StgPtr           p, next;
2594   StgRetInfoTable *info;
2595
2596   p = tso -> sp;
2597   while (1) {
2598     info = get_ret_itbl((StgClosure *)p);
2599     next = p + stack_frame_sizeW((StgClosure *)p);
2600     switch (info->i.type) {
2601       
2602     case ATOMICALLY_FRAME:
2603         debugTrace(DEBUG_stm,
2604                    "found ATOMICALLY_FRAME at %p during retry", p);
2605         tso->sp = p;
2606         return ATOMICALLY_FRAME;
2607       
2608     case CATCH_RETRY_FRAME:
2609         debugTrace(DEBUG_stm,
2610                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2611         tso->sp = p;
2612         return CATCH_RETRY_FRAME;
2613       
2614     case CATCH_STM_FRAME: {
2615         StgTRecHeader *trec = tso -> trec;
2616         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2617         debugTrace(DEBUG_stm,
2618                    "found CATCH_STM_FRAME at %p during retry", p);
2619         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2620         stmAbortTransaction(tso -> cap, trec);
2621         stmFreeAbortedTRec(tso -> cap, trec);
2622         tso -> trec = outer;
2623         p = next; 
2624         continue;
2625     }
2626       
2627
2628     default:
2629       ASSERT(info->i.type != CATCH_FRAME);
2630       ASSERT(info->i.type != STOP_FRAME);
2631       p = next; 
2632       continue;
2633     }
2634   }
2635 }
2636
2637 /* -----------------------------------------------------------------------------
2638    resurrectThreads is called after garbage collection on the list of
2639    threads found to be garbage.  Each of these threads will be woken
2640    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2641    on an MVar, or NonTermination if the thread was blocked on a Black
2642    Hole.
2643
2644    Locks: assumes we hold *all* the capabilities.
2645    -------------------------------------------------------------------------- */
2646
2647 void
2648 resurrectThreads (StgTSO *threads)
2649 {
2650     StgTSO *tso, *next;
2651     Capability *cap;
2652     step *step;
2653
2654     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2655         next = tso->global_link;
2656
2657         step = Bdescr((P_)tso)->step;
2658         tso->global_link = step->threads;
2659         step->threads = tso;
2660
2661         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2662         
2663         // Wake up the thread on the Capability it was last on
2664         cap = tso->cap;
2665         
2666         switch (tso->why_blocked) {
2667         case BlockedOnMVar:
2668         case BlockedOnException:
2669             /* Called by GC - sched_mutex lock is currently held. */
2670             throwToSingleThreaded(cap, tso,
2671                                   (StgClosure *)blockedOnDeadMVar_closure);
2672             break;
2673         case BlockedOnBlackHole:
2674             throwToSingleThreaded(cap, tso,
2675                                   (StgClosure *)nonTermination_closure);
2676             break;
2677         case BlockedOnSTM:
2678             throwToSingleThreaded(cap, tso,
2679                                   (StgClosure *)blockedIndefinitely_closure);
2680             break;
2681         case NotBlocked:
2682             /* This might happen if the thread was blocked on a black hole
2683              * belonging to a thread that we've just woken up (raiseAsync
2684              * can wake up threads, remember...).
2685              */
2686             continue;
2687         default:
2688             barf("resurrectThreads: thread blocked in a strange way");
2689         }
2690     }
2691 }
2692
2693 /* -----------------------------------------------------------------------------
2694    performPendingThrowTos is called after garbage collection, and
2695    passed a list of threads that were found to have pending throwTos
2696    (tso->blocked_exceptions was not empty), and were blocked.
2697    Normally this doesn't happen, because we would deliver the
2698    exception directly if the target thread is blocked, but there are
2699    small windows where it might occur on a multiprocessor (see
2700    throwTo()).
2701
2702    NB. we must be holding all the capabilities at this point, just
2703    like resurrectThreads().
2704    -------------------------------------------------------------------------- */
2705
2706 void
2707 performPendingThrowTos (StgTSO *threads)
2708 {
2709     StgTSO *tso, *next;
2710     Capability *cap;
2711     step *step;
2712
2713     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2714         next = tso->global_link;
2715
2716         step = Bdescr((P_)tso)->step;
2717         tso->global_link = step->threads;
2718         step->threads = tso;
2719
2720         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2721         
2722         cap = tso->cap;
2723         maybePerformBlockedException(cap, tso);
2724     }
2725 }