Detect when a C finalizer calls back to Haskell
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #include "GC.h"
35 #include "Weak.h"
36
37 /* PARALLEL_HASKELL includes go here */
38
39 #include "Sparks.h"
40 #include "Capability.h"
41 #include "Task.h"
42 #include "AwaitEvent.h"
43 #if defined(mingw32_HOST_OS)
44 #include "win32/IOManager.h"
45 #endif
46 #include "Trace.h"
47 #include "RaiseAsync.h"
48 #include "Threads.h"
49 #include "ThrIOManager.h"
50
51 #ifdef HAVE_SYS_TYPES_H
52 #include <sys/types.h>
53 #endif
54 #ifdef HAVE_UNISTD_H
55 #include <unistd.h>
56 #endif
57
58 #include <string.h>
59 #include <stdlib.h>
60 #include <stdarg.h>
61
62 #ifdef HAVE_ERRNO_H
63 #include <errno.h>
64 #endif
65
66 // Turn off inlining when debugging - it obfuscates things
67 #ifdef DEBUG
68 # undef  STATIC_INLINE
69 # define STATIC_INLINE static
70 #endif
71
72 /* -----------------------------------------------------------------------------
73  * Global variables
74  * -------------------------------------------------------------------------- */
75
76 #if !defined(THREADED_RTS)
77 // Blocked/sleeping thrads
78 StgTSO *blocked_queue_hd = NULL;
79 StgTSO *blocked_queue_tl = NULL;
80 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
81 #endif
82
83 /* Threads blocked on blackholes.
84  * LOCK: sched_mutex+capability, or all capabilities
85  */
86 StgTSO *blackhole_queue = NULL;
87
88 /* The blackhole_queue should be checked for threads to wake up.  See
89  * Schedule.h for more thorough comment.
90  * LOCK: none (doesn't matter if we miss an update)
91  */
92 rtsBool blackholes_need_checking = rtsFalse;
93
94 /* Set to true when the latest garbage collection failed to reclaim
95  * enough space, and the runtime should proceed to shut itself down in
96  * an orderly fashion (emitting profiling info etc.)
97  */
98 rtsBool heap_overflow = rtsFalse;
99
100 /* flag that tracks whether we have done any execution in this time slice.
101  * LOCK: currently none, perhaps we should lock (but needs to be
102  * updated in the fast path of the scheduler).
103  *
104  * NB. must be StgWord, we do xchg() on it.
105  */
106 volatile StgWord recent_activity = ACTIVITY_YES;
107
108 /* if this flag is set as well, give up execution
109  * LOCK: none (changes monotonically)
110  */
111 volatile StgWord sched_state = SCHED_RUNNING;
112
113 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
114  *  exists - earlier gccs apparently didn't.
115  *  -= chak
116  */
117 StgTSO dummy_tso;
118
119 /*
120  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
121  * in an MT setting, needed to signal that a worker thread shouldn't hang around
122  * in the scheduler when it is out of work.
123  */
124 rtsBool shutting_down_scheduler = rtsFalse;
125
126 /*
127  * This mutex protects most of the global scheduler data in
128  * the THREADED_RTS runtime.
129  */
130 #if defined(THREADED_RTS)
131 Mutex sched_mutex;
132 #endif
133
134 #if !defined(mingw32_HOST_OS)
135 #define FORKPROCESS_PRIMOP_SUPPORTED
136 #endif
137
138 /* -----------------------------------------------------------------------------
139  * static function prototypes
140  * -------------------------------------------------------------------------- */
141
142 static Capability *schedule (Capability *initialCapability, Task *task);
143
144 //
145 // These function all encapsulate parts of the scheduler loop, and are
146 // abstracted only to make the structure and control flow of the
147 // scheduler clearer.
148 //
149 static void schedulePreLoop (void);
150 static void scheduleFindWork (Capability *cap);
151 #if defined(THREADED_RTS)
152 static void scheduleYield (Capability **pcap, Task *task);
153 #endif
154 static void scheduleStartSignalHandlers (Capability *cap);
155 static void scheduleCheckBlockedThreads (Capability *cap);
156 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
157 static void scheduleCheckBlackHoles (Capability *cap);
158 static void scheduleDetectDeadlock (Capability *cap, Task *task);
159 static void schedulePushWork(Capability *cap, Task *task);
160 #if defined(PARALLEL_HASKELL)
161 static rtsBool scheduleGetRemoteWork(Capability *cap);
162 static void scheduleSendPendingMessages(void);
163 #endif
164 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
165 static void scheduleActivateSpark(Capability *cap);
166 #endif
167 static void schedulePostRunThread(Capability *cap, StgTSO *t);
168 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
169 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
170                                          StgTSO *t);
171 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
172                                     nat prev_what_next );
173 static void scheduleHandleThreadBlocked( StgTSO *t );
174 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
175                                              StgTSO *t );
176 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
177 static Capability *scheduleDoGC(Capability *cap, Task *task,
178                                 rtsBool force_major);
179
180 static rtsBool checkBlackHoles(Capability *cap);
181
182 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
183 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
184
185 static void deleteThread (Capability *cap, StgTSO *tso);
186 static void deleteAllThreads (Capability *cap);
187
188 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
189 static void deleteThread_(Capability *cap, StgTSO *tso);
190 #endif
191
192 #ifdef DEBUG
193 static char *whatNext_strs[] = {
194   "(unknown)",
195   "ThreadRunGHC",
196   "ThreadInterpret",
197   "ThreadKilled",
198   "ThreadRelocated",
199   "ThreadComplete"
200 };
201 #endif
202
203 /* -----------------------------------------------------------------------------
204  * Putting a thread on the run queue: different scheduling policies
205  * -------------------------------------------------------------------------- */
206
207 STATIC_INLINE void
208 addToRunQueue( Capability *cap, StgTSO *t )
209 {
210 #if defined(PARALLEL_HASKELL)
211     if (RtsFlags.ParFlags.doFairScheduling) { 
212         // this does round-robin scheduling; good for concurrency
213         appendToRunQueue(cap,t);
214     } else {
215         // this does unfair scheduling; good for parallelism
216         pushOnRunQueue(cap,t);
217     }
218 #else
219     // this does round-robin scheduling; good for concurrency
220     appendToRunQueue(cap,t);
221 #endif
222 }
223
224 /* ---------------------------------------------------------------------------
225    Main scheduling loop.
226
227    We use round-robin scheduling, each thread returning to the
228    scheduler loop when one of these conditions is detected:
229
230       * out of heap space
231       * timer expires (thread yields)
232       * thread blocks
233       * thread ends
234       * stack overflow
235
236    GRAN version:
237      In a GranSim setup this loop iterates over the global event queue.
238      This revolves around the global event queue, which determines what 
239      to do next. Therefore, it's more complicated than either the 
240      concurrent or the parallel (GUM) setup.
241   This version has been entirely removed (JB 2008/08).
242
243    GUM version:
244      GUM iterates over incoming messages.
245      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
246      and sends out a fish whenever it has nothing to do; in-between
247      doing the actual reductions (shared code below) it processes the
248      incoming messages and deals with delayed operations 
249      (see PendingFetches).
250      This is not the ugliest code you could imagine, but it's bloody close.
251
252   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
253   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
254   as well as future GUM versions. This file has been refurbished to
255   only contain valid code, which is however incomplete, refers to
256   invalid includes etc.
257
258    ------------------------------------------------------------------------ */
259
260 static Capability *
261 schedule (Capability *initialCapability, Task *task)
262 {
263   StgTSO *t;
264   Capability *cap;
265   StgThreadReturnCode ret;
266 #if defined(PARALLEL_HASKELL)
267   rtsBool receivedFinish = rtsFalse;
268 #endif
269   nat prev_what_next;
270   rtsBool ready_to_gc;
271 #if defined(THREADED_RTS)
272   rtsBool first = rtsTrue;
273 #endif
274   
275   cap = initialCapability;
276
277   // Pre-condition: this task owns initialCapability.
278   // The sched_mutex is *NOT* held
279   // NB. on return, we still hold a capability.
280
281   debugTrace (DEBUG_sched, 
282               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
283               task, initialCapability);
284
285   if (running_finalizers) {
286       errorBelch("error: a C finalizer called back into Haskell.\n"
287                  "   use Foreign.Concurrent.newForeignPtr for Haskell finalizers.");
288       stg_exit(EXIT_FAILURE);
289   }
290
291   schedulePreLoop();
292
293   // -----------------------------------------------------------
294   // Scheduler loop starts here:
295
296 #if defined(PARALLEL_HASKELL)
297 #define TERMINATION_CONDITION        (!receivedFinish)
298 #else
299 #define TERMINATION_CONDITION        rtsTrue
300 #endif
301
302   while (TERMINATION_CONDITION) {
303
304     // Check whether we have re-entered the RTS from Haskell without
305     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
306     // call).
307     if (cap->in_haskell) {
308           errorBelch("schedule: re-entered unsafely.\n"
309                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
310           stg_exit(EXIT_FAILURE);
311     }
312
313     // The interruption / shutdown sequence.
314     // 
315     // In order to cleanly shut down the runtime, we want to:
316     //   * make sure that all main threads return to their callers
317     //     with the state 'Interrupted'.
318     //   * clean up all OS threads assocated with the runtime
319     //   * free all memory etc.
320     //
321     // So the sequence for ^C goes like this:
322     //
323     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
324     //     arranges for some Capability to wake up
325     //
326     //   * all threads in the system are halted, and the zombies are
327     //     placed on the run queue for cleaning up.  We acquire all
328     //     the capabilities in order to delete the threads, this is
329     //     done by scheduleDoGC() for convenience (because GC already
330     //     needs to acquire all the capabilities).  We can't kill
331     //     threads involved in foreign calls.
332     // 
333     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
334     //
335     //   * sched_state := SCHED_SHUTTING_DOWN
336     //
337     //   * all workers exit when the run queue on their capability
338     //     drains.  All main threads will also exit when their TSO
339     //     reaches the head of the run queue and they can return.
340     //
341     //   * eventually all Capabilities will shut down, and the RTS can
342     //     exit.
343     //
344     //   * We might be left with threads blocked in foreign calls, 
345     //     we should really attempt to kill these somehow (TODO);
346     
347     switch (sched_state) {
348     case SCHED_RUNNING:
349         break;
350     case SCHED_INTERRUPTING:
351         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
352 #if defined(THREADED_RTS)
353         discardSparksCap(cap);
354 #endif
355         /* scheduleDoGC() deletes all the threads */
356         cap = scheduleDoGC(cap,task,rtsFalse);
357
358         // after scheduleDoGC(), we must be shutting down.  Either some
359         // other Capability did the final GC, or we did it above,
360         // either way we can fall through to the SCHED_SHUTTING_DOWN
361         // case now.
362         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
363         // fall through
364
365     case SCHED_SHUTTING_DOWN:
366         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
367         // If we are a worker, just exit.  If we're a bound thread
368         // then we will exit below when we've removed our TSO from
369         // the run queue.
370         if (task->tso == NULL && emptyRunQueue(cap)) {
371             return cap;
372         }
373         break;
374     default:
375         barf("sched_state: %d", sched_state);
376     }
377
378     scheduleFindWork(cap);
379
380     /* work pushing, currently relevant only for THREADED_RTS:
381        (pushes threads, wakes up idle capabilities for stealing) */
382     schedulePushWork(cap,task);
383
384 #if defined(PARALLEL_HASKELL)
385     /* since we perform a blocking receive and continue otherwise,
386        either we never reach here or we definitely have work! */
387     // from here: non-empty run queue
388     ASSERT(!emptyRunQueue(cap));
389
390     if (PacketsWaiting()) {  /* now process incoming messages, if any
391                                 pending...  
392
393                                 CAUTION: scheduleGetRemoteWork called
394                                 above, waits for messages as well! */
395       processMessages(cap, &receivedFinish);
396     }
397 #endif // PARALLEL_HASKELL: non-empty run queue!
398
399     scheduleDetectDeadlock(cap,task);
400
401 #if defined(THREADED_RTS)
402     cap = task->cap;    // reload cap, it might have changed
403 #endif
404
405     // Normally, the only way we can get here with no threads to
406     // run is if a keyboard interrupt received during 
407     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
408     // Additionally, it is not fatal for the
409     // threaded RTS to reach here with no threads to run.
410     //
411     // win32: might be here due to awaitEvent() being abandoned
412     // as a result of a console event having been delivered.
413     
414 #if defined(THREADED_RTS)
415     if (first) 
416     {
417     // XXX: ToDo
418     //     // don't yield the first time, we want a chance to run this
419     //     // thread for a bit, even if there are others banging at the
420     //     // door.
421     //     first = rtsFalse;
422     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
423     }
424
425   yield:
426     scheduleYield(&cap,task);
427     if (emptyRunQueue(cap)) continue; // look for work again
428 #endif
429
430 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
431     if ( emptyRunQueue(cap) ) {
432         ASSERT(sched_state >= SCHED_INTERRUPTING);
433     }
434 #endif
435
436     // 
437     // Get a thread to run
438     //
439     t = popRunQueue(cap);
440
441     // Sanity check the thread we're about to run.  This can be
442     // expensive if there is lots of thread switching going on...
443     IF_DEBUG(sanity,checkTSO(t));
444
445 #if defined(THREADED_RTS)
446     // Check whether we can run this thread in the current task.
447     // If not, we have to pass our capability to the right task.
448     {
449         Task *bound = t->bound;
450       
451         if (bound) {
452             if (bound == task) {
453                 debugTrace(DEBUG_sched,
454                            "### Running thread %lu in bound thread", (unsigned long)t->id);
455                 // yes, the Haskell thread is bound to the current native thread
456             } else {
457                 debugTrace(DEBUG_sched,
458                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
459                 // no, bound to a different Haskell thread: pass to that thread
460                 pushOnRunQueue(cap,t);
461                 continue;
462             }
463         } else {
464             // The thread we want to run is unbound.
465             if (task->tso) { 
466                 debugTrace(DEBUG_sched,
467                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
468                 // no, the current native thread is bound to a different
469                 // Haskell thread, so pass it to any worker thread
470                 pushOnRunQueue(cap,t);
471                 continue; 
472             }
473         }
474     }
475 #endif
476
477     // If we're shutting down, and this thread has not yet been
478     // killed, kill it now.  This sometimes happens when a finalizer
479     // thread is created by the final GC, or a thread previously
480     // in a foreign call returns.
481     if (sched_state >= SCHED_INTERRUPTING &&
482         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
483         deleteThread(cap,t);
484     }
485
486     /* context switches are initiated by the timer signal, unless
487      * the user specified "context switch as often as possible", with
488      * +RTS -C0
489      */
490     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
491         && !emptyThreadQueues(cap)) {
492         cap->context_switch = 1;
493     }
494          
495 run_thread:
496
497     // CurrentTSO is the thread to run.  t might be different if we
498     // loop back to run_thread, so make sure to set CurrentTSO after
499     // that.
500     cap->r.rCurrentTSO = t;
501
502     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
503                               (long)t->id, whatNext_strs[t->what_next]);
504
505     startHeapProfTimer();
506
507     // Check for exceptions blocked on this thread
508     maybePerformBlockedException (cap, t);
509
510     // ----------------------------------------------------------------------
511     // Run the current thread 
512
513     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
514     ASSERT(t->cap == cap);
515     ASSERT(t->bound ? t->bound->cap == cap : 1);
516
517     prev_what_next = t->what_next;
518
519     errno = t->saved_errno;
520 #if mingw32_HOST_OS
521     SetLastError(t->saved_winerror);
522 #endif
523
524     cap->in_haskell = rtsTrue;
525
526     dirty_TSO(cap,t);
527
528 #if defined(THREADED_RTS)
529     if (recent_activity == ACTIVITY_DONE_GC) {
530         // ACTIVITY_DONE_GC means we turned off the timer signal to
531         // conserve power (see #1623).  Re-enable it here.
532         nat prev;
533         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
534         if (prev == ACTIVITY_DONE_GC) {
535             startTimer();
536         }
537     } else {
538         recent_activity = ACTIVITY_YES;
539     }
540 #endif
541
542     switch (prev_what_next) {
543         
544     case ThreadKilled:
545     case ThreadComplete:
546         /* Thread already finished, return to scheduler. */
547         ret = ThreadFinished;
548         break;
549         
550     case ThreadRunGHC:
551     {
552         StgRegTable *r;
553         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
554         cap = regTableToCapability(r);
555         ret = r->rRet;
556         break;
557     }
558     
559     case ThreadInterpret:
560         cap = interpretBCO(cap);
561         ret = cap->r.rRet;
562         break;
563         
564     default:
565         barf("schedule: invalid what_next field");
566     }
567
568     cap->in_haskell = rtsFalse;
569
570     // The TSO might have moved, eg. if it re-entered the RTS and a GC
571     // happened.  So find the new location:
572     t = cap->r.rCurrentTSO;
573
574     // We have run some Haskell code: there might be blackhole-blocked
575     // threads to wake up now.
576     // Lock-free test here should be ok, we're just setting a flag.
577     if ( blackhole_queue != END_TSO_QUEUE ) {
578         blackholes_need_checking = rtsTrue;
579     }
580     
581     // And save the current errno in this thread.
582     // XXX: possibly bogus for SMP because this thread might already
583     // be running again, see code below.
584     t->saved_errno = errno;
585 #if mingw32_HOST_OS
586     // Similarly for Windows error code
587     t->saved_winerror = GetLastError();
588 #endif
589
590 #if defined(THREADED_RTS)
591     // If ret is ThreadBlocked, and this Task is bound to the TSO that
592     // blocked, we are in limbo - the TSO is now owned by whatever it
593     // is blocked on, and may in fact already have been woken up,
594     // perhaps even on a different Capability.  It may be the case
595     // that task->cap != cap.  We better yield this Capability
596     // immediately and return to normaility.
597     if (ret == ThreadBlocked) {
598         debugTrace(DEBUG_sched,
599                    "--<< thread %lu (%s) stopped: blocked",
600                    (unsigned long)t->id, whatNext_strs[t->what_next]);
601         goto yield;
602     }
603 #endif
604
605     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
606     ASSERT(t->cap == cap);
607
608     // ----------------------------------------------------------------------
609     
610     // Costs for the scheduler are assigned to CCS_SYSTEM
611     stopHeapProfTimer();
612 #if defined(PROFILING)
613     CCCS = CCS_SYSTEM;
614 #endif
615     
616     schedulePostRunThread(cap,t);
617
618     t = threadStackUnderflow(task,t);
619
620     ready_to_gc = rtsFalse;
621
622     switch (ret) {
623     case HeapOverflow:
624         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
625         break;
626
627     case StackOverflow:
628         scheduleHandleStackOverflow(cap,task,t);
629         break;
630
631     case ThreadYielding:
632         if (scheduleHandleYield(cap, t, prev_what_next)) {
633             // shortcut for switching between compiler/interpreter:
634             goto run_thread; 
635         }
636         break;
637
638     case ThreadBlocked:
639         scheduleHandleThreadBlocked(t);
640         break;
641
642     case ThreadFinished:
643         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
644         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
645         break;
646
647     default:
648       barf("schedule: invalid thread return code %d", (int)ret);
649     }
650
651     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
652       cap = scheduleDoGC(cap,task,rtsFalse);
653     }
654   } /* end of while() */
655 }
656
657 /* ----------------------------------------------------------------------------
658  * Setting up the scheduler loop
659  * ------------------------------------------------------------------------- */
660
661 static void
662 schedulePreLoop(void)
663 {
664   // initialisation for scheduler - what cannot go into initScheduler()  
665 }
666
667 /* -----------------------------------------------------------------------------
668  * scheduleFindWork()
669  *
670  * Search for work to do, and handle messages from elsewhere.
671  * -------------------------------------------------------------------------- */
672
673 static void
674 scheduleFindWork (Capability *cap)
675 {
676     scheduleStartSignalHandlers(cap);
677
678     // Only check the black holes here if we've nothing else to do.
679     // During normal execution, the black hole list only gets checked
680     // at GC time, to avoid repeatedly traversing this possibly long
681     // list each time around the scheduler.
682     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
683
684     scheduleCheckWakeupThreads(cap);
685
686     scheduleCheckBlockedThreads(cap);
687
688 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
689     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
690 #endif
691
692 #if defined(PARALLEL_HASKELL)
693     // if messages have been buffered...
694     scheduleSendPendingMessages();
695 #endif
696
697 #if defined(PARALLEL_HASKELL)
698     if (emptyRunQueue(cap)) {
699         receivedFinish = scheduleGetRemoteWork(cap);
700         continue; //  a new round, (hopefully) with new work
701         /* 
702            in GUM, this a) sends out a FISH and returns IF no fish is
703                            out already
704                         b) (blocking) awaits and receives messages
705            
706            in Eden, this is only the blocking receive, as b) in GUM.
707         */
708     }
709 #endif
710 }
711
712 #if defined(THREADED_RTS)
713 STATIC_INLINE rtsBool
714 shouldYieldCapability (Capability *cap, Task *task)
715 {
716     // we need to yield this capability to someone else if..
717     //   - another thread is initiating a GC
718     //   - another Task is returning from a foreign call
719     //   - the thread at the head of the run queue cannot be run
720     //     by this Task (it is bound to another Task, or it is unbound
721     //     and this task it bound).
722     return (waiting_for_gc || 
723             cap->returning_tasks_hd != NULL ||
724             (!emptyRunQueue(cap) && (task->tso == NULL
725                                      ? cap->run_queue_hd->bound != NULL
726                                      : cap->run_queue_hd->bound != task)));
727 }
728
729 // This is the single place where a Task goes to sleep.  There are
730 // two reasons it might need to sleep:
731 //    - there are no threads to run
732 //    - we need to yield this Capability to someone else 
733 //      (see shouldYieldCapability())
734 //
735 // Careful: the scheduler loop is quite delicate.  Make sure you run
736 // the tests in testsuite/concurrent (all ways) after modifying this,
737 // and also check the benchmarks in nofib/parallel for regressions.
738
739 static void
740 scheduleYield (Capability **pcap, Task *task)
741 {
742     Capability *cap = *pcap;
743
744     // if we have work, and we don't need to give up the Capability, continue.
745     if (!shouldYieldCapability(cap,task) && 
746         (!emptyRunQueue(cap) ||
747          blackholes_need_checking ||
748          sched_state >= SCHED_INTERRUPTING))
749         return;
750
751     // otherwise yield (sleep), and keep yielding if necessary.
752     do {
753         yieldCapability(&cap,task);
754     } 
755     while (shouldYieldCapability(cap,task));
756
757     // note there may still be no threads on the run queue at this
758     // point, the caller has to check.
759
760     *pcap = cap;
761     return;
762 }
763 #endif
764     
765 /* -----------------------------------------------------------------------------
766  * schedulePushWork()
767  *
768  * Push work to other Capabilities if we have some.
769  * -------------------------------------------------------------------------- */
770
771 static void
772 schedulePushWork(Capability *cap USED_IF_THREADS, 
773                  Task *task      USED_IF_THREADS)
774 {
775   /* following code not for PARALLEL_HASKELL. I kept the call general,
776      future GUM versions might use pushing in a distributed setup */
777 #if defined(THREADED_RTS)
778
779     Capability *free_caps[n_capabilities], *cap0;
780     nat i, n_free_caps;
781
782     // migration can be turned off with +RTS -qg
783     if (!RtsFlags.ParFlags.migrate) return;
784
785     // Check whether we have more threads on our run queue, or sparks
786     // in our pool, that we could hand to another Capability.
787     if (cap->run_queue_hd == END_TSO_QUEUE) {
788         if (sparkPoolSizeCap(cap) < 2) return;
789     } else {
790         if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
791             sparkPoolSizeCap(cap) < 1) return;
792     }
793
794     // First grab as many free Capabilities as we can.
795     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
796         cap0 = &capabilities[i];
797         if (cap != cap0 && tryGrabCapability(cap0,task)) {
798             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
799                 // it already has some work, we just grabbed it at 
800                 // the wrong moment.  Or maybe it's deadlocked!
801                 releaseCapability(cap0);
802             } else {
803                 free_caps[n_free_caps++] = cap0;
804             }
805         }
806     }
807
808     // we now have n_free_caps free capabilities stashed in
809     // free_caps[].  Share our run queue equally with them.  This is
810     // probably the simplest thing we could do; improvements we might
811     // want to do include:
812     //
813     //   - giving high priority to moving relatively new threads, on 
814     //     the gournds that they haven't had time to build up a
815     //     working set in the cache on this CPU/Capability.
816     //
817     //   - giving low priority to moving long-lived threads
818
819     if (n_free_caps > 0) {
820         StgTSO *prev, *t, *next;
821         rtsBool pushed_to_all;
822
823         debugTrace(DEBUG_sched, 
824                    "cap %d: %s and %d free capabilities, sharing...", 
825                    cap->no, 
826                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
827                    "excess threads on run queue":"sparks to share (>=2)",
828                    n_free_caps);
829
830         i = 0;
831         pushed_to_all = rtsFalse;
832
833         if (cap->run_queue_hd != END_TSO_QUEUE) {
834             prev = cap->run_queue_hd;
835             t = prev->_link;
836             prev->_link = END_TSO_QUEUE;
837             for (; t != END_TSO_QUEUE; t = next) {
838                 next = t->_link;
839                 t->_link = END_TSO_QUEUE;
840                 if (t->what_next == ThreadRelocated
841                     || t->bound == task // don't move my bound thread
842                     || tsoLocked(t)) {  // don't move a locked thread
843                     setTSOLink(cap, prev, t);
844                     prev = t;
845                 } else if (i == n_free_caps) {
846                     pushed_to_all = rtsTrue;
847                     i = 0;
848                     // keep one for us
849                     setTSOLink(cap, prev, t);
850                     prev = t;
851                 } else {
852                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
853                     appendToRunQueue(free_caps[i],t);
854                     if (t->bound) { t->bound->cap = free_caps[i]; }
855                     t->cap = free_caps[i];
856                     i++;
857                 }
858             }
859             cap->run_queue_tl = prev;
860         }
861
862 #ifdef SPARK_PUSHING
863         /* JB I left this code in place, it would work but is not necessary */
864
865         // If there are some free capabilities that we didn't push any
866         // threads to, then try to push a spark to each one.
867         if (!pushed_to_all) {
868             StgClosure *spark;
869             // i is the next free capability to push to
870             for (; i < n_free_caps; i++) {
871                 if (emptySparkPoolCap(free_caps[i])) {
872                     spark = tryStealSpark(cap->sparks);
873                     if (spark != NULL) {
874                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
875                         newSpark(&(free_caps[i]->r), spark);
876                     }
877                 }
878             }
879         }
880 #endif /* SPARK_PUSHING */
881
882         // release the capabilities
883         for (i = 0; i < n_free_caps; i++) {
884             task->cap = free_caps[i];
885             releaseAndWakeupCapability(free_caps[i]);
886         }
887     }
888     task->cap = cap; // reset to point to our Capability.
889
890 #endif /* THREADED_RTS */
891
892 }
893
894 /* ----------------------------------------------------------------------------
895  * Start any pending signal handlers
896  * ------------------------------------------------------------------------- */
897
898 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
899 static void
900 scheduleStartSignalHandlers(Capability *cap)
901 {
902     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
903         // safe outside the lock
904         startSignalHandlers(cap);
905     }
906 }
907 #else
908 static void
909 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
910 {
911 }
912 #endif
913
914 /* ----------------------------------------------------------------------------
915  * Check for blocked threads that can be woken up.
916  * ------------------------------------------------------------------------- */
917
918 static void
919 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
920 {
921 #if !defined(THREADED_RTS)
922     //
923     // Check whether any waiting threads need to be woken up.  If the
924     // run queue is empty, and there are no other tasks running, we
925     // can wait indefinitely for something to happen.
926     //
927     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
928     {
929         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
930     }
931 #endif
932 }
933
934
935 /* ----------------------------------------------------------------------------
936  * Check for threads woken up by other Capabilities
937  * ------------------------------------------------------------------------- */
938
939 static void
940 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
941 {
942 #if defined(THREADED_RTS)
943     // Any threads that were woken up by other Capabilities get
944     // appended to our run queue.
945     if (!emptyWakeupQueue(cap)) {
946         ACQUIRE_LOCK(&cap->lock);
947         if (emptyRunQueue(cap)) {
948             cap->run_queue_hd = cap->wakeup_queue_hd;
949             cap->run_queue_tl = cap->wakeup_queue_tl;
950         } else {
951             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
952             cap->run_queue_tl = cap->wakeup_queue_tl;
953         }
954         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
955         RELEASE_LOCK(&cap->lock);
956     }
957 #endif
958 }
959
960 /* ----------------------------------------------------------------------------
961  * Check for threads blocked on BLACKHOLEs that can be woken up
962  * ------------------------------------------------------------------------- */
963 static void
964 scheduleCheckBlackHoles (Capability *cap)
965 {
966     if ( blackholes_need_checking ) // check without the lock first
967     {
968         ACQUIRE_LOCK(&sched_mutex);
969         if ( blackholes_need_checking ) {
970             blackholes_need_checking = rtsFalse;
971             // important that we reset the flag *before* checking the
972             // blackhole queue, otherwise we could get deadlock.  This
973             // happens as follows: we wake up a thread that
974             // immediately runs on another Capability, blocks on a
975             // blackhole, and then we reset the blackholes_need_checking flag.
976             checkBlackHoles(cap);
977         }
978         RELEASE_LOCK(&sched_mutex);
979     }
980 }
981
982 /* ----------------------------------------------------------------------------
983  * Detect deadlock conditions and attempt to resolve them.
984  * ------------------------------------------------------------------------- */
985
986 static void
987 scheduleDetectDeadlock (Capability *cap, Task *task)
988 {
989
990 #if defined(PARALLEL_HASKELL)
991     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
992     return;
993 #endif
994
995     /* 
996      * Detect deadlock: when we have no threads to run, there are no
997      * threads blocked, waiting for I/O, or sleeping, and all the
998      * other tasks are waiting for work, we must have a deadlock of
999      * some description.
1000      */
1001     if ( emptyThreadQueues(cap) )
1002     {
1003 #if defined(THREADED_RTS)
1004         /* 
1005          * In the threaded RTS, we only check for deadlock if there
1006          * has been no activity in a complete timeslice.  This means
1007          * we won't eagerly start a full GC just because we don't have
1008          * any threads to run currently.
1009          */
1010         if (recent_activity != ACTIVITY_INACTIVE) return;
1011 #endif
1012
1013         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
1014
1015         // Garbage collection can release some new threads due to
1016         // either (a) finalizers or (b) threads resurrected because
1017         // they are unreachable and will therefore be sent an
1018         // exception.  Any threads thus released will be immediately
1019         // runnable.
1020         cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
1021         // when force_major == rtsTrue. scheduleDoGC sets
1022         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
1023         // signal.
1024
1025         if ( !emptyRunQueue(cap) ) return;
1026
1027 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1028         /* If we have user-installed signal handlers, then wait
1029          * for signals to arrive rather then bombing out with a
1030          * deadlock.
1031          */
1032         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1033             debugTrace(DEBUG_sched,
1034                        "still deadlocked, waiting for signals...");
1035
1036             awaitUserSignals();
1037
1038             if (signals_pending()) {
1039                 startSignalHandlers(cap);
1040             }
1041
1042             // either we have threads to run, or we were interrupted:
1043             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1044
1045             return;
1046         }
1047 #endif
1048
1049 #if !defined(THREADED_RTS)
1050         /* Probably a real deadlock.  Send the current main thread the
1051          * Deadlock exception.
1052          */
1053         if (task->tso) {
1054             switch (task->tso->why_blocked) {
1055             case BlockedOnSTM:
1056             case BlockedOnBlackHole:
1057             case BlockedOnException:
1058             case BlockedOnMVar:
1059                 throwToSingleThreaded(cap, task->tso, 
1060                                       (StgClosure *)nonTermination_closure);
1061                 return;
1062             default:
1063                 barf("deadlock: main thread blocked in a strange way");
1064             }
1065         }
1066         return;
1067 #endif
1068     }
1069 }
1070
1071
1072 /* ----------------------------------------------------------------------------
1073  * Send pending messages (PARALLEL_HASKELL only)
1074  * ------------------------------------------------------------------------- */
1075
1076 #if defined(PARALLEL_HASKELL)
1077 static void
1078 scheduleSendPendingMessages(void)
1079 {
1080
1081 # if defined(PAR) // global Mem.Mgmt., omit for now
1082     if (PendingFetches != END_BF_QUEUE) {
1083         processFetches();
1084     }
1085 # endif
1086     
1087     if (RtsFlags.ParFlags.BufferTime) {
1088         // if we use message buffering, we must send away all message
1089         // packets which have become too old...
1090         sendOldBuffers(); 
1091     }
1092 }
1093 #endif
1094
1095 /* ----------------------------------------------------------------------------
1096  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1097  * ------------------------------------------------------------------------- */
1098
1099 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1100 static void
1101 scheduleActivateSpark(Capability *cap)
1102 {
1103     if (anySparks())
1104     {
1105         createSparkThread(cap);
1106         debugTrace(DEBUG_sched, "creating a spark thread");
1107     }
1108 }
1109 #endif // PARALLEL_HASKELL || THREADED_RTS
1110
1111 /* ----------------------------------------------------------------------------
1112  * Get work from a remote node (PARALLEL_HASKELL only)
1113  * ------------------------------------------------------------------------- */
1114     
1115 #if defined(PARALLEL_HASKELL)
1116 static rtsBool /* return value used in PARALLEL_HASKELL only */
1117 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1118 {
1119 #if defined(PARALLEL_HASKELL)
1120   rtsBool receivedFinish = rtsFalse;
1121
1122   // idle() , i.e. send all buffers, wait for work
1123   if (RtsFlags.ParFlags.BufferTime) {
1124         IF_PAR_DEBUG(verbose, 
1125                 debugBelch("...send all pending data,"));
1126         {
1127           nat i;
1128           for (i=1; i<=nPEs; i++)
1129             sendImmediately(i); // send all messages away immediately
1130         }
1131   }
1132
1133   /* this would be the place for fishing in GUM... 
1134
1135      if (no-earlier-fish-around) 
1136           sendFish(choosePe());
1137    */
1138
1139   // Eden:just look for incoming messages (blocking receive)
1140   IF_PAR_DEBUG(verbose, 
1141                debugBelch("...wait for incoming messages...\n"));
1142   processMessages(cap, &receivedFinish); // blocking receive...
1143
1144
1145   return receivedFinish;
1146   // reenter scheduling look after having received something
1147
1148 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1149
1150   return rtsFalse; /* return value unused in THREADED_RTS */
1151
1152 #endif /* PARALLEL_HASKELL */
1153 }
1154 #endif // PARALLEL_HASKELL || THREADED_RTS
1155
1156 /* ----------------------------------------------------------------------------
1157  * After running a thread...
1158  * ------------------------------------------------------------------------- */
1159
1160 static void
1161 schedulePostRunThread (Capability *cap, StgTSO *t)
1162 {
1163     // We have to be able to catch transactions that are in an
1164     // infinite loop as a result of seeing an inconsistent view of
1165     // memory, e.g. 
1166     //
1167     //   atomically $ do
1168     //       [a,b] <- mapM readTVar [ta,tb]
1169     //       when (a == b) loop
1170     //
1171     // and a is never equal to b given a consistent view of memory.
1172     //
1173     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1174         if (!stmValidateNestOfTransactions (t -> trec)) {
1175             debugTrace(DEBUG_sched | DEBUG_stm,
1176                        "trec %p found wasting its time", t);
1177             
1178             // strip the stack back to the
1179             // ATOMICALLY_FRAME, aborting the (nested)
1180             // transaction, and saving the stack of any
1181             // partially-evaluated thunks on the heap.
1182             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1183             
1184             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1185         }
1186     }
1187
1188   /* some statistics gathering in the parallel case */
1189 }
1190
1191 /* -----------------------------------------------------------------------------
1192  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1193  * -------------------------------------------------------------------------- */
1194
1195 static rtsBool
1196 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1197 {
1198     // did the task ask for a large block?
1199     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1200         // if so, get one and push it on the front of the nursery.
1201         bdescr *bd;
1202         lnat blocks;
1203         
1204         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1205         
1206         debugTrace(DEBUG_sched,
1207                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1208                    (long)t->id, whatNext_strs[t->what_next], blocks);
1209     
1210         // don't do this if the nursery is (nearly) full, we'll GC first.
1211         if (cap->r.rCurrentNursery->link != NULL ||
1212             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1213                                                // if the nursery has only one block.
1214             
1215             ACQUIRE_SM_LOCK
1216             bd = allocGroup( blocks );
1217             RELEASE_SM_LOCK
1218             cap->r.rNursery->n_blocks += blocks;
1219             
1220             // link the new group into the list
1221             bd->link = cap->r.rCurrentNursery;
1222             bd->u.back = cap->r.rCurrentNursery->u.back;
1223             if (cap->r.rCurrentNursery->u.back != NULL) {
1224                 cap->r.rCurrentNursery->u.back->link = bd;
1225             } else {
1226 #if !defined(THREADED_RTS)
1227                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1228                        g0s0 == cap->r.rNursery);
1229 #endif
1230                 cap->r.rNursery->blocks = bd;
1231             }             
1232             cap->r.rCurrentNursery->u.back = bd;
1233             
1234             // initialise it as a nursery block.  We initialise the
1235             // step, gen_no, and flags field of *every* sub-block in
1236             // this large block, because this is easier than making
1237             // sure that we always find the block head of a large
1238             // block whenever we call Bdescr() (eg. evacuate() and
1239             // isAlive() in the GC would both have to do this, at
1240             // least).
1241             { 
1242                 bdescr *x;
1243                 for (x = bd; x < bd + blocks; x++) {
1244                     x->step = cap->r.rNursery;
1245                     x->gen_no = 0;
1246                     x->flags = 0;
1247                 }
1248             }
1249             
1250             // This assert can be a killer if the app is doing lots
1251             // of large block allocations.
1252             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1253             
1254             // now update the nursery to point to the new block
1255             cap->r.rCurrentNursery = bd;
1256             
1257             // we might be unlucky and have another thread get on the
1258             // run queue before us and steal the large block, but in that
1259             // case the thread will just end up requesting another large
1260             // block.
1261             pushOnRunQueue(cap,t);
1262             return rtsFalse;  /* not actually GC'ing */
1263         }
1264     }
1265     
1266     debugTrace(DEBUG_sched,
1267                "--<< thread %ld (%s) stopped: HeapOverflow",
1268                (long)t->id, whatNext_strs[t->what_next]);
1269
1270     if (cap->context_switch) {
1271         // Sometimes we miss a context switch, e.g. when calling
1272         // primitives in a tight loop, MAYBE_GC() doesn't check the
1273         // context switch flag, and we end up waiting for a GC.
1274         // See #1984, and concurrent/should_run/1984
1275         cap->context_switch = 0;
1276         addToRunQueue(cap,t);
1277     } else {
1278         pushOnRunQueue(cap,t);
1279     }
1280     return rtsTrue;
1281     /* actual GC is done at the end of the while loop in schedule() */
1282 }
1283
1284 /* -----------------------------------------------------------------------------
1285  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1286  * -------------------------------------------------------------------------- */
1287
1288 static void
1289 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1290 {
1291     debugTrace (DEBUG_sched,
1292                 "--<< thread %ld (%s) stopped, StackOverflow", 
1293                 (long)t->id, whatNext_strs[t->what_next]);
1294
1295     /* just adjust the stack for this thread, then pop it back
1296      * on the run queue.
1297      */
1298     { 
1299         /* enlarge the stack */
1300         StgTSO *new_t = threadStackOverflow(cap, t);
1301         
1302         /* The TSO attached to this Task may have moved, so update the
1303          * pointer to it.
1304          */
1305         if (task->tso == t) {
1306             task->tso = new_t;
1307         }
1308         pushOnRunQueue(cap,new_t);
1309     }
1310 }
1311
1312 /* -----------------------------------------------------------------------------
1313  * Handle a thread that returned to the scheduler with ThreadYielding
1314  * -------------------------------------------------------------------------- */
1315
1316 static rtsBool
1317 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1318 {
1319     // Reset the context switch flag.  We don't do this just before
1320     // running the thread, because that would mean we would lose ticks
1321     // during GC, which can lead to unfair scheduling (a thread hogs
1322     // the CPU because the tick always arrives during GC).  This way
1323     // penalises threads that do a lot of allocation, but that seems
1324     // better than the alternative.
1325     cap->context_switch = 0;
1326     
1327     /* put the thread back on the run queue.  Then, if we're ready to
1328      * GC, check whether this is the last task to stop.  If so, wake
1329      * up the GC thread.  getThread will block during a GC until the
1330      * GC is finished.
1331      */
1332 #ifdef DEBUG
1333     if (t->what_next != prev_what_next) {
1334         debugTrace(DEBUG_sched,
1335                    "--<< thread %ld (%s) stopped to switch evaluators", 
1336                    (long)t->id, whatNext_strs[t->what_next]);
1337     } else {
1338         debugTrace(DEBUG_sched,
1339                    "--<< thread %ld (%s) stopped, yielding",
1340                    (long)t->id, whatNext_strs[t->what_next]);
1341     }
1342 #endif
1343     
1344     IF_DEBUG(sanity,
1345              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1346              checkTSO(t));
1347     ASSERT(t->_link == END_TSO_QUEUE);
1348     
1349     // Shortcut if we're just switching evaluators: don't bother
1350     // doing stack squeezing (which can be expensive), just run the
1351     // thread.
1352     if (t->what_next != prev_what_next) {
1353         return rtsTrue;
1354     }
1355
1356     addToRunQueue(cap,t);
1357
1358     return rtsFalse;
1359 }
1360
1361 /* -----------------------------------------------------------------------------
1362  * Handle a thread that returned to the scheduler with ThreadBlocked
1363  * -------------------------------------------------------------------------- */
1364
1365 static void
1366 scheduleHandleThreadBlocked( StgTSO *t
1367 #if !defined(GRAN) && !defined(DEBUG)
1368     STG_UNUSED
1369 #endif
1370     )
1371 {
1372
1373       // We don't need to do anything.  The thread is blocked, and it
1374       // has tidied up its stack and placed itself on whatever queue
1375       // it needs to be on.
1376
1377     // ASSERT(t->why_blocked != NotBlocked);
1378     // Not true: for example,
1379     //    - in THREADED_RTS, the thread may already have been woken
1380     //      up by another Capability.  This actually happens: try
1381     //      conc023 +RTS -N2.
1382     //    - the thread may have woken itself up already, because
1383     //      threadPaused() might have raised a blocked throwTo
1384     //      exception, see maybePerformBlockedException().
1385
1386 #ifdef DEBUG
1387     if (traceClass(DEBUG_sched)) {
1388         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1389                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1390         printThreadBlockage(t);
1391         debugTraceEnd();
1392     }
1393 #endif
1394 }
1395
1396 /* -----------------------------------------------------------------------------
1397  * Handle a thread that returned to the scheduler with ThreadFinished
1398  * -------------------------------------------------------------------------- */
1399
1400 static rtsBool
1401 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1402 {
1403     /* Need to check whether this was a main thread, and if so,
1404      * return with the return value.
1405      *
1406      * We also end up here if the thread kills itself with an
1407      * uncaught exception, see Exception.cmm.
1408      */
1409     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1410                (unsigned long)t->id, whatNext_strs[t->what_next]);
1411
1412     // blocked exceptions can now complete, even if the thread was in
1413     // blocked mode (see #2910).  This unconditionally calls
1414     // lockTSO(), which ensures that we don't miss any threads that
1415     // are engaged in throwTo() with this thread as a target.
1416     awakenBlockedExceptionQueue (cap, t);
1417
1418       //
1419       // Check whether the thread that just completed was a bound
1420       // thread, and if so return with the result.  
1421       //
1422       // There is an assumption here that all thread completion goes
1423       // through this point; we need to make sure that if a thread
1424       // ends up in the ThreadKilled state, that it stays on the run
1425       // queue so it can be dealt with here.
1426       //
1427
1428       if (t->bound) {
1429
1430           if (t->bound != task) {
1431 #if !defined(THREADED_RTS)
1432               // Must be a bound thread that is not the topmost one.  Leave
1433               // it on the run queue until the stack has unwound to the
1434               // point where we can deal with this.  Leaving it on the run
1435               // queue also ensures that the garbage collector knows about
1436               // this thread and its return value (it gets dropped from the
1437               // step->threads list so there's no other way to find it).
1438               appendToRunQueue(cap,t);
1439               return rtsFalse;
1440 #else
1441               // this cannot happen in the threaded RTS, because a
1442               // bound thread can only be run by the appropriate Task.
1443               barf("finished bound thread that isn't mine");
1444 #endif
1445           }
1446
1447           ASSERT(task->tso == t);
1448
1449           if (t->what_next == ThreadComplete) {
1450               if (task->ret) {
1451                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1452                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1453               }
1454               task->stat = Success;
1455           } else {
1456               if (task->ret) {
1457                   *(task->ret) = NULL;
1458               }
1459               if (sched_state >= SCHED_INTERRUPTING) {
1460                   if (heap_overflow) {
1461                       task->stat = HeapExhausted;
1462                   } else {
1463                       task->stat = Interrupted;
1464                   }
1465               } else {
1466                   task->stat = Killed;
1467               }
1468           }
1469 #ifdef DEBUG
1470           removeThreadLabel((StgWord)task->tso->id);
1471 #endif
1472           return rtsTrue; // tells schedule() to return
1473       }
1474
1475       return rtsFalse;
1476 }
1477
1478 /* -----------------------------------------------------------------------------
1479  * Perform a heap census
1480  * -------------------------------------------------------------------------- */
1481
1482 static rtsBool
1483 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1484 {
1485     // When we have +RTS -i0 and we're heap profiling, do a census at
1486     // every GC.  This lets us get repeatable runs for debugging.
1487     if (performHeapProfile ||
1488         (RtsFlags.ProfFlags.profileInterval==0 &&
1489          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1490         return rtsTrue;
1491     } else {
1492         return rtsFalse;
1493     }
1494 }
1495
1496 /* -----------------------------------------------------------------------------
1497  * Perform a garbage collection if necessary
1498  * -------------------------------------------------------------------------- */
1499
1500 static Capability *
1501 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1502 {
1503     rtsBool heap_census;
1504 #ifdef THREADED_RTS
1505     /* extern static volatile StgWord waiting_for_gc; 
1506        lives inside capability.c */
1507     rtsBool gc_type, prev_pending_gc;
1508     nat i;
1509 #endif
1510
1511     if (sched_state == SCHED_SHUTTING_DOWN) {
1512         // The final GC has already been done, and the system is
1513         // shutting down.  We'll probably deadlock if we try to GC
1514         // now.
1515         return cap;
1516     }
1517
1518 #ifdef THREADED_RTS
1519     if (sched_state < SCHED_INTERRUPTING
1520         && RtsFlags.ParFlags.parGcEnabled
1521         && N >= RtsFlags.ParFlags.parGcGen
1522         && ! oldest_gen->steps[0].mark)
1523     {
1524         gc_type = PENDING_GC_PAR;
1525     } else {
1526         gc_type = PENDING_GC_SEQ;
1527     }
1528
1529     // In order to GC, there must be no threads running Haskell code.
1530     // Therefore, the GC thread needs to hold *all* the capabilities,
1531     // and release them after the GC has completed.  
1532     //
1533     // This seems to be the simplest way: previous attempts involved
1534     // making all the threads with capabilities give up their
1535     // capabilities and sleep except for the *last* one, which
1536     // actually did the GC.  But it's quite hard to arrange for all
1537     // the other tasks to sleep and stay asleep.
1538     //
1539
1540     /*  Other capabilities are prevented from running yet more Haskell
1541         threads if waiting_for_gc is set. Tested inside
1542         yieldCapability() and releaseCapability() in Capability.c */
1543
1544     prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1545     if (prev_pending_gc) {
1546         do {
1547             debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
1548                        prev_pending_gc);
1549             ASSERT(cap);
1550             yieldCapability(&cap,task);
1551         } while (waiting_for_gc);
1552         return cap;  // NOTE: task->cap might have changed here
1553     }
1554
1555     setContextSwitches();
1556
1557     // The final shutdown GC is always single-threaded, because it's
1558     // possible that some of the Capabilities have no worker threads.
1559     
1560     if (gc_type == PENDING_GC_SEQ)
1561     {
1562         // single-threaded GC: grab all the capabilities
1563         for (i=0; i < n_capabilities; i++) {
1564             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1565             if (cap != &capabilities[i]) {
1566                 Capability *pcap = &capabilities[i];
1567                 // we better hope this task doesn't get migrated to
1568                 // another Capability while we're waiting for this one.
1569                 // It won't, because load balancing happens while we have
1570                 // all the Capabilities, but even so it's a slightly
1571                 // unsavoury invariant.
1572                 task->cap = pcap;
1573                 waitForReturnCapability(&pcap, task);
1574                 if (pcap != &capabilities[i]) {
1575                     barf("scheduleDoGC: got the wrong capability");
1576                 }
1577             }
1578         }
1579     }
1580     else
1581     {
1582         // multi-threaded GC: make sure all the Capabilities donate one
1583         // GC thread each.
1584         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1585
1586         waitForGcThreads(cap);
1587     }
1588 #endif
1589
1590     // so this happens periodically:
1591     if (cap) scheduleCheckBlackHoles(cap);
1592     
1593     IF_DEBUG(scheduler, printAllThreads());
1594
1595 delete_threads_and_gc:
1596     /*
1597      * We now have all the capabilities; if we're in an interrupting
1598      * state, then we should take the opportunity to delete all the
1599      * threads in the system.
1600      */
1601     if (sched_state == SCHED_INTERRUPTING) {
1602         deleteAllThreads(cap);
1603         sched_state = SCHED_SHUTTING_DOWN;
1604     }
1605     
1606     heap_census = scheduleNeedHeapProfile(rtsTrue);
1607
1608     if (recent_activity == ACTIVITY_INACTIVE && force_major)
1609     {
1610         // We are doing a GC because the system has been idle for a
1611         // timeslice and we need to check for deadlock.  Record the
1612         // fact that we've done a GC and turn off the timer signal;
1613         // it will get re-enabled if we run any threads after the GC.
1614         //
1615         // Note: this is done before GC, because after GC there might
1616         // be threads already running (GarbageCollect() releases the
1617         // GC threads when it completes), so we risk turning off the
1618         // timer signal when it should really be on.
1619         recent_activity = ACTIVITY_DONE_GC;
1620         stopTimer();
1621     }
1622
1623 #if defined(THREADED_RTS)
1624     debugTrace(DEBUG_sched, "doing GC");
1625     // reset waiting_for_gc *before* GC, so that when the GC threads
1626     // emerge they don't immediately re-enter the GC.
1627     waiting_for_gc = 0;
1628     GarbageCollect(force_major || heap_census, gc_type, cap);
1629 #else
1630     GarbageCollect(force_major || heap_census, 0, cap);
1631 #endif
1632
1633     if (heap_census) {
1634         debugTrace(DEBUG_sched, "performing heap census");
1635         heapCensus();
1636         performHeapProfile = rtsFalse;
1637     }
1638
1639     if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1640         // GC set the heap_overflow flag, so we should proceed with
1641         // an orderly shutdown now.  Ultimately we want the main
1642         // thread to return to its caller with HeapExhausted, at which
1643         // point the caller should call hs_exit().  The first step is
1644         // to delete all the threads.
1645         //
1646         // Another way to do this would be to raise an exception in
1647         // the main thread, which we really should do because it gives
1648         // the program a chance to clean up.  But how do we find the
1649         // main thread?  It should presumably be the same one that
1650         // gets ^C exceptions, but that's all done on the Haskell side
1651         // (GHC.TopHandler).
1652         sched_state = SCHED_INTERRUPTING;
1653         goto delete_threads_and_gc;
1654     }
1655
1656 #ifdef SPARKBALANCE
1657     /* JB 
1658        Once we are all together... this would be the place to balance all
1659        spark pools. No concurrent stealing or adding of new sparks can
1660        occur. Should be defined in Sparks.c. */
1661     balanceSparkPoolsCaps(n_capabilities, capabilities);
1662 #endif
1663
1664 #if defined(THREADED_RTS)
1665     if (gc_type == PENDING_GC_SEQ) {
1666         // release our stash of capabilities.
1667         for (i = 0; i < n_capabilities; i++) {
1668             if (cap != &capabilities[i]) {
1669                 task->cap = &capabilities[i];
1670                 releaseCapability(&capabilities[i]);
1671             }
1672         }
1673     }
1674     if (cap) {
1675         task->cap = cap;
1676     } else {
1677         task->cap = NULL;
1678     }
1679 #endif
1680
1681     return cap;
1682 }
1683
1684 /* ---------------------------------------------------------------------------
1685  * Singleton fork(). Do not copy any running threads.
1686  * ------------------------------------------------------------------------- */
1687
1688 pid_t
1689 forkProcess(HsStablePtr *entry
1690 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1691             STG_UNUSED
1692 #endif
1693            )
1694 {
1695 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1696     Task *task;
1697     pid_t pid;
1698     StgTSO* t,*next;
1699     Capability *cap;
1700     nat s;
1701     
1702 #if defined(THREADED_RTS)
1703     if (RtsFlags.ParFlags.nNodes > 1) {
1704         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1705         stg_exit(EXIT_FAILURE);
1706     }
1707 #endif
1708
1709     debugTrace(DEBUG_sched, "forking!");
1710     
1711     // ToDo: for SMP, we should probably acquire *all* the capabilities
1712     cap = rts_lock();
1713     
1714     // no funny business: hold locks while we fork, otherwise if some
1715     // other thread is holding a lock when the fork happens, the data
1716     // structure protected by the lock will forever be in an
1717     // inconsistent state in the child.  See also #1391.
1718     ACQUIRE_LOCK(&sched_mutex);
1719     ACQUIRE_LOCK(&cap->lock);
1720     ACQUIRE_LOCK(&cap->running_task->lock);
1721
1722     pid = fork();
1723     
1724     if (pid) { // parent
1725         
1726         RELEASE_LOCK(&sched_mutex);
1727         RELEASE_LOCK(&cap->lock);
1728         RELEASE_LOCK(&cap->running_task->lock);
1729
1730         // just return the pid
1731         rts_unlock(cap);
1732         return pid;
1733         
1734     } else { // child
1735         
1736 #if defined(THREADED_RTS)
1737         initMutex(&sched_mutex);
1738         initMutex(&cap->lock);
1739         initMutex(&cap->running_task->lock);
1740 #endif
1741
1742         // Now, all OS threads except the thread that forked are
1743         // stopped.  We need to stop all Haskell threads, including
1744         // those involved in foreign calls.  Also we need to delete
1745         // all Tasks, because they correspond to OS threads that are
1746         // now gone.
1747
1748         for (s = 0; s < total_steps; s++) {
1749           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1750             if (t->what_next == ThreadRelocated) {
1751                 next = t->_link;
1752             } else {
1753                 next = t->global_link;
1754                 // don't allow threads to catch the ThreadKilled
1755                 // exception, but we do want to raiseAsync() because these
1756                 // threads may be evaluating thunks that we need later.
1757                 deleteThread_(cap,t);
1758             }
1759           }
1760         }
1761         
1762         // Empty the run queue.  It seems tempting to let all the
1763         // killed threads stay on the run queue as zombies to be
1764         // cleaned up later, but some of them correspond to bound
1765         // threads for which the corresponding Task does not exist.
1766         cap->run_queue_hd = END_TSO_QUEUE;
1767         cap->run_queue_tl = END_TSO_QUEUE;
1768
1769         // Any suspended C-calling Tasks are no more, their OS threads
1770         // don't exist now:
1771         cap->suspended_ccalling_tasks = NULL;
1772
1773         // Empty the threads lists.  Otherwise, the garbage
1774         // collector may attempt to resurrect some of these threads.
1775         for (s = 0; s < total_steps; s++) {
1776             all_steps[s].threads = END_TSO_QUEUE;
1777         }
1778
1779         // Wipe the task list, except the current Task.
1780         ACQUIRE_LOCK(&sched_mutex);
1781         for (task = all_tasks; task != NULL; task=task->all_link) {
1782             if (task != cap->running_task) {
1783 #if defined(THREADED_RTS)
1784                 initMutex(&task->lock); // see #1391
1785 #endif
1786                 discardTask(task);
1787             }
1788         }
1789         RELEASE_LOCK(&sched_mutex);
1790
1791 #if defined(THREADED_RTS)
1792         // Wipe our spare workers list, they no longer exist.  New
1793         // workers will be created if necessary.
1794         cap->spare_workers = NULL;
1795         cap->returning_tasks_hd = NULL;
1796         cap->returning_tasks_tl = NULL;
1797 #endif
1798
1799         // On Unix, all timers are reset in the child, so we need to start
1800         // the timer again.
1801         initTimer();
1802         startTimer();
1803
1804         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1805         rts_checkSchedStatus("forkProcess",cap);
1806         
1807         rts_unlock(cap);
1808         hs_exit();                      // clean up and exit
1809         stg_exit(EXIT_SUCCESS);
1810     }
1811 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1812     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1813     return -1;
1814 #endif
1815 }
1816
1817 /* ---------------------------------------------------------------------------
1818  * Delete all the threads in the system
1819  * ------------------------------------------------------------------------- */
1820    
1821 static void
1822 deleteAllThreads ( Capability *cap )
1823 {
1824     // NOTE: only safe to call if we own all capabilities.
1825
1826     StgTSO* t, *next;
1827     nat s;
1828
1829     debugTrace(DEBUG_sched,"deleting all threads");
1830     for (s = 0; s < total_steps; s++) {
1831       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1832         if (t->what_next == ThreadRelocated) {
1833             next = t->_link;
1834         } else {
1835             next = t->global_link;
1836             deleteThread(cap,t);
1837         }
1838       }
1839     }      
1840
1841     // The run queue now contains a bunch of ThreadKilled threads.  We
1842     // must not throw these away: the main thread(s) will be in there
1843     // somewhere, and the main scheduler loop has to deal with it.
1844     // Also, the run queue is the only thing keeping these threads from
1845     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1846
1847 #if !defined(THREADED_RTS)
1848     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1849     ASSERT(sleeping_queue == END_TSO_QUEUE);
1850 #endif
1851 }
1852
1853 /* -----------------------------------------------------------------------------
1854    Managing the suspended_ccalling_tasks list.
1855    Locks required: sched_mutex
1856    -------------------------------------------------------------------------- */
1857
1858 STATIC_INLINE void
1859 suspendTask (Capability *cap, Task *task)
1860 {
1861     ASSERT(task->next == NULL && task->prev == NULL);
1862     task->next = cap->suspended_ccalling_tasks;
1863     task->prev = NULL;
1864     if (cap->suspended_ccalling_tasks) {
1865         cap->suspended_ccalling_tasks->prev = task;
1866     }
1867     cap->suspended_ccalling_tasks = task;
1868 }
1869
1870 STATIC_INLINE void
1871 recoverSuspendedTask (Capability *cap, Task *task)
1872 {
1873     if (task->prev) {
1874         task->prev->next = task->next;
1875     } else {
1876         ASSERT(cap->suspended_ccalling_tasks == task);
1877         cap->suspended_ccalling_tasks = task->next;
1878     }
1879     if (task->next) {
1880         task->next->prev = task->prev;
1881     }
1882     task->next = task->prev = NULL;
1883 }
1884
1885 /* ---------------------------------------------------------------------------
1886  * Suspending & resuming Haskell threads.
1887  * 
1888  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1889  * its capability before calling the C function.  This allows another
1890  * task to pick up the capability and carry on running Haskell
1891  * threads.  It also means that if the C call blocks, it won't lock
1892  * the whole system.
1893  *
1894  * The Haskell thread making the C call is put to sleep for the
1895  * duration of the call, on the susepended_ccalling_threads queue.  We
1896  * give out a token to the task, which it can use to resume the thread
1897  * on return from the C function.
1898  * ------------------------------------------------------------------------- */
1899    
1900 void *
1901 suspendThread (StgRegTable *reg)
1902 {
1903   Capability *cap;
1904   int saved_errno;
1905   StgTSO *tso;
1906   Task *task;
1907 #if mingw32_HOST_OS
1908   StgWord32 saved_winerror;
1909 #endif
1910
1911   saved_errno = errno;
1912 #if mingw32_HOST_OS
1913   saved_winerror = GetLastError();
1914 #endif
1915
1916   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1917    */
1918   cap = regTableToCapability(reg);
1919
1920   task = cap->running_task;
1921   tso = cap->r.rCurrentTSO;
1922
1923   debugTrace(DEBUG_sched, 
1924              "thread %lu did a safe foreign call", 
1925              (unsigned long)cap->r.rCurrentTSO->id);
1926
1927   // XXX this might not be necessary --SDM
1928   tso->what_next = ThreadRunGHC;
1929
1930   threadPaused(cap,tso);
1931
1932   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1933       tso->why_blocked = BlockedOnCCall;
1934       tso->flags |= TSO_BLOCKEX;
1935       tso->flags &= ~TSO_INTERRUPTIBLE;
1936   } else {
1937       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1938   }
1939
1940   // Hand back capability
1941   task->suspended_tso = tso;
1942
1943   ACQUIRE_LOCK(&cap->lock);
1944
1945   suspendTask(cap,task);
1946   cap->in_haskell = rtsFalse;
1947   releaseCapability_(cap,rtsFalse);
1948   
1949   RELEASE_LOCK(&cap->lock);
1950
1951 #if defined(THREADED_RTS)
1952   /* Preparing to leave the RTS, so ensure there's a native thread/task
1953      waiting to take over.
1954   */
1955   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1956 #endif
1957
1958   errno = saved_errno;
1959 #if mingw32_HOST_OS
1960   SetLastError(saved_winerror);
1961 #endif
1962   return task;
1963 }
1964
1965 StgRegTable *
1966 resumeThread (void *task_)
1967 {
1968     StgTSO *tso;
1969     Capability *cap;
1970     Task *task = task_;
1971     int saved_errno;
1972 #if mingw32_HOST_OS
1973     StgWord32 saved_winerror;
1974 #endif
1975
1976     saved_errno = errno;
1977 #if mingw32_HOST_OS
1978     saved_winerror = GetLastError();
1979 #endif
1980
1981     cap = task->cap;
1982     // Wait for permission to re-enter the RTS with the result.
1983     waitForReturnCapability(&cap,task);
1984     // we might be on a different capability now... but if so, our
1985     // entry on the suspended_ccalling_tasks list will also have been
1986     // migrated.
1987
1988     // Remove the thread from the suspended list
1989     recoverSuspendedTask(cap,task);
1990
1991     tso = task->suspended_tso;
1992     task->suspended_tso = NULL;
1993     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1994     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1995     
1996     if (tso->why_blocked == BlockedOnCCall) {
1997         // avoid locking the TSO if we don't have to
1998         if (tso->blocked_exceptions != END_TSO_QUEUE) {
1999             awakenBlockedExceptionQueue(cap,tso);
2000         }
2001         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2002     }
2003     
2004     /* Reset blocking status */
2005     tso->why_blocked  = NotBlocked;
2006     
2007     cap->r.rCurrentTSO = tso;
2008     cap->in_haskell = rtsTrue;
2009     errno = saved_errno;
2010 #if mingw32_HOST_OS
2011     SetLastError(saved_winerror);
2012 #endif
2013
2014     /* We might have GC'd, mark the TSO dirty again */
2015     dirty_TSO(cap,tso);
2016
2017     IF_DEBUG(sanity, checkTSO(tso));
2018
2019     return &cap->r;
2020 }
2021
2022 /* ---------------------------------------------------------------------------
2023  * scheduleThread()
2024  *
2025  * scheduleThread puts a thread on the end  of the runnable queue.
2026  * This will usually be done immediately after a thread is created.
2027  * The caller of scheduleThread must create the thread using e.g.
2028  * createThread and push an appropriate closure
2029  * on this thread's stack before the scheduler is invoked.
2030  * ------------------------------------------------------------------------ */
2031
2032 void
2033 scheduleThread(Capability *cap, StgTSO *tso)
2034 {
2035     // The thread goes at the *end* of the run-queue, to avoid possible
2036     // starvation of any threads already on the queue.
2037     appendToRunQueue(cap,tso);
2038 }
2039
2040 void
2041 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2042 {
2043 #if defined(THREADED_RTS)
2044     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2045                               // move this thread from now on.
2046     cpu %= RtsFlags.ParFlags.nNodes;
2047     if (cpu == cap->no) {
2048         appendToRunQueue(cap,tso);
2049     } else {
2050         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
2051     }
2052 #else
2053     appendToRunQueue(cap,tso);
2054 #endif
2055 }
2056
2057 Capability *
2058 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2059 {
2060     Task *task;
2061
2062     // We already created/initialised the Task
2063     task = cap->running_task;
2064
2065     // This TSO is now a bound thread; make the Task and TSO
2066     // point to each other.
2067     tso->bound = task;
2068     tso->cap = cap;
2069
2070     task->tso = tso;
2071     task->ret = ret;
2072     task->stat = NoStatus;
2073
2074     appendToRunQueue(cap,tso);
2075
2076     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2077
2078     cap = schedule(cap,task);
2079
2080     ASSERT(task->stat != NoStatus);
2081     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2082
2083     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2084     return cap;
2085 }
2086
2087 /* ----------------------------------------------------------------------------
2088  * Starting Tasks
2089  * ------------------------------------------------------------------------- */
2090
2091 #if defined(THREADED_RTS)
2092 void OSThreadProcAttr
2093 workerStart(Task *task)
2094 {
2095     Capability *cap;
2096
2097     // See startWorkerTask().
2098     ACQUIRE_LOCK(&task->lock);
2099     cap = task->cap;
2100     RELEASE_LOCK(&task->lock);
2101
2102     // set the thread-local pointer to the Task:
2103     taskEnter(task);
2104
2105     // schedule() runs without a lock.
2106     cap = schedule(cap,task);
2107
2108     // On exit from schedule(), we have a Capability, but possibly not
2109     // the same one we started with.
2110
2111     // During shutdown, the requirement is that after all the
2112     // Capabilities are shut down, all workers that are shutting down
2113     // have finished workerTaskStop().  This is why we hold on to
2114     // cap->lock until we've finished workerTaskStop() below.
2115     //
2116     // There may be workers still involved in foreign calls; those
2117     // will just block in waitForReturnCapability() because the
2118     // Capability has been shut down.
2119     //
2120     ACQUIRE_LOCK(&cap->lock);
2121     releaseCapability_(cap,rtsFalse);
2122     workerTaskStop(task);
2123     RELEASE_LOCK(&cap->lock);
2124 }
2125 #endif
2126
2127 /* ---------------------------------------------------------------------------
2128  * initScheduler()
2129  *
2130  * Initialise the scheduler.  This resets all the queues - if the
2131  * queues contained any threads, they'll be garbage collected at the
2132  * next pass.
2133  *
2134  * ------------------------------------------------------------------------ */
2135
2136 void 
2137 initScheduler(void)
2138 {
2139 #if !defined(THREADED_RTS)
2140   blocked_queue_hd  = END_TSO_QUEUE;
2141   blocked_queue_tl  = END_TSO_QUEUE;
2142   sleeping_queue    = END_TSO_QUEUE;
2143 #endif
2144
2145   blackhole_queue   = END_TSO_QUEUE;
2146
2147   sched_state    = SCHED_RUNNING;
2148   recent_activity = ACTIVITY_YES;
2149
2150 #if defined(THREADED_RTS)
2151   /* Initialise the mutex and condition variables used by
2152    * the scheduler. */
2153   initMutex(&sched_mutex);
2154 #endif
2155   
2156   ACQUIRE_LOCK(&sched_mutex);
2157
2158   /* A capability holds the state a native thread needs in
2159    * order to execute STG code. At least one capability is
2160    * floating around (only THREADED_RTS builds have more than one).
2161    */
2162   initCapabilities();
2163
2164   initTaskManager();
2165
2166 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2167   initSparkPools();
2168 #endif
2169
2170 #if defined(THREADED_RTS)
2171   /*
2172    * Eagerly start one worker to run each Capability, except for
2173    * Capability 0.  The idea is that we're probably going to start a
2174    * bound thread on Capability 0 pretty soon, so we don't want a
2175    * worker task hogging it.
2176    */
2177   { 
2178       nat i;
2179       Capability *cap;
2180       for (i = 1; i < n_capabilities; i++) {
2181           cap = &capabilities[i];
2182           ACQUIRE_LOCK(&cap->lock);
2183           startWorkerTask(cap, workerStart);
2184           RELEASE_LOCK(&cap->lock);
2185       }
2186   }
2187 #endif
2188
2189   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2190
2191   RELEASE_LOCK(&sched_mutex);
2192 }
2193
2194 void
2195 exitScheduler(
2196     rtsBool wait_foreign
2197 #if !defined(THREADED_RTS)
2198                          __attribute__((unused))
2199 #endif
2200 )
2201                /* see Capability.c, shutdownCapability() */
2202 {
2203     Task *task = NULL;
2204
2205 #if defined(THREADED_RTS)
2206     ACQUIRE_LOCK(&sched_mutex);
2207     task = newBoundTask();
2208     RELEASE_LOCK(&sched_mutex);
2209 #endif
2210
2211     // If we haven't killed all the threads yet, do it now.
2212     if (sched_state < SCHED_SHUTTING_DOWN) {
2213         sched_state = SCHED_INTERRUPTING;
2214 #if defined(THREADED_RTS)
2215         waitForReturnCapability(&task->cap,task);
2216         scheduleDoGC(task->cap,task,rtsFalse);    
2217         releaseCapability(task->cap);
2218 #else
2219         scheduleDoGC(&MainCapability,task,rtsFalse);    
2220 #endif
2221     }
2222     sched_state = SCHED_SHUTTING_DOWN;
2223
2224 #if defined(THREADED_RTS)
2225     { 
2226         nat i;
2227         
2228         for (i = 0; i < n_capabilities; i++) {
2229             shutdownCapability(&capabilities[i], task, wait_foreign);
2230         }
2231         boundTaskExiting(task);
2232     }
2233 #endif
2234 }
2235
2236 void
2237 freeScheduler( void )
2238 {
2239     nat still_running;
2240
2241     ACQUIRE_LOCK(&sched_mutex);
2242     still_running = freeTaskManager();
2243     // We can only free the Capabilities if there are no Tasks still
2244     // running.  We might have a Task about to return from a foreign
2245     // call into waitForReturnCapability(), for example (actually,
2246     // this should be the *only* thing that a still-running Task can
2247     // do at this point, and it will block waiting for the
2248     // Capability).
2249     if (still_running == 0) {
2250         freeCapabilities();
2251         if (n_capabilities != 1) {
2252             stgFree(capabilities);
2253         }
2254     }
2255     RELEASE_LOCK(&sched_mutex);
2256 #if defined(THREADED_RTS)
2257     closeMutex(&sched_mutex);
2258 #endif
2259 }
2260
2261 /* -----------------------------------------------------------------------------
2262    performGC
2263
2264    This is the interface to the garbage collector from Haskell land.
2265    We provide this so that external C code can allocate and garbage
2266    collect when called from Haskell via _ccall_GC.
2267    -------------------------------------------------------------------------- */
2268
2269 static void
2270 performGC_(rtsBool force_major)
2271 {
2272     Task *task;
2273
2274     // We must grab a new Task here, because the existing Task may be
2275     // associated with a particular Capability, and chained onto the 
2276     // suspended_ccalling_tasks queue.
2277     ACQUIRE_LOCK(&sched_mutex);
2278     task = newBoundTask();
2279     RELEASE_LOCK(&sched_mutex);
2280
2281     waitForReturnCapability(&task->cap,task);
2282     scheduleDoGC(task->cap,task,force_major);
2283     releaseCapability(task->cap);
2284     boundTaskExiting(task);
2285 }
2286
2287 void
2288 performGC(void)
2289 {
2290     performGC_(rtsFalse);
2291 }
2292
2293 void
2294 performMajorGC(void)
2295 {
2296     performGC_(rtsTrue);
2297 }
2298
2299 /* -----------------------------------------------------------------------------
2300    Stack overflow
2301
2302    If the thread has reached its maximum stack size, then raise the
2303    StackOverflow exception in the offending thread.  Otherwise
2304    relocate the TSO into a larger chunk of memory and adjust its stack
2305    size appropriately.
2306    -------------------------------------------------------------------------- */
2307
2308 static StgTSO *
2309 threadStackOverflow(Capability *cap, StgTSO *tso)
2310 {
2311   nat new_stack_size, stack_words;
2312   lnat new_tso_size;
2313   StgPtr new_sp;
2314   StgTSO *dest;
2315
2316   IF_DEBUG(sanity,checkTSO(tso));
2317
2318   // don't allow throwTo() to modify the blocked_exceptions queue
2319   // while we are moving the TSO:
2320   lockClosure((StgClosure *)tso);
2321
2322   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2323       // NB. never raise a StackOverflow exception if the thread is
2324       // inside Control.Exceptino.block.  It is impractical to protect
2325       // against stack overflow exceptions, since virtually anything
2326       // can raise one (even 'catch'), so this is the only sensible
2327       // thing to do here.  See bug #767.
2328
2329       debugTrace(DEBUG_gc,
2330                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2331                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2332       IF_DEBUG(gc,
2333                /* If we're debugging, just print out the top of the stack */
2334                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2335                                                 tso->sp+64)));
2336
2337       // Send this thread the StackOverflow exception
2338       unlockTSO(tso);
2339       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2340       return tso;
2341   }
2342
2343   /* Try to double the current stack size.  If that takes us over the
2344    * maximum stack size for this thread, then use the maximum instead
2345    * (that is, unless we're already at or over the max size and we
2346    * can't raise the StackOverflow exception (see above), in which
2347    * case just double the size). Finally round up so the TSO ends up as
2348    * a whole number of blocks.
2349    */
2350   if (tso->stack_size >= tso->max_stack_size) {
2351       new_stack_size = tso->stack_size * 2;
2352   } else { 
2353       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2354   }
2355   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2356                                        TSO_STRUCT_SIZE)/sizeof(W_);
2357   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2358   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2359
2360   debugTrace(DEBUG_sched, 
2361              "increasing stack size from %ld words to %d.",
2362              (long)tso->stack_size, new_stack_size);
2363
2364   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2365   TICK_ALLOC_TSO(new_stack_size,0);
2366
2367   /* copy the TSO block and the old stack into the new area */
2368   memcpy(dest,tso,TSO_STRUCT_SIZE);
2369   stack_words = tso->stack + tso->stack_size - tso->sp;
2370   new_sp = (P_)dest + new_tso_size - stack_words;
2371   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2372
2373   /* relocate the stack pointers... */
2374   dest->sp         = new_sp;
2375   dest->stack_size = new_stack_size;
2376         
2377   /* Mark the old TSO as relocated.  We have to check for relocated
2378    * TSOs in the garbage collector and any primops that deal with TSOs.
2379    *
2380    * It's important to set the sp value to just beyond the end
2381    * of the stack, so we don't attempt to scavenge any part of the
2382    * dead TSO's stack.
2383    */
2384   tso->what_next = ThreadRelocated;
2385   setTSOLink(cap,tso,dest);
2386   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2387   tso->why_blocked = NotBlocked;
2388
2389   IF_PAR_DEBUG(verbose,
2390                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2391                      tso->id, tso, tso->stack_size);
2392                /* If we're debugging, just print out the top of the stack */
2393                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2394                                                 tso->sp+64)));
2395   
2396   unlockTSO(dest);
2397   unlockTSO(tso);
2398
2399   IF_DEBUG(sanity,checkTSO(dest));
2400 #if 0
2401   IF_DEBUG(scheduler,printTSO(dest));
2402 #endif
2403
2404   return dest;
2405 }
2406
2407 static StgTSO *
2408 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2409 {
2410     bdescr *bd, *new_bd;
2411     lnat free_w, tso_size_w;
2412     StgTSO *new_tso;
2413
2414     tso_size_w = tso_sizeW(tso);
2415
2416     if (tso_size_w < MBLOCK_SIZE_W || 
2417         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2418     {
2419         return tso;
2420     }
2421
2422     // don't allow throwTo() to modify the blocked_exceptions queue
2423     // while we are moving the TSO:
2424     lockClosure((StgClosure *)tso);
2425
2426     // this is the number of words we'll free
2427     free_w = round_to_mblocks(tso_size_w/2);
2428
2429     bd = Bdescr((StgPtr)tso);
2430     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2431     bd->free = bd->start + TSO_STRUCT_SIZEW;
2432
2433     new_tso = (StgTSO *)new_bd->start;
2434     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2435     new_tso->stack_size = new_bd->free - new_tso->stack;
2436
2437     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2438                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2439
2440     tso->what_next = ThreadRelocated;
2441     tso->_link = new_tso; // no write barrier reqd: same generation
2442
2443     // The TSO attached to this Task may have moved, so update the
2444     // pointer to it.
2445     if (task->tso == tso) {
2446         task->tso = new_tso;
2447     }
2448
2449     unlockTSO(new_tso);
2450     unlockTSO(tso);
2451
2452     IF_DEBUG(sanity,checkTSO(new_tso));
2453
2454     return new_tso;
2455 }
2456
2457 /* ---------------------------------------------------------------------------
2458    Interrupt execution
2459    - usually called inside a signal handler so it mustn't do anything fancy.   
2460    ------------------------------------------------------------------------ */
2461
2462 void
2463 interruptStgRts(void)
2464 {
2465     sched_state = SCHED_INTERRUPTING;
2466     setContextSwitches();
2467     wakeUpRts();
2468 }
2469
2470 /* -----------------------------------------------------------------------------
2471    Wake up the RTS
2472    
2473    This function causes at least one OS thread to wake up and run the
2474    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2475    an external event has arrived that may need servicing (eg. a
2476    keyboard interrupt).
2477
2478    In the single-threaded RTS we don't do anything here; we only have
2479    one thread anyway, and the event that caused us to want to wake up
2480    will have interrupted any blocking system call in progress anyway.
2481    -------------------------------------------------------------------------- */
2482
2483 void
2484 wakeUpRts(void)
2485 {
2486 #if defined(THREADED_RTS)
2487     // This forces the IO Manager thread to wakeup, which will
2488     // in turn ensure that some OS thread wakes up and runs the
2489     // scheduler loop, which will cause a GC and deadlock check.
2490     ioManagerWakeup();
2491 #endif
2492 }
2493
2494 /* -----------------------------------------------------------------------------
2495  * checkBlackHoles()
2496  *
2497  * Check the blackhole_queue for threads that can be woken up.  We do
2498  * this periodically: before every GC, and whenever the run queue is
2499  * empty.
2500  *
2501  * An elegant solution might be to just wake up all the blocked
2502  * threads with awakenBlockedQueue occasionally: they'll go back to
2503  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2504  * doesn't give us a way to tell whether we've actually managed to
2505  * wake up any threads, so we would be busy-waiting.
2506  *
2507  * -------------------------------------------------------------------------- */
2508
2509 static rtsBool
2510 checkBlackHoles (Capability *cap)
2511 {
2512     StgTSO **prev, *t;
2513     rtsBool any_woke_up = rtsFalse;
2514     StgHalfWord type;
2515
2516     // blackhole_queue is global:
2517     ASSERT_LOCK_HELD(&sched_mutex);
2518
2519     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2520
2521     // ASSUMES: sched_mutex
2522     prev = &blackhole_queue;
2523     t = blackhole_queue;
2524     while (t != END_TSO_QUEUE) {
2525         ASSERT(t->why_blocked == BlockedOnBlackHole);
2526         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2527         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2528             IF_DEBUG(sanity,checkTSO(t));
2529             t = unblockOne(cap, t);
2530             *prev = t;
2531             any_woke_up = rtsTrue;
2532         } else {
2533             prev = &t->_link;
2534             t = t->_link;
2535         }
2536     }
2537
2538     return any_woke_up;
2539 }
2540
2541 /* -----------------------------------------------------------------------------
2542    Deleting threads
2543
2544    This is used for interruption (^C) and forking, and corresponds to
2545    raising an exception but without letting the thread catch the
2546    exception.
2547    -------------------------------------------------------------------------- */
2548
2549 static void 
2550 deleteThread (Capability *cap, StgTSO *tso)
2551 {
2552     // NOTE: must only be called on a TSO that we have exclusive
2553     // access to, because we will call throwToSingleThreaded() below.
2554     // The TSO must be on the run queue of the Capability we own, or 
2555     // we must own all Capabilities.
2556
2557     if (tso->why_blocked != BlockedOnCCall &&
2558         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2559         throwToSingleThreaded(cap,tso,NULL);
2560     }
2561 }
2562
2563 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2564 static void 
2565 deleteThread_(Capability *cap, StgTSO *tso)
2566 { // for forkProcess only:
2567   // like deleteThread(), but we delete threads in foreign calls, too.
2568
2569     if (tso->why_blocked == BlockedOnCCall ||
2570         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2571         unblockOne(cap,tso);
2572         tso->what_next = ThreadKilled;
2573     } else {
2574         deleteThread(cap,tso);
2575     }
2576 }
2577 #endif
2578
2579 /* -----------------------------------------------------------------------------
2580    raiseExceptionHelper
2581    
2582    This function is called by the raise# primitve, just so that we can
2583    move some of the tricky bits of raising an exception from C-- into
2584    C.  Who knows, it might be a useful re-useable thing here too.
2585    -------------------------------------------------------------------------- */
2586
2587 StgWord
2588 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2589 {
2590     Capability *cap = regTableToCapability(reg);
2591     StgThunk *raise_closure = NULL;
2592     StgPtr p, next;
2593     StgRetInfoTable *info;
2594     //
2595     // This closure represents the expression 'raise# E' where E
2596     // is the exception raise.  It is used to overwrite all the
2597     // thunks which are currently under evaluataion.
2598     //
2599
2600     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2601     // LDV profiling: stg_raise_info has THUNK as its closure
2602     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2603     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2604     // 1 does not cause any problem unless profiling is performed.
2605     // However, when LDV profiling goes on, we need to linearly scan
2606     // small object pool, where raise_closure is stored, so we should
2607     // use MIN_UPD_SIZE.
2608     //
2609     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2610     //                                 sizeofW(StgClosure)+1);
2611     //
2612
2613     //
2614     // Walk up the stack, looking for the catch frame.  On the way,
2615     // we update any closures pointed to from update frames with the
2616     // raise closure that we just built.
2617     //
2618     p = tso->sp;
2619     while(1) {
2620         info = get_ret_itbl((StgClosure *)p);
2621         next = p + stack_frame_sizeW((StgClosure *)p);
2622         switch (info->i.type) {
2623             
2624         case UPDATE_FRAME:
2625             // Only create raise_closure if we need to.
2626             if (raise_closure == NULL) {
2627                 raise_closure = 
2628                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2629                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2630                 raise_closure->payload[0] = exception;
2631             }
2632             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2633             p = next;
2634             continue;
2635
2636         case ATOMICALLY_FRAME:
2637             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2638             tso->sp = p;
2639             return ATOMICALLY_FRAME;
2640             
2641         case CATCH_FRAME:
2642             tso->sp = p;
2643             return CATCH_FRAME;
2644
2645         case CATCH_STM_FRAME:
2646             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2647             tso->sp = p;
2648             return CATCH_STM_FRAME;
2649             
2650         case STOP_FRAME:
2651             tso->sp = p;
2652             return STOP_FRAME;
2653
2654         case CATCH_RETRY_FRAME:
2655         default:
2656             p = next; 
2657             continue;
2658         }
2659     }
2660 }
2661
2662
2663 /* -----------------------------------------------------------------------------
2664    findRetryFrameHelper
2665
2666    This function is called by the retry# primitive.  It traverses the stack
2667    leaving tso->sp referring to the frame which should handle the retry.  
2668
2669    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2670    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2671
2672    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2673    create) because retries are not considered to be exceptions, despite the
2674    similar implementation.
2675
2676    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2677    not be created within memory transactions.
2678    -------------------------------------------------------------------------- */
2679
2680 StgWord
2681 findRetryFrameHelper (StgTSO *tso)
2682 {
2683   StgPtr           p, next;
2684   StgRetInfoTable *info;
2685
2686   p = tso -> sp;
2687   while (1) {
2688     info = get_ret_itbl((StgClosure *)p);
2689     next = p + stack_frame_sizeW((StgClosure *)p);
2690     switch (info->i.type) {
2691       
2692     case ATOMICALLY_FRAME:
2693         debugTrace(DEBUG_stm,
2694                    "found ATOMICALLY_FRAME at %p during retry", p);
2695         tso->sp = p;
2696         return ATOMICALLY_FRAME;
2697       
2698     case CATCH_RETRY_FRAME:
2699         debugTrace(DEBUG_stm,
2700                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2701         tso->sp = p;
2702         return CATCH_RETRY_FRAME;
2703       
2704     case CATCH_STM_FRAME: {
2705         StgTRecHeader *trec = tso -> trec;
2706         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2707         debugTrace(DEBUG_stm,
2708                    "found CATCH_STM_FRAME at %p during retry", p);
2709         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2710         stmAbortTransaction(tso -> cap, trec);
2711         stmFreeAbortedTRec(tso -> cap, trec);
2712         tso -> trec = outer;
2713         p = next; 
2714         continue;
2715     }
2716       
2717
2718     default:
2719       ASSERT(info->i.type != CATCH_FRAME);
2720       ASSERT(info->i.type != STOP_FRAME);
2721       p = next; 
2722       continue;
2723     }
2724   }
2725 }
2726
2727 /* -----------------------------------------------------------------------------
2728    resurrectThreads is called after garbage collection on the list of
2729    threads found to be garbage.  Each of these threads will be woken
2730    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2731    on an MVar, or NonTermination if the thread was blocked on a Black
2732    Hole.
2733
2734    Locks: assumes we hold *all* the capabilities.
2735    -------------------------------------------------------------------------- */
2736
2737 void
2738 resurrectThreads (StgTSO *threads)
2739 {
2740     StgTSO *tso, *next;
2741     Capability *cap;
2742     step *step;
2743
2744     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2745         next = tso->global_link;
2746
2747         step = Bdescr((P_)tso)->step;
2748         tso->global_link = step->threads;
2749         step->threads = tso;
2750
2751         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2752         
2753         // Wake up the thread on the Capability it was last on
2754         cap = tso->cap;
2755         
2756         switch (tso->why_blocked) {
2757         case BlockedOnMVar:
2758         case BlockedOnException:
2759             /* Called by GC - sched_mutex lock is currently held. */
2760             throwToSingleThreaded(cap, tso,
2761                                   (StgClosure *)blockedOnDeadMVar_closure);
2762             break;
2763         case BlockedOnBlackHole:
2764             throwToSingleThreaded(cap, tso,
2765                                   (StgClosure *)nonTermination_closure);
2766             break;
2767         case BlockedOnSTM:
2768             throwToSingleThreaded(cap, tso,
2769                                   (StgClosure *)blockedIndefinitely_closure);
2770             break;
2771         case NotBlocked:
2772             /* This might happen if the thread was blocked on a black hole
2773              * belonging to a thread that we've just woken up (raiseAsync
2774              * can wake up threads, remember...).
2775              */
2776             continue;
2777         default:
2778             barf("resurrectThreads: thread blocked in a strange way");
2779         }
2780     }
2781 }
2782
2783 /* -----------------------------------------------------------------------------
2784    performPendingThrowTos is called after garbage collection, and
2785    passed a list of threads that were found to have pending throwTos
2786    (tso->blocked_exceptions was not empty), and were blocked.
2787    Normally this doesn't happen, because we would deliver the
2788    exception directly if the target thread is blocked, but there are
2789    small windows where it might occur on a multiprocessor (see
2790    throwTo()).
2791
2792    NB. we must be holding all the capabilities at this point, just
2793    like resurrectThreads().
2794    -------------------------------------------------------------------------- */
2795
2796 void
2797 performPendingThrowTos (StgTSO *threads)
2798 {
2799     StgTSO *tso, *next;
2800     Capability *cap;
2801     step *step;
2802
2803     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2804         next = tso->global_link;
2805
2806         step = Bdescr((P_)tso)->step;
2807         tso->global_link = step->threads;
2808         step->threads = tso;
2809
2810         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2811         
2812         cap = tso->cap;
2813         maybePerformBlockedException(cap, tso);
2814     }
2815 }