scheduleYield(): check the wakeup queue before yielding
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #include "GC.h"
35 #include "Weak.h"
36
37 /* PARALLEL_HASKELL includes go here */
38
39 #include "Sparks.h"
40 #include "Capability.h"
41 #include "Task.h"
42 #include "AwaitEvent.h"
43 #if defined(mingw32_HOST_OS)
44 #include "win32/IOManager.h"
45 #endif
46 #include "Trace.h"
47 #include "RaiseAsync.h"
48 #include "Threads.h"
49 #include "ThrIOManager.h"
50
51 #ifdef HAVE_SYS_TYPES_H
52 #include <sys/types.h>
53 #endif
54 #ifdef HAVE_UNISTD_H
55 #include <unistd.h>
56 #endif
57
58 #include <string.h>
59 #include <stdlib.h>
60 #include <stdarg.h>
61
62 #ifdef HAVE_ERRNO_H
63 #include <errno.h>
64 #endif
65
66 // Turn off inlining when debugging - it obfuscates things
67 #ifdef DEBUG
68 # undef  STATIC_INLINE
69 # define STATIC_INLINE static
70 #endif
71
72 /* -----------------------------------------------------------------------------
73  * Global variables
74  * -------------------------------------------------------------------------- */
75
76 #if !defined(THREADED_RTS)
77 // Blocked/sleeping thrads
78 StgTSO *blocked_queue_hd = NULL;
79 StgTSO *blocked_queue_tl = NULL;
80 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
81 #endif
82
83 /* Threads blocked on blackholes.
84  * LOCK: sched_mutex+capability, or all capabilities
85  */
86 StgTSO *blackhole_queue = NULL;
87
88 /* The blackhole_queue should be checked for threads to wake up.  See
89  * Schedule.h for more thorough comment.
90  * LOCK: none (doesn't matter if we miss an update)
91  */
92 rtsBool blackholes_need_checking = rtsFalse;
93
94 /* Set to true when the latest garbage collection failed to reclaim
95  * enough space, and the runtime should proceed to shut itself down in
96  * an orderly fashion (emitting profiling info etc.)
97  */
98 rtsBool heap_overflow = rtsFalse;
99
100 /* flag that tracks whether we have done any execution in this time slice.
101  * LOCK: currently none, perhaps we should lock (but needs to be
102  * updated in the fast path of the scheduler).
103  *
104  * NB. must be StgWord, we do xchg() on it.
105  */
106 volatile StgWord recent_activity = ACTIVITY_YES;
107
108 /* if this flag is set as well, give up execution
109  * LOCK: none (changes monotonically)
110  */
111 volatile StgWord sched_state = SCHED_RUNNING;
112
113 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
114  *  exists - earlier gccs apparently didn't.
115  *  -= chak
116  */
117 StgTSO dummy_tso;
118
119 /*
120  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
121  * in an MT setting, needed to signal that a worker thread shouldn't hang around
122  * in the scheduler when it is out of work.
123  */
124 rtsBool shutting_down_scheduler = rtsFalse;
125
126 /*
127  * This mutex protects most of the global scheduler data in
128  * the THREADED_RTS runtime.
129  */
130 #if defined(THREADED_RTS)
131 Mutex sched_mutex;
132 #endif
133
134 #if !defined(mingw32_HOST_OS)
135 #define FORKPROCESS_PRIMOP_SUPPORTED
136 #endif
137
138 /* -----------------------------------------------------------------------------
139  * static function prototypes
140  * -------------------------------------------------------------------------- */
141
142 static Capability *schedule (Capability *initialCapability, Task *task);
143
144 //
145 // These function all encapsulate parts of the scheduler loop, and are
146 // abstracted only to make the structure and control flow of the
147 // scheduler clearer.
148 //
149 static void schedulePreLoop (void);
150 static void scheduleFindWork (Capability *cap);
151 #if defined(THREADED_RTS)
152 static void scheduleYield (Capability **pcap, Task *task);
153 #endif
154 static void scheduleStartSignalHandlers (Capability *cap);
155 static void scheduleCheckBlockedThreads (Capability *cap);
156 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
157 static void scheduleCheckBlackHoles (Capability *cap);
158 static void scheduleDetectDeadlock (Capability *cap, Task *task);
159 static void schedulePushWork(Capability *cap, Task *task);
160 #if defined(PARALLEL_HASKELL)
161 static rtsBool scheduleGetRemoteWork(Capability *cap);
162 static void scheduleSendPendingMessages(void);
163 #endif
164 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
165 static void scheduleActivateSpark(Capability *cap);
166 #endif
167 static void schedulePostRunThread(Capability *cap, StgTSO *t);
168 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
169 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
170                                          StgTSO *t);
171 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
172                                     nat prev_what_next );
173 static void scheduleHandleThreadBlocked( StgTSO *t );
174 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
175                                              StgTSO *t );
176 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
177 static Capability *scheduleDoGC(Capability *cap, Task *task,
178                                 rtsBool force_major);
179
180 static rtsBool checkBlackHoles(Capability *cap);
181
182 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
183 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
184
185 static void deleteThread (Capability *cap, StgTSO *tso);
186 static void deleteAllThreads (Capability *cap);
187
188 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
189 static void deleteThread_(Capability *cap, StgTSO *tso);
190 #endif
191
192 #ifdef DEBUG
193 static char *whatNext_strs[] = {
194   "(unknown)",
195   "ThreadRunGHC",
196   "ThreadInterpret",
197   "ThreadKilled",
198   "ThreadRelocated",
199   "ThreadComplete"
200 };
201 #endif
202
203 /* -----------------------------------------------------------------------------
204  * Putting a thread on the run queue: different scheduling policies
205  * -------------------------------------------------------------------------- */
206
207 STATIC_INLINE void
208 addToRunQueue( Capability *cap, StgTSO *t )
209 {
210 #if defined(PARALLEL_HASKELL)
211     if (RtsFlags.ParFlags.doFairScheduling) { 
212         // this does round-robin scheduling; good for concurrency
213         appendToRunQueue(cap,t);
214     } else {
215         // this does unfair scheduling; good for parallelism
216         pushOnRunQueue(cap,t);
217     }
218 #else
219     // this does round-robin scheduling; good for concurrency
220     appendToRunQueue(cap,t);
221 #endif
222 }
223
224 /* ---------------------------------------------------------------------------
225    Main scheduling loop.
226
227    We use round-robin scheduling, each thread returning to the
228    scheduler loop when one of these conditions is detected:
229
230       * out of heap space
231       * timer expires (thread yields)
232       * thread blocks
233       * thread ends
234       * stack overflow
235
236    GRAN version:
237      In a GranSim setup this loop iterates over the global event queue.
238      This revolves around the global event queue, which determines what 
239      to do next. Therefore, it's more complicated than either the 
240      concurrent or the parallel (GUM) setup.
241   This version has been entirely removed (JB 2008/08).
242
243    GUM version:
244      GUM iterates over incoming messages.
245      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
246      and sends out a fish whenever it has nothing to do; in-between
247      doing the actual reductions (shared code below) it processes the
248      incoming messages and deals with delayed operations 
249      (see PendingFetches).
250      This is not the ugliest code you could imagine, but it's bloody close.
251
252   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
253   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
254   as well as future GUM versions. This file has been refurbished to
255   only contain valid code, which is however incomplete, refers to
256   invalid includes etc.
257
258    ------------------------------------------------------------------------ */
259
260 static Capability *
261 schedule (Capability *initialCapability, Task *task)
262 {
263   StgTSO *t;
264   Capability *cap;
265   StgThreadReturnCode ret;
266 #if defined(PARALLEL_HASKELL)
267   rtsBool receivedFinish = rtsFalse;
268 #endif
269   nat prev_what_next;
270   rtsBool ready_to_gc;
271 #if defined(THREADED_RTS)
272   rtsBool first = rtsTrue;
273 #endif
274   
275   cap = initialCapability;
276
277   // Pre-condition: this task owns initialCapability.
278   // The sched_mutex is *NOT* held
279   // NB. on return, we still hold a capability.
280
281   debugTrace (DEBUG_sched, 
282               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
283               task, initialCapability);
284
285   if (running_finalizers) {
286       errorBelch("error: a C finalizer called back into Haskell.\n"
287                  "   use Foreign.Concurrent.newForeignPtr for Haskell finalizers.");
288       stg_exit(EXIT_FAILURE);
289   }
290
291   schedulePreLoop();
292
293   // -----------------------------------------------------------
294   // Scheduler loop starts here:
295
296 #if defined(PARALLEL_HASKELL)
297 #define TERMINATION_CONDITION        (!receivedFinish)
298 #else
299 #define TERMINATION_CONDITION        rtsTrue
300 #endif
301
302   while (TERMINATION_CONDITION) {
303
304     // Check whether we have re-entered the RTS from Haskell without
305     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
306     // call).
307     if (cap->in_haskell) {
308           errorBelch("schedule: re-entered unsafely.\n"
309                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
310           stg_exit(EXIT_FAILURE);
311     }
312
313     // The interruption / shutdown sequence.
314     // 
315     // In order to cleanly shut down the runtime, we want to:
316     //   * make sure that all main threads return to their callers
317     //     with the state 'Interrupted'.
318     //   * clean up all OS threads assocated with the runtime
319     //   * free all memory etc.
320     //
321     // So the sequence for ^C goes like this:
322     //
323     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
324     //     arranges for some Capability to wake up
325     //
326     //   * all threads in the system are halted, and the zombies are
327     //     placed on the run queue for cleaning up.  We acquire all
328     //     the capabilities in order to delete the threads, this is
329     //     done by scheduleDoGC() for convenience (because GC already
330     //     needs to acquire all the capabilities).  We can't kill
331     //     threads involved in foreign calls.
332     // 
333     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
334     //
335     //   * sched_state := SCHED_SHUTTING_DOWN
336     //
337     //   * all workers exit when the run queue on their capability
338     //     drains.  All main threads will also exit when their TSO
339     //     reaches the head of the run queue and they can return.
340     //
341     //   * eventually all Capabilities will shut down, and the RTS can
342     //     exit.
343     //
344     //   * We might be left with threads blocked in foreign calls, 
345     //     we should really attempt to kill these somehow (TODO);
346     
347     switch (sched_state) {
348     case SCHED_RUNNING:
349         break;
350     case SCHED_INTERRUPTING:
351         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
352 #if defined(THREADED_RTS)
353         discardSparksCap(cap);
354 #endif
355         /* scheduleDoGC() deletes all the threads */
356         cap = scheduleDoGC(cap,task,rtsFalse);
357
358         // after scheduleDoGC(), we must be shutting down.  Either some
359         // other Capability did the final GC, or we did it above,
360         // either way we can fall through to the SCHED_SHUTTING_DOWN
361         // case now.
362         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
363         // fall through
364
365     case SCHED_SHUTTING_DOWN:
366         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
367         // If we are a worker, just exit.  If we're a bound thread
368         // then we will exit below when we've removed our TSO from
369         // the run queue.
370         if (task->tso == NULL && emptyRunQueue(cap)) {
371             return cap;
372         }
373         break;
374     default:
375         barf("sched_state: %d", sched_state);
376     }
377
378     scheduleFindWork(cap);
379
380     /* work pushing, currently relevant only for THREADED_RTS:
381        (pushes threads, wakes up idle capabilities for stealing) */
382     schedulePushWork(cap,task);
383
384 #if defined(PARALLEL_HASKELL)
385     /* since we perform a blocking receive and continue otherwise,
386        either we never reach here or we definitely have work! */
387     // from here: non-empty run queue
388     ASSERT(!emptyRunQueue(cap));
389
390     if (PacketsWaiting()) {  /* now process incoming messages, if any
391                                 pending...  
392
393                                 CAUTION: scheduleGetRemoteWork called
394                                 above, waits for messages as well! */
395       processMessages(cap, &receivedFinish);
396     }
397 #endif // PARALLEL_HASKELL: non-empty run queue!
398
399     scheduleDetectDeadlock(cap,task);
400
401 #if defined(THREADED_RTS)
402     cap = task->cap;    // reload cap, it might have changed
403 #endif
404
405     // Normally, the only way we can get here with no threads to
406     // run is if a keyboard interrupt received during 
407     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
408     // Additionally, it is not fatal for the
409     // threaded RTS to reach here with no threads to run.
410     //
411     // win32: might be here due to awaitEvent() being abandoned
412     // as a result of a console event having been delivered.
413     
414 #if defined(THREADED_RTS)
415     if (first) 
416     {
417     // XXX: ToDo
418     //     // don't yield the first time, we want a chance to run this
419     //     // thread for a bit, even if there are others banging at the
420     //     // door.
421     //     first = rtsFalse;
422     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
423     }
424
425   yield:
426     scheduleYield(&cap,task);
427     if (emptyRunQueue(cap)) continue; // look for work again
428 #endif
429
430 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
431     if ( emptyRunQueue(cap) ) {
432         ASSERT(sched_state >= SCHED_INTERRUPTING);
433     }
434 #endif
435
436     // 
437     // Get a thread to run
438     //
439     t = popRunQueue(cap);
440
441     // Sanity check the thread we're about to run.  This can be
442     // expensive if there is lots of thread switching going on...
443     IF_DEBUG(sanity,checkTSO(t));
444
445 #if defined(THREADED_RTS)
446     // Check whether we can run this thread in the current task.
447     // If not, we have to pass our capability to the right task.
448     {
449         Task *bound = t->bound;
450       
451         if (bound) {
452             if (bound == task) {
453                 debugTrace(DEBUG_sched,
454                            "### Running thread %lu in bound thread", (unsigned long)t->id);
455                 // yes, the Haskell thread is bound to the current native thread
456             } else {
457                 debugTrace(DEBUG_sched,
458                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
459                 // no, bound to a different Haskell thread: pass to that thread
460                 pushOnRunQueue(cap,t);
461                 continue;
462             }
463         } else {
464             // The thread we want to run is unbound.
465             if (task->tso) { 
466                 debugTrace(DEBUG_sched,
467                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
468                 // no, the current native thread is bound to a different
469                 // Haskell thread, so pass it to any worker thread
470                 pushOnRunQueue(cap,t);
471                 continue; 
472             }
473         }
474     }
475 #endif
476
477     // If we're shutting down, and this thread has not yet been
478     // killed, kill it now.  This sometimes happens when a finalizer
479     // thread is created by the final GC, or a thread previously
480     // in a foreign call returns.
481     if (sched_state >= SCHED_INTERRUPTING &&
482         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
483         deleteThread(cap,t);
484     }
485
486     /* context switches are initiated by the timer signal, unless
487      * the user specified "context switch as often as possible", with
488      * +RTS -C0
489      */
490     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
491         && !emptyThreadQueues(cap)) {
492         cap->context_switch = 1;
493     }
494          
495 run_thread:
496
497     // CurrentTSO is the thread to run.  t might be different if we
498     // loop back to run_thread, so make sure to set CurrentTSO after
499     // that.
500     cap->r.rCurrentTSO = t;
501
502     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
503                               (long)t->id, whatNext_strs[t->what_next]);
504
505     startHeapProfTimer();
506
507     // Check for exceptions blocked on this thread
508     maybePerformBlockedException (cap, t);
509
510     // ----------------------------------------------------------------------
511     // Run the current thread 
512
513     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
514     ASSERT(t->cap == cap);
515     ASSERT(t->bound ? t->bound->cap == cap : 1);
516
517     prev_what_next = t->what_next;
518
519     errno = t->saved_errno;
520 #if mingw32_HOST_OS
521     SetLastError(t->saved_winerror);
522 #endif
523
524     cap->in_haskell = rtsTrue;
525
526     dirty_TSO(cap,t);
527
528 #if defined(THREADED_RTS)
529     if (recent_activity == ACTIVITY_DONE_GC) {
530         // ACTIVITY_DONE_GC means we turned off the timer signal to
531         // conserve power (see #1623).  Re-enable it here.
532         nat prev;
533         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
534         if (prev == ACTIVITY_DONE_GC) {
535             startTimer();
536         }
537     } else {
538         recent_activity = ACTIVITY_YES;
539     }
540 #endif
541
542     switch (prev_what_next) {
543         
544     case ThreadKilled:
545     case ThreadComplete:
546         /* Thread already finished, return to scheduler. */
547         ret = ThreadFinished;
548         break;
549         
550     case ThreadRunGHC:
551     {
552         StgRegTable *r;
553         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
554         cap = regTableToCapability(r);
555         ret = r->rRet;
556         break;
557     }
558     
559     case ThreadInterpret:
560         cap = interpretBCO(cap);
561         ret = cap->r.rRet;
562         break;
563         
564     default:
565         barf("schedule: invalid what_next field");
566     }
567
568     cap->in_haskell = rtsFalse;
569
570     // The TSO might have moved, eg. if it re-entered the RTS and a GC
571     // happened.  So find the new location:
572     t = cap->r.rCurrentTSO;
573
574     // We have run some Haskell code: there might be blackhole-blocked
575     // threads to wake up now.
576     // Lock-free test here should be ok, we're just setting a flag.
577     if ( blackhole_queue != END_TSO_QUEUE ) {
578         blackholes_need_checking = rtsTrue;
579     }
580     
581     // And save the current errno in this thread.
582     // XXX: possibly bogus for SMP because this thread might already
583     // be running again, see code below.
584     t->saved_errno = errno;
585 #if mingw32_HOST_OS
586     // Similarly for Windows error code
587     t->saved_winerror = GetLastError();
588 #endif
589
590 #if defined(THREADED_RTS)
591     // If ret is ThreadBlocked, and this Task is bound to the TSO that
592     // blocked, we are in limbo - the TSO is now owned by whatever it
593     // is blocked on, and may in fact already have been woken up,
594     // perhaps even on a different Capability.  It may be the case
595     // that task->cap != cap.  We better yield this Capability
596     // immediately and return to normaility.
597     if (ret == ThreadBlocked) {
598         debugTrace(DEBUG_sched,
599                    "--<< thread %lu (%s) stopped: blocked",
600                    (unsigned long)t->id, whatNext_strs[t->what_next]);
601         goto yield;
602     }
603 #endif
604
605     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
606     ASSERT(t->cap == cap);
607
608     // ----------------------------------------------------------------------
609     
610     // Costs for the scheduler are assigned to CCS_SYSTEM
611     stopHeapProfTimer();
612 #if defined(PROFILING)
613     CCCS = CCS_SYSTEM;
614 #endif
615     
616     schedulePostRunThread(cap,t);
617
618     t = threadStackUnderflow(task,t);
619
620     ready_to_gc = rtsFalse;
621
622     switch (ret) {
623     case HeapOverflow:
624         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
625         break;
626
627     case StackOverflow:
628         scheduleHandleStackOverflow(cap,task,t);
629         break;
630
631     case ThreadYielding:
632         if (scheduleHandleYield(cap, t, prev_what_next)) {
633             // shortcut for switching between compiler/interpreter:
634             goto run_thread; 
635         }
636         break;
637
638     case ThreadBlocked:
639         scheduleHandleThreadBlocked(t);
640         break;
641
642     case ThreadFinished:
643         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
644         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
645         break;
646
647     default:
648       barf("schedule: invalid thread return code %d", (int)ret);
649     }
650
651     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
652       cap = scheduleDoGC(cap,task,rtsFalse);
653     }
654   } /* end of while() */
655 }
656
657 /* ----------------------------------------------------------------------------
658  * Setting up the scheduler loop
659  * ------------------------------------------------------------------------- */
660
661 static void
662 schedulePreLoop(void)
663 {
664   // initialisation for scheduler - what cannot go into initScheduler()  
665 }
666
667 /* -----------------------------------------------------------------------------
668  * scheduleFindWork()
669  *
670  * Search for work to do, and handle messages from elsewhere.
671  * -------------------------------------------------------------------------- */
672
673 static void
674 scheduleFindWork (Capability *cap)
675 {
676     scheduleStartSignalHandlers(cap);
677
678     // Only check the black holes here if we've nothing else to do.
679     // During normal execution, the black hole list only gets checked
680     // at GC time, to avoid repeatedly traversing this possibly long
681     // list each time around the scheduler.
682     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
683
684     scheduleCheckWakeupThreads(cap);
685
686     scheduleCheckBlockedThreads(cap);
687
688 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
689     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
690 #endif
691
692 #if defined(PARALLEL_HASKELL)
693     // if messages have been buffered...
694     scheduleSendPendingMessages();
695 #endif
696
697 #if defined(PARALLEL_HASKELL)
698     if (emptyRunQueue(cap)) {
699         receivedFinish = scheduleGetRemoteWork(cap);
700         continue; //  a new round, (hopefully) with new work
701         /* 
702            in GUM, this a) sends out a FISH and returns IF no fish is
703                            out already
704                         b) (blocking) awaits and receives messages
705            
706            in Eden, this is only the blocking receive, as b) in GUM.
707         */
708     }
709 #endif
710 }
711
712 #if defined(THREADED_RTS)
713 STATIC_INLINE rtsBool
714 shouldYieldCapability (Capability *cap, Task *task)
715 {
716     // we need to yield this capability to someone else if..
717     //   - another thread is initiating a GC
718     //   - another Task is returning from a foreign call
719     //   - the thread at the head of the run queue cannot be run
720     //     by this Task (it is bound to another Task, or it is unbound
721     //     and this task it bound).
722     return (waiting_for_gc || 
723             cap->returning_tasks_hd != NULL ||
724             (!emptyRunQueue(cap) && (task->tso == NULL
725                                      ? cap->run_queue_hd->bound != NULL
726                                      : cap->run_queue_hd->bound != task)));
727 }
728
729 // This is the single place where a Task goes to sleep.  There are
730 // two reasons it might need to sleep:
731 //    - there are no threads to run
732 //    - we need to yield this Capability to someone else 
733 //      (see shouldYieldCapability())
734 //
735 // Careful: the scheduler loop is quite delicate.  Make sure you run
736 // the tests in testsuite/concurrent (all ways) after modifying this,
737 // and also check the benchmarks in nofib/parallel for regressions.
738
739 static void
740 scheduleYield (Capability **pcap, Task *task)
741 {
742     Capability *cap = *pcap;
743
744     // if we have work, and we don't need to give up the Capability, continue.
745     if (!shouldYieldCapability(cap,task) && 
746         (!emptyRunQueue(cap) ||
747          !emptyWakeupQueue(cap) ||
748          blackholes_need_checking ||
749          sched_state >= SCHED_INTERRUPTING))
750         return;
751
752     // otherwise yield (sleep), and keep yielding if necessary.
753     do {
754         yieldCapability(&cap,task);
755     } 
756     while (shouldYieldCapability(cap,task));
757
758     // note there may still be no threads on the run queue at this
759     // point, the caller has to check.
760
761     *pcap = cap;
762     return;
763 }
764 #endif
765     
766 /* -----------------------------------------------------------------------------
767  * schedulePushWork()
768  *
769  * Push work to other Capabilities if we have some.
770  * -------------------------------------------------------------------------- */
771
772 static void
773 schedulePushWork(Capability *cap USED_IF_THREADS, 
774                  Task *task      USED_IF_THREADS)
775 {
776   /* following code not for PARALLEL_HASKELL. I kept the call general,
777      future GUM versions might use pushing in a distributed setup */
778 #if defined(THREADED_RTS)
779
780     Capability *free_caps[n_capabilities], *cap0;
781     nat i, n_free_caps;
782
783     // migration can be turned off with +RTS -qg
784     if (!RtsFlags.ParFlags.migrate) return;
785
786     // Check whether we have more threads on our run queue, or sparks
787     // in our pool, that we could hand to another Capability.
788     if (cap->run_queue_hd == END_TSO_QUEUE) {
789         if (sparkPoolSizeCap(cap) < 2) return;
790     } else {
791         if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
792             sparkPoolSizeCap(cap) < 1) return;
793     }
794
795     // First grab as many free Capabilities as we can.
796     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
797         cap0 = &capabilities[i];
798         if (cap != cap0 && tryGrabCapability(cap0,task)) {
799             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
800                 // it already has some work, we just grabbed it at 
801                 // the wrong moment.  Or maybe it's deadlocked!
802                 releaseCapability(cap0);
803             } else {
804                 free_caps[n_free_caps++] = cap0;
805             }
806         }
807     }
808
809     // we now have n_free_caps free capabilities stashed in
810     // free_caps[].  Share our run queue equally with them.  This is
811     // probably the simplest thing we could do; improvements we might
812     // want to do include:
813     //
814     //   - giving high priority to moving relatively new threads, on 
815     //     the gournds that they haven't had time to build up a
816     //     working set in the cache on this CPU/Capability.
817     //
818     //   - giving low priority to moving long-lived threads
819
820     if (n_free_caps > 0) {
821         StgTSO *prev, *t, *next;
822         rtsBool pushed_to_all;
823
824         debugTrace(DEBUG_sched, 
825                    "cap %d: %s and %d free capabilities, sharing...", 
826                    cap->no, 
827                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
828                    "excess threads on run queue":"sparks to share (>=2)",
829                    n_free_caps);
830
831         i = 0;
832         pushed_to_all = rtsFalse;
833
834         if (cap->run_queue_hd != END_TSO_QUEUE) {
835             prev = cap->run_queue_hd;
836             t = prev->_link;
837             prev->_link = END_TSO_QUEUE;
838             for (; t != END_TSO_QUEUE; t = next) {
839                 next = t->_link;
840                 t->_link = END_TSO_QUEUE;
841                 if (t->what_next == ThreadRelocated
842                     || t->bound == task // don't move my bound thread
843                     || tsoLocked(t)) {  // don't move a locked thread
844                     setTSOLink(cap, prev, t);
845                     prev = t;
846                 } else if (i == n_free_caps) {
847                     pushed_to_all = rtsTrue;
848                     i = 0;
849                     // keep one for us
850                     setTSOLink(cap, prev, t);
851                     prev = t;
852                 } else {
853                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
854                     appendToRunQueue(free_caps[i],t);
855                     if (t->bound) { t->bound->cap = free_caps[i]; }
856                     t->cap = free_caps[i];
857                     i++;
858                 }
859             }
860             cap->run_queue_tl = prev;
861         }
862
863 #ifdef SPARK_PUSHING
864         /* JB I left this code in place, it would work but is not necessary */
865
866         // If there are some free capabilities that we didn't push any
867         // threads to, then try to push a spark to each one.
868         if (!pushed_to_all) {
869             StgClosure *spark;
870             // i is the next free capability to push to
871             for (; i < n_free_caps; i++) {
872                 if (emptySparkPoolCap(free_caps[i])) {
873                     spark = tryStealSpark(cap->sparks);
874                     if (spark != NULL) {
875                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
876                         newSpark(&(free_caps[i]->r), spark);
877                     }
878                 }
879             }
880         }
881 #endif /* SPARK_PUSHING */
882
883         // release the capabilities
884         for (i = 0; i < n_free_caps; i++) {
885             task->cap = free_caps[i];
886             releaseAndWakeupCapability(free_caps[i]);
887         }
888     }
889     task->cap = cap; // reset to point to our Capability.
890
891 #endif /* THREADED_RTS */
892
893 }
894
895 /* ----------------------------------------------------------------------------
896  * Start any pending signal handlers
897  * ------------------------------------------------------------------------- */
898
899 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
900 static void
901 scheduleStartSignalHandlers(Capability *cap)
902 {
903     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
904         // safe outside the lock
905         startSignalHandlers(cap);
906     }
907 }
908 #else
909 static void
910 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
911 {
912 }
913 #endif
914
915 /* ----------------------------------------------------------------------------
916  * Check for blocked threads that can be woken up.
917  * ------------------------------------------------------------------------- */
918
919 static void
920 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
921 {
922 #if !defined(THREADED_RTS)
923     //
924     // Check whether any waiting threads need to be woken up.  If the
925     // run queue is empty, and there are no other tasks running, we
926     // can wait indefinitely for something to happen.
927     //
928     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
929     {
930         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
931     }
932 #endif
933 }
934
935
936 /* ----------------------------------------------------------------------------
937  * Check for threads woken up by other Capabilities
938  * ------------------------------------------------------------------------- */
939
940 static void
941 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
942 {
943 #if defined(THREADED_RTS)
944     // Any threads that were woken up by other Capabilities get
945     // appended to our run queue.
946     if (!emptyWakeupQueue(cap)) {
947         ACQUIRE_LOCK(&cap->lock);
948         if (emptyRunQueue(cap)) {
949             cap->run_queue_hd = cap->wakeup_queue_hd;
950             cap->run_queue_tl = cap->wakeup_queue_tl;
951         } else {
952             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
953             cap->run_queue_tl = cap->wakeup_queue_tl;
954         }
955         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
956         RELEASE_LOCK(&cap->lock);
957     }
958 #endif
959 }
960
961 /* ----------------------------------------------------------------------------
962  * Check for threads blocked on BLACKHOLEs that can be woken up
963  * ------------------------------------------------------------------------- */
964 static void
965 scheduleCheckBlackHoles (Capability *cap)
966 {
967     if ( blackholes_need_checking ) // check without the lock first
968     {
969         ACQUIRE_LOCK(&sched_mutex);
970         if ( blackholes_need_checking ) {
971             blackholes_need_checking = rtsFalse;
972             // important that we reset the flag *before* checking the
973             // blackhole queue, otherwise we could get deadlock.  This
974             // happens as follows: we wake up a thread that
975             // immediately runs on another Capability, blocks on a
976             // blackhole, and then we reset the blackholes_need_checking flag.
977             checkBlackHoles(cap);
978         }
979         RELEASE_LOCK(&sched_mutex);
980     }
981 }
982
983 /* ----------------------------------------------------------------------------
984  * Detect deadlock conditions and attempt to resolve them.
985  * ------------------------------------------------------------------------- */
986
987 static void
988 scheduleDetectDeadlock (Capability *cap, Task *task)
989 {
990
991 #if defined(PARALLEL_HASKELL)
992     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
993     return;
994 #endif
995
996     /* 
997      * Detect deadlock: when we have no threads to run, there are no
998      * threads blocked, waiting for I/O, or sleeping, and all the
999      * other tasks are waiting for work, we must have a deadlock of
1000      * some description.
1001      */
1002     if ( emptyThreadQueues(cap) )
1003     {
1004 #if defined(THREADED_RTS)
1005         /* 
1006          * In the threaded RTS, we only check for deadlock if there
1007          * has been no activity in a complete timeslice.  This means
1008          * we won't eagerly start a full GC just because we don't have
1009          * any threads to run currently.
1010          */
1011         if (recent_activity != ACTIVITY_INACTIVE) return;
1012 #endif
1013
1014         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
1015
1016         // Garbage collection can release some new threads due to
1017         // either (a) finalizers or (b) threads resurrected because
1018         // they are unreachable and will therefore be sent an
1019         // exception.  Any threads thus released will be immediately
1020         // runnable.
1021         cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
1022         // when force_major == rtsTrue. scheduleDoGC sets
1023         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
1024         // signal.
1025
1026         if ( !emptyRunQueue(cap) ) return;
1027
1028 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1029         /* If we have user-installed signal handlers, then wait
1030          * for signals to arrive rather then bombing out with a
1031          * deadlock.
1032          */
1033         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1034             debugTrace(DEBUG_sched,
1035                        "still deadlocked, waiting for signals...");
1036
1037             awaitUserSignals();
1038
1039             if (signals_pending()) {
1040                 startSignalHandlers(cap);
1041             }
1042
1043             // either we have threads to run, or we were interrupted:
1044             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1045
1046             return;
1047         }
1048 #endif
1049
1050 #if !defined(THREADED_RTS)
1051         /* Probably a real deadlock.  Send the current main thread the
1052          * Deadlock exception.
1053          */
1054         if (task->tso) {
1055             switch (task->tso->why_blocked) {
1056             case BlockedOnSTM:
1057             case BlockedOnBlackHole:
1058             case BlockedOnException:
1059             case BlockedOnMVar:
1060                 throwToSingleThreaded(cap, task->tso, 
1061                                       (StgClosure *)nonTermination_closure);
1062                 return;
1063             default:
1064                 barf("deadlock: main thread blocked in a strange way");
1065             }
1066         }
1067         return;
1068 #endif
1069     }
1070 }
1071
1072
1073 /* ----------------------------------------------------------------------------
1074  * Send pending messages (PARALLEL_HASKELL only)
1075  * ------------------------------------------------------------------------- */
1076
1077 #if defined(PARALLEL_HASKELL)
1078 static void
1079 scheduleSendPendingMessages(void)
1080 {
1081
1082 # if defined(PAR) // global Mem.Mgmt., omit for now
1083     if (PendingFetches != END_BF_QUEUE) {
1084         processFetches();
1085     }
1086 # endif
1087     
1088     if (RtsFlags.ParFlags.BufferTime) {
1089         // if we use message buffering, we must send away all message
1090         // packets which have become too old...
1091         sendOldBuffers(); 
1092     }
1093 }
1094 #endif
1095
1096 /* ----------------------------------------------------------------------------
1097  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1098  * ------------------------------------------------------------------------- */
1099
1100 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1101 static void
1102 scheduleActivateSpark(Capability *cap)
1103 {
1104     if (anySparks())
1105     {
1106         createSparkThread(cap);
1107         debugTrace(DEBUG_sched, "creating a spark thread");
1108     }
1109 }
1110 #endif // PARALLEL_HASKELL || THREADED_RTS
1111
1112 /* ----------------------------------------------------------------------------
1113  * Get work from a remote node (PARALLEL_HASKELL only)
1114  * ------------------------------------------------------------------------- */
1115     
1116 #if defined(PARALLEL_HASKELL)
1117 static rtsBool /* return value used in PARALLEL_HASKELL only */
1118 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1119 {
1120 #if defined(PARALLEL_HASKELL)
1121   rtsBool receivedFinish = rtsFalse;
1122
1123   // idle() , i.e. send all buffers, wait for work
1124   if (RtsFlags.ParFlags.BufferTime) {
1125         IF_PAR_DEBUG(verbose, 
1126                 debugBelch("...send all pending data,"));
1127         {
1128           nat i;
1129           for (i=1; i<=nPEs; i++)
1130             sendImmediately(i); // send all messages away immediately
1131         }
1132   }
1133
1134   /* this would be the place for fishing in GUM... 
1135
1136      if (no-earlier-fish-around) 
1137           sendFish(choosePe());
1138    */
1139
1140   // Eden:just look for incoming messages (blocking receive)
1141   IF_PAR_DEBUG(verbose, 
1142                debugBelch("...wait for incoming messages...\n"));
1143   processMessages(cap, &receivedFinish); // blocking receive...
1144
1145
1146   return receivedFinish;
1147   // reenter scheduling look after having received something
1148
1149 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1150
1151   return rtsFalse; /* return value unused in THREADED_RTS */
1152
1153 #endif /* PARALLEL_HASKELL */
1154 }
1155 #endif // PARALLEL_HASKELL || THREADED_RTS
1156
1157 /* ----------------------------------------------------------------------------
1158  * After running a thread...
1159  * ------------------------------------------------------------------------- */
1160
1161 static void
1162 schedulePostRunThread (Capability *cap, StgTSO *t)
1163 {
1164     // We have to be able to catch transactions that are in an
1165     // infinite loop as a result of seeing an inconsistent view of
1166     // memory, e.g. 
1167     //
1168     //   atomically $ do
1169     //       [a,b] <- mapM readTVar [ta,tb]
1170     //       when (a == b) loop
1171     //
1172     // and a is never equal to b given a consistent view of memory.
1173     //
1174     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1175         if (!stmValidateNestOfTransactions (t -> trec)) {
1176             debugTrace(DEBUG_sched | DEBUG_stm,
1177                        "trec %p found wasting its time", t);
1178             
1179             // strip the stack back to the
1180             // ATOMICALLY_FRAME, aborting the (nested)
1181             // transaction, and saving the stack of any
1182             // partially-evaluated thunks on the heap.
1183             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1184             
1185             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1186         }
1187     }
1188
1189   /* some statistics gathering in the parallel case */
1190 }
1191
1192 /* -----------------------------------------------------------------------------
1193  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1194  * -------------------------------------------------------------------------- */
1195
1196 static rtsBool
1197 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1198 {
1199     // did the task ask for a large block?
1200     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1201         // if so, get one and push it on the front of the nursery.
1202         bdescr *bd;
1203         lnat blocks;
1204         
1205         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1206         
1207         debugTrace(DEBUG_sched,
1208                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1209                    (long)t->id, whatNext_strs[t->what_next], blocks);
1210     
1211         // don't do this if the nursery is (nearly) full, we'll GC first.
1212         if (cap->r.rCurrentNursery->link != NULL ||
1213             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1214                                                // if the nursery has only one block.
1215             
1216             ACQUIRE_SM_LOCK
1217             bd = allocGroup( blocks );
1218             RELEASE_SM_LOCK
1219             cap->r.rNursery->n_blocks += blocks;
1220             
1221             // link the new group into the list
1222             bd->link = cap->r.rCurrentNursery;
1223             bd->u.back = cap->r.rCurrentNursery->u.back;
1224             if (cap->r.rCurrentNursery->u.back != NULL) {
1225                 cap->r.rCurrentNursery->u.back->link = bd;
1226             } else {
1227 #if !defined(THREADED_RTS)
1228                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1229                        g0s0 == cap->r.rNursery);
1230 #endif
1231                 cap->r.rNursery->blocks = bd;
1232             }             
1233             cap->r.rCurrentNursery->u.back = bd;
1234             
1235             // initialise it as a nursery block.  We initialise the
1236             // step, gen_no, and flags field of *every* sub-block in
1237             // this large block, because this is easier than making
1238             // sure that we always find the block head of a large
1239             // block whenever we call Bdescr() (eg. evacuate() and
1240             // isAlive() in the GC would both have to do this, at
1241             // least).
1242             { 
1243                 bdescr *x;
1244                 for (x = bd; x < bd + blocks; x++) {
1245                     x->step = cap->r.rNursery;
1246                     x->gen_no = 0;
1247                     x->flags = 0;
1248                 }
1249             }
1250             
1251             // This assert can be a killer if the app is doing lots
1252             // of large block allocations.
1253             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1254             
1255             // now update the nursery to point to the new block
1256             cap->r.rCurrentNursery = bd;
1257             
1258             // we might be unlucky and have another thread get on the
1259             // run queue before us and steal the large block, but in that
1260             // case the thread will just end up requesting another large
1261             // block.
1262             pushOnRunQueue(cap,t);
1263             return rtsFalse;  /* not actually GC'ing */
1264         }
1265     }
1266     
1267     debugTrace(DEBUG_sched,
1268                "--<< thread %ld (%s) stopped: HeapOverflow",
1269                (long)t->id, whatNext_strs[t->what_next]);
1270
1271     if (cap->context_switch) {
1272         // Sometimes we miss a context switch, e.g. when calling
1273         // primitives in a tight loop, MAYBE_GC() doesn't check the
1274         // context switch flag, and we end up waiting for a GC.
1275         // See #1984, and concurrent/should_run/1984
1276         cap->context_switch = 0;
1277         addToRunQueue(cap,t);
1278     } else {
1279         pushOnRunQueue(cap,t);
1280     }
1281     return rtsTrue;
1282     /* actual GC is done at the end of the while loop in schedule() */
1283 }
1284
1285 /* -----------------------------------------------------------------------------
1286  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1287  * -------------------------------------------------------------------------- */
1288
1289 static void
1290 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1291 {
1292     debugTrace (DEBUG_sched,
1293                 "--<< thread %ld (%s) stopped, StackOverflow", 
1294                 (long)t->id, whatNext_strs[t->what_next]);
1295
1296     /* just adjust the stack for this thread, then pop it back
1297      * on the run queue.
1298      */
1299     { 
1300         /* enlarge the stack */
1301         StgTSO *new_t = threadStackOverflow(cap, t);
1302         
1303         /* The TSO attached to this Task may have moved, so update the
1304          * pointer to it.
1305          */
1306         if (task->tso == t) {
1307             task->tso = new_t;
1308         }
1309         pushOnRunQueue(cap,new_t);
1310     }
1311 }
1312
1313 /* -----------------------------------------------------------------------------
1314  * Handle a thread that returned to the scheduler with ThreadYielding
1315  * -------------------------------------------------------------------------- */
1316
1317 static rtsBool
1318 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1319 {
1320     // Reset the context switch flag.  We don't do this just before
1321     // running the thread, because that would mean we would lose ticks
1322     // during GC, which can lead to unfair scheduling (a thread hogs
1323     // the CPU because the tick always arrives during GC).  This way
1324     // penalises threads that do a lot of allocation, but that seems
1325     // better than the alternative.
1326     cap->context_switch = 0;
1327     
1328     /* put the thread back on the run queue.  Then, if we're ready to
1329      * GC, check whether this is the last task to stop.  If so, wake
1330      * up the GC thread.  getThread will block during a GC until the
1331      * GC is finished.
1332      */
1333 #ifdef DEBUG
1334     if (t->what_next != prev_what_next) {
1335         debugTrace(DEBUG_sched,
1336                    "--<< thread %ld (%s) stopped to switch evaluators", 
1337                    (long)t->id, whatNext_strs[t->what_next]);
1338     } else {
1339         debugTrace(DEBUG_sched,
1340                    "--<< thread %ld (%s) stopped, yielding",
1341                    (long)t->id, whatNext_strs[t->what_next]);
1342     }
1343 #endif
1344     
1345     IF_DEBUG(sanity,
1346              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1347              checkTSO(t));
1348     ASSERT(t->_link == END_TSO_QUEUE);
1349     
1350     // Shortcut if we're just switching evaluators: don't bother
1351     // doing stack squeezing (which can be expensive), just run the
1352     // thread.
1353     if (t->what_next != prev_what_next) {
1354         return rtsTrue;
1355     }
1356
1357     addToRunQueue(cap,t);
1358
1359     return rtsFalse;
1360 }
1361
1362 /* -----------------------------------------------------------------------------
1363  * Handle a thread that returned to the scheduler with ThreadBlocked
1364  * -------------------------------------------------------------------------- */
1365
1366 static void
1367 scheduleHandleThreadBlocked( StgTSO *t
1368 #if !defined(GRAN) && !defined(DEBUG)
1369     STG_UNUSED
1370 #endif
1371     )
1372 {
1373
1374       // We don't need to do anything.  The thread is blocked, and it
1375       // has tidied up its stack and placed itself on whatever queue
1376       // it needs to be on.
1377
1378     // ASSERT(t->why_blocked != NotBlocked);
1379     // Not true: for example,
1380     //    - in THREADED_RTS, the thread may already have been woken
1381     //      up by another Capability.  This actually happens: try
1382     //      conc023 +RTS -N2.
1383     //    - the thread may have woken itself up already, because
1384     //      threadPaused() might have raised a blocked throwTo
1385     //      exception, see maybePerformBlockedException().
1386
1387 #ifdef DEBUG
1388     if (traceClass(DEBUG_sched)) {
1389         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1390                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1391         printThreadBlockage(t);
1392         debugTraceEnd();
1393     }
1394 #endif
1395 }
1396
1397 /* -----------------------------------------------------------------------------
1398  * Handle a thread that returned to the scheduler with ThreadFinished
1399  * -------------------------------------------------------------------------- */
1400
1401 static rtsBool
1402 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1403 {
1404     /* Need to check whether this was a main thread, and if so,
1405      * return with the return value.
1406      *
1407      * We also end up here if the thread kills itself with an
1408      * uncaught exception, see Exception.cmm.
1409      */
1410     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1411                (unsigned long)t->id, whatNext_strs[t->what_next]);
1412
1413     // blocked exceptions can now complete, even if the thread was in
1414     // blocked mode (see #2910).  This unconditionally calls
1415     // lockTSO(), which ensures that we don't miss any threads that
1416     // are engaged in throwTo() with this thread as a target.
1417     awakenBlockedExceptionQueue (cap, t);
1418
1419       //
1420       // Check whether the thread that just completed was a bound
1421       // thread, and if so return with the result.  
1422       //
1423       // There is an assumption here that all thread completion goes
1424       // through this point; we need to make sure that if a thread
1425       // ends up in the ThreadKilled state, that it stays on the run
1426       // queue so it can be dealt with here.
1427       //
1428
1429       if (t->bound) {
1430
1431           if (t->bound != task) {
1432 #if !defined(THREADED_RTS)
1433               // Must be a bound thread that is not the topmost one.  Leave
1434               // it on the run queue until the stack has unwound to the
1435               // point where we can deal with this.  Leaving it on the run
1436               // queue also ensures that the garbage collector knows about
1437               // this thread and its return value (it gets dropped from the
1438               // step->threads list so there's no other way to find it).
1439               appendToRunQueue(cap,t);
1440               return rtsFalse;
1441 #else
1442               // this cannot happen in the threaded RTS, because a
1443               // bound thread can only be run by the appropriate Task.
1444               barf("finished bound thread that isn't mine");
1445 #endif
1446           }
1447
1448           ASSERT(task->tso == t);
1449
1450           if (t->what_next == ThreadComplete) {
1451               if (task->ret) {
1452                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1453                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1454               }
1455               task->stat = Success;
1456           } else {
1457               if (task->ret) {
1458                   *(task->ret) = NULL;
1459               }
1460               if (sched_state >= SCHED_INTERRUPTING) {
1461                   if (heap_overflow) {
1462                       task->stat = HeapExhausted;
1463                   } else {
1464                       task->stat = Interrupted;
1465                   }
1466               } else {
1467                   task->stat = Killed;
1468               }
1469           }
1470 #ifdef DEBUG
1471           removeThreadLabel((StgWord)task->tso->id);
1472 #endif
1473           return rtsTrue; // tells schedule() to return
1474       }
1475
1476       return rtsFalse;
1477 }
1478
1479 /* -----------------------------------------------------------------------------
1480  * Perform a heap census
1481  * -------------------------------------------------------------------------- */
1482
1483 static rtsBool
1484 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1485 {
1486     // When we have +RTS -i0 and we're heap profiling, do a census at
1487     // every GC.  This lets us get repeatable runs for debugging.
1488     if (performHeapProfile ||
1489         (RtsFlags.ProfFlags.profileInterval==0 &&
1490          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1491         return rtsTrue;
1492     } else {
1493         return rtsFalse;
1494     }
1495 }
1496
1497 /* -----------------------------------------------------------------------------
1498  * Perform a garbage collection if necessary
1499  * -------------------------------------------------------------------------- */
1500
1501 static Capability *
1502 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1503 {
1504     rtsBool heap_census;
1505 #ifdef THREADED_RTS
1506     /* extern static volatile StgWord waiting_for_gc; 
1507        lives inside capability.c */
1508     rtsBool gc_type, prev_pending_gc;
1509     nat i;
1510 #endif
1511
1512     if (sched_state == SCHED_SHUTTING_DOWN) {
1513         // The final GC has already been done, and the system is
1514         // shutting down.  We'll probably deadlock if we try to GC
1515         // now.
1516         return cap;
1517     }
1518
1519 #ifdef THREADED_RTS
1520     if (sched_state < SCHED_INTERRUPTING
1521         && RtsFlags.ParFlags.parGcEnabled
1522         && N >= RtsFlags.ParFlags.parGcGen
1523         && ! oldest_gen->steps[0].mark)
1524     {
1525         gc_type = PENDING_GC_PAR;
1526     } else {
1527         gc_type = PENDING_GC_SEQ;
1528     }
1529
1530     // In order to GC, there must be no threads running Haskell code.
1531     // Therefore, the GC thread needs to hold *all* the capabilities,
1532     // and release them after the GC has completed.  
1533     //
1534     // This seems to be the simplest way: previous attempts involved
1535     // making all the threads with capabilities give up their
1536     // capabilities and sleep except for the *last* one, which
1537     // actually did the GC.  But it's quite hard to arrange for all
1538     // the other tasks to sleep and stay asleep.
1539     //
1540
1541     /*  Other capabilities are prevented from running yet more Haskell
1542         threads if waiting_for_gc is set. Tested inside
1543         yieldCapability() and releaseCapability() in Capability.c */
1544
1545     prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1546     if (prev_pending_gc) {
1547         do {
1548             debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
1549                        prev_pending_gc);
1550             ASSERT(cap);
1551             yieldCapability(&cap,task);
1552         } while (waiting_for_gc);
1553         return cap;  // NOTE: task->cap might have changed here
1554     }
1555
1556     setContextSwitches();
1557
1558     // The final shutdown GC is always single-threaded, because it's
1559     // possible that some of the Capabilities have no worker threads.
1560     
1561     if (gc_type == PENDING_GC_SEQ)
1562     {
1563         // single-threaded GC: grab all the capabilities
1564         for (i=0; i < n_capabilities; i++) {
1565             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1566             if (cap != &capabilities[i]) {
1567                 Capability *pcap = &capabilities[i];
1568                 // we better hope this task doesn't get migrated to
1569                 // another Capability while we're waiting for this one.
1570                 // It won't, because load balancing happens while we have
1571                 // all the Capabilities, but even so it's a slightly
1572                 // unsavoury invariant.
1573                 task->cap = pcap;
1574                 waitForReturnCapability(&pcap, task);
1575                 if (pcap != &capabilities[i]) {
1576                     barf("scheduleDoGC: got the wrong capability");
1577                 }
1578             }
1579         }
1580     }
1581     else
1582     {
1583         // multi-threaded GC: make sure all the Capabilities donate one
1584         // GC thread each.
1585         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1586
1587         waitForGcThreads(cap);
1588     }
1589 #endif
1590
1591     // so this happens periodically:
1592     if (cap) scheduleCheckBlackHoles(cap);
1593     
1594     IF_DEBUG(scheduler, printAllThreads());
1595
1596 delete_threads_and_gc:
1597     /*
1598      * We now have all the capabilities; if we're in an interrupting
1599      * state, then we should take the opportunity to delete all the
1600      * threads in the system.
1601      */
1602     if (sched_state == SCHED_INTERRUPTING) {
1603         deleteAllThreads(cap);
1604         sched_state = SCHED_SHUTTING_DOWN;
1605     }
1606     
1607     heap_census = scheduleNeedHeapProfile(rtsTrue);
1608
1609     if (recent_activity == ACTIVITY_INACTIVE && force_major)
1610     {
1611         // We are doing a GC because the system has been idle for a
1612         // timeslice and we need to check for deadlock.  Record the
1613         // fact that we've done a GC and turn off the timer signal;
1614         // it will get re-enabled if we run any threads after the GC.
1615         //
1616         // Note: this is done before GC, because after GC there might
1617         // be threads already running (GarbageCollect() releases the
1618         // GC threads when it completes), so we risk turning off the
1619         // timer signal when it should really be on.
1620         recent_activity = ACTIVITY_DONE_GC;
1621         stopTimer();
1622     }
1623
1624 #if defined(THREADED_RTS)
1625     debugTrace(DEBUG_sched, "doing GC");
1626     // reset waiting_for_gc *before* GC, so that when the GC threads
1627     // emerge they don't immediately re-enter the GC.
1628     waiting_for_gc = 0;
1629     GarbageCollect(force_major || heap_census, gc_type, cap);
1630 #else
1631     GarbageCollect(force_major || heap_census, 0, cap);
1632 #endif
1633
1634     if (heap_census) {
1635         debugTrace(DEBUG_sched, "performing heap census");
1636         heapCensus();
1637         performHeapProfile = rtsFalse;
1638     }
1639
1640     if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1641         // GC set the heap_overflow flag, so we should proceed with
1642         // an orderly shutdown now.  Ultimately we want the main
1643         // thread to return to its caller with HeapExhausted, at which
1644         // point the caller should call hs_exit().  The first step is
1645         // to delete all the threads.
1646         //
1647         // Another way to do this would be to raise an exception in
1648         // the main thread, which we really should do because it gives
1649         // the program a chance to clean up.  But how do we find the
1650         // main thread?  It should presumably be the same one that
1651         // gets ^C exceptions, but that's all done on the Haskell side
1652         // (GHC.TopHandler).
1653         sched_state = SCHED_INTERRUPTING;
1654         goto delete_threads_and_gc;
1655     }
1656
1657 #ifdef SPARKBALANCE
1658     /* JB 
1659        Once we are all together... this would be the place to balance all
1660        spark pools. No concurrent stealing or adding of new sparks can
1661        occur. Should be defined in Sparks.c. */
1662     balanceSparkPoolsCaps(n_capabilities, capabilities);
1663 #endif
1664
1665 #if defined(THREADED_RTS)
1666     if (gc_type == PENDING_GC_SEQ) {
1667         // release our stash of capabilities.
1668         for (i = 0; i < n_capabilities; i++) {
1669             if (cap != &capabilities[i]) {
1670                 task->cap = &capabilities[i];
1671                 releaseCapability(&capabilities[i]);
1672             }
1673         }
1674     }
1675     if (cap) {
1676         task->cap = cap;
1677     } else {
1678         task->cap = NULL;
1679     }
1680 #endif
1681
1682     return cap;
1683 }
1684
1685 /* ---------------------------------------------------------------------------
1686  * Singleton fork(). Do not copy any running threads.
1687  * ------------------------------------------------------------------------- */
1688
1689 pid_t
1690 forkProcess(HsStablePtr *entry
1691 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1692             STG_UNUSED
1693 #endif
1694            )
1695 {
1696 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1697     Task *task;
1698     pid_t pid;
1699     StgTSO* t,*next;
1700     Capability *cap;
1701     nat s;
1702     
1703 #if defined(THREADED_RTS)
1704     if (RtsFlags.ParFlags.nNodes > 1) {
1705         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1706         stg_exit(EXIT_FAILURE);
1707     }
1708 #endif
1709
1710     debugTrace(DEBUG_sched, "forking!");
1711     
1712     // ToDo: for SMP, we should probably acquire *all* the capabilities
1713     cap = rts_lock();
1714     
1715     // no funny business: hold locks while we fork, otherwise if some
1716     // other thread is holding a lock when the fork happens, the data
1717     // structure protected by the lock will forever be in an
1718     // inconsistent state in the child.  See also #1391.
1719     ACQUIRE_LOCK(&sched_mutex);
1720     ACQUIRE_LOCK(&cap->lock);
1721     ACQUIRE_LOCK(&cap->running_task->lock);
1722
1723     pid = fork();
1724     
1725     if (pid) { // parent
1726         
1727         RELEASE_LOCK(&sched_mutex);
1728         RELEASE_LOCK(&cap->lock);
1729         RELEASE_LOCK(&cap->running_task->lock);
1730
1731         // just return the pid
1732         rts_unlock(cap);
1733         return pid;
1734         
1735     } else { // child
1736         
1737 #if defined(THREADED_RTS)
1738         initMutex(&sched_mutex);
1739         initMutex(&cap->lock);
1740         initMutex(&cap->running_task->lock);
1741 #endif
1742
1743         // Now, all OS threads except the thread that forked are
1744         // stopped.  We need to stop all Haskell threads, including
1745         // those involved in foreign calls.  Also we need to delete
1746         // all Tasks, because they correspond to OS threads that are
1747         // now gone.
1748
1749         for (s = 0; s < total_steps; s++) {
1750           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1751             if (t->what_next == ThreadRelocated) {
1752                 next = t->_link;
1753             } else {
1754                 next = t->global_link;
1755                 // don't allow threads to catch the ThreadKilled
1756                 // exception, but we do want to raiseAsync() because these
1757                 // threads may be evaluating thunks that we need later.
1758                 deleteThread_(cap,t);
1759             }
1760           }
1761         }
1762         
1763         // Empty the run queue.  It seems tempting to let all the
1764         // killed threads stay on the run queue as zombies to be
1765         // cleaned up later, but some of them correspond to bound
1766         // threads for which the corresponding Task does not exist.
1767         cap->run_queue_hd = END_TSO_QUEUE;
1768         cap->run_queue_tl = END_TSO_QUEUE;
1769
1770         // Any suspended C-calling Tasks are no more, their OS threads
1771         // don't exist now:
1772         cap->suspended_ccalling_tasks = NULL;
1773
1774         // Empty the threads lists.  Otherwise, the garbage
1775         // collector may attempt to resurrect some of these threads.
1776         for (s = 0; s < total_steps; s++) {
1777             all_steps[s].threads = END_TSO_QUEUE;
1778         }
1779
1780         // Wipe the task list, except the current Task.
1781         ACQUIRE_LOCK(&sched_mutex);
1782         for (task = all_tasks; task != NULL; task=task->all_link) {
1783             if (task != cap->running_task) {
1784 #if defined(THREADED_RTS)
1785                 initMutex(&task->lock); // see #1391
1786 #endif
1787                 discardTask(task);
1788             }
1789         }
1790         RELEASE_LOCK(&sched_mutex);
1791
1792 #if defined(THREADED_RTS)
1793         // Wipe our spare workers list, they no longer exist.  New
1794         // workers will be created if necessary.
1795         cap->spare_workers = NULL;
1796         cap->returning_tasks_hd = NULL;
1797         cap->returning_tasks_tl = NULL;
1798 #endif
1799
1800         // On Unix, all timers are reset in the child, so we need to start
1801         // the timer again.
1802         initTimer();
1803         startTimer();
1804
1805         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1806         rts_checkSchedStatus("forkProcess",cap);
1807         
1808         rts_unlock(cap);
1809         hs_exit();                      // clean up and exit
1810         stg_exit(EXIT_SUCCESS);
1811     }
1812 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1813     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1814     return -1;
1815 #endif
1816 }
1817
1818 /* ---------------------------------------------------------------------------
1819  * Delete all the threads in the system
1820  * ------------------------------------------------------------------------- */
1821    
1822 static void
1823 deleteAllThreads ( Capability *cap )
1824 {
1825     // NOTE: only safe to call if we own all capabilities.
1826
1827     StgTSO* t, *next;
1828     nat s;
1829
1830     debugTrace(DEBUG_sched,"deleting all threads");
1831     for (s = 0; s < total_steps; s++) {
1832       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1833         if (t->what_next == ThreadRelocated) {
1834             next = t->_link;
1835         } else {
1836             next = t->global_link;
1837             deleteThread(cap,t);
1838         }
1839       }
1840     }      
1841
1842     // The run queue now contains a bunch of ThreadKilled threads.  We
1843     // must not throw these away: the main thread(s) will be in there
1844     // somewhere, and the main scheduler loop has to deal with it.
1845     // Also, the run queue is the only thing keeping these threads from
1846     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1847
1848 #if !defined(THREADED_RTS)
1849     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1850     ASSERT(sleeping_queue == END_TSO_QUEUE);
1851 #endif
1852 }
1853
1854 /* -----------------------------------------------------------------------------
1855    Managing the suspended_ccalling_tasks list.
1856    Locks required: sched_mutex
1857    -------------------------------------------------------------------------- */
1858
1859 STATIC_INLINE void
1860 suspendTask (Capability *cap, Task *task)
1861 {
1862     ASSERT(task->next == NULL && task->prev == NULL);
1863     task->next = cap->suspended_ccalling_tasks;
1864     task->prev = NULL;
1865     if (cap->suspended_ccalling_tasks) {
1866         cap->suspended_ccalling_tasks->prev = task;
1867     }
1868     cap->suspended_ccalling_tasks = task;
1869 }
1870
1871 STATIC_INLINE void
1872 recoverSuspendedTask (Capability *cap, Task *task)
1873 {
1874     if (task->prev) {
1875         task->prev->next = task->next;
1876     } else {
1877         ASSERT(cap->suspended_ccalling_tasks == task);
1878         cap->suspended_ccalling_tasks = task->next;
1879     }
1880     if (task->next) {
1881         task->next->prev = task->prev;
1882     }
1883     task->next = task->prev = NULL;
1884 }
1885
1886 /* ---------------------------------------------------------------------------
1887  * Suspending & resuming Haskell threads.
1888  * 
1889  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1890  * its capability before calling the C function.  This allows another
1891  * task to pick up the capability and carry on running Haskell
1892  * threads.  It also means that if the C call blocks, it won't lock
1893  * the whole system.
1894  *
1895  * The Haskell thread making the C call is put to sleep for the
1896  * duration of the call, on the susepended_ccalling_threads queue.  We
1897  * give out a token to the task, which it can use to resume the thread
1898  * on return from the C function.
1899  * ------------------------------------------------------------------------- */
1900    
1901 void *
1902 suspendThread (StgRegTable *reg)
1903 {
1904   Capability *cap;
1905   int saved_errno;
1906   StgTSO *tso;
1907   Task *task;
1908 #if mingw32_HOST_OS
1909   StgWord32 saved_winerror;
1910 #endif
1911
1912   saved_errno = errno;
1913 #if mingw32_HOST_OS
1914   saved_winerror = GetLastError();
1915 #endif
1916
1917   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1918    */
1919   cap = regTableToCapability(reg);
1920
1921   task = cap->running_task;
1922   tso = cap->r.rCurrentTSO;
1923
1924   debugTrace(DEBUG_sched, 
1925              "thread %lu did a safe foreign call", 
1926              (unsigned long)cap->r.rCurrentTSO->id);
1927
1928   // XXX this might not be necessary --SDM
1929   tso->what_next = ThreadRunGHC;
1930
1931   threadPaused(cap,tso);
1932
1933   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1934       tso->why_blocked = BlockedOnCCall;
1935       tso->flags |= TSO_BLOCKEX;
1936       tso->flags &= ~TSO_INTERRUPTIBLE;
1937   } else {
1938       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1939   }
1940
1941   // Hand back capability
1942   task->suspended_tso = tso;
1943
1944   ACQUIRE_LOCK(&cap->lock);
1945
1946   suspendTask(cap,task);
1947   cap->in_haskell = rtsFalse;
1948   releaseCapability_(cap,rtsFalse);
1949   
1950   RELEASE_LOCK(&cap->lock);
1951
1952 #if defined(THREADED_RTS)
1953   /* Preparing to leave the RTS, so ensure there's a native thread/task
1954      waiting to take over.
1955   */
1956   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1957 #endif
1958
1959   errno = saved_errno;
1960 #if mingw32_HOST_OS
1961   SetLastError(saved_winerror);
1962 #endif
1963   return task;
1964 }
1965
1966 StgRegTable *
1967 resumeThread (void *task_)
1968 {
1969     StgTSO *tso;
1970     Capability *cap;
1971     Task *task = task_;
1972     int saved_errno;
1973 #if mingw32_HOST_OS
1974     StgWord32 saved_winerror;
1975 #endif
1976
1977     saved_errno = errno;
1978 #if mingw32_HOST_OS
1979     saved_winerror = GetLastError();
1980 #endif
1981
1982     cap = task->cap;
1983     // Wait for permission to re-enter the RTS with the result.
1984     waitForReturnCapability(&cap,task);
1985     // we might be on a different capability now... but if so, our
1986     // entry on the suspended_ccalling_tasks list will also have been
1987     // migrated.
1988
1989     // Remove the thread from the suspended list
1990     recoverSuspendedTask(cap,task);
1991
1992     tso = task->suspended_tso;
1993     task->suspended_tso = NULL;
1994     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1995     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1996     
1997     if (tso->why_blocked == BlockedOnCCall) {
1998         // avoid locking the TSO if we don't have to
1999         if (tso->blocked_exceptions != END_TSO_QUEUE) {
2000             awakenBlockedExceptionQueue(cap,tso);
2001         }
2002         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2003     }
2004     
2005     /* Reset blocking status */
2006     tso->why_blocked  = NotBlocked;
2007     
2008     cap->r.rCurrentTSO = tso;
2009     cap->in_haskell = rtsTrue;
2010     errno = saved_errno;
2011 #if mingw32_HOST_OS
2012     SetLastError(saved_winerror);
2013 #endif
2014
2015     /* We might have GC'd, mark the TSO dirty again */
2016     dirty_TSO(cap,tso);
2017
2018     IF_DEBUG(sanity, checkTSO(tso));
2019
2020     return &cap->r;
2021 }
2022
2023 /* ---------------------------------------------------------------------------
2024  * scheduleThread()
2025  *
2026  * scheduleThread puts a thread on the end  of the runnable queue.
2027  * This will usually be done immediately after a thread is created.
2028  * The caller of scheduleThread must create the thread using e.g.
2029  * createThread and push an appropriate closure
2030  * on this thread's stack before the scheduler is invoked.
2031  * ------------------------------------------------------------------------ */
2032
2033 void
2034 scheduleThread(Capability *cap, StgTSO *tso)
2035 {
2036     // The thread goes at the *end* of the run-queue, to avoid possible
2037     // starvation of any threads already on the queue.
2038     appendToRunQueue(cap,tso);
2039 }
2040
2041 void
2042 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2043 {
2044 #if defined(THREADED_RTS)
2045     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2046                               // move this thread from now on.
2047     cpu %= RtsFlags.ParFlags.nNodes;
2048     if (cpu == cap->no) {
2049         appendToRunQueue(cap,tso);
2050     } else {
2051         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
2052     }
2053 #else
2054     appendToRunQueue(cap,tso);
2055 #endif
2056 }
2057
2058 Capability *
2059 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2060 {
2061     Task *task;
2062
2063     // We already created/initialised the Task
2064     task = cap->running_task;
2065
2066     // This TSO is now a bound thread; make the Task and TSO
2067     // point to each other.
2068     tso->bound = task;
2069     tso->cap = cap;
2070
2071     task->tso = tso;
2072     task->ret = ret;
2073     task->stat = NoStatus;
2074
2075     appendToRunQueue(cap,tso);
2076
2077     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2078
2079     cap = schedule(cap,task);
2080
2081     ASSERT(task->stat != NoStatus);
2082     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2083
2084     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2085     return cap;
2086 }
2087
2088 /* ----------------------------------------------------------------------------
2089  * Starting Tasks
2090  * ------------------------------------------------------------------------- */
2091
2092 #if defined(THREADED_RTS)
2093 void OSThreadProcAttr
2094 workerStart(Task *task)
2095 {
2096     Capability *cap;
2097
2098     // See startWorkerTask().
2099     ACQUIRE_LOCK(&task->lock);
2100     cap = task->cap;
2101     RELEASE_LOCK(&task->lock);
2102
2103     // set the thread-local pointer to the Task:
2104     taskEnter(task);
2105
2106     // schedule() runs without a lock.
2107     cap = schedule(cap,task);
2108
2109     // On exit from schedule(), we have a Capability, but possibly not
2110     // the same one we started with.
2111
2112     // During shutdown, the requirement is that after all the
2113     // Capabilities are shut down, all workers that are shutting down
2114     // have finished workerTaskStop().  This is why we hold on to
2115     // cap->lock until we've finished workerTaskStop() below.
2116     //
2117     // There may be workers still involved in foreign calls; those
2118     // will just block in waitForReturnCapability() because the
2119     // Capability has been shut down.
2120     //
2121     ACQUIRE_LOCK(&cap->lock);
2122     releaseCapability_(cap,rtsFalse);
2123     workerTaskStop(task);
2124     RELEASE_LOCK(&cap->lock);
2125 }
2126 #endif
2127
2128 /* ---------------------------------------------------------------------------
2129  * initScheduler()
2130  *
2131  * Initialise the scheduler.  This resets all the queues - if the
2132  * queues contained any threads, they'll be garbage collected at the
2133  * next pass.
2134  *
2135  * ------------------------------------------------------------------------ */
2136
2137 void 
2138 initScheduler(void)
2139 {
2140 #if !defined(THREADED_RTS)
2141   blocked_queue_hd  = END_TSO_QUEUE;
2142   blocked_queue_tl  = END_TSO_QUEUE;
2143   sleeping_queue    = END_TSO_QUEUE;
2144 #endif
2145
2146   blackhole_queue   = END_TSO_QUEUE;
2147
2148   sched_state    = SCHED_RUNNING;
2149   recent_activity = ACTIVITY_YES;
2150
2151 #if defined(THREADED_RTS)
2152   /* Initialise the mutex and condition variables used by
2153    * the scheduler. */
2154   initMutex(&sched_mutex);
2155 #endif
2156   
2157   ACQUIRE_LOCK(&sched_mutex);
2158
2159   /* A capability holds the state a native thread needs in
2160    * order to execute STG code. At least one capability is
2161    * floating around (only THREADED_RTS builds have more than one).
2162    */
2163   initCapabilities();
2164
2165   initTaskManager();
2166
2167 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2168   initSparkPools();
2169 #endif
2170
2171 #if defined(THREADED_RTS)
2172   /*
2173    * Eagerly start one worker to run each Capability, except for
2174    * Capability 0.  The idea is that we're probably going to start a
2175    * bound thread on Capability 0 pretty soon, so we don't want a
2176    * worker task hogging it.
2177    */
2178   { 
2179       nat i;
2180       Capability *cap;
2181       for (i = 1; i < n_capabilities; i++) {
2182           cap = &capabilities[i];
2183           ACQUIRE_LOCK(&cap->lock);
2184           startWorkerTask(cap, workerStart);
2185           RELEASE_LOCK(&cap->lock);
2186       }
2187   }
2188 #endif
2189
2190   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2191
2192   RELEASE_LOCK(&sched_mutex);
2193 }
2194
2195 void
2196 exitScheduler(
2197     rtsBool wait_foreign
2198 #if !defined(THREADED_RTS)
2199                          __attribute__((unused))
2200 #endif
2201 )
2202                /* see Capability.c, shutdownCapability() */
2203 {
2204     Task *task = NULL;
2205
2206 #if defined(THREADED_RTS)
2207     ACQUIRE_LOCK(&sched_mutex);
2208     task = newBoundTask();
2209     RELEASE_LOCK(&sched_mutex);
2210 #endif
2211
2212     // If we haven't killed all the threads yet, do it now.
2213     if (sched_state < SCHED_SHUTTING_DOWN) {
2214         sched_state = SCHED_INTERRUPTING;
2215 #if defined(THREADED_RTS)
2216         waitForReturnCapability(&task->cap,task);
2217         scheduleDoGC(task->cap,task,rtsFalse);    
2218         releaseCapability(task->cap);
2219 #else
2220         scheduleDoGC(&MainCapability,task,rtsFalse);    
2221 #endif
2222     }
2223     sched_state = SCHED_SHUTTING_DOWN;
2224
2225 #if defined(THREADED_RTS)
2226     { 
2227         nat i;
2228         
2229         for (i = 0; i < n_capabilities; i++) {
2230             shutdownCapability(&capabilities[i], task, wait_foreign);
2231         }
2232         boundTaskExiting(task);
2233     }
2234 #endif
2235 }
2236
2237 void
2238 freeScheduler( void )
2239 {
2240     nat still_running;
2241
2242     ACQUIRE_LOCK(&sched_mutex);
2243     still_running = freeTaskManager();
2244     // We can only free the Capabilities if there are no Tasks still
2245     // running.  We might have a Task about to return from a foreign
2246     // call into waitForReturnCapability(), for example (actually,
2247     // this should be the *only* thing that a still-running Task can
2248     // do at this point, and it will block waiting for the
2249     // Capability).
2250     if (still_running == 0) {
2251         freeCapabilities();
2252         if (n_capabilities != 1) {
2253             stgFree(capabilities);
2254         }
2255     }
2256     RELEASE_LOCK(&sched_mutex);
2257 #if defined(THREADED_RTS)
2258     closeMutex(&sched_mutex);
2259 #endif
2260 }
2261
2262 /* -----------------------------------------------------------------------------
2263    performGC
2264
2265    This is the interface to the garbage collector from Haskell land.
2266    We provide this so that external C code can allocate and garbage
2267    collect when called from Haskell via _ccall_GC.
2268    -------------------------------------------------------------------------- */
2269
2270 static void
2271 performGC_(rtsBool force_major)
2272 {
2273     Task *task;
2274
2275     // We must grab a new Task here, because the existing Task may be
2276     // associated with a particular Capability, and chained onto the 
2277     // suspended_ccalling_tasks queue.
2278     ACQUIRE_LOCK(&sched_mutex);
2279     task = newBoundTask();
2280     RELEASE_LOCK(&sched_mutex);
2281
2282     waitForReturnCapability(&task->cap,task);
2283     scheduleDoGC(task->cap,task,force_major);
2284     releaseCapability(task->cap);
2285     boundTaskExiting(task);
2286 }
2287
2288 void
2289 performGC(void)
2290 {
2291     performGC_(rtsFalse);
2292 }
2293
2294 void
2295 performMajorGC(void)
2296 {
2297     performGC_(rtsTrue);
2298 }
2299
2300 /* -----------------------------------------------------------------------------
2301    Stack overflow
2302
2303    If the thread has reached its maximum stack size, then raise the
2304    StackOverflow exception in the offending thread.  Otherwise
2305    relocate the TSO into a larger chunk of memory and adjust its stack
2306    size appropriately.
2307    -------------------------------------------------------------------------- */
2308
2309 static StgTSO *
2310 threadStackOverflow(Capability *cap, StgTSO *tso)
2311 {
2312   nat new_stack_size, stack_words;
2313   lnat new_tso_size;
2314   StgPtr new_sp;
2315   StgTSO *dest;
2316
2317   IF_DEBUG(sanity,checkTSO(tso));
2318
2319   // don't allow throwTo() to modify the blocked_exceptions queue
2320   // while we are moving the TSO:
2321   lockClosure((StgClosure *)tso);
2322
2323   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2324       // NB. never raise a StackOverflow exception if the thread is
2325       // inside Control.Exceptino.block.  It is impractical to protect
2326       // against stack overflow exceptions, since virtually anything
2327       // can raise one (even 'catch'), so this is the only sensible
2328       // thing to do here.  See bug #767.
2329
2330       debugTrace(DEBUG_gc,
2331                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2332                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2333       IF_DEBUG(gc,
2334                /* If we're debugging, just print out the top of the stack */
2335                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2336                                                 tso->sp+64)));
2337
2338       // Send this thread the StackOverflow exception
2339       unlockTSO(tso);
2340       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2341       return tso;
2342   }
2343
2344   /* Try to double the current stack size.  If that takes us over the
2345    * maximum stack size for this thread, then use the maximum instead
2346    * (that is, unless we're already at or over the max size and we
2347    * can't raise the StackOverflow exception (see above), in which
2348    * case just double the size). Finally round up so the TSO ends up as
2349    * a whole number of blocks.
2350    */
2351   if (tso->stack_size >= tso->max_stack_size) {
2352       new_stack_size = tso->stack_size * 2;
2353   } else { 
2354       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2355   }
2356   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2357                                        TSO_STRUCT_SIZE)/sizeof(W_);
2358   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2359   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2360
2361   debugTrace(DEBUG_sched, 
2362              "increasing stack size from %ld words to %d.",
2363              (long)tso->stack_size, new_stack_size);
2364
2365   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2366   TICK_ALLOC_TSO(new_stack_size,0);
2367
2368   /* copy the TSO block and the old stack into the new area */
2369   memcpy(dest,tso,TSO_STRUCT_SIZE);
2370   stack_words = tso->stack + tso->stack_size - tso->sp;
2371   new_sp = (P_)dest + new_tso_size - stack_words;
2372   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2373
2374   /* relocate the stack pointers... */
2375   dest->sp         = new_sp;
2376   dest->stack_size = new_stack_size;
2377         
2378   /* Mark the old TSO as relocated.  We have to check for relocated
2379    * TSOs in the garbage collector and any primops that deal with TSOs.
2380    *
2381    * It's important to set the sp value to just beyond the end
2382    * of the stack, so we don't attempt to scavenge any part of the
2383    * dead TSO's stack.
2384    */
2385   tso->what_next = ThreadRelocated;
2386   setTSOLink(cap,tso,dest);
2387   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2388   tso->why_blocked = NotBlocked;
2389
2390   IF_PAR_DEBUG(verbose,
2391                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2392                      tso->id, tso, tso->stack_size);
2393                /* If we're debugging, just print out the top of the stack */
2394                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2395                                                 tso->sp+64)));
2396   
2397   unlockTSO(dest);
2398   unlockTSO(tso);
2399
2400   IF_DEBUG(sanity,checkTSO(dest));
2401 #if 0
2402   IF_DEBUG(scheduler,printTSO(dest));
2403 #endif
2404
2405   return dest;
2406 }
2407
2408 static StgTSO *
2409 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2410 {
2411     bdescr *bd, *new_bd;
2412     lnat free_w, tso_size_w;
2413     StgTSO *new_tso;
2414
2415     tso_size_w = tso_sizeW(tso);
2416
2417     if (tso_size_w < MBLOCK_SIZE_W || 
2418         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2419     {
2420         return tso;
2421     }
2422
2423     // don't allow throwTo() to modify the blocked_exceptions queue
2424     // while we are moving the TSO:
2425     lockClosure((StgClosure *)tso);
2426
2427     // this is the number of words we'll free
2428     free_w = round_to_mblocks(tso_size_w/2);
2429
2430     bd = Bdescr((StgPtr)tso);
2431     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2432     bd->free = bd->start + TSO_STRUCT_SIZEW;
2433
2434     new_tso = (StgTSO *)new_bd->start;
2435     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2436     new_tso->stack_size = new_bd->free - new_tso->stack;
2437
2438     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2439                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2440
2441     tso->what_next = ThreadRelocated;
2442     tso->_link = new_tso; // no write barrier reqd: same generation
2443
2444     // The TSO attached to this Task may have moved, so update the
2445     // pointer to it.
2446     if (task->tso == tso) {
2447         task->tso = new_tso;
2448     }
2449
2450     unlockTSO(new_tso);
2451     unlockTSO(tso);
2452
2453     IF_DEBUG(sanity,checkTSO(new_tso));
2454
2455     return new_tso;
2456 }
2457
2458 /* ---------------------------------------------------------------------------
2459    Interrupt execution
2460    - usually called inside a signal handler so it mustn't do anything fancy.   
2461    ------------------------------------------------------------------------ */
2462
2463 void
2464 interruptStgRts(void)
2465 {
2466     sched_state = SCHED_INTERRUPTING;
2467     setContextSwitches();
2468     wakeUpRts();
2469 }
2470
2471 /* -----------------------------------------------------------------------------
2472    Wake up the RTS
2473    
2474    This function causes at least one OS thread to wake up and run the
2475    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2476    an external event has arrived that may need servicing (eg. a
2477    keyboard interrupt).
2478
2479    In the single-threaded RTS we don't do anything here; we only have
2480    one thread anyway, and the event that caused us to want to wake up
2481    will have interrupted any blocking system call in progress anyway.
2482    -------------------------------------------------------------------------- */
2483
2484 void
2485 wakeUpRts(void)
2486 {
2487 #if defined(THREADED_RTS)
2488     // This forces the IO Manager thread to wakeup, which will
2489     // in turn ensure that some OS thread wakes up and runs the
2490     // scheduler loop, which will cause a GC and deadlock check.
2491     ioManagerWakeup();
2492 #endif
2493 }
2494
2495 /* -----------------------------------------------------------------------------
2496  * checkBlackHoles()
2497  *
2498  * Check the blackhole_queue for threads that can be woken up.  We do
2499  * this periodically: before every GC, and whenever the run queue is
2500  * empty.
2501  *
2502  * An elegant solution might be to just wake up all the blocked
2503  * threads with awakenBlockedQueue occasionally: they'll go back to
2504  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2505  * doesn't give us a way to tell whether we've actually managed to
2506  * wake up any threads, so we would be busy-waiting.
2507  *
2508  * -------------------------------------------------------------------------- */
2509
2510 static rtsBool
2511 checkBlackHoles (Capability *cap)
2512 {
2513     StgTSO **prev, *t;
2514     rtsBool any_woke_up = rtsFalse;
2515     StgHalfWord type;
2516
2517     // blackhole_queue is global:
2518     ASSERT_LOCK_HELD(&sched_mutex);
2519
2520     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2521
2522     // ASSUMES: sched_mutex
2523     prev = &blackhole_queue;
2524     t = blackhole_queue;
2525     while (t != END_TSO_QUEUE) {
2526         ASSERT(t->why_blocked == BlockedOnBlackHole);
2527         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2528         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2529             IF_DEBUG(sanity,checkTSO(t));
2530             t = unblockOne(cap, t);
2531             *prev = t;
2532             any_woke_up = rtsTrue;
2533         } else {
2534             prev = &t->_link;
2535             t = t->_link;
2536         }
2537     }
2538
2539     return any_woke_up;
2540 }
2541
2542 /* -----------------------------------------------------------------------------
2543    Deleting threads
2544
2545    This is used for interruption (^C) and forking, and corresponds to
2546    raising an exception but without letting the thread catch the
2547    exception.
2548    -------------------------------------------------------------------------- */
2549
2550 static void 
2551 deleteThread (Capability *cap, StgTSO *tso)
2552 {
2553     // NOTE: must only be called on a TSO that we have exclusive
2554     // access to, because we will call throwToSingleThreaded() below.
2555     // The TSO must be on the run queue of the Capability we own, or 
2556     // we must own all Capabilities.
2557
2558     if (tso->why_blocked != BlockedOnCCall &&
2559         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2560         throwToSingleThreaded(cap,tso,NULL);
2561     }
2562 }
2563
2564 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2565 static void 
2566 deleteThread_(Capability *cap, StgTSO *tso)
2567 { // for forkProcess only:
2568   // like deleteThread(), but we delete threads in foreign calls, too.
2569
2570     if (tso->why_blocked == BlockedOnCCall ||
2571         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2572         unblockOne(cap,tso);
2573         tso->what_next = ThreadKilled;
2574     } else {
2575         deleteThread(cap,tso);
2576     }
2577 }
2578 #endif
2579
2580 /* -----------------------------------------------------------------------------
2581    raiseExceptionHelper
2582    
2583    This function is called by the raise# primitve, just so that we can
2584    move some of the tricky bits of raising an exception from C-- into
2585    C.  Who knows, it might be a useful re-useable thing here too.
2586    -------------------------------------------------------------------------- */
2587
2588 StgWord
2589 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2590 {
2591     Capability *cap = regTableToCapability(reg);
2592     StgThunk *raise_closure = NULL;
2593     StgPtr p, next;
2594     StgRetInfoTable *info;
2595     //
2596     // This closure represents the expression 'raise# E' where E
2597     // is the exception raise.  It is used to overwrite all the
2598     // thunks which are currently under evaluataion.
2599     //
2600
2601     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2602     // LDV profiling: stg_raise_info has THUNK as its closure
2603     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2604     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2605     // 1 does not cause any problem unless profiling is performed.
2606     // However, when LDV profiling goes on, we need to linearly scan
2607     // small object pool, where raise_closure is stored, so we should
2608     // use MIN_UPD_SIZE.
2609     //
2610     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2611     //                                 sizeofW(StgClosure)+1);
2612     //
2613
2614     //
2615     // Walk up the stack, looking for the catch frame.  On the way,
2616     // we update any closures pointed to from update frames with the
2617     // raise closure that we just built.
2618     //
2619     p = tso->sp;
2620     while(1) {
2621         info = get_ret_itbl((StgClosure *)p);
2622         next = p + stack_frame_sizeW((StgClosure *)p);
2623         switch (info->i.type) {
2624             
2625         case UPDATE_FRAME:
2626             // Only create raise_closure if we need to.
2627             if (raise_closure == NULL) {
2628                 raise_closure = 
2629                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2630                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2631                 raise_closure->payload[0] = exception;
2632             }
2633             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2634             p = next;
2635             continue;
2636
2637         case ATOMICALLY_FRAME:
2638             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2639             tso->sp = p;
2640             return ATOMICALLY_FRAME;
2641             
2642         case CATCH_FRAME:
2643             tso->sp = p;
2644             return CATCH_FRAME;
2645
2646         case CATCH_STM_FRAME:
2647             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2648             tso->sp = p;
2649             return CATCH_STM_FRAME;
2650             
2651         case STOP_FRAME:
2652             tso->sp = p;
2653             return STOP_FRAME;
2654
2655         case CATCH_RETRY_FRAME:
2656         default:
2657             p = next; 
2658             continue;
2659         }
2660     }
2661 }
2662
2663
2664 /* -----------------------------------------------------------------------------
2665    findRetryFrameHelper
2666
2667    This function is called by the retry# primitive.  It traverses the stack
2668    leaving tso->sp referring to the frame which should handle the retry.  
2669
2670    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2671    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2672
2673    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2674    create) because retries are not considered to be exceptions, despite the
2675    similar implementation.
2676
2677    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2678    not be created within memory transactions.
2679    -------------------------------------------------------------------------- */
2680
2681 StgWord
2682 findRetryFrameHelper (StgTSO *tso)
2683 {
2684   StgPtr           p, next;
2685   StgRetInfoTable *info;
2686
2687   p = tso -> sp;
2688   while (1) {
2689     info = get_ret_itbl((StgClosure *)p);
2690     next = p + stack_frame_sizeW((StgClosure *)p);
2691     switch (info->i.type) {
2692       
2693     case ATOMICALLY_FRAME:
2694         debugTrace(DEBUG_stm,
2695                    "found ATOMICALLY_FRAME at %p during retry", p);
2696         tso->sp = p;
2697         return ATOMICALLY_FRAME;
2698       
2699     case CATCH_RETRY_FRAME:
2700         debugTrace(DEBUG_stm,
2701                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2702         tso->sp = p;
2703         return CATCH_RETRY_FRAME;
2704       
2705     case CATCH_STM_FRAME: {
2706         StgTRecHeader *trec = tso -> trec;
2707         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2708         debugTrace(DEBUG_stm,
2709                    "found CATCH_STM_FRAME at %p during retry", p);
2710         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2711         stmAbortTransaction(tso -> cap, trec);
2712         stmFreeAbortedTRec(tso -> cap, trec);
2713         tso -> trec = outer;
2714         p = next; 
2715         continue;
2716     }
2717       
2718
2719     default:
2720       ASSERT(info->i.type != CATCH_FRAME);
2721       ASSERT(info->i.type != STOP_FRAME);
2722       p = next; 
2723       continue;
2724     }
2725   }
2726 }
2727
2728 /* -----------------------------------------------------------------------------
2729    resurrectThreads is called after garbage collection on the list of
2730    threads found to be garbage.  Each of these threads will be woken
2731    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2732    on an MVar, or NonTermination if the thread was blocked on a Black
2733    Hole.
2734
2735    Locks: assumes we hold *all* the capabilities.
2736    -------------------------------------------------------------------------- */
2737
2738 void
2739 resurrectThreads (StgTSO *threads)
2740 {
2741     StgTSO *tso, *next;
2742     Capability *cap;
2743     step *step;
2744
2745     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2746         next = tso->global_link;
2747
2748         step = Bdescr((P_)tso)->step;
2749         tso->global_link = step->threads;
2750         step->threads = tso;
2751
2752         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2753         
2754         // Wake up the thread on the Capability it was last on
2755         cap = tso->cap;
2756         
2757         switch (tso->why_blocked) {
2758         case BlockedOnMVar:
2759         case BlockedOnException:
2760             /* Called by GC - sched_mutex lock is currently held. */
2761             throwToSingleThreaded(cap, tso,
2762                                   (StgClosure *)blockedOnDeadMVar_closure);
2763             break;
2764         case BlockedOnBlackHole:
2765             throwToSingleThreaded(cap, tso,
2766                                   (StgClosure *)nonTermination_closure);
2767             break;
2768         case BlockedOnSTM:
2769             throwToSingleThreaded(cap, tso,
2770                                   (StgClosure *)blockedIndefinitely_closure);
2771             break;
2772         case NotBlocked:
2773             /* This might happen if the thread was blocked on a black hole
2774              * belonging to a thread that we've just woken up (raiseAsync
2775              * can wake up threads, remember...).
2776              */
2777             continue;
2778         default:
2779             barf("resurrectThreads: thread blocked in a strange way");
2780         }
2781     }
2782 }
2783
2784 /* -----------------------------------------------------------------------------
2785    performPendingThrowTos is called after garbage collection, and
2786    passed a list of threads that were found to have pending throwTos
2787    (tso->blocked_exceptions was not empty), and were blocked.
2788    Normally this doesn't happen, because we would deliver the
2789    exception directly if the target thread is blocked, but there are
2790    small windows where it might occur on a multiprocessor (see
2791    throwTo()).
2792
2793    NB. we must be holding all the capabilities at this point, just
2794    like resurrectThreads().
2795    -------------------------------------------------------------------------- */
2796
2797 void
2798 performPendingThrowTos (StgTSO *threads)
2799 {
2800     StgTSO *tso, *next;
2801     Capability *cap;
2802     step *step;
2803
2804     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2805         next = tso->global_link;
2806
2807         step = Bdescr((P_)tso)->step;
2808         tso->global_link = step->threads;
2809         step->threads = tso;
2810
2811         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2812         
2813         cap = tso->cap;
2814         maybePerformBlockedException(cap, tso);
2815     }
2816 }