Fix a bug which sometimes caused extra major GCs to be performed
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #include "GC.h"
35 #include "Weak.h"
36
37 /* PARALLEL_HASKELL includes go here */
38
39 #include "Sparks.h"
40 #include "Capability.h"
41 #include "Task.h"
42 #include "AwaitEvent.h"
43 #if defined(mingw32_HOST_OS)
44 #include "win32/IOManager.h"
45 #endif
46 #include "Trace.h"
47 #include "RaiseAsync.h"
48 #include "Threads.h"
49 #include "ThrIOManager.h"
50
51 #ifdef HAVE_SYS_TYPES_H
52 #include <sys/types.h>
53 #endif
54 #ifdef HAVE_UNISTD_H
55 #include <unistd.h>
56 #endif
57
58 #include <string.h>
59 #include <stdlib.h>
60 #include <stdarg.h>
61
62 #ifdef HAVE_ERRNO_H
63 #include <errno.h>
64 #endif
65
66 // Turn off inlining when debugging - it obfuscates things
67 #ifdef DEBUG
68 # undef  STATIC_INLINE
69 # define STATIC_INLINE static
70 #endif
71
72 /* -----------------------------------------------------------------------------
73  * Global variables
74  * -------------------------------------------------------------------------- */
75
76 #if !defined(THREADED_RTS)
77 // Blocked/sleeping thrads
78 StgTSO *blocked_queue_hd = NULL;
79 StgTSO *blocked_queue_tl = NULL;
80 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
81 #endif
82
83 /* Threads blocked on blackholes.
84  * LOCK: sched_mutex+capability, or all capabilities
85  */
86 StgTSO *blackhole_queue = NULL;
87
88 /* The blackhole_queue should be checked for threads to wake up.  See
89  * Schedule.h for more thorough comment.
90  * LOCK: none (doesn't matter if we miss an update)
91  */
92 rtsBool blackholes_need_checking = rtsFalse;
93
94 /* Set to true when the latest garbage collection failed to reclaim
95  * enough space, and the runtime should proceed to shut itself down in
96  * an orderly fashion (emitting profiling info etc.)
97  */
98 rtsBool heap_overflow = rtsFalse;
99
100 /* flag that tracks whether we have done any execution in this time slice.
101  * LOCK: currently none, perhaps we should lock (but needs to be
102  * updated in the fast path of the scheduler).
103  *
104  * NB. must be StgWord, we do xchg() on it.
105  */
106 volatile StgWord recent_activity = ACTIVITY_YES;
107
108 /* if this flag is set as well, give up execution
109  * LOCK: none (changes monotonically)
110  */
111 volatile StgWord sched_state = SCHED_RUNNING;
112
113 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
114  *  exists - earlier gccs apparently didn't.
115  *  -= chak
116  */
117 StgTSO dummy_tso;
118
119 /*
120  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
121  * in an MT setting, needed to signal that a worker thread shouldn't hang around
122  * in the scheduler when it is out of work.
123  */
124 rtsBool shutting_down_scheduler = rtsFalse;
125
126 /*
127  * This mutex protects most of the global scheduler data in
128  * the THREADED_RTS runtime.
129  */
130 #if defined(THREADED_RTS)
131 Mutex sched_mutex;
132 #endif
133
134 #if !defined(mingw32_HOST_OS)
135 #define FORKPROCESS_PRIMOP_SUPPORTED
136 #endif
137
138 /* -----------------------------------------------------------------------------
139  * static function prototypes
140  * -------------------------------------------------------------------------- */
141
142 static Capability *schedule (Capability *initialCapability, Task *task);
143
144 //
145 // These function all encapsulate parts of the scheduler loop, and are
146 // abstracted only to make the structure and control flow of the
147 // scheduler clearer.
148 //
149 static void schedulePreLoop (void);
150 static void scheduleFindWork (Capability *cap);
151 #if defined(THREADED_RTS)
152 static void scheduleYield (Capability **pcap, Task *task);
153 #endif
154 static void scheduleStartSignalHandlers (Capability *cap);
155 static void scheduleCheckBlockedThreads (Capability *cap);
156 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
157 static void scheduleCheckBlackHoles (Capability *cap);
158 static void scheduleDetectDeadlock (Capability *cap, Task *task);
159 static void schedulePushWork(Capability *cap, Task *task);
160 #if defined(PARALLEL_HASKELL)
161 static rtsBool scheduleGetRemoteWork(Capability *cap);
162 static void scheduleSendPendingMessages(void);
163 #endif
164 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
165 static void scheduleActivateSpark(Capability *cap);
166 #endif
167 static void schedulePostRunThread(Capability *cap, StgTSO *t);
168 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
169 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
170                                          StgTSO *t);
171 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
172                                     nat prev_what_next );
173 static void scheduleHandleThreadBlocked( StgTSO *t );
174 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
175                                              StgTSO *t );
176 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
177 static Capability *scheduleDoGC(Capability *cap, Task *task,
178                                 rtsBool force_major);
179
180 static rtsBool checkBlackHoles(Capability *cap);
181
182 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
183 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
184
185 static void deleteThread (Capability *cap, StgTSO *tso);
186 static void deleteAllThreads (Capability *cap);
187
188 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
189 static void deleteThread_(Capability *cap, StgTSO *tso);
190 #endif
191
192 #ifdef DEBUG
193 static char *whatNext_strs[] = {
194   "(unknown)",
195   "ThreadRunGHC",
196   "ThreadInterpret",
197   "ThreadKilled",
198   "ThreadRelocated",
199   "ThreadComplete"
200 };
201 #endif
202
203 /* -----------------------------------------------------------------------------
204  * Putting a thread on the run queue: different scheduling policies
205  * -------------------------------------------------------------------------- */
206
207 STATIC_INLINE void
208 addToRunQueue( Capability *cap, StgTSO *t )
209 {
210 #if defined(PARALLEL_HASKELL)
211     if (RtsFlags.ParFlags.doFairScheduling) { 
212         // this does round-robin scheduling; good for concurrency
213         appendToRunQueue(cap,t);
214     } else {
215         // this does unfair scheduling; good for parallelism
216         pushOnRunQueue(cap,t);
217     }
218 #else
219     // this does round-robin scheduling; good for concurrency
220     appendToRunQueue(cap,t);
221 #endif
222 }
223
224 /* ---------------------------------------------------------------------------
225    Main scheduling loop.
226
227    We use round-robin scheduling, each thread returning to the
228    scheduler loop when one of these conditions is detected:
229
230       * out of heap space
231       * timer expires (thread yields)
232       * thread blocks
233       * thread ends
234       * stack overflow
235
236    GRAN version:
237      In a GranSim setup this loop iterates over the global event queue.
238      This revolves around the global event queue, which determines what 
239      to do next. Therefore, it's more complicated than either the 
240      concurrent or the parallel (GUM) setup.
241   This version has been entirely removed (JB 2008/08).
242
243    GUM version:
244      GUM iterates over incoming messages.
245      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
246      and sends out a fish whenever it has nothing to do; in-between
247      doing the actual reductions (shared code below) it processes the
248      incoming messages and deals with delayed operations 
249      (see PendingFetches).
250      This is not the ugliest code you could imagine, but it's bloody close.
251
252   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
253   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
254   as well as future GUM versions. This file has been refurbished to
255   only contain valid code, which is however incomplete, refers to
256   invalid includes etc.
257
258    ------------------------------------------------------------------------ */
259
260 static Capability *
261 schedule (Capability *initialCapability, Task *task)
262 {
263   StgTSO *t;
264   Capability *cap;
265   StgThreadReturnCode ret;
266 #if defined(PARALLEL_HASKELL)
267   rtsBool receivedFinish = rtsFalse;
268 #endif
269   nat prev_what_next;
270   rtsBool ready_to_gc;
271 #if defined(THREADED_RTS)
272   rtsBool first = rtsTrue;
273 #endif
274   
275   cap = initialCapability;
276
277   // Pre-condition: this task owns initialCapability.
278   // The sched_mutex is *NOT* held
279   // NB. on return, we still hold a capability.
280
281   debugTrace (DEBUG_sched, 
282               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
283               task, initialCapability);
284
285   if (running_finalizers) {
286       errorBelch("error: a C finalizer called back into Haskell.\n"
287                  "   use Foreign.Concurrent.newForeignPtr for Haskell finalizers.");
288       stg_exit(EXIT_FAILURE);
289   }
290
291   schedulePreLoop();
292
293   // -----------------------------------------------------------
294   // Scheduler loop starts here:
295
296 #if defined(PARALLEL_HASKELL)
297 #define TERMINATION_CONDITION        (!receivedFinish)
298 #else
299 #define TERMINATION_CONDITION        rtsTrue
300 #endif
301
302   while (TERMINATION_CONDITION) {
303
304     // Check whether we have re-entered the RTS from Haskell without
305     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
306     // call).
307     if (cap->in_haskell) {
308           errorBelch("schedule: re-entered unsafely.\n"
309                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
310           stg_exit(EXIT_FAILURE);
311     }
312
313     // The interruption / shutdown sequence.
314     // 
315     // In order to cleanly shut down the runtime, we want to:
316     //   * make sure that all main threads return to their callers
317     //     with the state 'Interrupted'.
318     //   * clean up all OS threads assocated with the runtime
319     //   * free all memory etc.
320     //
321     // So the sequence for ^C goes like this:
322     //
323     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
324     //     arranges for some Capability to wake up
325     //
326     //   * all threads in the system are halted, and the zombies are
327     //     placed on the run queue for cleaning up.  We acquire all
328     //     the capabilities in order to delete the threads, this is
329     //     done by scheduleDoGC() for convenience (because GC already
330     //     needs to acquire all the capabilities).  We can't kill
331     //     threads involved in foreign calls.
332     // 
333     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
334     //
335     //   * sched_state := SCHED_SHUTTING_DOWN
336     //
337     //   * all workers exit when the run queue on their capability
338     //     drains.  All main threads will also exit when their TSO
339     //     reaches the head of the run queue and they can return.
340     //
341     //   * eventually all Capabilities will shut down, and the RTS can
342     //     exit.
343     //
344     //   * We might be left with threads blocked in foreign calls, 
345     //     we should really attempt to kill these somehow (TODO);
346     
347     switch (sched_state) {
348     case SCHED_RUNNING:
349         break;
350     case SCHED_INTERRUPTING:
351         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
352 #if defined(THREADED_RTS)
353         discardSparksCap(cap);
354 #endif
355         /* scheduleDoGC() deletes all the threads */
356         cap = scheduleDoGC(cap,task,rtsFalse);
357
358         // after scheduleDoGC(), we must be shutting down.  Either some
359         // other Capability did the final GC, or we did it above,
360         // either way we can fall through to the SCHED_SHUTTING_DOWN
361         // case now.
362         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
363         // fall through
364
365     case SCHED_SHUTTING_DOWN:
366         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
367         // If we are a worker, just exit.  If we're a bound thread
368         // then we will exit below when we've removed our TSO from
369         // the run queue.
370         if (task->tso == NULL && emptyRunQueue(cap)) {
371             return cap;
372         }
373         break;
374     default:
375         barf("sched_state: %d", sched_state);
376     }
377
378     scheduleFindWork(cap);
379
380     /* work pushing, currently relevant only for THREADED_RTS:
381        (pushes threads, wakes up idle capabilities for stealing) */
382     schedulePushWork(cap,task);
383
384 #if defined(PARALLEL_HASKELL)
385     /* since we perform a blocking receive and continue otherwise,
386        either we never reach here or we definitely have work! */
387     // from here: non-empty run queue
388     ASSERT(!emptyRunQueue(cap));
389
390     if (PacketsWaiting()) {  /* now process incoming messages, if any
391                                 pending...  
392
393                                 CAUTION: scheduleGetRemoteWork called
394                                 above, waits for messages as well! */
395       processMessages(cap, &receivedFinish);
396     }
397 #endif // PARALLEL_HASKELL: non-empty run queue!
398
399     scheduleDetectDeadlock(cap,task);
400
401 #if defined(THREADED_RTS)
402     cap = task->cap;    // reload cap, it might have changed
403 #endif
404
405     // Normally, the only way we can get here with no threads to
406     // run is if a keyboard interrupt received during 
407     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
408     // Additionally, it is not fatal for the
409     // threaded RTS to reach here with no threads to run.
410     //
411     // win32: might be here due to awaitEvent() being abandoned
412     // as a result of a console event having been delivered.
413     
414 #if defined(THREADED_RTS)
415     if (first) 
416     {
417     // XXX: ToDo
418     //     // don't yield the first time, we want a chance to run this
419     //     // thread for a bit, even if there are others banging at the
420     //     // door.
421     //     first = rtsFalse;
422     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
423     }
424
425   yield:
426     scheduleYield(&cap,task);
427     if (emptyRunQueue(cap)) continue; // look for work again
428 #endif
429
430 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
431     if ( emptyRunQueue(cap) ) {
432         ASSERT(sched_state >= SCHED_INTERRUPTING);
433     }
434 #endif
435
436     // 
437     // Get a thread to run
438     //
439     t = popRunQueue(cap);
440
441     // Sanity check the thread we're about to run.  This can be
442     // expensive if there is lots of thread switching going on...
443     IF_DEBUG(sanity,checkTSO(t));
444
445 #if defined(THREADED_RTS)
446     // Check whether we can run this thread in the current task.
447     // If not, we have to pass our capability to the right task.
448     {
449         Task *bound = t->bound;
450       
451         if (bound) {
452             if (bound == task) {
453                 debugTrace(DEBUG_sched,
454                            "### Running thread %lu in bound thread", (unsigned long)t->id);
455                 // yes, the Haskell thread is bound to the current native thread
456             } else {
457                 debugTrace(DEBUG_sched,
458                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
459                 // no, bound to a different Haskell thread: pass to that thread
460                 pushOnRunQueue(cap,t);
461                 continue;
462             }
463         } else {
464             // The thread we want to run is unbound.
465             if (task->tso) { 
466                 debugTrace(DEBUG_sched,
467                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
468                 // no, the current native thread is bound to a different
469                 // Haskell thread, so pass it to any worker thread
470                 pushOnRunQueue(cap,t);
471                 continue; 
472             }
473         }
474     }
475 #endif
476
477     // If we're shutting down, and this thread has not yet been
478     // killed, kill it now.  This sometimes happens when a finalizer
479     // thread is created by the final GC, or a thread previously
480     // in a foreign call returns.
481     if (sched_state >= SCHED_INTERRUPTING &&
482         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
483         deleteThread(cap,t);
484     }
485
486     /* context switches are initiated by the timer signal, unless
487      * the user specified "context switch as often as possible", with
488      * +RTS -C0
489      */
490     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
491         && !emptyThreadQueues(cap)) {
492         cap->context_switch = 1;
493     }
494          
495 run_thread:
496
497     // CurrentTSO is the thread to run.  t might be different if we
498     // loop back to run_thread, so make sure to set CurrentTSO after
499     // that.
500     cap->r.rCurrentTSO = t;
501
502     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
503                               (long)t->id, whatNext_strs[t->what_next]);
504
505     startHeapProfTimer();
506
507     // Check for exceptions blocked on this thread
508     maybePerformBlockedException (cap, t);
509
510     // ----------------------------------------------------------------------
511     // Run the current thread 
512
513     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
514     ASSERT(t->cap == cap);
515     ASSERT(t->bound ? t->bound->cap == cap : 1);
516
517     prev_what_next = t->what_next;
518
519     errno = t->saved_errno;
520 #if mingw32_HOST_OS
521     SetLastError(t->saved_winerror);
522 #endif
523
524     cap->in_haskell = rtsTrue;
525
526     dirty_TSO(cap,t);
527
528 #if defined(THREADED_RTS)
529     if (recent_activity == ACTIVITY_DONE_GC) {
530         // ACTIVITY_DONE_GC means we turned off the timer signal to
531         // conserve power (see #1623).  Re-enable it here.
532         nat prev;
533         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
534         if (prev == ACTIVITY_DONE_GC) {
535             startTimer();
536         }
537     } else {
538         recent_activity = ACTIVITY_YES;
539     }
540 #endif
541
542     switch (prev_what_next) {
543         
544     case ThreadKilled:
545     case ThreadComplete:
546         /* Thread already finished, return to scheduler. */
547         ret = ThreadFinished;
548         break;
549         
550     case ThreadRunGHC:
551     {
552         StgRegTable *r;
553         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
554         cap = regTableToCapability(r);
555         ret = r->rRet;
556         break;
557     }
558     
559     case ThreadInterpret:
560         cap = interpretBCO(cap);
561         ret = cap->r.rRet;
562         break;
563         
564     default:
565         barf("schedule: invalid what_next field");
566     }
567
568     cap->in_haskell = rtsFalse;
569
570     // The TSO might have moved, eg. if it re-entered the RTS and a GC
571     // happened.  So find the new location:
572     t = cap->r.rCurrentTSO;
573
574     // We have run some Haskell code: there might be blackhole-blocked
575     // threads to wake up now.
576     // Lock-free test here should be ok, we're just setting a flag.
577     if ( blackhole_queue != END_TSO_QUEUE ) {
578         blackholes_need_checking = rtsTrue;
579     }
580     
581     // And save the current errno in this thread.
582     // XXX: possibly bogus for SMP because this thread might already
583     // be running again, see code below.
584     t->saved_errno = errno;
585 #if mingw32_HOST_OS
586     // Similarly for Windows error code
587     t->saved_winerror = GetLastError();
588 #endif
589
590 #if defined(THREADED_RTS)
591     // If ret is ThreadBlocked, and this Task is bound to the TSO that
592     // blocked, we are in limbo - the TSO is now owned by whatever it
593     // is blocked on, and may in fact already have been woken up,
594     // perhaps even on a different Capability.  It may be the case
595     // that task->cap != cap.  We better yield this Capability
596     // immediately and return to normaility.
597     if (ret == ThreadBlocked) {
598         debugTrace(DEBUG_sched,
599                    "--<< thread %lu (%s) stopped: blocked",
600                    (unsigned long)t->id, whatNext_strs[t->what_next]);
601         goto yield;
602     }
603 #endif
604
605     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
606     ASSERT(t->cap == cap);
607
608     // ----------------------------------------------------------------------
609     
610     // Costs for the scheduler are assigned to CCS_SYSTEM
611     stopHeapProfTimer();
612 #if defined(PROFILING)
613     CCCS = CCS_SYSTEM;
614 #endif
615     
616     schedulePostRunThread(cap,t);
617
618     t = threadStackUnderflow(task,t);
619
620     ready_to_gc = rtsFalse;
621
622     switch (ret) {
623     case HeapOverflow:
624         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
625         break;
626
627     case StackOverflow:
628         scheduleHandleStackOverflow(cap,task,t);
629         break;
630
631     case ThreadYielding:
632         if (scheduleHandleYield(cap, t, prev_what_next)) {
633             // shortcut for switching between compiler/interpreter:
634             goto run_thread; 
635         }
636         break;
637
638     case ThreadBlocked:
639         scheduleHandleThreadBlocked(t);
640         break;
641
642     case ThreadFinished:
643         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
644         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
645         break;
646
647     default:
648       barf("schedule: invalid thread return code %d", (int)ret);
649     }
650
651     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
652       cap = scheduleDoGC(cap,task,rtsFalse);
653     }
654   } /* end of while() */
655 }
656
657 /* ----------------------------------------------------------------------------
658  * Setting up the scheduler loop
659  * ------------------------------------------------------------------------- */
660
661 static void
662 schedulePreLoop(void)
663 {
664   // initialisation for scheduler - what cannot go into initScheduler()  
665 }
666
667 /* -----------------------------------------------------------------------------
668  * scheduleFindWork()
669  *
670  * Search for work to do, and handle messages from elsewhere.
671  * -------------------------------------------------------------------------- */
672
673 static void
674 scheduleFindWork (Capability *cap)
675 {
676     scheduleStartSignalHandlers(cap);
677
678     // Only check the black holes here if we've nothing else to do.
679     // During normal execution, the black hole list only gets checked
680     // at GC time, to avoid repeatedly traversing this possibly long
681     // list each time around the scheduler.
682     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
683
684     scheduleCheckWakeupThreads(cap);
685
686     scheduleCheckBlockedThreads(cap);
687
688 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
689     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
690 #endif
691
692 #if defined(PARALLEL_HASKELL)
693     // if messages have been buffered...
694     scheduleSendPendingMessages();
695 #endif
696
697 #if defined(PARALLEL_HASKELL)
698     if (emptyRunQueue(cap)) {
699         receivedFinish = scheduleGetRemoteWork(cap);
700         continue; //  a new round, (hopefully) with new work
701         /* 
702            in GUM, this a) sends out a FISH and returns IF no fish is
703                            out already
704                         b) (blocking) awaits and receives messages
705            
706            in Eden, this is only the blocking receive, as b) in GUM.
707         */
708     }
709 #endif
710 }
711
712 #if defined(THREADED_RTS)
713 STATIC_INLINE rtsBool
714 shouldYieldCapability (Capability *cap, Task *task)
715 {
716     // we need to yield this capability to someone else if..
717     //   - another thread is initiating a GC
718     //   - another Task is returning from a foreign call
719     //   - the thread at the head of the run queue cannot be run
720     //     by this Task (it is bound to another Task, or it is unbound
721     //     and this task it bound).
722     return (waiting_for_gc || 
723             cap->returning_tasks_hd != NULL ||
724             (!emptyRunQueue(cap) && (task->tso == NULL
725                                      ? cap->run_queue_hd->bound != NULL
726                                      : cap->run_queue_hd->bound != task)));
727 }
728
729 // This is the single place where a Task goes to sleep.  There are
730 // two reasons it might need to sleep:
731 //    - there are no threads to run
732 //    - we need to yield this Capability to someone else 
733 //      (see shouldYieldCapability())
734 //
735 // Careful: the scheduler loop is quite delicate.  Make sure you run
736 // the tests in testsuite/concurrent (all ways) after modifying this,
737 // and also check the benchmarks in nofib/parallel for regressions.
738
739 static void
740 scheduleYield (Capability **pcap, Task *task)
741 {
742     Capability *cap = *pcap;
743
744     // if we have work, and we don't need to give up the Capability, continue.
745     if (!shouldYieldCapability(cap,task) && 
746         (!emptyRunQueue(cap) ||
747          !emptyWakeupQueue(cap) ||
748          blackholes_need_checking ||
749          sched_state >= SCHED_INTERRUPTING))
750         return;
751
752     // otherwise yield (sleep), and keep yielding if necessary.
753     do {
754         yieldCapability(&cap,task);
755     } 
756     while (shouldYieldCapability(cap,task));
757
758     // note there may still be no threads on the run queue at this
759     // point, the caller has to check.
760
761     *pcap = cap;
762     return;
763 }
764 #endif
765     
766 /* -----------------------------------------------------------------------------
767  * schedulePushWork()
768  *
769  * Push work to other Capabilities if we have some.
770  * -------------------------------------------------------------------------- */
771
772 static void
773 schedulePushWork(Capability *cap USED_IF_THREADS, 
774                  Task *task      USED_IF_THREADS)
775 {
776   /* following code not for PARALLEL_HASKELL. I kept the call general,
777      future GUM versions might use pushing in a distributed setup */
778 #if defined(THREADED_RTS)
779
780     Capability *free_caps[n_capabilities], *cap0;
781     nat i, n_free_caps;
782
783     // migration can be turned off with +RTS -qg
784     if (!RtsFlags.ParFlags.migrate) return;
785
786     // Check whether we have more threads on our run queue, or sparks
787     // in our pool, that we could hand to another Capability.
788     if (cap->run_queue_hd == END_TSO_QUEUE) {
789         if (sparkPoolSizeCap(cap) < 2) return;
790     } else {
791         if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
792             sparkPoolSizeCap(cap) < 1) return;
793     }
794
795     // First grab as many free Capabilities as we can.
796     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
797         cap0 = &capabilities[i];
798         if (cap != cap0 && tryGrabCapability(cap0,task)) {
799             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
800                 // it already has some work, we just grabbed it at 
801                 // the wrong moment.  Or maybe it's deadlocked!
802                 releaseCapability(cap0);
803             } else {
804                 free_caps[n_free_caps++] = cap0;
805             }
806         }
807     }
808
809     // we now have n_free_caps free capabilities stashed in
810     // free_caps[].  Share our run queue equally with them.  This is
811     // probably the simplest thing we could do; improvements we might
812     // want to do include:
813     //
814     //   - giving high priority to moving relatively new threads, on 
815     //     the gournds that they haven't had time to build up a
816     //     working set in the cache on this CPU/Capability.
817     //
818     //   - giving low priority to moving long-lived threads
819
820     if (n_free_caps > 0) {
821         StgTSO *prev, *t, *next;
822         rtsBool pushed_to_all;
823
824         debugTrace(DEBUG_sched, 
825                    "cap %d: %s and %d free capabilities, sharing...", 
826                    cap->no, 
827                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
828                    "excess threads on run queue":"sparks to share (>=2)",
829                    n_free_caps);
830
831         i = 0;
832         pushed_to_all = rtsFalse;
833
834         if (cap->run_queue_hd != END_TSO_QUEUE) {
835             prev = cap->run_queue_hd;
836             t = prev->_link;
837             prev->_link = END_TSO_QUEUE;
838             for (; t != END_TSO_QUEUE; t = next) {
839                 next = t->_link;
840                 t->_link = END_TSO_QUEUE;
841                 if (t->what_next == ThreadRelocated
842                     || t->bound == task // don't move my bound thread
843                     || tsoLocked(t)) {  // don't move a locked thread
844                     setTSOLink(cap, prev, t);
845                     prev = t;
846                 } else if (i == n_free_caps) {
847                     pushed_to_all = rtsTrue;
848                     i = 0;
849                     // keep one for us
850                     setTSOLink(cap, prev, t);
851                     prev = t;
852                 } else {
853                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
854                     appendToRunQueue(free_caps[i],t);
855                     if (t->bound) { t->bound->cap = free_caps[i]; }
856                     t->cap = free_caps[i];
857                     i++;
858                 }
859             }
860             cap->run_queue_tl = prev;
861         }
862
863 #ifdef SPARK_PUSHING
864         /* JB I left this code in place, it would work but is not necessary */
865
866         // If there are some free capabilities that we didn't push any
867         // threads to, then try to push a spark to each one.
868         if (!pushed_to_all) {
869             StgClosure *spark;
870             // i is the next free capability to push to
871             for (; i < n_free_caps; i++) {
872                 if (emptySparkPoolCap(free_caps[i])) {
873                     spark = tryStealSpark(cap->sparks);
874                     if (spark != NULL) {
875                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
876                         newSpark(&(free_caps[i]->r), spark);
877                     }
878                 }
879             }
880         }
881 #endif /* SPARK_PUSHING */
882
883         // release the capabilities
884         for (i = 0; i < n_free_caps; i++) {
885             task->cap = free_caps[i];
886             releaseAndWakeupCapability(free_caps[i]);
887         }
888     }
889     task->cap = cap; // reset to point to our Capability.
890
891 #endif /* THREADED_RTS */
892
893 }
894
895 /* ----------------------------------------------------------------------------
896  * Start any pending signal handlers
897  * ------------------------------------------------------------------------- */
898
899 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
900 static void
901 scheduleStartSignalHandlers(Capability *cap)
902 {
903     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
904         // safe outside the lock
905         startSignalHandlers(cap);
906     }
907 }
908 #else
909 static void
910 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
911 {
912 }
913 #endif
914
915 /* ----------------------------------------------------------------------------
916  * Check for blocked threads that can be woken up.
917  * ------------------------------------------------------------------------- */
918
919 static void
920 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
921 {
922 #if !defined(THREADED_RTS)
923     //
924     // Check whether any waiting threads need to be woken up.  If the
925     // run queue is empty, and there are no other tasks running, we
926     // can wait indefinitely for something to happen.
927     //
928     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
929     {
930         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
931     }
932 #endif
933 }
934
935
936 /* ----------------------------------------------------------------------------
937  * Check for threads woken up by other Capabilities
938  * ------------------------------------------------------------------------- */
939
940 static void
941 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
942 {
943 #if defined(THREADED_RTS)
944     // Any threads that were woken up by other Capabilities get
945     // appended to our run queue.
946     if (!emptyWakeupQueue(cap)) {
947         ACQUIRE_LOCK(&cap->lock);
948         if (emptyRunQueue(cap)) {
949             cap->run_queue_hd = cap->wakeup_queue_hd;
950             cap->run_queue_tl = cap->wakeup_queue_tl;
951         } else {
952             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
953             cap->run_queue_tl = cap->wakeup_queue_tl;
954         }
955         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
956         RELEASE_LOCK(&cap->lock);
957     }
958 #endif
959 }
960
961 /* ----------------------------------------------------------------------------
962  * Check for threads blocked on BLACKHOLEs that can be woken up
963  * ------------------------------------------------------------------------- */
964 static void
965 scheduleCheckBlackHoles (Capability *cap)
966 {
967     if ( blackholes_need_checking ) // check without the lock first
968     {
969         ACQUIRE_LOCK(&sched_mutex);
970         if ( blackholes_need_checking ) {
971             blackholes_need_checking = rtsFalse;
972             // important that we reset the flag *before* checking the
973             // blackhole queue, otherwise we could get deadlock.  This
974             // happens as follows: we wake up a thread that
975             // immediately runs on another Capability, blocks on a
976             // blackhole, and then we reset the blackholes_need_checking flag.
977             checkBlackHoles(cap);
978         }
979         RELEASE_LOCK(&sched_mutex);
980     }
981 }
982
983 /* ----------------------------------------------------------------------------
984  * Detect deadlock conditions and attempt to resolve them.
985  * ------------------------------------------------------------------------- */
986
987 static void
988 scheduleDetectDeadlock (Capability *cap, Task *task)
989 {
990
991 #if defined(PARALLEL_HASKELL)
992     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
993     return;
994 #endif
995
996     /* 
997      * Detect deadlock: when we have no threads to run, there are no
998      * threads blocked, waiting for I/O, or sleeping, and all the
999      * other tasks are waiting for work, we must have a deadlock of
1000      * some description.
1001      */
1002     if ( emptyThreadQueues(cap) )
1003     {
1004 #if defined(THREADED_RTS)
1005         /* 
1006          * In the threaded RTS, we only check for deadlock if there
1007          * has been no activity in a complete timeslice.  This means
1008          * we won't eagerly start a full GC just because we don't have
1009          * any threads to run currently.
1010          */
1011         if (recent_activity != ACTIVITY_INACTIVE) return;
1012 #endif
1013
1014         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
1015
1016         // Garbage collection can release some new threads due to
1017         // either (a) finalizers or (b) threads resurrected because
1018         // they are unreachable and will therefore be sent an
1019         // exception.  Any threads thus released will be immediately
1020         // runnable.
1021         cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
1022         // when force_major == rtsTrue. scheduleDoGC sets
1023         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
1024         // signal.
1025
1026         if ( !emptyRunQueue(cap) ) return;
1027
1028 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1029         /* If we have user-installed signal handlers, then wait
1030          * for signals to arrive rather then bombing out with a
1031          * deadlock.
1032          */
1033         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1034             debugTrace(DEBUG_sched,
1035                        "still deadlocked, waiting for signals...");
1036
1037             awaitUserSignals();
1038
1039             if (signals_pending()) {
1040                 startSignalHandlers(cap);
1041             }
1042
1043             // either we have threads to run, or we were interrupted:
1044             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1045
1046             return;
1047         }
1048 #endif
1049
1050 #if !defined(THREADED_RTS)
1051         /* Probably a real deadlock.  Send the current main thread the
1052          * Deadlock exception.
1053          */
1054         if (task->tso) {
1055             switch (task->tso->why_blocked) {
1056             case BlockedOnSTM:
1057             case BlockedOnBlackHole:
1058             case BlockedOnException:
1059             case BlockedOnMVar:
1060                 throwToSingleThreaded(cap, task->tso, 
1061                                       (StgClosure *)nonTermination_closure);
1062                 return;
1063             default:
1064                 barf("deadlock: main thread blocked in a strange way");
1065             }
1066         }
1067         return;
1068 #endif
1069     }
1070 }
1071
1072
1073 /* ----------------------------------------------------------------------------
1074  * Send pending messages (PARALLEL_HASKELL only)
1075  * ------------------------------------------------------------------------- */
1076
1077 #if defined(PARALLEL_HASKELL)
1078 static void
1079 scheduleSendPendingMessages(void)
1080 {
1081
1082 # if defined(PAR) // global Mem.Mgmt., omit for now
1083     if (PendingFetches != END_BF_QUEUE) {
1084         processFetches();
1085     }
1086 # endif
1087     
1088     if (RtsFlags.ParFlags.BufferTime) {
1089         // if we use message buffering, we must send away all message
1090         // packets which have become too old...
1091         sendOldBuffers(); 
1092     }
1093 }
1094 #endif
1095
1096 /* ----------------------------------------------------------------------------
1097  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1098  * ------------------------------------------------------------------------- */
1099
1100 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1101 static void
1102 scheduleActivateSpark(Capability *cap)
1103 {
1104     if (anySparks())
1105     {
1106         createSparkThread(cap);
1107         debugTrace(DEBUG_sched, "creating a spark thread");
1108     }
1109 }
1110 #endif // PARALLEL_HASKELL || THREADED_RTS
1111
1112 /* ----------------------------------------------------------------------------
1113  * Get work from a remote node (PARALLEL_HASKELL only)
1114  * ------------------------------------------------------------------------- */
1115     
1116 #if defined(PARALLEL_HASKELL)
1117 static rtsBool /* return value used in PARALLEL_HASKELL only */
1118 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1119 {
1120 #if defined(PARALLEL_HASKELL)
1121   rtsBool receivedFinish = rtsFalse;
1122
1123   // idle() , i.e. send all buffers, wait for work
1124   if (RtsFlags.ParFlags.BufferTime) {
1125         IF_PAR_DEBUG(verbose, 
1126                 debugBelch("...send all pending data,"));
1127         {
1128           nat i;
1129           for (i=1; i<=nPEs; i++)
1130             sendImmediately(i); // send all messages away immediately
1131         }
1132   }
1133
1134   /* this would be the place for fishing in GUM... 
1135
1136      if (no-earlier-fish-around) 
1137           sendFish(choosePe());
1138    */
1139
1140   // Eden:just look for incoming messages (blocking receive)
1141   IF_PAR_DEBUG(verbose, 
1142                debugBelch("...wait for incoming messages...\n"));
1143   processMessages(cap, &receivedFinish); // blocking receive...
1144
1145
1146   return receivedFinish;
1147   // reenter scheduling look after having received something
1148
1149 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1150
1151   return rtsFalse; /* return value unused in THREADED_RTS */
1152
1153 #endif /* PARALLEL_HASKELL */
1154 }
1155 #endif // PARALLEL_HASKELL || THREADED_RTS
1156
1157 /* ----------------------------------------------------------------------------
1158  * After running a thread...
1159  * ------------------------------------------------------------------------- */
1160
1161 static void
1162 schedulePostRunThread (Capability *cap, StgTSO *t)
1163 {
1164     // We have to be able to catch transactions that are in an
1165     // infinite loop as a result of seeing an inconsistent view of
1166     // memory, e.g. 
1167     //
1168     //   atomically $ do
1169     //       [a,b] <- mapM readTVar [ta,tb]
1170     //       when (a == b) loop
1171     //
1172     // and a is never equal to b given a consistent view of memory.
1173     //
1174     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1175         if (!stmValidateNestOfTransactions (t -> trec)) {
1176             debugTrace(DEBUG_sched | DEBUG_stm,
1177                        "trec %p found wasting its time", t);
1178             
1179             // strip the stack back to the
1180             // ATOMICALLY_FRAME, aborting the (nested)
1181             // transaction, and saving the stack of any
1182             // partially-evaluated thunks on the heap.
1183             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1184             
1185             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1186         }
1187     }
1188
1189   /* some statistics gathering in the parallel case */
1190 }
1191
1192 /* -----------------------------------------------------------------------------
1193  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1194  * -------------------------------------------------------------------------- */
1195
1196 static rtsBool
1197 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1198 {
1199     // did the task ask for a large block?
1200     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1201         // if so, get one and push it on the front of the nursery.
1202         bdescr *bd;
1203         lnat blocks;
1204         
1205         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1206         
1207         debugTrace(DEBUG_sched,
1208                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1209                    (long)t->id, whatNext_strs[t->what_next], blocks);
1210     
1211         // don't do this if the nursery is (nearly) full, we'll GC first.
1212         if (cap->r.rCurrentNursery->link != NULL ||
1213             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1214                                                // if the nursery has only one block.
1215             
1216             ACQUIRE_SM_LOCK
1217             bd = allocGroup( blocks );
1218             RELEASE_SM_LOCK
1219             cap->r.rNursery->n_blocks += blocks;
1220             
1221             // link the new group into the list
1222             bd->link = cap->r.rCurrentNursery;
1223             bd->u.back = cap->r.rCurrentNursery->u.back;
1224             if (cap->r.rCurrentNursery->u.back != NULL) {
1225                 cap->r.rCurrentNursery->u.back->link = bd;
1226             } else {
1227 #if !defined(THREADED_RTS)
1228                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1229                        g0s0 == cap->r.rNursery);
1230 #endif
1231                 cap->r.rNursery->blocks = bd;
1232             }             
1233             cap->r.rCurrentNursery->u.back = bd;
1234             
1235             // initialise it as a nursery block.  We initialise the
1236             // step, gen_no, and flags field of *every* sub-block in
1237             // this large block, because this is easier than making
1238             // sure that we always find the block head of a large
1239             // block whenever we call Bdescr() (eg. evacuate() and
1240             // isAlive() in the GC would both have to do this, at
1241             // least).
1242             { 
1243                 bdescr *x;
1244                 for (x = bd; x < bd + blocks; x++) {
1245                     x->step = cap->r.rNursery;
1246                     x->gen_no = 0;
1247                     x->flags = 0;
1248                 }
1249             }
1250             
1251             // This assert can be a killer if the app is doing lots
1252             // of large block allocations.
1253             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1254             
1255             // now update the nursery to point to the new block
1256             cap->r.rCurrentNursery = bd;
1257             
1258             // we might be unlucky and have another thread get on the
1259             // run queue before us and steal the large block, but in that
1260             // case the thread will just end up requesting another large
1261             // block.
1262             pushOnRunQueue(cap,t);
1263             return rtsFalse;  /* not actually GC'ing */
1264         }
1265     }
1266     
1267     debugTrace(DEBUG_sched,
1268                "--<< thread %ld (%s) stopped: HeapOverflow",
1269                (long)t->id, whatNext_strs[t->what_next]);
1270
1271     if (cap->context_switch) {
1272         // Sometimes we miss a context switch, e.g. when calling
1273         // primitives in a tight loop, MAYBE_GC() doesn't check the
1274         // context switch flag, and we end up waiting for a GC.
1275         // See #1984, and concurrent/should_run/1984
1276         cap->context_switch = 0;
1277         addToRunQueue(cap,t);
1278     } else {
1279         pushOnRunQueue(cap,t);
1280     }
1281     return rtsTrue;
1282     /* actual GC is done at the end of the while loop in schedule() */
1283 }
1284
1285 /* -----------------------------------------------------------------------------
1286  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1287  * -------------------------------------------------------------------------- */
1288
1289 static void
1290 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1291 {
1292     debugTrace (DEBUG_sched,
1293                 "--<< thread %ld (%s) stopped, StackOverflow", 
1294                 (long)t->id, whatNext_strs[t->what_next]);
1295
1296     /* just adjust the stack for this thread, then pop it back
1297      * on the run queue.
1298      */
1299     { 
1300         /* enlarge the stack */
1301         StgTSO *new_t = threadStackOverflow(cap, t);
1302         
1303         /* The TSO attached to this Task may have moved, so update the
1304          * pointer to it.
1305          */
1306         if (task->tso == t) {
1307             task->tso = new_t;
1308         }
1309         pushOnRunQueue(cap,new_t);
1310     }
1311 }
1312
1313 /* -----------------------------------------------------------------------------
1314  * Handle a thread that returned to the scheduler with ThreadYielding
1315  * -------------------------------------------------------------------------- */
1316
1317 static rtsBool
1318 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1319 {
1320     // Reset the context switch flag.  We don't do this just before
1321     // running the thread, because that would mean we would lose ticks
1322     // during GC, which can lead to unfair scheduling (a thread hogs
1323     // the CPU because the tick always arrives during GC).  This way
1324     // penalises threads that do a lot of allocation, but that seems
1325     // better than the alternative.
1326     cap->context_switch = 0;
1327     
1328     /* put the thread back on the run queue.  Then, if we're ready to
1329      * GC, check whether this is the last task to stop.  If so, wake
1330      * up the GC thread.  getThread will block during a GC until the
1331      * GC is finished.
1332      */
1333 #ifdef DEBUG
1334     if (t->what_next != prev_what_next) {
1335         debugTrace(DEBUG_sched,
1336                    "--<< thread %ld (%s) stopped to switch evaluators", 
1337                    (long)t->id, whatNext_strs[t->what_next]);
1338     } else {
1339         debugTrace(DEBUG_sched,
1340                    "--<< thread %ld (%s) stopped, yielding",
1341                    (long)t->id, whatNext_strs[t->what_next]);
1342     }
1343 #endif
1344     
1345     IF_DEBUG(sanity,
1346              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1347              checkTSO(t));
1348     ASSERT(t->_link == END_TSO_QUEUE);
1349     
1350     // Shortcut if we're just switching evaluators: don't bother
1351     // doing stack squeezing (which can be expensive), just run the
1352     // thread.
1353     if (t->what_next != prev_what_next) {
1354         return rtsTrue;
1355     }
1356
1357     addToRunQueue(cap,t);
1358
1359     return rtsFalse;
1360 }
1361
1362 /* -----------------------------------------------------------------------------
1363  * Handle a thread that returned to the scheduler with ThreadBlocked
1364  * -------------------------------------------------------------------------- */
1365
1366 static void
1367 scheduleHandleThreadBlocked( StgTSO *t
1368 #if !defined(GRAN) && !defined(DEBUG)
1369     STG_UNUSED
1370 #endif
1371     )
1372 {
1373
1374       // We don't need to do anything.  The thread is blocked, and it
1375       // has tidied up its stack and placed itself on whatever queue
1376       // it needs to be on.
1377
1378     // ASSERT(t->why_blocked != NotBlocked);
1379     // Not true: for example,
1380     //    - in THREADED_RTS, the thread may already have been woken
1381     //      up by another Capability.  This actually happens: try
1382     //      conc023 +RTS -N2.
1383     //    - the thread may have woken itself up already, because
1384     //      threadPaused() might have raised a blocked throwTo
1385     //      exception, see maybePerformBlockedException().
1386
1387 #ifdef DEBUG
1388     if (traceClass(DEBUG_sched)) {
1389         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1390                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1391         printThreadBlockage(t);
1392         debugTraceEnd();
1393     }
1394 #endif
1395 }
1396
1397 /* -----------------------------------------------------------------------------
1398  * Handle a thread that returned to the scheduler with ThreadFinished
1399  * -------------------------------------------------------------------------- */
1400
1401 static rtsBool
1402 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1403 {
1404     /* Need to check whether this was a main thread, and if so,
1405      * return with the return value.
1406      *
1407      * We also end up here if the thread kills itself with an
1408      * uncaught exception, see Exception.cmm.
1409      */
1410     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1411                (unsigned long)t->id, whatNext_strs[t->what_next]);
1412
1413     // blocked exceptions can now complete, even if the thread was in
1414     // blocked mode (see #2910).  This unconditionally calls
1415     // lockTSO(), which ensures that we don't miss any threads that
1416     // are engaged in throwTo() with this thread as a target.
1417     awakenBlockedExceptionQueue (cap, t);
1418
1419       //
1420       // Check whether the thread that just completed was a bound
1421       // thread, and if so return with the result.  
1422       //
1423       // There is an assumption here that all thread completion goes
1424       // through this point; we need to make sure that if a thread
1425       // ends up in the ThreadKilled state, that it stays on the run
1426       // queue so it can be dealt with here.
1427       //
1428
1429       if (t->bound) {
1430
1431           if (t->bound != task) {
1432 #if !defined(THREADED_RTS)
1433               // Must be a bound thread that is not the topmost one.  Leave
1434               // it on the run queue until the stack has unwound to the
1435               // point where we can deal with this.  Leaving it on the run
1436               // queue also ensures that the garbage collector knows about
1437               // this thread and its return value (it gets dropped from the
1438               // step->threads list so there's no other way to find it).
1439               appendToRunQueue(cap,t);
1440               return rtsFalse;
1441 #else
1442               // this cannot happen in the threaded RTS, because a
1443               // bound thread can only be run by the appropriate Task.
1444               barf("finished bound thread that isn't mine");
1445 #endif
1446           }
1447
1448           ASSERT(task->tso == t);
1449
1450           if (t->what_next == ThreadComplete) {
1451               if (task->ret) {
1452                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1453                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1454               }
1455               task->stat = Success;
1456           } else {
1457               if (task->ret) {
1458                   *(task->ret) = NULL;
1459               }
1460               if (sched_state >= SCHED_INTERRUPTING) {
1461                   if (heap_overflow) {
1462                       task->stat = HeapExhausted;
1463                   } else {
1464                       task->stat = Interrupted;
1465                   }
1466               } else {
1467                   task->stat = Killed;
1468               }
1469           }
1470 #ifdef DEBUG
1471           removeThreadLabel((StgWord)task->tso->id);
1472 #endif
1473           return rtsTrue; // tells schedule() to return
1474       }
1475
1476       return rtsFalse;
1477 }
1478
1479 /* -----------------------------------------------------------------------------
1480  * Perform a heap census
1481  * -------------------------------------------------------------------------- */
1482
1483 static rtsBool
1484 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1485 {
1486     // When we have +RTS -i0 and we're heap profiling, do a census at
1487     // every GC.  This lets us get repeatable runs for debugging.
1488     if (performHeapProfile ||
1489         (RtsFlags.ProfFlags.profileInterval==0 &&
1490          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1491         return rtsTrue;
1492     } else {
1493         return rtsFalse;
1494     }
1495 }
1496
1497 /* -----------------------------------------------------------------------------
1498  * Perform a garbage collection if necessary
1499  * -------------------------------------------------------------------------- */
1500
1501 static Capability *
1502 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1503 {
1504     rtsBool heap_census;
1505 #ifdef THREADED_RTS
1506     /* extern static volatile StgWord waiting_for_gc; 
1507        lives inside capability.c */
1508     rtsBool gc_type, prev_pending_gc;
1509     nat i;
1510 #endif
1511
1512     if (sched_state == SCHED_SHUTTING_DOWN) {
1513         // The final GC has already been done, and the system is
1514         // shutting down.  We'll probably deadlock if we try to GC
1515         // now.
1516         return cap;
1517     }
1518
1519 #ifdef THREADED_RTS
1520     if (sched_state < SCHED_INTERRUPTING
1521         && RtsFlags.ParFlags.parGcEnabled
1522         && N >= RtsFlags.ParFlags.parGcGen
1523         && ! oldest_gen->steps[0].mark)
1524     {
1525         gc_type = PENDING_GC_PAR;
1526     } else {
1527         gc_type = PENDING_GC_SEQ;
1528     }
1529
1530     // In order to GC, there must be no threads running Haskell code.
1531     // Therefore, the GC thread needs to hold *all* the capabilities,
1532     // and release them after the GC has completed.  
1533     //
1534     // This seems to be the simplest way: previous attempts involved
1535     // making all the threads with capabilities give up their
1536     // capabilities and sleep except for the *last* one, which
1537     // actually did the GC.  But it's quite hard to arrange for all
1538     // the other tasks to sleep and stay asleep.
1539     //
1540
1541     /*  Other capabilities are prevented from running yet more Haskell
1542         threads if waiting_for_gc is set. Tested inside
1543         yieldCapability() and releaseCapability() in Capability.c */
1544
1545     prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1546     if (prev_pending_gc) {
1547         do {
1548             debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
1549                        prev_pending_gc);
1550             ASSERT(cap);
1551             yieldCapability(&cap,task);
1552         } while (waiting_for_gc);
1553         return cap;  // NOTE: task->cap might have changed here
1554     }
1555
1556     setContextSwitches();
1557
1558     // The final shutdown GC is always single-threaded, because it's
1559     // possible that some of the Capabilities have no worker threads.
1560     
1561     if (gc_type == PENDING_GC_SEQ)
1562     {
1563         // single-threaded GC: grab all the capabilities
1564         for (i=0; i < n_capabilities; i++) {
1565             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1566             if (cap != &capabilities[i]) {
1567                 Capability *pcap = &capabilities[i];
1568                 // we better hope this task doesn't get migrated to
1569                 // another Capability while we're waiting for this one.
1570                 // It won't, because load balancing happens while we have
1571                 // all the Capabilities, but even so it's a slightly
1572                 // unsavoury invariant.
1573                 task->cap = pcap;
1574                 waitForReturnCapability(&pcap, task);
1575                 if (pcap != &capabilities[i]) {
1576                     barf("scheduleDoGC: got the wrong capability");
1577                 }
1578             }
1579         }
1580     }
1581     else
1582     {
1583         // multi-threaded GC: make sure all the Capabilities donate one
1584         // GC thread each.
1585         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1586
1587         waitForGcThreads(cap);
1588     }
1589 #endif
1590
1591     // so this happens periodically:
1592     if (cap) scheduleCheckBlackHoles(cap);
1593     
1594     IF_DEBUG(scheduler, printAllThreads());
1595
1596 delete_threads_and_gc:
1597     /*
1598      * We now have all the capabilities; if we're in an interrupting
1599      * state, then we should take the opportunity to delete all the
1600      * threads in the system.
1601      */
1602     if (sched_state == SCHED_INTERRUPTING) {
1603         deleteAllThreads(cap);
1604         sched_state = SCHED_SHUTTING_DOWN;
1605     }
1606     
1607     heap_census = scheduleNeedHeapProfile(rtsTrue);
1608
1609 #if defined(THREADED_RTS)
1610     debugTrace(DEBUG_sched, "doing GC");
1611     // reset waiting_for_gc *before* GC, so that when the GC threads
1612     // emerge they don't immediately re-enter the GC.
1613     waiting_for_gc = 0;
1614     GarbageCollect(force_major || heap_census, gc_type, cap);
1615 #else
1616     GarbageCollect(force_major || heap_census, 0, cap);
1617 #endif
1618
1619     if (recent_activity == ACTIVITY_INACTIVE && force_major)
1620     {
1621         // We are doing a GC because the system has been idle for a
1622         // timeslice and we need to check for deadlock.  Record the
1623         // fact that we've done a GC and turn off the timer signal;
1624         // it will get re-enabled if we run any threads after the GC.
1625         recent_activity = ACTIVITY_DONE_GC;
1626         stopTimer();
1627     }
1628     else
1629     {
1630         // the GC might have taken long enough for the timer to set
1631         // recent_activity = ACTIVITY_INACTIVE, but we aren't
1632         // necessarily deadlocked:
1633         recent_activity = ACTIVITY_YES;
1634     }
1635
1636 #if defined(THREADED_RTS)
1637     if (gc_type == PENDING_GC_PAR)
1638     {
1639         releaseGCThreads(cap);
1640     }
1641 #endif
1642
1643     if (heap_census) {
1644         debugTrace(DEBUG_sched, "performing heap census");
1645         heapCensus();
1646         performHeapProfile = rtsFalse;
1647     }
1648
1649     if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1650         // GC set the heap_overflow flag, so we should proceed with
1651         // an orderly shutdown now.  Ultimately we want the main
1652         // thread to return to its caller with HeapExhausted, at which
1653         // point the caller should call hs_exit().  The first step is
1654         // to delete all the threads.
1655         //
1656         // Another way to do this would be to raise an exception in
1657         // the main thread, which we really should do because it gives
1658         // the program a chance to clean up.  But how do we find the
1659         // main thread?  It should presumably be the same one that
1660         // gets ^C exceptions, but that's all done on the Haskell side
1661         // (GHC.TopHandler).
1662         sched_state = SCHED_INTERRUPTING;
1663         goto delete_threads_and_gc;
1664     }
1665
1666 #ifdef SPARKBALANCE
1667     /* JB 
1668        Once we are all together... this would be the place to balance all
1669        spark pools. No concurrent stealing or adding of new sparks can
1670        occur. Should be defined in Sparks.c. */
1671     balanceSparkPoolsCaps(n_capabilities, capabilities);
1672 #endif
1673
1674 #if defined(THREADED_RTS)
1675     if (gc_type == PENDING_GC_SEQ) {
1676         // release our stash of capabilities.
1677         for (i = 0; i < n_capabilities; i++) {
1678             if (cap != &capabilities[i]) {
1679                 task->cap = &capabilities[i];
1680                 releaseCapability(&capabilities[i]);
1681             }
1682         }
1683     }
1684     if (cap) {
1685         task->cap = cap;
1686     } else {
1687         task->cap = NULL;
1688     }
1689 #endif
1690
1691     return cap;
1692 }
1693
1694 /* ---------------------------------------------------------------------------
1695  * Singleton fork(). Do not copy any running threads.
1696  * ------------------------------------------------------------------------- */
1697
1698 pid_t
1699 forkProcess(HsStablePtr *entry
1700 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1701             STG_UNUSED
1702 #endif
1703            )
1704 {
1705 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1706     Task *task;
1707     pid_t pid;
1708     StgTSO* t,*next;
1709     Capability *cap;
1710     nat s;
1711     
1712 #if defined(THREADED_RTS)
1713     if (RtsFlags.ParFlags.nNodes > 1) {
1714         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1715         stg_exit(EXIT_FAILURE);
1716     }
1717 #endif
1718
1719     debugTrace(DEBUG_sched, "forking!");
1720     
1721     // ToDo: for SMP, we should probably acquire *all* the capabilities
1722     cap = rts_lock();
1723     
1724     // no funny business: hold locks while we fork, otherwise if some
1725     // other thread is holding a lock when the fork happens, the data
1726     // structure protected by the lock will forever be in an
1727     // inconsistent state in the child.  See also #1391.
1728     ACQUIRE_LOCK(&sched_mutex);
1729     ACQUIRE_LOCK(&cap->lock);
1730     ACQUIRE_LOCK(&cap->running_task->lock);
1731
1732     pid = fork();
1733     
1734     if (pid) { // parent
1735         
1736         RELEASE_LOCK(&sched_mutex);
1737         RELEASE_LOCK(&cap->lock);
1738         RELEASE_LOCK(&cap->running_task->lock);
1739
1740         // just return the pid
1741         rts_unlock(cap);
1742         return pid;
1743         
1744     } else { // child
1745         
1746 #if defined(THREADED_RTS)
1747         initMutex(&sched_mutex);
1748         initMutex(&cap->lock);
1749         initMutex(&cap->running_task->lock);
1750 #endif
1751
1752         // Now, all OS threads except the thread that forked are
1753         // stopped.  We need to stop all Haskell threads, including
1754         // those involved in foreign calls.  Also we need to delete
1755         // all Tasks, because they correspond to OS threads that are
1756         // now gone.
1757
1758         for (s = 0; s < total_steps; s++) {
1759           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1760             if (t->what_next == ThreadRelocated) {
1761                 next = t->_link;
1762             } else {
1763                 next = t->global_link;
1764                 // don't allow threads to catch the ThreadKilled
1765                 // exception, but we do want to raiseAsync() because these
1766                 // threads may be evaluating thunks that we need later.
1767                 deleteThread_(cap,t);
1768             }
1769           }
1770         }
1771         
1772         // Empty the run queue.  It seems tempting to let all the
1773         // killed threads stay on the run queue as zombies to be
1774         // cleaned up later, but some of them correspond to bound
1775         // threads for which the corresponding Task does not exist.
1776         cap->run_queue_hd = END_TSO_QUEUE;
1777         cap->run_queue_tl = END_TSO_QUEUE;
1778
1779         // Any suspended C-calling Tasks are no more, their OS threads
1780         // don't exist now:
1781         cap->suspended_ccalling_tasks = NULL;
1782
1783         // Empty the threads lists.  Otherwise, the garbage
1784         // collector may attempt to resurrect some of these threads.
1785         for (s = 0; s < total_steps; s++) {
1786             all_steps[s].threads = END_TSO_QUEUE;
1787         }
1788
1789         // Wipe the task list, except the current Task.
1790         ACQUIRE_LOCK(&sched_mutex);
1791         for (task = all_tasks; task != NULL; task=task->all_link) {
1792             if (task != cap->running_task) {
1793 #if defined(THREADED_RTS)
1794                 initMutex(&task->lock); // see #1391
1795 #endif
1796                 discardTask(task);
1797             }
1798         }
1799         RELEASE_LOCK(&sched_mutex);
1800
1801 #if defined(THREADED_RTS)
1802         // Wipe our spare workers list, they no longer exist.  New
1803         // workers will be created if necessary.
1804         cap->spare_workers = NULL;
1805         cap->returning_tasks_hd = NULL;
1806         cap->returning_tasks_tl = NULL;
1807 #endif
1808
1809         // On Unix, all timers are reset in the child, so we need to start
1810         // the timer again.
1811         initTimer();
1812         startTimer();
1813
1814         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1815         rts_checkSchedStatus("forkProcess",cap);
1816         
1817         rts_unlock(cap);
1818         hs_exit();                      // clean up and exit
1819         stg_exit(EXIT_SUCCESS);
1820     }
1821 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1822     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1823     return -1;
1824 #endif
1825 }
1826
1827 /* ---------------------------------------------------------------------------
1828  * Delete all the threads in the system
1829  * ------------------------------------------------------------------------- */
1830    
1831 static void
1832 deleteAllThreads ( Capability *cap )
1833 {
1834     // NOTE: only safe to call if we own all capabilities.
1835
1836     StgTSO* t, *next;
1837     nat s;
1838
1839     debugTrace(DEBUG_sched,"deleting all threads");
1840     for (s = 0; s < total_steps; s++) {
1841       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1842         if (t->what_next == ThreadRelocated) {
1843             next = t->_link;
1844         } else {
1845             next = t->global_link;
1846             deleteThread(cap,t);
1847         }
1848       }
1849     }      
1850
1851     // The run queue now contains a bunch of ThreadKilled threads.  We
1852     // must not throw these away: the main thread(s) will be in there
1853     // somewhere, and the main scheduler loop has to deal with it.
1854     // Also, the run queue is the only thing keeping these threads from
1855     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1856
1857 #if !defined(THREADED_RTS)
1858     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1859     ASSERT(sleeping_queue == END_TSO_QUEUE);
1860 #endif
1861 }
1862
1863 /* -----------------------------------------------------------------------------
1864    Managing the suspended_ccalling_tasks list.
1865    Locks required: sched_mutex
1866    -------------------------------------------------------------------------- */
1867
1868 STATIC_INLINE void
1869 suspendTask (Capability *cap, Task *task)
1870 {
1871     ASSERT(task->next == NULL && task->prev == NULL);
1872     task->next = cap->suspended_ccalling_tasks;
1873     task->prev = NULL;
1874     if (cap->suspended_ccalling_tasks) {
1875         cap->suspended_ccalling_tasks->prev = task;
1876     }
1877     cap->suspended_ccalling_tasks = task;
1878 }
1879
1880 STATIC_INLINE void
1881 recoverSuspendedTask (Capability *cap, Task *task)
1882 {
1883     if (task->prev) {
1884         task->prev->next = task->next;
1885     } else {
1886         ASSERT(cap->suspended_ccalling_tasks == task);
1887         cap->suspended_ccalling_tasks = task->next;
1888     }
1889     if (task->next) {
1890         task->next->prev = task->prev;
1891     }
1892     task->next = task->prev = NULL;
1893 }
1894
1895 /* ---------------------------------------------------------------------------
1896  * Suspending & resuming Haskell threads.
1897  * 
1898  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1899  * its capability before calling the C function.  This allows another
1900  * task to pick up the capability and carry on running Haskell
1901  * threads.  It also means that if the C call blocks, it won't lock
1902  * the whole system.
1903  *
1904  * The Haskell thread making the C call is put to sleep for the
1905  * duration of the call, on the susepended_ccalling_threads queue.  We
1906  * give out a token to the task, which it can use to resume the thread
1907  * on return from the C function.
1908  * ------------------------------------------------------------------------- */
1909    
1910 void *
1911 suspendThread (StgRegTable *reg)
1912 {
1913   Capability *cap;
1914   int saved_errno;
1915   StgTSO *tso;
1916   Task *task;
1917 #if mingw32_HOST_OS
1918   StgWord32 saved_winerror;
1919 #endif
1920
1921   saved_errno = errno;
1922 #if mingw32_HOST_OS
1923   saved_winerror = GetLastError();
1924 #endif
1925
1926   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1927    */
1928   cap = regTableToCapability(reg);
1929
1930   task = cap->running_task;
1931   tso = cap->r.rCurrentTSO;
1932
1933   debugTrace(DEBUG_sched, 
1934              "thread %lu did a safe foreign call", 
1935              (unsigned long)cap->r.rCurrentTSO->id);
1936
1937   // XXX this might not be necessary --SDM
1938   tso->what_next = ThreadRunGHC;
1939
1940   threadPaused(cap,tso);
1941
1942   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1943       tso->why_blocked = BlockedOnCCall;
1944       tso->flags |= TSO_BLOCKEX;
1945       tso->flags &= ~TSO_INTERRUPTIBLE;
1946   } else {
1947       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1948   }
1949
1950   // Hand back capability
1951   task->suspended_tso = tso;
1952
1953   ACQUIRE_LOCK(&cap->lock);
1954
1955   suspendTask(cap,task);
1956   cap->in_haskell = rtsFalse;
1957   releaseCapability_(cap,rtsFalse);
1958   
1959   RELEASE_LOCK(&cap->lock);
1960
1961 #if defined(THREADED_RTS)
1962   /* Preparing to leave the RTS, so ensure there's a native thread/task
1963      waiting to take over.
1964   */
1965   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1966 #endif
1967
1968   errno = saved_errno;
1969 #if mingw32_HOST_OS
1970   SetLastError(saved_winerror);
1971 #endif
1972   return task;
1973 }
1974
1975 StgRegTable *
1976 resumeThread (void *task_)
1977 {
1978     StgTSO *tso;
1979     Capability *cap;
1980     Task *task = task_;
1981     int saved_errno;
1982 #if mingw32_HOST_OS
1983     StgWord32 saved_winerror;
1984 #endif
1985
1986     saved_errno = errno;
1987 #if mingw32_HOST_OS
1988     saved_winerror = GetLastError();
1989 #endif
1990
1991     cap = task->cap;
1992     // Wait for permission to re-enter the RTS with the result.
1993     waitForReturnCapability(&cap,task);
1994     // we might be on a different capability now... but if so, our
1995     // entry on the suspended_ccalling_tasks list will also have been
1996     // migrated.
1997
1998     // Remove the thread from the suspended list
1999     recoverSuspendedTask(cap,task);
2000
2001     tso = task->suspended_tso;
2002     task->suspended_tso = NULL;
2003     tso->_link = END_TSO_QUEUE; // no write barrier reqd
2004     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2005     
2006     if (tso->why_blocked == BlockedOnCCall) {
2007         // avoid locking the TSO if we don't have to
2008         if (tso->blocked_exceptions != END_TSO_QUEUE) {
2009             awakenBlockedExceptionQueue(cap,tso);
2010         }
2011         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2012     }
2013     
2014     /* Reset blocking status */
2015     tso->why_blocked  = NotBlocked;
2016     
2017     cap->r.rCurrentTSO = tso;
2018     cap->in_haskell = rtsTrue;
2019     errno = saved_errno;
2020 #if mingw32_HOST_OS
2021     SetLastError(saved_winerror);
2022 #endif
2023
2024     /* We might have GC'd, mark the TSO dirty again */
2025     dirty_TSO(cap,tso);
2026
2027     IF_DEBUG(sanity, checkTSO(tso));
2028
2029     return &cap->r;
2030 }
2031
2032 /* ---------------------------------------------------------------------------
2033  * scheduleThread()
2034  *
2035  * scheduleThread puts a thread on the end  of the runnable queue.
2036  * This will usually be done immediately after a thread is created.
2037  * The caller of scheduleThread must create the thread using e.g.
2038  * createThread and push an appropriate closure
2039  * on this thread's stack before the scheduler is invoked.
2040  * ------------------------------------------------------------------------ */
2041
2042 void
2043 scheduleThread(Capability *cap, StgTSO *tso)
2044 {
2045     // The thread goes at the *end* of the run-queue, to avoid possible
2046     // starvation of any threads already on the queue.
2047     appendToRunQueue(cap,tso);
2048 }
2049
2050 void
2051 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2052 {
2053 #if defined(THREADED_RTS)
2054     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2055                               // move this thread from now on.
2056     cpu %= RtsFlags.ParFlags.nNodes;
2057     if (cpu == cap->no) {
2058         appendToRunQueue(cap,tso);
2059     } else {
2060         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
2061     }
2062 #else
2063     appendToRunQueue(cap,tso);
2064 #endif
2065 }
2066
2067 Capability *
2068 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2069 {
2070     Task *task;
2071
2072     // We already created/initialised the Task
2073     task = cap->running_task;
2074
2075     // This TSO is now a bound thread; make the Task and TSO
2076     // point to each other.
2077     tso->bound = task;
2078     tso->cap = cap;
2079
2080     task->tso = tso;
2081     task->ret = ret;
2082     task->stat = NoStatus;
2083
2084     appendToRunQueue(cap,tso);
2085
2086     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2087
2088     cap = schedule(cap,task);
2089
2090     ASSERT(task->stat != NoStatus);
2091     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2092
2093     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2094     return cap;
2095 }
2096
2097 /* ----------------------------------------------------------------------------
2098  * Starting Tasks
2099  * ------------------------------------------------------------------------- */
2100
2101 #if defined(THREADED_RTS)
2102 void OSThreadProcAttr
2103 workerStart(Task *task)
2104 {
2105     Capability *cap;
2106
2107     // See startWorkerTask().
2108     ACQUIRE_LOCK(&task->lock);
2109     cap = task->cap;
2110     RELEASE_LOCK(&task->lock);
2111
2112     // set the thread-local pointer to the Task:
2113     taskEnter(task);
2114
2115     // schedule() runs without a lock.
2116     cap = schedule(cap,task);
2117
2118     // On exit from schedule(), we have a Capability, but possibly not
2119     // the same one we started with.
2120
2121     // During shutdown, the requirement is that after all the
2122     // Capabilities are shut down, all workers that are shutting down
2123     // have finished workerTaskStop().  This is why we hold on to
2124     // cap->lock until we've finished workerTaskStop() below.
2125     //
2126     // There may be workers still involved in foreign calls; those
2127     // will just block in waitForReturnCapability() because the
2128     // Capability has been shut down.
2129     //
2130     ACQUIRE_LOCK(&cap->lock);
2131     releaseCapability_(cap,rtsFalse);
2132     workerTaskStop(task);
2133     RELEASE_LOCK(&cap->lock);
2134 }
2135 #endif
2136
2137 /* ---------------------------------------------------------------------------
2138  * initScheduler()
2139  *
2140  * Initialise the scheduler.  This resets all the queues - if the
2141  * queues contained any threads, they'll be garbage collected at the
2142  * next pass.
2143  *
2144  * ------------------------------------------------------------------------ */
2145
2146 void 
2147 initScheduler(void)
2148 {
2149 #if !defined(THREADED_RTS)
2150   blocked_queue_hd  = END_TSO_QUEUE;
2151   blocked_queue_tl  = END_TSO_QUEUE;
2152   sleeping_queue    = END_TSO_QUEUE;
2153 #endif
2154
2155   blackhole_queue   = END_TSO_QUEUE;
2156
2157   sched_state    = SCHED_RUNNING;
2158   recent_activity = ACTIVITY_YES;
2159
2160 #if defined(THREADED_RTS)
2161   /* Initialise the mutex and condition variables used by
2162    * the scheduler. */
2163   initMutex(&sched_mutex);
2164 #endif
2165   
2166   ACQUIRE_LOCK(&sched_mutex);
2167
2168   /* A capability holds the state a native thread needs in
2169    * order to execute STG code. At least one capability is
2170    * floating around (only THREADED_RTS builds have more than one).
2171    */
2172   initCapabilities();
2173
2174   initTaskManager();
2175
2176 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2177   initSparkPools();
2178 #endif
2179
2180 #if defined(THREADED_RTS)
2181   /*
2182    * Eagerly start one worker to run each Capability, except for
2183    * Capability 0.  The idea is that we're probably going to start a
2184    * bound thread on Capability 0 pretty soon, so we don't want a
2185    * worker task hogging it.
2186    */
2187   { 
2188       nat i;
2189       Capability *cap;
2190       for (i = 1; i < n_capabilities; i++) {
2191           cap = &capabilities[i];
2192           ACQUIRE_LOCK(&cap->lock);
2193           startWorkerTask(cap, workerStart);
2194           RELEASE_LOCK(&cap->lock);
2195       }
2196   }
2197 #endif
2198
2199   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2200
2201   RELEASE_LOCK(&sched_mutex);
2202 }
2203
2204 void
2205 exitScheduler(
2206     rtsBool wait_foreign
2207 #if !defined(THREADED_RTS)
2208                          __attribute__((unused))
2209 #endif
2210 )
2211                /* see Capability.c, shutdownCapability() */
2212 {
2213     Task *task = NULL;
2214
2215     ACQUIRE_LOCK(&sched_mutex);
2216     task = newBoundTask();
2217     RELEASE_LOCK(&sched_mutex);
2218
2219     // If we haven't killed all the threads yet, do it now.
2220     if (sched_state < SCHED_SHUTTING_DOWN) {
2221         sched_state = SCHED_INTERRUPTING;
2222         waitForReturnCapability(&task->cap,task);
2223         scheduleDoGC(task->cap,task,rtsFalse);    
2224         releaseCapability(task->cap);
2225     }
2226     sched_state = SCHED_SHUTTING_DOWN;
2227
2228 #if defined(THREADED_RTS)
2229     { 
2230         nat i;
2231         
2232         for (i = 0; i < n_capabilities; i++) {
2233             shutdownCapability(&capabilities[i], task, wait_foreign);
2234         }
2235         boundTaskExiting(task);
2236     }
2237 #endif
2238 }
2239
2240 void
2241 freeScheduler( void )
2242 {
2243     nat still_running;
2244
2245     ACQUIRE_LOCK(&sched_mutex);
2246     still_running = freeTaskManager();
2247     // We can only free the Capabilities if there are no Tasks still
2248     // running.  We might have a Task about to return from a foreign
2249     // call into waitForReturnCapability(), for example (actually,
2250     // this should be the *only* thing that a still-running Task can
2251     // do at this point, and it will block waiting for the
2252     // Capability).
2253     if (still_running == 0) {
2254         freeCapabilities();
2255         if (n_capabilities != 1) {
2256             stgFree(capabilities);
2257         }
2258     }
2259     RELEASE_LOCK(&sched_mutex);
2260 #if defined(THREADED_RTS)
2261     closeMutex(&sched_mutex);
2262 #endif
2263 }
2264
2265 /* -----------------------------------------------------------------------------
2266    performGC
2267
2268    This is the interface to the garbage collector from Haskell land.
2269    We provide this so that external C code can allocate and garbage
2270    collect when called from Haskell via _ccall_GC.
2271    -------------------------------------------------------------------------- */
2272
2273 static void
2274 performGC_(rtsBool force_major)
2275 {
2276     Task *task;
2277
2278     // We must grab a new Task here, because the existing Task may be
2279     // associated with a particular Capability, and chained onto the 
2280     // suspended_ccalling_tasks queue.
2281     ACQUIRE_LOCK(&sched_mutex);
2282     task = newBoundTask();
2283     RELEASE_LOCK(&sched_mutex);
2284
2285     waitForReturnCapability(&task->cap,task);
2286     scheduleDoGC(task->cap,task,force_major);
2287     releaseCapability(task->cap);
2288     boundTaskExiting(task);
2289 }
2290
2291 void
2292 performGC(void)
2293 {
2294     performGC_(rtsFalse);
2295 }
2296
2297 void
2298 performMajorGC(void)
2299 {
2300     performGC_(rtsTrue);
2301 }
2302
2303 /* -----------------------------------------------------------------------------
2304    Stack overflow
2305
2306    If the thread has reached its maximum stack size, then raise the
2307    StackOverflow exception in the offending thread.  Otherwise
2308    relocate the TSO into a larger chunk of memory and adjust its stack
2309    size appropriately.
2310    -------------------------------------------------------------------------- */
2311
2312 static StgTSO *
2313 threadStackOverflow(Capability *cap, StgTSO *tso)
2314 {
2315   nat new_stack_size, stack_words;
2316   lnat new_tso_size;
2317   StgPtr new_sp;
2318   StgTSO *dest;
2319
2320   IF_DEBUG(sanity,checkTSO(tso));
2321
2322   // don't allow throwTo() to modify the blocked_exceptions queue
2323   // while we are moving the TSO:
2324   lockClosure((StgClosure *)tso);
2325
2326   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2327       // NB. never raise a StackOverflow exception if the thread is
2328       // inside Control.Exceptino.block.  It is impractical to protect
2329       // against stack overflow exceptions, since virtually anything
2330       // can raise one (even 'catch'), so this is the only sensible
2331       // thing to do here.  See bug #767.
2332
2333       debugTrace(DEBUG_gc,
2334                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2335                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2336       IF_DEBUG(gc,
2337                /* If we're debugging, just print out the top of the stack */
2338                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2339                                                 tso->sp+64)));
2340
2341       // Send this thread the StackOverflow exception
2342       unlockTSO(tso);
2343       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2344       return tso;
2345   }
2346
2347   /* Try to double the current stack size.  If that takes us over the
2348    * maximum stack size for this thread, then use the maximum instead
2349    * (that is, unless we're already at or over the max size and we
2350    * can't raise the StackOverflow exception (see above), in which
2351    * case just double the size). Finally round up so the TSO ends up as
2352    * a whole number of blocks.
2353    */
2354   if (tso->stack_size >= tso->max_stack_size) {
2355       new_stack_size = tso->stack_size * 2;
2356   } else { 
2357       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2358   }
2359   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2360                                        TSO_STRUCT_SIZE)/sizeof(W_);
2361   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2362   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2363
2364   debugTrace(DEBUG_sched, 
2365              "increasing stack size from %ld words to %d.",
2366              (long)tso->stack_size, new_stack_size);
2367
2368   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2369   TICK_ALLOC_TSO(new_stack_size,0);
2370
2371   /* copy the TSO block and the old stack into the new area */
2372   memcpy(dest,tso,TSO_STRUCT_SIZE);
2373   stack_words = tso->stack + tso->stack_size - tso->sp;
2374   new_sp = (P_)dest + new_tso_size - stack_words;
2375   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2376
2377   /* relocate the stack pointers... */
2378   dest->sp         = new_sp;
2379   dest->stack_size = new_stack_size;
2380         
2381   /* Mark the old TSO as relocated.  We have to check for relocated
2382    * TSOs in the garbage collector and any primops that deal with TSOs.
2383    *
2384    * It's important to set the sp value to just beyond the end
2385    * of the stack, so we don't attempt to scavenge any part of the
2386    * dead TSO's stack.
2387    */
2388   tso->what_next = ThreadRelocated;
2389   setTSOLink(cap,tso,dest);
2390   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2391   tso->why_blocked = NotBlocked;
2392
2393   IF_PAR_DEBUG(verbose,
2394                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2395                      tso->id, tso, tso->stack_size);
2396                /* If we're debugging, just print out the top of the stack */
2397                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2398                                                 tso->sp+64)));
2399   
2400   unlockTSO(dest);
2401   unlockTSO(tso);
2402
2403   IF_DEBUG(sanity,checkTSO(dest));
2404 #if 0
2405   IF_DEBUG(scheduler,printTSO(dest));
2406 #endif
2407
2408   return dest;
2409 }
2410
2411 static StgTSO *
2412 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2413 {
2414     bdescr *bd, *new_bd;
2415     lnat free_w, tso_size_w;
2416     StgTSO *new_tso;
2417
2418     tso_size_w = tso_sizeW(tso);
2419
2420     if (tso_size_w < MBLOCK_SIZE_W || 
2421         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2422     {
2423         return tso;
2424     }
2425
2426     // don't allow throwTo() to modify the blocked_exceptions queue
2427     // while we are moving the TSO:
2428     lockClosure((StgClosure *)tso);
2429
2430     // this is the number of words we'll free
2431     free_w = round_to_mblocks(tso_size_w/2);
2432
2433     bd = Bdescr((StgPtr)tso);
2434     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2435     bd->free = bd->start + TSO_STRUCT_SIZEW;
2436
2437     new_tso = (StgTSO *)new_bd->start;
2438     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2439     new_tso->stack_size = new_bd->free - new_tso->stack;
2440
2441     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2442                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2443
2444     tso->what_next = ThreadRelocated;
2445     tso->_link = new_tso; // no write barrier reqd: same generation
2446
2447     // The TSO attached to this Task may have moved, so update the
2448     // pointer to it.
2449     if (task->tso == tso) {
2450         task->tso = new_tso;
2451     }
2452
2453     unlockTSO(new_tso);
2454     unlockTSO(tso);
2455
2456     IF_DEBUG(sanity,checkTSO(new_tso));
2457
2458     return new_tso;
2459 }
2460
2461 /* ---------------------------------------------------------------------------
2462    Interrupt execution
2463    - usually called inside a signal handler so it mustn't do anything fancy.   
2464    ------------------------------------------------------------------------ */
2465
2466 void
2467 interruptStgRts(void)
2468 {
2469     sched_state = SCHED_INTERRUPTING;
2470     setContextSwitches();
2471     wakeUpRts();
2472 }
2473
2474 /* -----------------------------------------------------------------------------
2475    Wake up the RTS
2476    
2477    This function causes at least one OS thread to wake up and run the
2478    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2479    an external event has arrived that may need servicing (eg. a
2480    keyboard interrupt).
2481
2482    In the single-threaded RTS we don't do anything here; we only have
2483    one thread anyway, and the event that caused us to want to wake up
2484    will have interrupted any blocking system call in progress anyway.
2485    -------------------------------------------------------------------------- */
2486
2487 void
2488 wakeUpRts(void)
2489 {
2490 #if defined(THREADED_RTS)
2491     // This forces the IO Manager thread to wakeup, which will
2492     // in turn ensure that some OS thread wakes up and runs the
2493     // scheduler loop, which will cause a GC and deadlock check.
2494     ioManagerWakeup();
2495 #endif
2496 }
2497
2498 /* -----------------------------------------------------------------------------
2499  * checkBlackHoles()
2500  *
2501  * Check the blackhole_queue for threads that can be woken up.  We do
2502  * this periodically: before every GC, and whenever the run queue is
2503  * empty.
2504  *
2505  * An elegant solution might be to just wake up all the blocked
2506  * threads with awakenBlockedQueue occasionally: they'll go back to
2507  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2508  * doesn't give us a way to tell whether we've actually managed to
2509  * wake up any threads, so we would be busy-waiting.
2510  *
2511  * -------------------------------------------------------------------------- */
2512
2513 static rtsBool
2514 checkBlackHoles (Capability *cap)
2515 {
2516     StgTSO **prev, *t;
2517     rtsBool any_woke_up = rtsFalse;
2518     StgHalfWord type;
2519
2520     // blackhole_queue is global:
2521     ASSERT_LOCK_HELD(&sched_mutex);
2522
2523     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2524
2525     // ASSUMES: sched_mutex
2526     prev = &blackhole_queue;
2527     t = blackhole_queue;
2528     while (t != END_TSO_QUEUE) {
2529         ASSERT(t->why_blocked == BlockedOnBlackHole);
2530         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2531         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2532             IF_DEBUG(sanity,checkTSO(t));
2533             t = unblockOne(cap, t);
2534             *prev = t;
2535             any_woke_up = rtsTrue;
2536         } else {
2537             prev = &t->_link;
2538             t = t->_link;
2539         }
2540     }
2541
2542     return any_woke_up;
2543 }
2544
2545 /* -----------------------------------------------------------------------------
2546    Deleting threads
2547
2548    This is used for interruption (^C) and forking, and corresponds to
2549    raising an exception but without letting the thread catch the
2550    exception.
2551    -------------------------------------------------------------------------- */
2552
2553 static void 
2554 deleteThread (Capability *cap, StgTSO *tso)
2555 {
2556     // NOTE: must only be called on a TSO that we have exclusive
2557     // access to, because we will call throwToSingleThreaded() below.
2558     // The TSO must be on the run queue of the Capability we own, or 
2559     // we must own all Capabilities.
2560
2561     if (tso->why_blocked != BlockedOnCCall &&
2562         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2563         throwToSingleThreaded(cap,tso,NULL);
2564     }
2565 }
2566
2567 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2568 static void 
2569 deleteThread_(Capability *cap, StgTSO *tso)
2570 { // for forkProcess only:
2571   // like deleteThread(), but we delete threads in foreign calls, too.
2572
2573     if (tso->why_blocked == BlockedOnCCall ||
2574         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2575         unblockOne(cap,tso);
2576         tso->what_next = ThreadKilled;
2577     } else {
2578         deleteThread(cap,tso);
2579     }
2580 }
2581 #endif
2582
2583 /* -----------------------------------------------------------------------------
2584    raiseExceptionHelper
2585    
2586    This function is called by the raise# primitve, just so that we can
2587    move some of the tricky bits of raising an exception from C-- into
2588    C.  Who knows, it might be a useful re-useable thing here too.
2589    -------------------------------------------------------------------------- */
2590
2591 StgWord
2592 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2593 {
2594     Capability *cap = regTableToCapability(reg);
2595     StgThunk *raise_closure = NULL;
2596     StgPtr p, next;
2597     StgRetInfoTable *info;
2598     //
2599     // This closure represents the expression 'raise# E' where E
2600     // is the exception raise.  It is used to overwrite all the
2601     // thunks which are currently under evaluataion.
2602     //
2603
2604     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2605     // LDV profiling: stg_raise_info has THUNK as its closure
2606     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2607     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2608     // 1 does not cause any problem unless profiling is performed.
2609     // However, when LDV profiling goes on, we need to linearly scan
2610     // small object pool, where raise_closure is stored, so we should
2611     // use MIN_UPD_SIZE.
2612     //
2613     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2614     //                                 sizeofW(StgClosure)+1);
2615     //
2616
2617     //
2618     // Walk up the stack, looking for the catch frame.  On the way,
2619     // we update any closures pointed to from update frames with the
2620     // raise closure that we just built.
2621     //
2622     p = tso->sp;
2623     while(1) {
2624         info = get_ret_itbl((StgClosure *)p);
2625         next = p + stack_frame_sizeW((StgClosure *)p);
2626         switch (info->i.type) {
2627             
2628         case UPDATE_FRAME:
2629             // Only create raise_closure if we need to.
2630             if (raise_closure == NULL) {
2631                 raise_closure = 
2632                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2633                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2634                 raise_closure->payload[0] = exception;
2635             }
2636             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2637             p = next;
2638             continue;
2639
2640         case ATOMICALLY_FRAME:
2641             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2642             tso->sp = p;
2643             return ATOMICALLY_FRAME;
2644             
2645         case CATCH_FRAME:
2646             tso->sp = p;
2647             return CATCH_FRAME;
2648
2649         case CATCH_STM_FRAME:
2650             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2651             tso->sp = p;
2652             return CATCH_STM_FRAME;
2653             
2654         case STOP_FRAME:
2655             tso->sp = p;
2656             return STOP_FRAME;
2657
2658         case CATCH_RETRY_FRAME:
2659         default:
2660             p = next; 
2661             continue;
2662         }
2663     }
2664 }
2665
2666
2667 /* -----------------------------------------------------------------------------
2668    findRetryFrameHelper
2669
2670    This function is called by the retry# primitive.  It traverses the stack
2671    leaving tso->sp referring to the frame which should handle the retry.  
2672
2673    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2674    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2675
2676    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2677    create) because retries are not considered to be exceptions, despite the
2678    similar implementation.
2679
2680    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2681    not be created within memory transactions.
2682    -------------------------------------------------------------------------- */
2683
2684 StgWord
2685 findRetryFrameHelper (StgTSO *tso)
2686 {
2687   StgPtr           p, next;
2688   StgRetInfoTable *info;
2689
2690   p = tso -> sp;
2691   while (1) {
2692     info = get_ret_itbl((StgClosure *)p);
2693     next = p + stack_frame_sizeW((StgClosure *)p);
2694     switch (info->i.type) {
2695       
2696     case ATOMICALLY_FRAME:
2697         debugTrace(DEBUG_stm,
2698                    "found ATOMICALLY_FRAME at %p during retry", p);
2699         tso->sp = p;
2700         return ATOMICALLY_FRAME;
2701       
2702     case CATCH_RETRY_FRAME:
2703         debugTrace(DEBUG_stm,
2704                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2705         tso->sp = p;
2706         return CATCH_RETRY_FRAME;
2707       
2708     case CATCH_STM_FRAME: {
2709         StgTRecHeader *trec = tso -> trec;
2710         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2711         debugTrace(DEBUG_stm,
2712                    "found CATCH_STM_FRAME at %p during retry", p);
2713         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2714         stmAbortTransaction(tso -> cap, trec);
2715         stmFreeAbortedTRec(tso -> cap, trec);
2716         tso -> trec = outer;
2717         p = next; 
2718         continue;
2719     }
2720       
2721
2722     default:
2723       ASSERT(info->i.type != CATCH_FRAME);
2724       ASSERT(info->i.type != STOP_FRAME);
2725       p = next; 
2726       continue;
2727     }
2728   }
2729 }
2730
2731 /* -----------------------------------------------------------------------------
2732    resurrectThreads is called after garbage collection on the list of
2733    threads found to be garbage.  Each of these threads will be woken
2734    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2735    on an MVar, or NonTermination if the thread was blocked on a Black
2736    Hole.
2737
2738    Locks: assumes we hold *all* the capabilities.
2739    -------------------------------------------------------------------------- */
2740
2741 void
2742 resurrectThreads (StgTSO *threads)
2743 {
2744     StgTSO *tso, *next;
2745     Capability *cap;
2746     step *step;
2747
2748     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2749         next = tso->global_link;
2750
2751         step = Bdescr((P_)tso)->step;
2752         tso->global_link = step->threads;
2753         step->threads = tso;
2754
2755         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2756         
2757         // Wake up the thread on the Capability it was last on
2758         cap = tso->cap;
2759         
2760         switch (tso->why_blocked) {
2761         case BlockedOnMVar:
2762         case BlockedOnException:
2763             /* Called by GC - sched_mutex lock is currently held. */
2764             throwToSingleThreaded(cap, tso,
2765                                   (StgClosure *)blockedOnDeadMVar_closure);
2766             break;
2767         case BlockedOnBlackHole:
2768             throwToSingleThreaded(cap, tso,
2769                                   (StgClosure *)nonTermination_closure);
2770             break;
2771         case BlockedOnSTM:
2772             throwToSingleThreaded(cap, tso,
2773                                   (StgClosure *)blockedIndefinitely_closure);
2774             break;
2775         case NotBlocked:
2776             /* This might happen if the thread was blocked on a black hole
2777              * belonging to a thread that we've just woken up (raiseAsync
2778              * can wake up threads, remember...).
2779              */
2780             continue;
2781         default:
2782             barf("resurrectThreads: thread blocked in a strange way");
2783         }
2784     }
2785 }
2786
2787 /* -----------------------------------------------------------------------------
2788    performPendingThrowTos is called after garbage collection, and
2789    passed a list of threads that were found to have pending throwTos
2790    (tso->blocked_exceptions was not empty), and were blocked.
2791    Normally this doesn't happen, because we would deliver the
2792    exception directly if the target thread is blocked, but there are
2793    small windows where it might occur on a multiprocessor (see
2794    throwTo()).
2795
2796    NB. we must be holding all the capabilities at this point, just
2797    like resurrectThreads().
2798    -------------------------------------------------------------------------- */
2799
2800 void
2801 performPendingThrowTos (StgTSO *threads)
2802 {
2803     StgTSO *tso, *next;
2804     Capability *cap;
2805     step *step;
2806
2807     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2808         next = tso->global_link;
2809
2810         step = Bdescr((P_)tso)->step;
2811         tso->global_link = step->threads;
2812         step->threads = tso;
2813
2814         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2815         
2816         cap = tso->cap;
2817         maybePerformBlockedException(cap, tso);
2818     }
2819 }