Detect C finalizer callbacks in rts_lock() instead of schedule()
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "eventlog/EventLog.h"
30 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
31 #include "Sparks.h"
32 #include "Capability.h"
33 #include "Task.h"
34 #include "AwaitEvent.h"
35 #if defined(mingw32_HOST_OS)
36 #include "win32/IOManager.h"
37 #endif
38 #include "Trace.h"
39 #include "RaiseAsync.h"
40 #include "Threads.h"
41 #include "Timer.h"
42 #include "ThreadPaused.h"
43
44 #ifdef HAVE_SYS_TYPES_H
45 #include <sys/types.h>
46 #endif
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <string.h>
52 #include <stdlib.h>
53 #include <stdarg.h>
54
55 #ifdef HAVE_ERRNO_H
56 #include <errno.h>
57 #endif
58
59 /* -----------------------------------------------------------------------------
60  * Global variables
61  * -------------------------------------------------------------------------- */
62
63 #if !defined(THREADED_RTS)
64 // Blocked/sleeping thrads
65 StgTSO *blocked_queue_hd = NULL;
66 StgTSO *blocked_queue_tl = NULL;
67 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
68 #endif
69
70 /* Threads blocked on blackholes.
71  * LOCK: sched_mutex+capability, or all capabilities
72  */
73 StgTSO *blackhole_queue = NULL;
74
75 /* The blackhole_queue should be checked for threads to wake up.  See
76  * Schedule.h for more thorough comment.
77  * LOCK: none (doesn't matter if we miss an update)
78  */
79 rtsBool blackholes_need_checking = rtsFalse;
80
81 /* Set to true when the latest garbage collection failed to reclaim
82  * enough space, and the runtime should proceed to shut itself down in
83  * an orderly fashion (emitting profiling info etc.)
84  */
85 rtsBool heap_overflow = rtsFalse;
86
87 /* flag that tracks whether we have done any execution in this time slice.
88  * LOCK: currently none, perhaps we should lock (but needs to be
89  * updated in the fast path of the scheduler).
90  *
91  * NB. must be StgWord, we do xchg() on it.
92  */
93 volatile StgWord recent_activity = ACTIVITY_YES;
94
95 /* if this flag is set as well, give up execution
96  * LOCK: none (changes monotonically)
97  */
98 volatile StgWord sched_state = SCHED_RUNNING;
99
100 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
101  *  exists - earlier gccs apparently didn't.
102  *  -= chak
103  */
104 StgTSO dummy_tso;
105
106 /*
107  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
108  * in an MT setting, needed to signal that a worker thread shouldn't hang around
109  * in the scheduler when it is out of work.
110  */
111 rtsBool shutting_down_scheduler = rtsFalse;
112
113 /*
114  * This mutex protects most of the global scheduler data in
115  * the THREADED_RTS runtime.
116  */
117 #if defined(THREADED_RTS)
118 Mutex sched_mutex;
119 #endif
120
121 #if !defined(mingw32_HOST_OS)
122 #define FORKPROCESS_PRIMOP_SUPPORTED
123 #endif
124
125 /* -----------------------------------------------------------------------------
126  * static function prototypes
127  * -------------------------------------------------------------------------- */
128
129 static Capability *schedule (Capability *initialCapability, Task *task);
130
131 //
132 // These function all encapsulate parts of the scheduler loop, and are
133 // abstracted only to make the structure and control flow of the
134 // scheduler clearer.
135 //
136 static void schedulePreLoop (void);
137 static void scheduleFindWork (Capability *cap);
138 #if defined(THREADED_RTS)
139 static void scheduleYield (Capability **pcap, Task *task);
140 #endif
141 static void scheduleStartSignalHandlers (Capability *cap);
142 static void scheduleCheckBlockedThreads (Capability *cap);
143 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
144 static void scheduleCheckBlackHoles (Capability *cap);
145 static void scheduleDetectDeadlock (Capability *cap, Task *task);
146 static void schedulePushWork(Capability *cap, Task *task);
147 #if defined(THREADED_RTS)
148 static void scheduleActivateSpark(Capability *cap);
149 #endif
150 static void schedulePostRunThread(Capability *cap, StgTSO *t);
151 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
152 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
153                                          StgTSO *t);
154 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
155                                     nat prev_what_next );
156 static void scheduleHandleThreadBlocked( StgTSO *t );
157 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
158                                              StgTSO *t );
159 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
160 static Capability *scheduleDoGC(Capability *cap, Task *task,
161                                 rtsBool force_major);
162
163 static rtsBool checkBlackHoles(Capability *cap);
164
165 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
166 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
167
168 static void deleteThread (Capability *cap, StgTSO *tso);
169 static void deleteAllThreads (Capability *cap);
170
171 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
172 static void deleteThread_(Capability *cap, StgTSO *tso);
173 #endif
174
175 #ifdef DEBUG
176 static char *whatNext_strs[] = {
177   [0]               = "(unknown)",
178   [ThreadRunGHC]    = "ThreadRunGHC",
179   [ThreadInterpret] = "ThreadInterpret",
180   [ThreadKilled]    = "ThreadKilled",
181   [ThreadRelocated] = "ThreadRelocated",
182   [ThreadComplete]  = "ThreadComplete"
183 };
184 #endif
185
186 /* -----------------------------------------------------------------------------
187  * Putting a thread on the run queue: different scheduling policies
188  * -------------------------------------------------------------------------- */
189
190 STATIC_INLINE void
191 addToRunQueue( Capability *cap, StgTSO *t )
192 {
193     // this does round-robin scheduling; good for concurrency
194     appendToRunQueue(cap,t);
195 }
196
197 /* ---------------------------------------------------------------------------
198    Main scheduling loop.
199
200    We use round-robin scheduling, each thread returning to the
201    scheduler loop when one of these conditions is detected:
202
203       * out of heap space
204       * timer expires (thread yields)
205       * thread blocks
206       * thread ends
207       * stack overflow
208
209    GRAN version:
210      In a GranSim setup this loop iterates over the global event queue.
211      This revolves around the global event queue, which determines what 
212      to do next. Therefore, it's more complicated than either the 
213      concurrent or the parallel (GUM) setup.
214   This version has been entirely removed (JB 2008/08).
215
216    GUM version:
217      GUM iterates over incoming messages.
218      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
219      and sends out a fish whenever it has nothing to do; in-between
220      doing the actual reductions (shared code below) it processes the
221      incoming messages and deals with delayed operations 
222      (see PendingFetches).
223      This is not the ugliest code you could imagine, but it's bloody close.
224
225   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
226   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
227   as well as future GUM versions. This file has been refurbished to
228   only contain valid code, which is however incomplete, refers to
229   invalid includes etc.
230
231    ------------------------------------------------------------------------ */
232
233 static Capability *
234 schedule (Capability *initialCapability, Task *task)
235 {
236   StgTSO *t;
237   Capability *cap;
238   StgThreadReturnCode ret;
239   nat prev_what_next;
240   rtsBool ready_to_gc;
241 #if defined(THREADED_RTS)
242   rtsBool first = rtsTrue;
243 #endif
244   
245   cap = initialCapability;
246
247   // Pre-condition: this task owns initialCapability.
248   // The sched_mutex is *NOT* held
249   // NB. on return, we still hold a capability.
250
251   debugTrace (DEBUG_sched, 
252               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
253               task, initialCapability);
254
255   schedulePreLoop();
256
257   // -----------------------------------------------------------
258   // Scheduler loop starts here:
259
260   while (1) {
261
262     // Check whether we have re-entered the RTS from Haskell without
263     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
264     // call).
265     if (cap->in_haskell) {
266           errorBelch("schedule: re-entered unsafely.\n"
267                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
268           stg_exit(EXIT_FAILURE);
269     }
270
271     // The interruption / shutdown sequence.
272     // 
273     // In order to cleanly shut down the runtime, we want to:
274     //   * make sure that all main threads return to their callers
275     //     with the state 'Interrupted'.
276     //   * clean up all OS threads assocated with the runtime
277     //   * free all memory etc.
278     //
279     // So the sequence for ^C goes like this:
280     //
281     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
282     //     arranges for some Capability to wake up
283     //
284     //   * all threads in the system are halted, and the zombies are
285     //     placed on the run queue for cleaning up.  We acquire all
286     //     the capabilities in order to delete the threads, this is
287     //     done by scheduleDoGC() for convenience (because GC already
288     //     needs to acquire all the capabilities).  We can't kill
289     //     threads involved in foreign calls.
290     // 
291     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
292     //
293     //   * sched_state := SCHED_SHUTTING_DOWN
294     //
295     //   * all workers exit when the run queue on their capability
296     //     drains.  All main threads will also exit when their TSO
297     //     reaches the head of the run queue and they can return.
298     //
299     //   * eventually all Capabilities will shut down, and the RTS can
300     //     exit.
301     //
302     //   * We might be left with threads blocked in foreign calls, 
303     //     we should really attempt to kill these somehow (TODO);
304     
305     switch (sched_state) {
306     case SCHED_RUNNING:
307         break;
308     case SCHED_INTERRUPTING:
309         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
310 #if defined(THREADED_RTS)
311         discardSparksCap(cap);
312 #endif
313         /* scheduleDoGC() deletes all the threads */
314         cap = scheduleDoGC(cap,task,rtsFalse);
315
316         // after scheduleDoGC(), we must be shutting down.  Either some
317         // other Capability did the final GC, or we did it above,
318         // either way we can fall through to the SCHED_SHUTTING_DOWN
319         // case now.
320         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
321         // fall through
322
323     case SCHED_SHUTTING_DOWN:
324         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
325         // If we are a worker, just exit.  If we're a bound thread
326         // then we will exit below when we've removed our TSO from
327         // the run queue.
328         if (task->tso == NULL && emptyRunQueue(cap)) {
329             return cap;
330         }
331         break;
332     default:
333         barf("sched_state: %d", sched_state);
334     }
335
336     scheduleFindWork(cap);
337
338     /* work pushing, currently relevant only for THREADED_RTS:
339        (pushes threads, wakes up idle capabilities for stealing) */
340     schedulePushWork(cap,task);
341
342     scheduleDetectDeadlock(cap,task);
343
344 #if defined(THREADED_RTS)
345     cap = task->cap;    // reload cap, it might have changed
346 #endif
347
348     // Normally, the only way we can get here with no threads to
349     // run is if a keyboard interrupt received during 
350     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
351     // Additionally, it is not fatal for the
352     // threaded RTS to reach here with no threads to run.
353     //
354     // win32: might be here due to awaitEvent() being abandoned
355     // as a result of a console event having been delivered.
356     
357 #if defined(THREADED_RTS)
358     if (first) 
359     {
360     // XXX: ToDo
361     //     // don't yield the first time, we want a chance to run this
362     //     // thread for a bit, even if there are others banging at the
363     //     // door.
364     //     first = rtsFalse;
365     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
366     }
367
368   yield:
369     scheduleYield(&cap,task);
370     if (emptyRunQueue(cap)) continue; // look for work again
371 #endif
372
373 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
374     if ( emptyRunQueue(cap) ) {
375         ASSERT(sched_state >= SCHED_INTERRUPTING);
376     }
377 #endif
378
379     // 
380     // Get a thread to run
381     //
382     t = popRunQueue(cap);
383
384     // Sanity check the thread we're about to run.  This can be
385     // expensive if there is lots of thread switching going on...
386     IF_DEBUG(sanity,checkTSO(t));
387
388 #if defined(THREADED_RTS)
389     // Check whether we can run this thread in the current task.
390     // If not, we have to pass our capability to the right task.
391     {
392         Task *bound = t->bound;
393       
394         if (bound) {
395             if (bound == task) {
396                 debugTrace(DEBUG_sched,
397                            "### Running thread %lu in bound thread", (unsigned long)t->id);
398                 // yes, the Haskell thread is bound to the current native thread
399             } else {
400                 debugTrace(DEBUG_sched,
401                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
402                 // no, bound to a different Haskell thread: pass to that thread
403                 pushOnRunQueue(cap,t);
404                 continue;
405             }
406         } else {
407             // The thread we want to run is unbound.
408             if (task->tso) { 
409                 debugTrace(DEBUG_sched,
410                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
411                 // no, the current native thread is bound to a different
412                 // Haskell thread, so pass it to any worker thread
413                 pushOnRunQueue(cap,t);
414                 continue; 
415             }
416         }
417     }
418 #endif
419
420     // If we're shutting down, and this thread has not yet been
421     // killed, kill it now.  This sometimes happens when a finalizer
422     // thread is created by the final GC, or a thread previously
423     // in a foreign call returns.
424     if (sched_state >= SCHED_INTERRUPTING &&
425         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
426         deleteThread(cap,t);
427     }
428
429     /* context switches are initiated by the timer signal, unless
430      * the user specified "context switch as often as possible", with
431      * +RTS -C0
432      */
433     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
434         && !emptyThreadQueues(cap)) {
435         cap->context_switch = 1;
436     }
437          
438 run_thread:
439
440     // CurrentTSO is the thread to run.  t might be different if we
441     // loop back to run_thread, so make sure to set CurrentTSO after
442     // that.
443     cap->r.rCurrentTSO = t;
444
445     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
446                               (long)t->id, whatNext_strs[t->what_next]);
447
448     startHeapProfTimer();
449
450     // Check for exceptions blocked on this thread
451     maybePerformBlockedException (cap, t);
452
453     // ----------------------------------------------------------------------
454     // Run the current thread 
455
456     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
457     ASSERT(t->cap == cap);
458     ASSERT(t->bound ? t->bound->cap == cap : 1);
459
460     prev_what_next = t->what_next;
461
462     errno = t->saved_errno;
463 #if mingw32_HOST_OS
464     SetLastError(t->saved_winerror);
465 #endif
466
467     cap->in_haskell = rtsTrue;
468
469     dirty_TSO(cap,t);
470
471 #if defined(THREADED_RTS)
472     if (recent_activity == ACTIVITY_DONE_GC) {
473         // ACTIVITY_DONE_GC means we turned off the timer signal to
474         // conserve power (see #1623).  Re-enable it here.
475         nat prev;
476         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
477         if (prev == ACTIVITY_DONE_GC) {
478             startTimer();
479         }
480     } else {
481         recent_activity = ACTIVITY_YES;
482     }
483 #endif
484
485     postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
486
487     switch (prev_what_next) {
488         
489     case ThreadKilled:
490     case ThreadComplete:
491         /* Thread already finished, return to scheduler. */
492         ret = ThreadFinished;
493         break;
494         
495     case ThreadRunGHC:
496     {
497         StgRegTable *r;
498         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
499         cap = regTableToCapability(r);
500         ret = r->rRet;
501         break;
502     }
503     
504     case ThreadInterpret:
505         cap = interpretBCO(cap);
506         ret = cap->r.rRet;
507         break;
508         
509     default:
510         barf("schedule: invalid what_next field");
511     }
512
513     cap->in_haskell = rtsFalse;
514
515     // The TSO might have moved, eg. if it re-entered the RTS and a GC
516     // happened.  So find the new location:
517     t = cap->r.rCurrentTSO;
518
519     // We have run some Haskell code: there might be blackhole-blocked
520     // threads to wake up now.
521     // Lock-free test here should be ok, we're just setting a flag.
522     if ( blackhole_queue != END_TSO_QUEUE ) {
523         blackholes_need_checking = rtsTrue;
524     }
525     
526     // And save the current errno in this thread.
527     // XXX: possibly bogus for SMP because this thread might already
528     // be running again, see code below.
529     t->saved_errno = errno;
530 #if mingw32_HOST_OS
531     // Similarly for Windows error code
532     t->saved_winerror = GetLastError();
533 #endif
534
535     postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
536
537 #if defined(THREADED_RTS)
538     // If ret is ThreadBlocked, and this Task is bound to the TSO that
539     // blocked, we are in limbo - the TSO is now owned by whatever it
540     // is blocked on, and may in fact already have been woken up,
541     // perhaps even on a different Capability.  It may be the case
542     // that task->cap != cap.  We better yield this Capability
543     // immediately and return to normaility.
544     if (ret == ThreadBlocked) {
545         debugTrace(DEBUG_sched,
546                    "--<< thread %lu (%s) stopped: blocked",
547                    (unsigned long)t->id, whatNext_strs[t->what_next]);
548         goto yield;
549     }
550 #endif
551
552     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
553     ASSERT(t->cap == cap);
554
555     // ----------------------------------------------------------------------
556     
557     // Costs for the scheduler are assigned to CCS_SYSTEM
558     stopHeapProfTimer();
559 #if defined(PROFILING)
560     CCCS = CCS_SYSTEM;
561 #endif
562     
563     schedulePostRunThread(cap,t);
564
565     if (ret != StackOverflow) {
566         t = threadStackUnderflow(task,t);
567     }
568
569     ready_to_gc = rtsFalse;
570
571     switch (ret) {
572     case HeapOverflow:
573         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
574         break;
575
576     case StackOverflow:
577         scheduleHandleStackOverflow(cap,task,t);
578         break;
579
580     case ThreadYielding:
581         if (scheduleHandleYield(cap, t, prev_what_next)) {
582             // shortcut for switching between compiler/interpreter:
583             goto run_thread; 
584         }
585         break;
586
587     case ThreadBlocked:
588         scheduleHandleThreadBlocked(t);
589         break;
590
591     case ThreadFinished:
592         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
593         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
594         break;
595
596     default:
597       barf("schedule: invalid thread return code %d", (int)ret);
598     }
599
600     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
601       cap = scheduleDoGC(cap,task,rtsFalse);
602     }
603   } /* end of while() */
604 }
605
606 /* ----------------------------------------------------------------------------
607  * Setting up the scheduler loop
608  * ------------------------------------------------------------------------- */
609
610 static void
611 schedulePreLoop(void)
612 {
613   // initialisation for scheduler - what cannot go into initScheduler()  
614 }
615
616 /* -----------------------------------------------------------------------------
617  * scheduleFindWork()
618  *
619  * Search for work to do, and handle messages from elsewhere.
620  * -------------------------------------------------------------------------- */
621
622 static void
623 scheduleFindWork (Capability *cap)
624 {
625     scheduleStartSignalHandlers(cap);
626
627     // Only check the black holes here if we've nothing else to do.
628     // During normal execution, the black hole list only gets checked
629     // at GC time, to avoid repeatedly traversing this possibly long
630     // list each time around the scheduler.
631     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
632
633     scheduleCheckWakeupThreads(cap);
634
635     scheduleCheckBlockedThreads(cap);
636
637 #if defined(THREADED_RTS)
638     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
639 #endif
640 }
641
642 #if defined(THREADED_RTS)
643 STATIC_INLINE rtsBool
644 shouldYieldCapability (Capability *cap, Task *task)
645 {
646     // we need to yield this capability to someone else if..
647     //   - another thread is initiating a GC
648     //   - another Task is returning from a foreign call
649     //   - the thread at the head of the run queue cannot be run
650     //     by this Task (it is bound to another Task, or it is unbound
651     //     and this task it bound).
652     return (waiting_for_gc || 
653             cap->returning_tasks_hd != NULL ||
654             (!emptyRunQueue(cap) && (task->tso == NULL
655                                      ? cap->run_queue_hd->bound != NULL
656                                      : cap->run_queue_hd->bound != task)));
657 }
658
659 // This is the single place where a Task goes to sleep.  There are
660 // two reasons it might need to sleep:
661 //    - there are no threads to run
662 //    - we need to yield this Capability to someone else 
663 //      (see shouldYieldCapability())
664 //
665 // Careful: the scheduler loop is quite delicate.  Make sure you run
666 // the tests in testsuite/concurrent (all ways) after modifying this,
667 // and also check the benchmarks in nofib/parallel for regressions.
668
669 static void
670 scheduleYield (Capability **pcap, Task *task)
671 {
672     Capability *cap = *pcap;
673
674     // if we have work, and we don't need to give up the Capability, continue.
675     if (!shouldYieldCapability(cap,task) && 
676         (!emptyRunQueue(cap) ||
677          !emptyWakeupQueue(cap) ||
678          blackholes_need_checking ||
679          sched_state >= SCHED_INTERRUPTING))
680         return;
681
682     // otherwise yield (sleep), and keep yielding if necessary.
683     do {
684         yieldCapability(&cap,task);
685     } 
686     while (shouldYieldCapability(cap,task));
687
688     // note there may still be no threads on the run queue at this
689     // point, the caller has to check.
690
691     *pcap = cap;
692     return;
693 }
694 #endif
695     
696 /* -----------------------------------------------------------------------------
697  * schedulePushWork()
698  *
699  * Push work to other Capabilities if we have some.
700  * -------------------------------------------------------------------------- */
701
702 static void
703 schedulePushWork(Capability *cap USED_IF_THREADS, 
704                  Task *task      USED_IF_THREADS)
705 {
706   /* following code not for PARALLEL_HASKELL. I kept the call general,
707      future GUM versions might use pushing in a distributed setup */
708 #if defined(THREADED_RTS)
709
710     Capability *free_caps[n_capabilities], *cap0;
711     nat i, n_free_caps;
712
713     // migration can be turned off with +RTS -qg
714     if (!RtsFlags.ParFlags.migrate) return;
715
716     // Check whether we have more threads on our run queue, or sparks
717     // in our pool, that we could hand to another Capability.
718     if (cap->run_queue_hd == END_TSO_QUEUE) {
719         if (sparkPoolSizeCap(cap) < 2) return;
720     } else {
721         if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
722             sparkPoolSizeCap(cap) < 1) return;
723     }
724
725     // First grab as many free Capabilities as we can.
726     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
727         cap0 = &capabilities[i];
728         if (cap != cap0 && tryGrabCapability(cap0,task)) {
729             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
730                 // it already has some work, we just grabbed it at 
731                 // the wrong moment.  Or maybe it's deadlocked!
732                 releaseCapability(cap0);
733             } else {
734                 free_caps[n_free_caps++] = cap0;
735             }
736         }
737     }
738
739     // we now have n_free_caps free capabilities stashed in
740     // free_caps[].  Share our run queue equally with them.  This is
741     // probably the simplest thing we could do; improvements we might
742     // want to do include:
743     //
744     //   - giving high priority to moving relatively new threads, on 
745     //     the gournds that they haven't had time to build up a
746     //     working set in the cache on this CPU/Capability.
747     //
748     //   - giving low priority to moving long-lived threads
749
750     if (n_free_caps > 0) {
751         StgTSO *prev, *t, *next;
752         rtsBool pushed_to_all;
753
754         debugTrace(DEBUG_sched, 
755                    "cap %d: %s and %d free capabilities, sharing...", 
756                    cap->no, 
757                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
758                    "excess threads on run queue":"sparks to share (>=2)",
759                    n_free_caps);
760
761         i = 0;
762         pushed_to_all = rtsFalse;
763
764         if (cap->run_queue_hd != END_TSO_QUEUE) {
765             prev = cap->run_queue_hd;
766             t = prev->_link;
767             prev->_link = END_TSO_QUEUE;
768             for (; t != END_TSO_QUEUE; t = next) {
769                 next = t->_link;
770                 t->_link = END_TSO_QUEUE;
771                 if (t->what_next == ThreadRelocated
772                     || t->bound == task // don't move my bound thread
773                     || tsoLocked(t)) {  // don't move a locked thread
774                     setTSOLink(cap, prev, t);
775                     prev = t;
776                 } else if (i == n_free_caps) {
777                     pushed_to_all = rtsTrue;
778                     i = 0;
779                     // keep one for us
780                     setTSOLink(cap, prev, t);
781                     prev = t;
782                 } else {
783                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
784                     appendToRunQueue(free_caps[i],t);
785
786         postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
787
788                     if (t->bound) { t->bound->cap = free_caps[i]; }
789                     t->cap = free_caps[i];
790                     i++;
791                 }
792             }
793             cap->run_queue_tl = prev;
794         }
795
796 #ifdef SPARK_PUSHING
797         /* JB I left this code in place, it would work but is not necessary */
798
799         // If there are some free capabilities that we didn't push any
800         // threads to, then try to push a spark to each one.
801         if (!pushed_to_all) {
802             StgClosure *spark;
803             // i is the next free capability to push to
804             for (; i < n_free_caps; i++) {
805                 if (emptySparkPoolCap(free_caps[i])) {
806                     spark = tryStealSpark(cap->sparks);
807                     if (spark != NULL) {
808                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
809
810       postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no);
811
812                         newSpark(&(free_caps[i]->r), spark);
813                     }
814                 }
815             }
816         }
817 #endif /* SPARK_PUSHING */
818
819         // release the capabilities
820         for (i = 0; i < n_free_caps; i++) {
821             task->cap = free_caps[i];
822             releaseAndWakeupCapability(free_caps[i]);
823         }
824     }
825     task->cap = cap; // reset to point to our Capability.
826
827 #endif /* THREADED_RTS */
828
829 }
830
831 /* ----------------------------------------------------------------------------
832  * Start any pending signal handlers
833  * ------------------------------------------------------------------------- */
834
835 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
836 static void
837 scheduleStartSignalHandlers(Capability *cap)
838 {
839     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
840         // safe outside the lock
841         startSignalHandlers(cap);
842     }
843 }
844 #else
845 static void
846 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
847 {
848 }
849 #endif
850
851 /* ----------------------------------------------------------------------------
852  * Check for blocked threads that can be woken up.
853  * ------------------------------------------------------------------------- */
854
855 static void
856 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
857 {
858 #if !defined(THREADED_RTS)
859     //
860     // Check whether any waiting threads need to be woken up.  If the
861     // run queue is empty, and there are no other tasks running, we
862     // can wait indefinitely for something to happen.
863     //
864     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
865     {
866         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
867     }
868 #endif
869 }
870
871
872 /* ----------------------------------------------------------------------------
873  * Check for threads woken up by other Capabilities
874  * ------------------------------------------------------------------------- */
875
876 static void
877 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
878 {
879 #if defined(THREADED_RTS)
880     // Any threads that were woken up by other Capabilities get
881     // appended to our run queue.
882     if (!emptyWakeupQueue(cap)) {
883         ACQUIRE_LOCK(&cap->lock);
884         if (emptyRunQueue(cap)) {
885             cap->run_queue_hd = cap->wakeup_queue_hd;
886             cap->run_queue_tl = cap->wakeup_queue_tl;
887         } else {
888             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
889             cap->run_queue_tl = cap->wakeup_queue_tl;
890         }
891         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
892         RELEASE_LOCK(&cap->lock);
893     }
894 #endif
895 }
896
897 /* ----------------------------------------------------------------------------
898  * Check for threads blocked on BLACKHOLEs that can be woken up
899  * ------------------------------------------------------------------------- */
900 static void
901 scheduleCheckBlackHoles (Capability *cap)
902 {
903     if ( blackholes_need_checking ) // check without the lock first
904     {
905         ACQUIRE_LOCK(&sched_mutex);
906         if ( blackholes_need_checking ) {
907             blackholes_need_checking = rtsFalse;
908             // important that we reset the flag *before* checking the
909             // blackhole queue, otherwise we could get deadlock.  This
910             // happens as follows: we wake up a thread that
911             // immediately runs on another Capability, blocks on a
912             // blackhole, and then we reset the blackholes_need_checking flag.
913             checkBlackHoles(cap);
914         }
915         RELEASE_LOCK(&sched_mutex);
916     }
917 }
918
919 /* ----------------------------------------------------------------------------
920  * Detect deadlock conditions and attempt to resolve them.
921  * ------------------------------------------------------------------------- */
922
923 static void
924 scheduleDetectDeadlock (Capability *cap, Task *task)
925 {
926     /* 
927      * Detect deadlock: when we have no threads to run, there are no
928      * threads blocked, waiting for I/O, or sleeping, and all the
929      * other tasks are waiting for work, we must have a deadlock of
930      * some description.
931      */
932     if ( emptyThreadQueues(cap) )
933     {
934 #if defined(THREADED_RTS)
935         /* 
936          * In the threaded RTS, we only check for deadlock if there
937          * has been no activity in a complete timeslice.  This means
938          * we won't eagerly start a full GC just because we don't have
939          * any threads to run currently.
940          */
941         if (recent_activity != ACTIVITY_INACTIVE) return;
942 #endif
943
944         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
945
946         // Garbage collection can release some new threads due to
947         // either (a) finalizers or (b) threads resurrected because
948         // they are unreachable and will therefore be sent an
949         // exception.  Any threads thus released will be immediately
950         // runnable.
951         cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
952         // when force_major == rtsTrue. scheduleDoGC sets
953         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
954         // signal.
955
956         if ( !emptyRunQueue(cap) ) return;
957
958 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
959         /* If we have user-installed signal handlers, then wait
960          * for signals to arrive rather then bombing out with a
961          * deadlock.
962          */
963         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
964             debugTrace(DEBUG_sched,
965                        "still deadlocked, waiting for signals...");
966
967             awaitUserSignals();
968
969             if (signals_pending()) {
970                 startSignalHandlers(cap);
971             }
972
973             // either we have threads to run, or we were interrupted:
974             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
975
976             return;
977         }
978 #endif
979
980 #if !defined(THREADED_RTS)
981         /* Probably a real deadlock.  Send the current main thread the
982          * Deadlock exception.
983          */
984         if (task->tso) {
985             switch (task->tso->why_blocked) {
986             case BlockedOnSTM:
987             case BlockedOnBlackHole:
988             case BlockedOnException:
989             case BlockedOnMVar:
990                 throwToSingleThreaded(cap, task->tso, 
991                                       (StgClosure *)nonTermination_closure);
992                 return;
993             default:
994                 barf("deadlock: main thread blocked in a strange way");
995             }
996         }
997         return;
998 #endif
999     }
1000 }
1001
1002
1003 /* ----------------------------------------------------------------------------
1004  * Send pending messages (PARALLEL_HASKELL only)
1005  * ------------------------------------------------------------------------- */
1006
1007 #if defined(PARALLEL_HASKELL)
1008 static void
1009 scheduleSendPendingMessages(void)
1010 {
1011
1012 # if defined(PAR) // global Mem.Mgmt., omit for now
1013     if (PendingFetches != END_BF_QUEUE) {
1014         processFetches();
1015     }
1016 # endif
1017     
1018     if (RtsFlags.ParFlags.BufferTime) {
1019         // if we use message buffering, we must send away all message
1020         // packets which have become too old...
1021         sendOldBuffers(); 
1022     }
1023 }
1024 #endif
1025
1026 /* ----------------------------------------------------------------------------
1027  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1028  * ------------------------------------------------------------------------- */
1029
1030 #if defined(THREADED_RTS)
1031 static void
1032 scheduleActivateSpark(Capability *cap)
1033 {
1034     if (anySparks())
1035     {
1036         createSparkThread(cap);
1037         debugTrace(DEBUG_sched, "creating a spark thread");
1038     }
1039 }
1040 #endif // PARALLEL_HASKELL || THREADED_RTS
1041
1042 /* ----------------------------------------------------------------------------
1043  * After running a thread...
1044  * ------------------------------------------------------------------------- */
1045
1046 static void
1047 schedulePostRunThread (Capability *cap, StgTSO *t)
1048 {
1049     // We have to be able to catch transactions that are in an
1050     // infinite loop as a result of seeing an inconsistent view of
1051     // memory, e.g. 
1052     //
1053     //   atomically $ do
1054     //       [a,b] <- mapM readTVar [ta,tb]
1055     //       when (a == b) loop
1056     //
1057     // and a is never equal to b given a consistent view of memory.
1058     //
1059     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1060         if (!stmValidateNestOfTransactions (t -> trec)) {
1061             debugTrace(DEBUG_sched | DEBUG_stm,
1062                        "trec %p found wasting its time", t);
1063             
1064             // strip the stack back to the
1065             // ATOMICALLY_FRAME, aborting the (nested)
1066             // transaction, and saving the stack of any
1067             // partially-evaluated thunks on the heap.
1068             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1069             
1070             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1071         }
1072     }
1073
1074   /* some statistics gathering in the parallel case */
1075 }
1076
1077 /* -----------------------------------------------------------------------------
1078  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1079  * -------------------------------------------------------------------------- */
1080
1081 static rtsBool
1082 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1083 {
1084     // did the task ask for a large block?
1085     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1086         // if so, get one and push it on the front of the nursery.
1087         bdescr *bd;
1088         lnat blocks;
1089         
1090         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1091         
1092         debugTrace(DEBUG_sched,
1093                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1094                    (long)t->id, whatNext_strs[t->what_next], blocks);
1095     
1096         // don't do this if the nursery is (nearly) full, we'll GC first.
1097         if (cap->r.rCurrentNursery->link != NULL ||
1098             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1099                                                // if the nursery has only one block.
1100             
1101             ACQUIRE_SM_LOCK
1102             bd = allocGroup( blocks );
1103             RELEASE_SM_LOCK
1104             cap->r.rNursery->n_blocks += blocks;
1105             
1106             // link the new group into the list
1107             bd->link = cap->r.rCurrentNursery;
1108             bd->u.back = cap->r.rCurrentNursery->u.back;
1109             if (cap->r.rCurrentNursery->u.back != NULL) {
1110                 cap->r.rCurrentNursery->u.back->link = bd;
1111             } else {
1112 #if !defined(THREADED_RTS)
1113                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1114                        g0s0 == cap->r.rNursery);
1115 #endif
1116                 cap->r.rNursery->blocks = bd;
1117             }             
1118             cap->r.rCurrentNursery->u.back = bd;
1119             
1120             // initialise it as a nursery block.  We initialise the
1121             // step, gen_no, and flags field of *every* sub-block in
1122             // this large block, because this is easier than making
1123             // sure that we always find the block head of a large
1124             // block whenever we call Bdescr() (eg. evacuate() and
1125             // isAlive() in the GC would both have to do this, at
1126             // least).
1127             { 
1128                 bdescr *x;
1129                 for (x = bd; x < bd + blocks; x++) {
1130                     x->step = cap->r.rNursery;
1131                     x->gen_no = 0;
1132                     x->flags = 0;
1133                 }
1134             }
1135             
1136             // This assert can be a killer if the app is doing lots
1137             // of large block allocations.
1138             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1139             
1140             // now update the nursery to point to the new block
1141             cap->r.rCurrentNursery = bd;
1142             
1143             // we might be unlucky and have another thread get on the
1144             // run queue before us and steal the large block, but in that
1145             // case the thread will just end up requesting another large
1146             // block.
1147             pushOnRunQueue(cap,t);
1148             return rtsFalse;  /* not actually GC'ing */
1149         }
1150     }
1151     
1152     debugTrace(DEBUG_sched,
1153                "--<< thread %ld (%s) stopped: HeapOverflow",
1154                (long)t->id, whatNext_strs[t->what_next]);
1155
1156     if (cap->r.rHpLim == NULL || cap->context_switch) {
1157         // Sometimes we miss a context switch, e.g. when calling
1158         // primitives in a tight loop, MAYBE_GC() doesn't check the
1159         // context switch flag, and we end up waiting for a GC.
1160         // See #1984, and concurrent/should_run/1984
1161         cap->context_switch = 0;
1162         addToRunQueue(cap,t);
1163     } else {
1164         pushOnRunQueue(cap,t);
1165     }
1166     return rtsTrue;
1167     /* actual GC is done at the end of the while loop in schedule() */
1168 }
1169
1170 /* -----------------------------------------------------------------------------
1171  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1172  * -------------------------------------------------------------------------- */
1173
1174 static void
1175 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1176 {
1177     debugTrace (DEBUG_sched,
1178                 "--<< thread %ld (%s) stopped, StackOverflow", 
1179                 (long)t->id, whatNext_strs[t->what_next]);
1180
1181     /* just adjust the stack for this thread, then pop it back
1182      * on the run queue.
1183      */
1184     { 
1185         /* enlarge the stack */
1186         StgTSO *new_t = threadStackOverflow(cap, t);
1187         
1188         /* The TSO attached to this Task may have moved, so update the
1189          * pointer to it.
1190          */
1191         if (task->tso == t) {
1192             task->tso = new_t;
1193         }
1194         pushOnRunQueue(cap,new_t);
1195     }
1196 }
1197
1198 /* -----------------------------------------------------------------------------
1199  * Handle a thread that returned to the scheduler with ThreadYielding
1200  * -------------------------------------------------------------------------- */
1201
1202 static rtsBool
1203 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1204 {
1205     // Reset the context switch flag.  We don't do this just before
1206     // running the thread, because that would mean we would lose ticks
1207     // during GC, which can lead to unfair scheduling (a thread hogs
1208     // the CPU because the tick always arrives during GC).  This way
1209     // penalises threads that do a lot of allocation, but that seems
1210     // better than the alternative.
1211     cap->context_switch = 0;
1212     
1213     /* put the thread back on the run queue.  Then, if we're ready to
1214      * GC, check whether this is the last task to stop.  If so, wake
1215      * up the GC thread.  getThread will block during a GC until the
1216      * GC is finished.
1217      */
1218 #ifdef DEBUG
1219     if (t->what_next != prev_what_next) {
1220         debugTrace(DEBUG_sched,
1221                    "--<< thread %ld (%s) stopped to switch evaluators", 
1222                    (long)t->id, whatNext_strs[t->what_next]);
1223     } else {
1224         debugTrace(DEBUG_sched,
1225                    "--<< thread %ld (%s) stopped, yielding",
1226                    (long)t->id, whatNext_strs[t->what_next]);
1227     }
1228 #endif
1229     
1230     IF_DEBUG(sanity,
1231              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1232              checkTSO(t));
1233     ASSERT(t->_link == END_TSO_QUEUE);
1234     
1235     // Shortcut if we're just switching evaluators: don't bother
1236     // doing stack squeezing (which can be expensive), just run the
1237     // thread.
1238     if (t->what_next != prev_what_next) {
1239         return rtsTrue;
1240     }
1241
1242     addToRunQueue(cap,t);
1243
1244     return rtsFalse;
1245 }
1246
1247 /* -----------------------------------------------------------------------------
1248  * Handle a thread that returned to the scheduler with ThreadBlocked
1249  * -------------------------------------------------------------------------- */
1250
1251 static void
1252 scheduleHandleThreadBlocked( StgTSO *t
1253 #if !defined(DEBUG)
1254     STG_UNUSED
1255 #endif
1256     )
1257 {
1258
1259       // We don't need to do anything.  The thread is blocked, and it
1260       // has tidied up its stack and placed itself on whatever queue
1261       // it needs to be on.
1262
1263     // ASSERT(t->why_blocked != NotBlocked);
1264     // Not true: for example,
1265     //    - in THREADED_RTS, the thread may already have been woken
1266     //      up by another Capability.  This actually happens: try
1267     //      conc023 +RTS -N2.
1268     //    - the thread may have woken itself up already, because
1269     //      threadPaused() might have raised a blocked throwTo
1270     //      exception, see maybePerformBlockedException().
1271
1272 #ifdef DEBUG
1273     if (traceClass(DEBUG_sched)) {
1274         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1275                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1276         printThreadBlockage(t);
1277         debugTraceEnd();
1278     }
1279 #endif
1280 }
1281
1282 /* -----------------------------------------------------------------------------
1283  * Handle a thread that returned to the scheduler with ThreadFinished
1284  * -------------------------------------------------------------------------- */
1285
1286 static rtsBool
1287 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1288 {
1289     /* Need to check whether this was a main thread, and if so,
1290      * return with the return value.
1291      *
1292      * We also end up here if the thread kills itself with an
1293      * uncaught exception, see Exception.cmm.
1294      */
1295     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1296                (unsigned long)t->id, whatNext_strs[t->what_next]);
1297
1298     // blocked exceptions can now complete, even if the thread was in
1299     // blocked mode (see #2910).  This unconditionally calls
1300     // lockTSO(), which ensures that we don't miss any threads that
1301     // are engaged in throwTo() with this thread as a target.
1302     awakenBlockedExceptionQueue (cap, t);
1303
1304       //
1305       // Check whether the thread that just completed was a bound
1306       // thread, and if so return with the result.  
1307       //
1308       // There is an assumption here that all thread completion goes
1309       // through this point; we need to make sure that if a thread
1310       // ends up in the ThreadKilled state, that it stays on the run
1311       // queue so it can be dealt with here.
1312       //
1313
1314       if (t->bound) {
1315
1316           if (t->bound != task) {
1317 #if !defined(THREADED_RTS)
1318               // Must be a bound thread that is not the topmost one.  Leave
1319               // it on the run queue until the stack has unwound to the
1320               // point where we can deal with this.  Leaving it on the run
1321               // queue also ensures that the garbage collector knows about
1322               // this thread and its return value (it gets dropped from the
1323               // step->threads list so there's no other way to find it).
1324               appendToRunQueue(cap,t);
1325               return rtsFalse;
1326 #else
1327               // this cannot happen in the threaded RTS, because a
1328               // bound thread can only be run by the appropriate Task.
1329               barf("finished bound thread that isn't mine");
1330 #endif
1331           }
1332
1333           ASSERT(task->tso == t);
1334
1335           if (t->what_next == ThreadComplete) {
1336               if (task->ret) {
1337                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1338                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1339               }
1340               task->stat = Success;
1341           } else {
1342               if (task->ret) {
1343                   *(task->ret) = NULL;
1344               }
1345               if (sched_state >= SCHED_INTERRUPTING) {
1346                   if (heap_overflow) {
1347                       task->stat = HeapExhausted;
1348                   } else {
1349                       task->stat = Interrupted;
1350                   }
1351               } else {
1352                   task->stat = Killed;
1353               }
1354           }
1355 #ifdef DEBUG
1356           removeThreadLabel((StgWord)task->tso->id);
1357 #endif
1358           return rtsTrue; // tells schedule() to return
1359       }
1360
1361       return rtsFalse;
1362 }
1363
1364 /* -----------------------------------------------------------------------------
1365  * Perform a heap census
1366  * -------------------------------------------------------------------------- */
1367
1368 static rtsBool
1369 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1370 {
1371     // When we have +RTS -i0 and we're heap profiling, do a census at
1372     // every GC.  This lets us get repeatable runs for debugging.
1373     if (performHeapProfile ||
1374         (RtsFlags.ProfFlags.profileInterval==0 &&
1375          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1376         return rtsTrue;
1377     } else {
1378         return rtsFalse;
1379     }
1380 }
1381
1382 /* -----------------------------------------------------------------------------
1383  * Perform a garbage collection if necessary
1384  * -------------------------------------------------------------------------- */
1385
1386 static Capability *
1387 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1388 {
1389     rtsBool heap_census;
1390 #ifdef THREADED_RTS
1391     /* extern static volatile StgWord waiting_for_gc; 
1392        lives inside capability.c */
1393     rtsBool gc_type, prev_pending_gc;
1394     nat i;
1395 #endif
1396
1397     if (sched_state == SCHED_SHUTTING_DOWN) {
1398         // The final GC has already been done, and the system is
1399         // shutting down.  We'll probably deadlock if we try to GC
1400         // now.
1401         return cap;
1402     }
1403
1404 #ifdef THREADED_RTS
1405     if (sched_state < SCHED_INTERRUPTING
1406         && RtsFlags.ParFlags.parGcEnabled
1407         && N >= RtsFlags.ParFlags.parGcGen
1408         && ! oldest_gen->steps[0].mark)
1409     {
1410         gc_type = PENDING_GC_PAR;
1411     } else {
1412         gc_type = PENDING_GC_SEQ;
1413     }
1414
1415     // In order to GC, there must be no threads running Haskell code.
1416     // Therefore, the GC thread needs to hold *all* the capabilities,
1417     // and release them after the GC has completed.  
1418     //
1419     // This seems to be the simplest way: previous attempts involved
1420     // making all the threads with capabilities give up their
1421     // capabilities and sleep except for the *last* one, which
1422     // actually did the GC.  But it's quite hard to arrange for all
1423     // the other tasks to sleep and stay asleep.
1424     //
1425
1426     /*  Other capabilities are prevented from running yet more Haskell
1427         threads if waiting_for_gc is set. Tested inside
1428         yieldCapability() and releaseCapability() in Capability.c */
1429
1430     prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1431     if (prev_pending_gc) {
1432         do {
1433             debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
1434                        prev_pending_gc);
1435             ASSERT(cap);
1436             yieldCapability(&cap,task);
1437         } while (waiting_for_gc);
1438         return cap;  // NOTE: task->cap might have changed here
1439     }
1440
1441     setContextSwitches();
1442
1443     // The final shutdown GC is always single-threaded, because it's
1444     // possible that some of the Capabilities have no worker threads.
1445     
1446     if (gc_type == PENDING_GC_SEQ)
1447     {
1448         postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
1449         // single-threaded GC: grab all the capabilities
1450         for (i=0; i < n_capabilities; i++) {
1451             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1452             if (cap != &capabilities[i]) {
1453                 Capability *pcap = &capabilities[i];
1454                 // we better hope this task doesn't get migrated to
1455                 // another Capability while we're waiting for this one.
1456                 // It won't, because load balancing happens while we have
1457                 // all the Capabilities, but even so it's a slightly
1458                 // unsavoury invariant.
1459                 task->cap = pcap;
1460                 waitForReturnCapability(&pcap, task);
1461                 if (pcap != &capabilities[i]) {
1462                     barf("scheduleDoGC: got the wrong capability");
1463                 }
1464             }
1465         }
1466     }
1467     else
1468     {
1469         // multi-threaded GC: make sure all the Capabilities donate one
1470         // GC thread each.
1471         postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
1472         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1473
1474         waitForGcThreads(cap);
1475     }
1476 #endif
1477
1478     // so this happens periodically:
1479     if (cap) scheduleCheckBlackHoles(cap);
1480     
1481     IF_DEBUG(scheduler, printAllThreads());
1482
1483 delete_threads_and_gc:
1484     /*
1485      * We now have all the capabilities; if we're in an interrupting
1486      * state, then we should take the opportunity to delete all the
1487      * threads in the system.
1488      */
1489     if (sched_state == SCHED_INTERRUPTING) {
1490         deleteAllThreads(cap);
1491         sched_state = SCHED_SHUTTING_DOWN;
1492     }
1493     
1494     heap_census = scheduleNeedHeapProfile(rtsTrue);
1495
1496 #if defined(THREADED_RTS)
1497     postEvent(cap, EVENT_GC_START, 0, 0);
1498     debugTrace(DEBUG_sched, "doing GC");
1499     // reset waiting_for_gc *before* GC, so that when the GC threads
1500     // emerge they don't immediately re-enter the GC.
1501     waiting_for_gc = 0;
1502     GarbageCollect(force_major || heap_census, gc_type, cap);
1503 #else
1504     GarbageCollect(force_major || heap_census, 0, cap);
1505 #endif
1506     postEvent(cap, EVENT_GC_END, 0, 0);
1507
1508     if (recent_activity == ACTIVITY_INACTIVE && force_major)
1509     {
1510         // We are doing a GC because the system has been idle for a
1511         // timeslice and we need to check for deadlock.  Record the
1512         // fact that we've done a GC and turn off the timer signal;
1513         // it will get re-enabled if we run any threads after the GC.
1514         recent_activity = ACTIVITY_DONE_GC;
1515         stopTimer();
1516     }
1517     else
1518     {
1519         // the GC might have taken long enough for the timer to set
1520         // recent_activity = ACTIVITY_INACTIVE, but we aren't
1521         // necessarily deadlocked:
1522         recent_activity = ACTIVITY_YES;
1523     }
1524
1525 #if defined(THREADED_RTS)
1526     if (gc_type == PENDING_GC_PAR)
1527     {
1528         releaseGCThreads(cap);
1529     }
1530 #endif
1531
1532     if (heap_census) {
1533         debugTrace(DEBUG_sched, "performing heap census");
1534         heapCensus();
1535         performHeapProfile = rtsFalse;
1536     }
1537
1538     if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1539         // GC set the heap_overflow flag, so we should proceed with
1540         // an orderly shutdown now.  Ultimately we want the main
1541         // thread to return to its caller with HeapExhausted, at which
1542         // point the caller should call hs_exit().  The first step is
1543         // to delete all the threads.
1544         //
1545         // Another way to do this would be to raise an exception in
1546         // the main thread, which we really should do because it gives
1547         // the program a chance to clean up.  But how do we find the
1548         // main thread?  It should presumably be the same one that
1549         // gets ^C exceptions, but that's all done on the Haskell side
1550         // (GHC.TopHandler).
1551         sched_state = SCHED_INTERRUPTING;
1552         goto delete_threads_and_gc;
1553     }
1554
1555 #ifdef SPARKBALANCE
1556     /* JB 
1557        Once we are all together... this would be the place to balance all
1558        spark pools. No concurrent stealing or adding of new sparks can
1559        occur. Should be defined in Sparks.c. */
1560     balanceSparkPoolsCaps(n_capabilities, capabilities);
1561 #endif
1562
1563 #if defined(THREADED_RTS)
1564     if (gc_type == PENDING_GC_SEQ) {
1565         // release our stash of capabilities.
1566         for (i = 0; i < n_capabilities; i++) {
1567             if (cap != &capabilities[i]) {
1568                 task->cap = &capabilities[i];
1569                 releaseCapability(&capabilities[i]);
1570             }
1571         }
1572     }
1573     if (cap) {
1574         task->cap = cap;
1575     } else {
1576         task->cap = NULL;
1577     }
1578 #endif
1579
1580     return cap;
1581 }
1582
1583 /* ---------------------------------------------------------------------------
1584  * Singleton fork(). Do not copy any running threads.
1585  * ------------------------------------------------------------------------- */
1586
1587 pid_t
1588 forkProcess(HsStablePtr *entry
1589 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1590             STG_UNUSED
1591 #endif
1592            )
1593 {
1594 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1595     Task *task;
1596     pid_t pid;
1597     StgTSO* t,*next;
1598     Capability *cap;
1599     nat s;
1600     
1601 #if defined(THREADED_RTS)
1602     if (RtsFlags.ParFlags.nNodes > 1) {
1603         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1604         stg_exit(EXIT_FAILURE);
1605     }
1606 #endif
1607
1608     debugTrace(DEBUG_sched, "forking!");
1609     
1610     // ToDo: for SMP, we should probably acquire *all* the capabilities
1611     cap = rts_lock();
1612     
1613     // no funny business: hold locks while we fork, otherwise if some
1614     // other thread is holding a lock when the fork happens, the data
1615     // structure protected by the lock will forever be in an
1616     // inconsistent state in the child.  See also #1391.
1617     ACQUIRE_LOCK(&sched_mutex);
1618     ACQUIRE_LOCK(&cap->lock);
1619     ACQUIRE_LOCK(&cap->running_task->lock);
1620
1621     pid = fork();
1622     
1623     if (pid) { // parent
1624         
1625         RELEASE_LOCK(&sched_mutex);
1626         RELEASE_LOCK(&cap->lock);
1627         RELEASE_LOCK(&cap->running_task->lock);
1628
1629         // just return the pid
1630         rts_unlock(cap);
1631         return pid;
1632         
1633     } else { // child
1634         
1635 #if defined(THREADED_RTS)
1636         initMutex(&sched_mutex);
1637         initMutex(&cap->lock);
1638         initMutex(&cap->running_task->lock);
1639 #endif
1640
1641         // Now, all OS threads except the thread that forked are
1642         // stopped.  We need to stop all Haskell threads, including
1643         // those involved in foreign calls.  Also we need to delete
1644         // all Tasks, because they correspond to OS threads that are
1645         // now gone.
1646
1647         for (s = 0; s < total_steps; s++) {
1648           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1649             if (t->what_next == ThreadRelocated) {
1650                 next = t->_link;
1651             } else {
1652                 next = t->global_link;
1653                 // don't allow threads to catch the ThreadKilled
1654                 // exception, but we do want to raiseAsync() because these
1655                 // threads may be evaluating thunks that we need later.
1656                 deleteThread_(cap,t);
1657             }
1658           }
1659         }
1660         
1661         // Empty the run queue.  It seems tempting to let all the
1662         // killed threads stay on the run queue as zombies to be
1663         // cleaned up later, but some of them correspond to bound
1664         // threads for which the corresponding Task does not exist.
1665         cap->run_queue_hd = END_TSO_QUEUE;
1666         cap->run_queue_tl = END_TSO_QUEUE;
1667
1668         // Any suspended C-calling Tasks are no more, their OS threads
1669         // don't exist now:
1670         cap->suspended_ccalling_tasks = NULL;
1671
1672         // Empty the threads lists.  Otherwise, the garbage
1673         // collector may attempt to resurrect some of these threads.
1674         for (s = 0; s < total_steps; s++) {
1675             all_steps[s].threads = END_TSO_QUEUE;
1676         }
1677
1678         // Wipe the task list, except the current Task.
1679         ACQUIRE_LOCK(&sched_mutex);
1680         for (task = all_tasks; task != NULL; task=task->all_link) {
1681             if (task != cap->running_task) {
1682 #if defined(THREADED_RTS)
1683                 initMutex(&task->lock); // see #1391
1684 #endif
1685                 discardTask(task);
1686             }
1687         }
1688         RELEASE_LOCK(&sched_mutex);
1689
1690 #if defined(THREADED_RTS)
1691         // Wipe our spare workers list, they no longer exist.  New
1692         // workers will be created if necessary.
1693         cap->spare_workers = NULL;
1694         cap->returning_tasks_hd = NULL;
1695         cap->returning_tasks_tl = NULL;
1696 #endif
1697
1698         // On Unix, all timers are reset in the child, so we need to start
1699         // the timer again.
1700         initTimer();
1701         startTimer();
1702
1703         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1704         rts_checkSchedStatus("forkProcess",cap);
1705         
1706         rts_unlock(cap);
1707         hs_exit();                      // clean up and exit
1708         stg_exit(EXIT_SUCCESS);
1709     }
1710 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1711     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1712 #endif
1713 }
1714
1715 /* ---------------------------------------------------------------------------
1716  * Delete all the threads in the system
1717  * ------------------------------------------------------------------------- */
1718    
1719 static void
1720 deleteAllThreads ( Capability *cap )
1721 {
1722     // NOTE: only safe to call if we own all capabilities.
1723
1724     StgTSO* t, *next;
1725     nat s;
1726
1727     debugTrace(DEBUG_sched,"deleting all threads");
1728     for (s = 0; s < total_steps; s++) {
1729       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1730         if (t->what_next == ThreadRelocated) {
1731             next = t->_link;
1732         } else {
1733             next = t->global_link;
1734             deleteThread(cap,t);
1735         }
1736       }
1737     }      
1738
1739     // The run queue now contains a bunch of ThreadKilled threads.  We
1740     // must not throw these away: the main thread(s) will be in there
1741     // somewhere, and the main scheduler loop has to deal with it.
1742     // Also, the run queue is the only thing keeping these threads from
1743     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1744
1745 #if !defined(THREADED_RTS)
1746     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1747     ASSERT(sleeping_queue == END_TSO_QUEUE);
1748 #endif
1749 }
1750
1751 /* -----------------------------------------------------------------------------
1752    Managing the suspended_ccalling_tasks list.
1753    Locks required: sched_mutex
1754    -------------------------------------------------------------------------- */
1755
1756 STATIC_INLINE void
1757 suspendTask (Capability *cap, Task *task)
1758 {
1759     ASSERT(task->next == NULL && task->prev == NULL);
1760     task->next = cap->suspended_ccalling_tasks;
1761     task->prev = NULL;
1762     if (cap->suspended_ccalling_tasks) {
1763         cap->suspended_ccalling_tasks->prev = task;
1764     }
1765     cap->suspended_ccalling_tasks = task;
1766 }
1767
1768 STATIC_INLINE void
1769 recoverSuspendedTask (Capability *cap, Task *task)
1770 {
1771     if (task->prev) {
1772         task->prev->next = task->next;
1773     } else {
1774         ASSERT(cap->suspended_ccalling_tasks == task);
1775         cap->suspended_ccalling_tasks = task->next;
1776     }
1777     if (task->next) {
1778         task->next->prev = task->prev;
1779     }
1780     task->next = task->prev = NULL;
1781 }
1782
1783 /* ---------------------------------------------------------------------------
1784  * Suspending & resuming Haskell threads.
1785  * 
1786  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1787  * its capability before calling the C function.  This allows another
1788  * task to pick up the capability and carry on running Haskell
1789  * threads.  It also means that if the C call blocks, it won't lock
1790  * the whole system.
1791  *
1792  * The Haskell thread making the C call is put to sleep for the
1793  * duration of the call, on the susepended_ccalling_threads queue.  We
1794  * give out a token to the task, which it can use to resume the thread
1795  * on return from the C function.
1796  * ------------------------------------------------------------------------- */
1797    
1798 void *
1799 suspendThread (StgRegTable *reg)
1800 {
1801   Capability *cap;
1802   int saved_errno;
1803   StgTSO *tso;
1804   Task *task;
1805 #if mingw32_HOST_OS
1806   StgWord32 saved_winerror;
1807 #endif
1808
1809   saved_errno = errno;
1810 #if mingw32_HOST_OS
1811   saved_winerror = GetLastError();
1812 #endif
1813
1814   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1815    */
1816   cap = regTableToCapability(reg);
1817
1818   task = cap->running_task;
1819   tso = cap->r.rCurrentTSO;
1820
1821   postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
1822   debugTrace(DEBUG_sched, 
1823              "thread %lu did a safe foreign call", 
1824              (unsigned long)cap->r.rCurrentTSO->id);
1825
1826   // XXX this might not be necessary --SDM
1827   tso->what_next = ThreadRunGHC;
1828
1829   threadPaused(cap,tso);
1830
1831   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1832       tso->why_blocked = BlockedOnCCall;
1833       tso->flags |= TSO_BLOCKEX;
1834       tso->flags &= ~TSO_INTERRUPTIBLE;
1835   } else {
1836       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1837   }
1838
1839   // Hand back capability
1840   task->suspended_tso = tso;
1841
1842   ACQUIRE_LOCK(&cap->lock);
1843
1844   suspendTask(cap,task);
1845   cap->in_haskell = rtsFalse;
1846   releaseCapability_(cap,rtsFalse);
1847   
1848   RELEASE_LOCK(&cap->lock);
1849
1850 #if defined(THREADED_RTS)
1851   /* Preparing to leave the RTS, so ensure there's a native thread/task
1852      waiting to take over.
1853   */
1854   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1855 #endif
1856
1857   errno = saved_errno;
1858 #if mingw32_HOST_OS
1859   SetLastError(saved_winerror);
1860 #endif
1861   return task;
1862 }
1863
1864 StgRegTable *
1865 resumeThread (void *task_)
1866 {
1867     StgTSO *tso;
1868     Capability *cap;
1869     Task *task = task_;
1870     int saved_errno;
1871 #if mingw32_HOST_OS
1872     StgWord32 saved_winerror;
1873 #endif
1874
1875     saved_errno = errno;
1876 #if mingw32_HOST_OS
1877     saved_winerror = GetLastError();
1878 #endif
1879
1880     cap = task->cap;
1881     // Wait for permission to re-enter the RTS with the result.
1882     waitForReturnCapability(&cap,task);
1883     // we might be on a different capability now... but if so, our
1884     // entry on the suspended_ccalling_tasks list will also have been
1885     // migrated.
1886
1887     // Remove the thread from the suspended list
1888     recoverSuspendedTask(cap,task);
1889
1890     tso = task->suspended_tso;
1891     task->suspended_tso = NULL;
1892     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1893
1894     postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
1895     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1896     
1897     if (tso->why_blocked == BlockedOnCCall) {
1898         // avoid locking the TSO if we don't have to
1899         if (tso->blocked_exceptions != END_TSO_QUEUE) {
1900             awakenBlockedExceptionQueue(cap,tso);
1901         }
1902         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1903     }
1904     
1905     /* Reset blocking status */
1906     tso->why_blocked  = NotBlocked;
1907     
1908     cap->r.rCurrentTSO = tso;
1909     cap->in_haskell = rtsTrue;
1910     errno = saved_errno;
1911 #if mingw32_HOST_OS
1912     SetLastError(saved_winerror);
1913 #endif
1914
1915     /* We might have GC'd, mark the TSO dirty again */
1916     dirty_TSO(cap,tso);
1917
1918     IF_DEBUG(sanity, checkTSO(tso));
1919
1920     return &cap->r;
1921 }
1922
1923 /* ---------------------------------------------------------------------------
1924  * scheduleThread()
1925  *
1926  * scheduleThread puts a thread on the end  of the runnable queue.
1927  * This will usually be done immediately after a thread is created.
1928  * The caller of scheduleThread must create the thread using e.g.
1929  * createThread and push an appropriate closure
1930  * on this thread's stack before the scheduler is invoked.
1931  * ------------------------------------------------------------------------ */
1932
1933 void
1934 scheduleThread(Capability *cap, StgTSO *tso)
1935 {
1936     // The thread goes at the *end* of the run-queue, to avoid possible
1937     // starvation of any threads already on the queue.
1938     appendToRunQueue(cap,tso);
1939 }
1940
1941 void
1942 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1943 {
1944 #if defined(THREADED_RTS)
1945     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1946                               // move this thread from now on.
1947     cpu %= RtsFlags.ParFlags.nNodes;
1948     if (cpu == cap->no) {
1949         appendToRunQueue(cap,tso);
1950     } else {
1951         postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
1952         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1953     }
1954 #else
1955     appendToRunQueue(cap,tso);
1956 #endif
1957 }
1958
1959 Capability *
1960 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1961 {
1962     Task *task;
1963
1964     // We already created/initialised the Task
1965     task = cap->running_task;
1966
1967     // This TSO is now a bound thread; make the Task and TSO
1968     // point to each other.
1969     tso->bound = task;
1970     tso->cap = cap;
1971
1972     task->tso = tso;
1973     task->ret = ret;
1974     task->stat = NoStatus;
1975
1976     appendToRunQueue(cap,tso);
1977
1978     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1979
1980     cap = schedule(cap,task);
1981
1982     ASSERT(task->stat != NoStatus);
1983     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1984
1985     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1986     return cap;
1987 }
1988
1989 /* ----------------------------------------------------------------------------
1990  * Starting Tasks
1991  * ------------------------------------------------------------------------- */
1992
1993 #if defined(THREADED_RTS)
1994 void OSThreadProcAttr
1995 workerStart(Task *task)
1996 {
1997     Capability *cap;
1998
1999     // See startWorkerTask().
2000     ACQUIRE_LOCK(&task->lock);
2001     cap = task->cap;
2002     RELEASE_LOCK(&task->lock);
2003
2004     if (RtsFlags.ParFlags.setAffinity) {
2005         setThreadAffinity(cap->no, n_capabilities);
2006     }
2007
2008     // set the thread-local pointer to the Task:
2009     taskEnter(task);
2010
2011     // schedule() runs without a lock.
2012     cap = schedule(cap,task);
2013
2014     // On exit from schedule(), we have a Capability, but possibly not
2015     // the same one we started with.
2016
2017     // During shutdown, the requirement is that after all the
2018     // Capabilities are shut down, all workers that are shutting down
2019     // have finished workerTaskStop().  This is why we hold on to
2020     // cap->lock until we've finished workerTaskStop() below.
2021     //
2022     // There may be workers still involved in foreign calls; those
2023     // will just block in waitForReturnCapability() because the
2024     // Capability has been shut down.
2025     //
2026     ACQUIRE_LOCK(&cap->lock);
2027     releaseCapability_(cap,rtsFalse);
2028     workerTaskStop(task);
2029     RELEASE_LOCK(&cap->lock);
2030 }
2031 #endif
2032
2033 /* ---------------------------------------------------------------------------
2034  * initScheduler()
2035  *
2036  * Initialise the scheduler.  This resets all the queues - if the
2037  * queues contained any threads, they'll be garbage collected at the
2038  * next pass.
2039  *
2040  * ------------------------------------------------------------------------ */
2041
2042 void 
2043 initScheduler(void)
2044 {
2045 #if !defined(THREADED_RTS)
2046   blocked_queue_hd  = END_TSO_QUEUE;
2047   blocked_queue_tl  = END_TSO_QUEUE;
2048   sleeping_queue    = END_TSO_QUEUE;
2049 #endif
2050
2051   blackhole_queue   = END_TSO_QUEUE;
2052
2053   sched_state    = SCHED_RUNNING;
2054   recent_activity = ACTIVITY_YES;
2055
2056 #if defined(THREADED_RTS)
2057   /* Initialise the mutex and condition variables used by
2058    * the scheduler. */
2059   initMutex(&sched_mutex);
2060 #endif
2061   
2062   ACQUIRE_LOCK(&sched_mutex);
2063
2064   /* A capability holds the state a native thread needs in
2065    * order to execute STG code. At least one capability is
2066    * floating around (only THREADED_RTS builds have more than one).
2067    */
2068   initCapabilities();
2069
2070   initTaskManager();
2071
2072 #if defined(THREADED_RTS)
2073   initSparkPools();
2074 #endif
2075
2076 #if defined(THREADED_RTS)
2077   /*
2078    * Eagerly start one worker to run each Capability, except for
2079    * Capability 0.  The idea is that we're probably going to start a
2080    * bound thread on Capability 0 pretty soon, so we don't want a
2081    * worker task hogging it.
2082    */
2083   { 
2084       nat i;
2085       Capability *cap;
2086       for (i = 1; i < n_capabilities; i++) {
2087           cap = &capabilities[i];
2088           ACQUIRE_LOCK(&cap->lock);
2089           startWorkerTask(cap, workerStart);
2090           RELEASE_LOCK(&cap->lock);
2091       }
2092   }
2093 #endif
2094
2095   RELEASE_LOCK(&sched_mutex);
2096 }
2097
2098 void
2099 exitScheduler(
2100     rtsBool wait_foreign
2101 #if !defined(THREADED_RTS)
2102                          __attribute__((unused))
2103 #endif
2104 )
2105                /* see Capability.c, shutdownCapability() */
2106 {
2107     Task *task = NULL;
2108
2109     task = newBoundTask();
2110
2111     // If we haven't killed all the threads yet, do it now.
2112     if (sched_state < SCHED_SHUTTING_DOWN) {
2113         sched_state = SCHED_INTERRUPTING;
2114         waitForReturnCapability(&task->cap,task);
2115         scheduleDoGC(task->cap,task,rtsFalse);    
2116         releaseCapability(task->cap);
2117     }
2118     sched_state = SCHED_SHUTTING_DOWN;
2119
2120 #if defined(THREADED_RTS)
2121     { 
2122         nat i;
2123         
2124         for (i = 0; i < n_capabilities; i++) {
2125             shutdownCapability(&capabilities[i], task, wait_foreign);
2126         }
2127         boundTaskExiting(task);
2128     }
2129 #endif
2130 }
2131
2132 void
2133 freeScheduler( void )
2134 {
2135     nat still_running;
2136
2137     ACQUIRE_LOCK(&sched_mutex);
2138     still_running = freeTaskManager();
2139     // We can only free the Capabilities if there are no Tasks still
2140     // running.  We might have a Task about to return from a foreign
2141     // call into waitForReturnCapability(), for example (actually,
2142     // this should be the *only* thing that a still-running Task can
2143     // do at this point, and it will block waiting for the
2144     // Capability).
2145     if (still_running == 0) {
2146         freeCapabilities();
2147         if (n_capabilities != 1) {
2148             stgFree(capabilities);
2149         }
2150     }
2151     RELEASE_LOCK(&sched_mutex);
2152 #if defined(THREADED_RTS)
2153     closeMutex(&sched_mutex);
2154 #endif
2155 }
2156
2157 /* -----------------------------------------------------------------------------
2158    performGC
2159
2160    This is the interface to the garbage collector from Haskell land.
2161    We provide this so that external C code can allocate and garbage
2162    collect when called from Haskell via _ccall_GC.
2163    -------------------------------------------------------------------------- */
2164
2165 static void
2166 performGC_(rtsBool force_major)
2167 {
2168     Task *task;
2169
2170     // We must grab a new Task here, because the existing Task may be
2171     // associated with a particular Capability, and chained onto the 
2172     // suspended_ccalling_tasks queue.
2173     task = newBoundTask();
2174
2175     waitForReturnCapability(&task->cap,task);
2176     scheduleDoGC(task->cap,task,force_major);
2177     releaseCapability(task->cap);
2178     boundTaskExiting(task);
2179 }
2180
2181 void
2182 performGC(void)
2183 {
2184     performGC_(rtsFalse);
2185 }
2186
2187 void
2188 performMajorGC(void)
2189 {
2190     performGC_(rtsTrue);
2191 }
2192
2193 /* -----------------------------------------------------------------------------
2194    Stack overflow
2195
2196    If the thread has reached its maximum stack size, then raise the
2197    StackOverflow exception in the offending thread.  Otherwise
2198    relocate the TSO into a larger chunk of memory and adjust its stack
2199    size appropriately.
2200    -------------------------------------------------------------------------- */
2201
2202 static StgTSO *
2203 threadStackOverflow(Capability *cap, StgTSO *tso)
2204 {
2205   nat new_stack_size, stack_words;
2206   lnat new_tso_size;
2207   StgPtr new_sp;
2208   StgTSO *dest;
2209
2210   IF_DEBUG(sanity,checkTSO(tso));
2211
2212   // don't allow throwTo() to modify the blocked_exceptions queue
2213   // while we are moving the TSO:
2214   lockClosure((StgClosure *)tso);
2215
2216   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2217       // NB. never raise a StackOverflow exception if the thread is
2218       // inside Control.Exceptino.block.  It is impractical to protect
2219       // against stack overflow exceptions, since virtually anything
2220       // can raise one (even 'catch'), so this is the only sensible
2221       // thing to do here.  See bug #767.
2222
2223       debugTrace(DEBUG_gc,
2224                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2225                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2226       IF_DEBUG(gc,
2227                /* If we're debugging, just print out the top of the stack */
2228                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2229                                                 tso->sp+64)));
2230
2231       // Send this thread the StackOverflow exception
2232       unlockTSO(tso);
2233       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2234       return tso;
2235   }
2236
2237   /* Try to double the current stack size.  If that takes us over the
2238    * maximum stack size for this thread, then use the maximum instead
2239    * (that is, unless we're already at or over the max size and we
2240    * can't raise the StackOverflow exception (see above), in which
2241    * case just double the size). Finally round up so the TSO ends up as
2242    * a whole number of blocks.
2243    */
2244   if (tso->stack_size >= tso->max_stack_size) {
2245       new_stack_size = tso->stack_size * 2;
2246   } else { 
2247       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2248   }
2249   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2250                                        TSO_STRUCT_SIZE)/sizeof(W_);
2251   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2252   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2253
2254   debugTrace(DEBUG_sched, 
2255              "increasing stack size from %ld words to %d.",
2256              (long)tso->stack_size, new_stack_size);
2257
2258   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2259   TICK_ALLOC_TSO(new_stack_size,0);
2260
2261   /* copy the TSO block and the old stack into the new area */
2262   memcpy(dest,tso,TSO_STRUCT_SIZE);
2263   stack_words = tso->stack + tso->stack_size - tso->sp;
2264   new_sp = (P_)dest + new_tso_size - stack_words;
2265   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2266
2267   /* relocate the stack pointers... */
2268   dest->sp         = new_sp;
2269   dest->stack_size = new_stack_size;
2270         
2271   /* Mark the old TSO as relocated.  We have to check for relocated
2272    * TSOs in the garbage collector and any primops that deal with TSOs.
2273    *
2274    * It's important to set the sp value to just beyond the end
2275    * of the stack, so we don't attempt to scavenge any part of the
2276    * dead TSO's stack.
2277    */
2278   tso->what_next = ThreadRelocated;
2279   setTSOLink(cap,tso,dest);
2280   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2281   tso->why_blocked = NotBlocked;
2282
2283   unlockTSO(dest);
2284   unlockTSO(tso);
2285
2286   IF_DEBUG(sanity,checkTSO(dest));
2287 #if 0
2288   IF_DEBUG(scheduler,printTSO(dest));
2289 #endif
2290
2291   return dest;
2292 }
2293
2294 static StgTSO *
2295 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2296 {
2297     bdescr *bd, *new_bd;
2298     lnat free_w, tso_size_w;
2299     StgTSO *new_tso;
2300
2301     tso_size_w = tso_sizeW(tso);
2302
2303     if (tso_size_w < MBLOCK_SIZE_W ||
2304           // TSO is less than 2 mblocks (since the first mblock is
2305           // shorter than MBLOCK_SIZE_W)
2306         (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 ||
2307           // or TSO is not a whole number of megablocks (ensuring
2308           // precondition of splitLargeBlock() below)
2309         (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) ||
2310           // or TSO is smaller than the minimum stack size (rounded up)
2311         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2312           // or stack is using more than 1/4 of the available space
2313     {
2314         // then do nothing
2315         return tso;
2316     }
2317
2318     // don't allow throwTo() to modify the blocked_exceptions queue
2319     // while we are moving the TSO:
2320     lockClosure((StgClosure *)tso);
2321
2322     // this is the number of words we'll free
2323     free_w = round_to_mblocks(tso_size_w/2);
2324
2325     bd = Bdescr((StgPtr)tso);
2326     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2327     bd->free = bd->start + TSO_STRUCT_SIZEW;
2328
2329     new_tso = (StgTSO *)new_bd->start;
2330     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2331     new_tso->stack_size = new_bd->free - new_tso->stack;
2332
2333     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2334                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2335
2336     tso->what_next = ThreadRelocated;
2337     tso->_link = new_tso; // no write barrier reqd: same generation
2338
2339     // The TSO attached to this Task may have moved, so update the
2340     // pointer to it.
2341     if (task->tso == tso) {
2342         task->tso = new_tso;
2343     }
2344
2345     unlockTSO(new_tso);
2346     unlockTSO(tso);
2347
2348     IF_DEBUG(sanity,checkTSO(new_tso));
2349
2350     return new_tso;
2351 }
2352
2353 /* ---------------------------------------------------------------------------
2354    Interrupt execution
2355    - usually called inside a signal handler so it mustn't do anything fancy.   
2356    ------------------------------------------------------------------------ */
2357
2358 void
2359 interruptStgRts(void)
2360 {
2361     sched_state = SCHED_INTERRUPTING;
2362     setContextSwitches();
2363 #if defined(THREADED_RTS)
2364     wakeUpRts();
2365 #endif
2366 }
2367
2368 /* -----------------------------------------------------------------------------
2369    Wake up the RTS
2370    
2371    This function causes at least one OS thread to wake up and run the
2372    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2373    an external event has arrived that may need servicing (eg. a
2374    keyboard interrupt).
2375
2376    In the single-threaded RTS we don't do anything here; we only have
2377    one thread anyway, and the event that caused us to want to wake up
2378    will have interrupted any blocking system call in progress anyway.
2379    -------------------------------------------------------------------------- */
2380
2381 #if defined(THREADED_RTS)
2382 void wakeUpRts(void)
2383 {
2384     // This forces the IO Manager thread to wakeup, which will
2385     // in turn ensure that some OS thread wakes up and runs the
2386     // scheduler loop, which will cause a GC and deadlock check.
2387     ioManagerWakeup();
2388 }
2389 #endif
2390
2391 /* -----------------------------------------------------------------------------
2392  * checkBlackHoles()
2393  *
2394  * Check the blackhole_queue for threads that can be woken up.  We do
2395  * this periodically: before every GC, and whenever the run queue is
2396  * empty.
2397  *
2398  * An elegant solution might be to just wake up all the blocked
2399  * threads with awakenBlockedQueue occasionally: they'll go back to
2400  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2401  * doesn't give us a way to tell whether we've actually managed to
2402  * wake up any threads, so we would be busy-waiting.
2403  *
2404  * -------------------------------------------------------------------------- */
2405
2406 static rtsBool
2407 checkBlackHoles (Capability *cap)
2408 {
2409     StgTSO **prev, *t;
2410     rtsBool any_woke_up = rtsFalse;
2411     StgHalfWord type;
2412
2413     // blackhole_queue is global:
2414     ASSERT_LOCK_HELD(&sched_mutex);
2415
2416     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2417
2418     // ASSUMES: sched_mutex
2419     prev = &blackhole_queue;
2420     t = blackhole_queue;
2421     while (t != END_TSO_QUEUE) {
2422         if (t->what_next == ThreadRelocated) {
2423             t = t->_link;
2424             continue;
2425         }
2426         ASSERT(t->why_blocked == BlockedOnBlackHole);
2427         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2428         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2429             IF_DEBUG(sanity,checkTSO(t));
2430             t = unblockOne(cap, t);
2431             *prev = t;
2432             any_woke_up = rtsTrue;
2433         } else {
2434             prev = &t->_link;
2435             t = t->_link;
2436         }
2437     }
2438
2439     return any_woke_up;
2440 }
2441
2442 /* -----------------------------------------------------------------------------
2443    Deleting threads
2444
2445    This is used for interruption (^C) and forking, and corresponds to
2446    raising an exception but without letting the thread catch the
2447    exception.
2448    -------------------------------------------------------------------------- */
2449
2450 static void 
2451 deleteThread (Capability *cap, StgTSO *tso)
2452 {
2453     // NOTE: must only be called on a TSO that we have exclusive
2454     // access to, because we will call throwToSingleThreaded() below.
2455     // The TSO must be on the run queue of the Capability we own, or 
2456     // we must own all Capabilities.
2457
2458     if (tso->why_blocked != BlockedOnCCall &&
2459         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2460         throwToSingleThreaded(cap,tso,NULL);
2461     }
2462 }
2463
2464 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2465 static void 
2466 deleteThread_(Capability *cap, StgTSO *tso)
2467 { // for forkProcess only:
2468   // like deleteThread(), but we delete threads in foreign calls, too.
2469
2470     if (tso->why_blocked == BlockedOnCCall ||
2471         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2472         unblockOne(cap,tso);
2473         tso->what_next = ThreadKilled;
2474     } else {
2475         deleteThread(cap,tso);
2476     }
2477 }
2478 #endif
2479
2480 /* -----------------------------------------------------------------------------
2481    raiseExceptionHelper
2482    
2483    This function is called by the raise# primitve, just so that we can
2484    move some of the tricky bits of raising an exception from C-- into
2485    C.  Who knows, it might be a useful re-useable thing here too.
2486    -------------------------------------------------------------------------- */
2487
2488 StgWord
2489 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2490 {
2491     Capability *cap = regTableToCapability(reg);
2492     StgThunk *raise_closure = NULL;
2493     StgPtr p, next;
2494     StgRetInfoTable *info;
2495     //
2496     // This closure represents the expression 'raise# E' where E
2497     // is the exception raise.  It is used to overwrite all the
2498     // thunks which are currently under evaluataion.
2499     //
2500
2501     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2502     // LDV profiling: stg_raise_info has THUNK as its closure
2503     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2504     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2505     // 1 does not cause any problem unless profiling is performed.
2506     // However, when LDV profiling goes on, we need to linearly scan
2507     // small object pool, where raise_closure is stored, so we should
2508     // use MIN_UPD_SIZE.
2509     //
2510     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2511     //                                 sizeofW(StgClosure)+1);
2512     //
2513
2514     //
2515     // Walk up the stack, looking for the catch frame.  On the way,
2516     // we update any closures pointed to from update frames with the
2517     // raise closure that we just built.
2518     //
2519     p = tso->sp;
2520     while(1) {
2521         info = get_ret_itbl((StgClosure *)p);
2522         next = p + stack_frame_sizeW((StgClosure *)p);
2523         switch (info->i.type) {
2524             
2525         case UPDATE_FRAME:
2526             // Only create raise_closure if we need to.
2527             if (raise_closure == NULL) {
2528                 raise_closure = 
2529                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2530                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2531                 raise_closure->payload[0] = exception;
2532             }
2533             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2534             p = next;
2535             continue;
2536
2537         case ATOMICALLY_FRAME:
2538             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2539             tso->sp = p;
2540             return ATOMICALLY_FRAME;
2541             
2542         case CATCH_FRAME:
2543             tso->sp = p;
2544             return CATCH_FRAME;
2545
2546         case CATCH_STM_FRAME:
2547             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2548             tso->sp = p;
2549             return CATCH_STM_FRAME;
2550             
2551         case STOP_FRAME:
2552             tso->sp = p;
2553             return STOP_FRAME;
2554
2555         case CATCH_RETRY_FRAME:
2556         default:
2557             p = next; 
2558             continue;
2559         }
2560     }
2561 }
2562
2563
2564 /* -----------------------------------------------------------------------------
2565    findRetryFrameHelper
2566
2567    This function is called by the retry# primitive.  It traverses the stack
2568    leaving tso->sp referring to the frame which should handle the retry.  
2569
2570    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2571    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2572
2573    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2574    create) because retries are not considered to be exceptions, despite the
2575    similar implementation.
2576
2577    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2578    not be created within memory transactions.
2579    -------------------------------------------------------------------------- */
2580
2581 StgWord
2582 findRetryFrameHelper (StgTSO *tso)
2583 {
2584   StgPtr           p, next;
2585   StgRetInfoTable *info;
2586
2587   p = tso -> sp;
2588   while (1) {
2589     info = get_ret_itbl((StgClosure *)p);
2590     next = p + stack_frame_sizeW((StgClosure *)p);
2591     switch (info->i.type) {
2592       
2593     case ATOMICALLY_FRAME:
2594         debugTrace(DEBUG_stm,
2595                    "found ATOMICALLY_FRAME at %p during retry", p);
2596         tso->sp = p;
2597         return ATOMICALLY_FRAME;
2598       
2599     case CATCH_RETRY_FRAME:
2600         debugTrace(DEBUG_stm,
2601                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2602         tso->sp = p;
2603         return CATCH_RETRY_FRAME;
2604       
2605     case CATCH_STM_FRAME: {
2606         StgTRecHeader *trec = tso -> trec;
2607         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2608         debugTrace(DEBUG_stm,
2609                    "found CATCH_STM_FRAME at %p during retry", p);
2610         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2611         stmAbortTransaction(tso -> cap, trec);
2612         stmFreeAbortedTRec(tso -> cap, trec);
2613         tso -> trec = outer;
2614         p = next; 
2615         continue;
2616     }
2617       
2618
2619     default:
2620       ASSERT(info->i.type != CATCH_FRAME);
2621       ASSERT(info->i.type != STOP_FRAME);
2622       p = next; 
2623       continue;
2624     }
2625   }
2626 }
2627
2628 /* -----------------------------------------------------------------------------
2629    resurrectThreads is called after garbage collection on the list of
2630    threads found to be garbage.  Each of these threads will be woken
2631    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2632    on an MVar, or NonTermination if the thread was blocked on a Black
2633    Hole.
2634
2635    Locks: assumes we hold *all* the capabilities.
2636    -------------------------------------------------------------------------- */
2637
2638 void
2639 resurrectThreads (StgTSO *threads)
2640 {
2641     StgTSO *tso, *next;
2642     Capability *cap;
2643     step *step;
2644
2645     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2646         next = tso->global_link;
2647
2648         step = Bdescr((P_)tso)->step;
2649         tso->global_link = step->threads;
2650         step->threads = tso;
2651
2652         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2653         
2654         // Wake up the thread on the Capability it was last on
2655         cap = tso->cap;
2656         
2657         switch (tso->why_blocked) {
2658         case BlockedOnMVar:
2659         case BlockedOnException:
2660             /* Called by GC - sched_mutex lock is currently held. */
2661             throwToSingleThreaded(cap, tso,
2662                                   (StgClosure *)blockedOnDeadMVar_closure);
2663             break;
2664         case BlockedOnBlackHole:
2665             throwToSingleThreaded(cap, tso,
2666                                   (StgClosure *)nonTermination_closure);
2667             break;
2668         case BlockedOnSTM:
2669             throwToSingleThreaded(cap, tso,
2670                                   (StgClosure *)blockedIndefinitely_closure);
2671             break;
2672         case NotBlocked:
2673             /* This might happen if the thread was blocked on a black hole
2674              * belonging to a thread that we've just woken up (raiseAsync
2675              * can wake up threads, remember...).
2676              */
2677             continue;
2678         default:
2679             barf("resurrectThreads: thread blocked in a strange way");
2680         }
2681     }
2682 }
2683
2684 /* -----------------------------------------------------------------------------
2685    performPendingThrowTos is called after garbage collection, and
2686    passed a list of threads that were found to have pending throwTos
2687    (tso->blocked_exceptions was not empty), and were blocked.
2688    Normally this doesn't happen, because we would deliver the
2689    exception directly if the target thread is blocked, but there are
2690    small windows where it might occur on a multiprocessor (see
2691    throwTo()).
2692
2693    NB. we must be holding all the capabilities at this point, just
2694    like resurrectThreads().
2695    -------------------------------------------------------------------------- */
2696
2697 void
2698 performPendingThrowTos (StgTSO *threads)
2699 {
2700     StgTSO *tso, *next;
2701     Capability *cap;
2702     step *step;
2703
2704     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2705         next = tso->global_link;
2706
2707         step = Bdescr((P_)tso)->step;
2708         tso->global_link = step->threads;
2709         step->threads = tso;
2710
2711         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2712         
2713         cap = tso->cap;
2714         maybePerformBlockedException(cap, tso);
2715     }
2716 }