improve the finalizer callback error message
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #include "GC.h"
35 #include "Weak.h"
36 #include "EventLog.h"
37
38 /* PARALLEL_HASKELL includes go here */
39
40 #include "Sparks.h"
41 #include "Capability.h"
42 #include "Task.h"
43 #include "AwaitEvent.h"
44 #if defined(mingw32_HOST_OS)
45 #include "win32/IOManager.h"
46 #endif
47 #include "Trace.h"
48 #include "RaiseAsync.h"
49 #include "Threads.h"
50 #include "ThrIOManager.h"
51
52 #ifdef HAVE_SYS_TYPES_H
53 #include <sys/types.h>
54 #endif
55 #ifdef HAVE_UNISTD_H
56 #include <unistd.h>
57 #endif
58
59 #include <string.h>
60 #include <stdlib.h>
61 #include <stdarg.h>
62
63 #ifdef HAVE_ERRNO_H
64 #include <errno.h>
65 #endif
66
67 // Turn off inlining when debugging - it obfuscates things
68 #ifdef DEBUG
69 # undef  STATIC_INLINE
70 # define STATIC_INLINE static
71 #endif
72
73 /* -----------------------------------------------------------------------------
74  * Global variables
75  * -------------------------------------------------------------------------- */
76
77 #if !defined(THREADED_RTS)
78 // Blocked/sleeping thrads
79 StgTSO *blocked_queue_hd = NULL;
80 StgTSO *blocked_queue_tl = NULL;
81 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
82 #endif
83
84 /* Threads blocked on blackholes.
85  * LOCK: sched_mutex+capability, or all capabilities
86  */
87 StgTSO *blackhole_queue = NULL;
88
89 /* The blackhole_queue should be checked for threads to wake up.  See
90  * Schedule.h for more thorough comment.
91  * LOCK: none (doesn't matter if we miss an update)
92  */
93 rtsBool blackholes_need_checking = rtsFalse;
94
95 /* Set to true when the latest garbage collection failed to reclaim
96  * enough space, and the runtime should proceed to shut itself down in
97  * an orderly fashion (emitting profiling info etc.)
98  */
99 rtsBool heap_overflow = rtsFalse;
100
101 /* flag that tracks whether we have done any execution in this time slice.
102  * LOCK: currently none, perhaps we should lock (but needs to be
103  * updated in the fast path of the scheduler).
104  *
105  * NB. must be StgWord, we do xchg() on it.
106  */
107 volatile StgWord recent_activity = ACTIVITY_YES;
108
109 /* if this flag is set as well, give up execution
110  * LOCK: none (changes monotonically)
111  */
112 volatile StgWord sched_state = SCHED_RUNNING;
113
114 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
115  *  exists - earlier gccs apparently didn't.
116  *  -= chak
117  */
118 StgTSO dummy_tso;
119
120 /*
121  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
122  * in an MT setting, needed to signal that a worker thread shouldn't hang around
123  * in the scheduler when it is out of work.
124  */
125 rtsBool shutting_down_scheduler = rtsFalse;
126
127 /*
128  * This mutex protects most of the global scheduler data in
129  * the THREADED_RTS runtime.
130  */
131 #if defined(THREADED_RTS)
132 Mutex sched_mutex;
133 #endif
134
135 #if !defined(mingw32_HOST_OS)
136 #define FORKPROCESS_PRIMOP_SUPPORTED
137 #endif
138
139 /* -----------------------------------------------------------------------------
140  * static function prototypes
141  * -------------------------------------------------------------------------- */
142
143 static Capability *schedule (Capability *initialCapability, Task *task);
144
145 //
146 // These function all encapsulate parts of the scheduler loop, and are
147 // abstracted only to make the structure and control flow of the
148 // scheduler clearer.
149 //
150 static void schedulePreLoop (void);
151 static void scheduleFindWork (Capability *cap);
152 #if defined(THREADED_RTS)
153 static void scheduleYield (Capability **pcap, Task *task);
154 #endif
155 static void scheduleStartSignalHandlers (Capability *cap);
156 static void scheduleCheckBlockedThreads (Capability *cap);
157 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
158 static void scheduleCheckBlackHoles (Capability *cap);
159 static void scheduleDetectDeadlock (Capability *cap, Task *task);
160 static void schedulePushWork(Capability *cap, Task *task);
161 #if defined(PARALLEL_HASKELL)
162 static rtsBool scheduleGetRemoteWork(Capability *cap);
163 static void scheduleSendPendingMessages(void);
164 #endif
165 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
166 static void scheduleActivateSpark(Capability *cap);
167 #endif
168 static void schedulePostRunThread(Capability *cap, StgTSO *t);
169 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
170 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
171                                          StgTSO *t);
172 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
173                                     nat prev_what_next );
174 static void scheduleHandleThreadBlocked( StgTSO *t );
175 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
176                                              StgTSO *t );
177 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
178 static Capability *scheduleDoGC(Capability *cap, Task *task,
179                                 rtsBool force_major);
180
181 static rtsBool checkBlackHoles(Capability *cap);
182
183 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
184 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
185
186 static void deleteThread (Capability *cap, StgTSO *tso);
187 static void deleteAllThreads (Capability *cap);
188
189 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
190 static void deleteThread_(Capability *cap, StgTSO *tso);
191 #endif
192
193 #ifdef DEBUG
194 static char *whatNext_strs[] = {
195   "(unknown)",
196   "ThreadRunGHC",
197   "ThreadInterpret",
198   "ThreadKilled",
199   "ThreadRelocated",
200   "ThreadComplete"
201 };
202 #endif
203
204 /* -----------------------------------------------------------------------------
205  * Putting a thread on the run queue: different scheduling policies
206  * -------------------------------------------------------------------------- */
207
208 STATIC_INLINE void
209 addToRunQueue( Capability *cap, StgTSO *t )
210 {
211 #if defined(PARALLEL_HASKELL)
212     if (RtsFlags.ParFlags.doFairScheduling) { 
213         // this does round-robin scheduling; good for concurrency
214         appendToRunQueue(cap,t);
215     } else {
216         // this does unfair scheduling; good for parallelism
217         pushOnRunQueue(cap,t);
218     }
219 #else
220     // this does round-robin scheduling; good for concurrency
221     appendToRunQueue(cap,t);
222 #endif
223 }
224
225 /* ---------------------------------------------------------------------------
226    Main scheduling loop.
227
228    We use round-robin scheduling, each thread returning to the
229    scheduler loop when one of these conditions is detected:
230
231       * out of heap space
232       * timer expires (thread yields)
233       * thread blocks
234       * thread ends
235       * stack overflow
236
237    GRAN version:
238      In a GranSim setup this loop iterates over the global event queue.
239      This revolves around the global event queue, which determines what 
240      to do next. Therefore, it's more complicated than either the 
241      concurrent or the parallel (GUM) setup.
242   This version has been entirely removed (JB 2008/08).
243
244    GUM version:
245      GUM iterates over incoming messages.
246      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
247      and sends out a fish whenever it has nothing to do; in-between
248      doing the actual reductions (shared code below) it processes the
249      incoming messages and deals with delayed operations 
250      (see PendingFetches).
251      This is not the ugliest code you could imagine, but it's bloody close.
252
253   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
254   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
255   as well as future GUM versions. This file has been refurbished to
256   only contain valid code, which is however incomplete, refers to
257   invalid includes etc.
258
259    ------------------------------------------------------------------------ */
260
261 static Capability *
262 schedule (Capability *initialCapability, Task *task)
263 {
264   StgTSO *t;
265   Capability *cap;
266   StgThreadReturnCode ret;
267 #if defined(PARALLEL_HASKELL)
268   rtsBool receivedFinish = rtsFalse;
269 #endif
270   nat prev_what_next;
271   rtsBool ready_to_gc;
272 #if defined(THREADED_RTS)
273   rtsBool first = rtsTrue;
274 #endif
275   
276   cap = initialCapability;
277
278   // Pre-condition: this task owns initialCapability.
279   // The sched_mutex is *NOT* held
280   // NB. on return, we still hold a capability.
281
282   debugTrace (DEBUG_sched, 
283               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
284               task, initialCapability);
285
286   if (running_finalizers) {
287       errorBelch("error: a C finalizer called back into Haskell.\n"
288                  "   This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n"
289                  "   To create finalizers that may call back into Haskll, use\n"
290                  "   Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr.");
291       stg_exit(EXIT_FAILURE);
292   }
293
294   schedulePreLoop();
295
296   // -----------------------------------------------------------
297   // Scheduler loop starts here:
298
299 #if defined(PARALLEL_HASKELL)
300 #define TERMINATION_CONDITION        (!receivedFinish)
301 #else
302 #define TERMINATION_CONDITION        rtsTrue
303 #endif
304
305   while (TERMINATION_CONDITION) {
306
307     // Check whether we have re-entered the RTS from Haskell without
308     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
309     // call).
310     if (cap->in_haskell) {
311           errorBelch("schedule: re-entered unsafely.\n"
312                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
313           stg_exit(EXIT_FAILURE);
314     }
315
316     // The interruption / shutdown sequence.
317     // 
318     // In order to cleanly shut down the runtime, we want to:
319     //   * make sure that all main threads return to their callers
320     //     with the state 'Interrupted'.
321     //   * clean up all OS threads assocated with the runtime
322     //   * free all memory etc.
323     //
324     // So the sequence for ^C goes like this:
325     //
326     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
327     //     arranges for some Capability to wake up
328     //
329     //   * all threads in the system are halted, and the zombies are
330     //     placed on the run queue for cleaning up.  We acquire all
331     //     the capabilities in order to delete the threads, this is
332     //     done by scheduleDoGC() for convenience (because GC already
333     //     needs to acquire all the capabilities).  We can't kill
334     //     threads involved in foreign calls.
335     // 
336     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
337     //
338     //   * sched_state := SCHED_SHUTTING_DOWN
339     //
340     //   * all workers exit when the run queue on their capability
341     //     drains.  All main threads will also exit when their TSO
342     //     reaches the head of the run queue and they can return.
343     //
344     //   * eventually all Capabilities will shut down, and the RTS can
345     //     exit.
346     //
347     //   * We might be left with threads blocked in foreign calls, 
348     //     we should really attempt to kill these somehow (TODO);
349     
350     switch (sched_state) {
351     case SCHED_RUNNING:
352         break;
353     case SCHED_INTERRUPTING:
354         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
355 #if defined(THREADED_RTS)
356         discardSparksCap(cap);
357 #endif
358         /* scheduleDoGC() deletes all the threads */
359         cap = scheduleDoGC(cap,task,rtsFalse);
360
361         // after scheduleDoGC(), we must be shutting down.  Either some
362         // other Capability did the final GC, or we did it above,
363         // either way we can fall through to the SCHED_SHUTTING_DOWN
364         // case now.
365         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
366         // fall through
367
368     case SCHED_SHUTTING_DOWN:
369         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
370         // If we are a worker, just exit.  If we're a bound thread
371         // then we will exit below when we've removed our TSO from
372         // the run queue.
373         if (task->tso == NULL && emptyRunQueue(cap)) {
374             return cap;
375         }
376         break;
377     default:
378         barf("sched_state: %d", sched_state);
379     }
380
381     scheduleFindWork(cap);
382
383     /* work pushing, currently relevant only for THREADED_RTS:
384        (pushes threads, wakes up idle capabilities for stealing) */
385     schedulePushWork(cap,task);
386
387 #if defined(PARALLEL_HASKELL)
388     /* since we perform a blocking receive and continue otherwise,
389        either we never reach here or we definitely have work! */
390     // from here: non-empty run queue
391     ASSERT(!emptyRunQueue(cap));
392
393     if (PacketsWaiting()) {  /* now process incoming messages, if any
394                                 pending...  
395
396                                 CAUTION: scheduleGetRemoteWork called
397                                 above, waits for messages as well! */
398       processMessages(cap, &receivedFinish);
399     }
400 #endif // PARALLEL_HASKELL: non-empty run queue!
401
402     scheduleDetectDeadlock(cap,task);
403
404 #if defined(THREADED_RTS)
405     cap = task->cap;    // reload cap, it might have changed
406 #endif
407
408     // Normally, the only way we can get here with no threads to
409     // run is if a keyboard interrupt received during 
410     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
411     // Additionally, it is not fatal for the
412     // threaded RTS to reach here with no threads to run.
413     //
414     // win32: might be here due to awaitEvent() being abandoned
415     // as a result of a console event having been delivered.
416     
417 #if defined(THREADED_RTS)
418     if (first) 
419     {
420     // XXX: ToDo
421     //     // don't yield the first time, we want a chance to run this
422     //     // thread for a bit, even if there are others banging at the
423     //     // door.
424     //     first = rtsFalse;
425     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
426     }
427
428   yield:
429     scheduleYield(&cap,task);
430     if (emptyRunQueue(cap)) continue; // look for work again
431 #endif
432
433 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
434     if ( emptyRunQueue(cap) ) {
435         ASSERT(sched_state >= SCHED_INTERRUPTING);
436     }
437 #endif
438
439     // 
440     // Get a thread to run
441     //
442     t = popRunQueue(cap);
443
444     // Sanity check the thread we're about to run.  This can be
445     // expensive if there is lots of thread switching going on...
446     IF_DEBUG(sanity,checkTSO(t));
447
448 #if defined(THREADED_RTS)
449     // Check whether we can run this thread in the current task.
450     // If not, we have to pass our capability to the right task.
451     {
452         Task *bound = t->bound;
453       
454         if (bound) {
455             if (bound == task) {
456                 debugTrace(DEBUG_sched,
457                            "### Running thread %lu in bound thread", (unsigned long)t->id);
458                 // yes, the Haskell thread is bound to the current native thread
459             } else {
460                 debugTrace(DEBUG_sched,
461                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
462                 // no, bound to a different Haskell thread: pass to that thread
463                 pushOnRunQueue(cap,t);
464                 continue;
465             }
466         } else {
467             // The thread we want to run is unbound.
468             if (task->tso) { 
469                 debugTrace(DEBUG_sched,
470                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
471                 // no, the current native thread is bound to a different
472                 // Haskell thread, so pass it to any worker thread
473                 pushOnRunQueue(cap,t);
474                 continue; 
475             }
476         }
477     }
478 #endif
479
480     // If we're shutting down, and this thread has not yet been
481     // killed, kill it now.  This sometimes happens when a finalizer
482     // thread is created by the final GC, or a thread previously
483     // in a foreign call returns.
484     if (sched_state >= SCHED_INTERRUPTING &&
485         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
486         deleteThread(cap,t);
487     }
488
489     /* context switches are initiated by the timer signal, unless
490      * the user specified "context switch as often as possible", with
491      * +RTS -C0
492      */
493     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
494         && !emptyThreadQueues(cap)) {
495         cap->context_switch = 1;
496     }
497          
498 run_thread:
499
500     // CurrentTSO is the thread to run.  t might be different if we
501     // loop back to run_thread, so make sure to set CurrentTSO after
502     // that.
503     cap->r.rCurrentTSO = t;
504
505     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
506                               (long)t->id, whatNext_strs[t->what_next]);
507
508     startHeapProfTimer();
509
510     // Check for exceptions blocked on this thread
511     maybePerformBlockedException (cap, t);
512
513     // ----------------------------------------------------------------------
514     // Run the current thread 
515
516     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
517     ASSERT(t->cap == cap);
518     ASSERT(t->bound ? t->bound->cap == cap : 1);
519
520     prev_what_next = t->what_next;
521
522     errno = t->saved_errno;
523 #if mingw32_HOST_OS
524     SetLastError(t->saved_winerror);
525 #endif
526
527     cap->in_haskell = rtsTrue;
528
529     dirty_TSO(cap,t);
530
531 #if defined(THREADED_RTS)
532     if (recent_activity == ACTIVITY_DONE_GC) {
533         // ACTIVITY_DONE_GC means we turned off the timer signal to
534         // conserve power (see #1623).  Re-enable it here.
535         nat prev;
536         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
537         if (prev == ACTIVITY_DONE_GC) {
538             startTimer();
539         }
540     } else {
541         recent_activity = ACTIVITY_YES;
542     }
543 #endif
544
545     postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
546
547     switch (prev_what_next) {
548         
549     case ThreadKilled:
550     case ThreadComplete:
551         /* Thread already finished, return to scheduler. */
552         ret = ThreadFinished;
553         break;
554         
555     case ThreadRunGHC:
556     {
557         StgRegTable *r;
558         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
559         cap = regTableToCapability(r);
560         ret = r->rRet;
561         break;
562     }
563     
564     case ThreadInterpret:
565         cap = interpretBCO(cap);
566         ret = cap->r.rRet;
567         break;
568         
569     default:
570         barf("schedule: invalid what_next field");
571     }
572
573     cap->in_haskell = rtsFalse;
574
575     // The TSO might have moved, eg. if it re-entered the RTS and a GC
576     // happened.  So find the new location:
577     t = cap->r.rCurrentTSO;
578
579     // We have run some Haskell code: there might be blackhole-blocked
580     // threads to wake up now.
581     // Lock-free test here should be ok, we're just setting a flag.
582     if ( blackhole_queue != END_TSO_QUEUE ) {
583         blackholes_need_checking = rtsTrue;
584     }
585     
586     // And save the current errno in this thread.
587     // XXX: possibly bogus for SMP because this thread might already
588     // be running again, see code below.
589     t->saved_errno = errno;
590 #if mingw32_HOST_OS
591     // Similarly for Windows error code
592     t->saved_winerror = GetLastError();
593 #endif
594
595     postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
596
597 #if defined(THREADED_RTS)
598     // If ret is ThreadBlocked, and this Task is bound to the TSO that
599     // blocked, we are in limbo - the TSO is now owned by whatever it
600     // is blocked on, and may in fact already have been woken up,
601     // perhaps even on a different Capability.  It may be the case
602     // that task->cap != cap.  We better yield this Capability
603     // immediately and return to normaility.
604     if (ret == ThreadBlocked) {
605         debugTrace(DEBUG_sched,
606                    "--<< thread %lu (%s) stopped: blocked",
607                    (unsigned long)t->id, whatNext_strs[t->what_next]);
608         goto yield;
609     }
610 #endif
611
612     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
613     ASSERT(t->cap == cap);
614
615     // ----------------------------------------------------------------------
616     
617     // Costs for the scheduler are assigned to CCS_SYSTEM
618     stopHeapProfTimer();
619 #if defined(PROFILING)
620     CCCS = CCS_SYSTEM;
621 #endif
622     
623     schedulePostRunThread(cap,t);
624
625     t = threadStackUnderflow(task,t);
626
627     ready_to_gc = rtsFalse;
628
629     switch (ret) {
630     case HeapOverflow:
631         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
632         break;
633
634     case StackOverflow:
635         scheduleHandleStackOverflow(cap,task,t);
636         break;
637
638     case ThreadYielding:
639         if (scheduleHandleYield(cap, t, prev_what_next)) {
640             // shortcut for switching between compiler/interpreter:
641             goto run_thread; 
642         }
643         break;
644
645     case ThreadBlocked:
646         scheduleHandleThreadBlocked(t);
647         break;
648
649     case ThreadFinished:
650         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
651         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
652         break;
653
654     default:
655       barf("schedule: invalid thread return code %d", (int)ret);
656     }
657
658     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
659       cap = scheduleDoGC(cap,task,rtsFalse);
660     }
661   } /* end of while() */
662 }
663
664 /* ----------------------------------------------------------------------------
665  * Setting up the scheduler loop
666  * ------------------------------------------------------------------------- */
667
668 static void
669 schedulePreLoop(void)
670 {
671   // initialisation for scheduler - what cannot go into initScheduler()  
672 }
673
674 /* -----------------------------------------------------------------------------
675  * scheduleFindWork()
676  *
677  * Search for work to do, and handle messages from elsewhere.
678  * -------------------------------------------------------------------------- */
679
680 static void
681 scheduleFindWork (Capability *cap)
682 {
683     scheduleStartSignalHandlers(cap);
684
685     // Only check the black holes here if we've nothing else to do.
686     // During normal execution, the black hole list only gets checked
687     // at GC time, to avoid repeatedly traversing this possibly long
688     // list each time around the scheduler.
689     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
690
691     scheduleCheckWakeupThreads(cap);
692
693     scheduleCheckBlockedThreads(cap);
694
695 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
696     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
697 #endif
698
699 #if defined(PARALLEL_HASKELL)
700     // if messages have been buffered...
701     scheduleSendPendingMessages();
702 #endif
703
704 #if defined(PARALLEL_HASKELL)
705     if (emptyRunQueue(cap)) {
706         receivedFinish = scheduleGetRemoteWork(cap);
707         continue; //  a new round, (hopefully) with new work
708         /* 
709            in GUM, this a) sends out a FISH and returns IF no fish is
710                            out already
711                         b) (blocking) awaits and receives messages
712            
713            in Eden, this is only the blocking receive, as b) in GUM.
714         */
715     }
716 #endif
717 }
718
719 #if defined(THREADED_RTS)
720 STATIC_INLINE rtsBool
721 shouldYieldCapability (Capability *cap, Task *task)
722 {
723     // we need to yield this capability to someone else if..
724     //   - another thread is initiating a GC
725     //   - another Task is returning from a foreign call
726     //   - the thread at the head of the run queue cannot be run
727     //     by this Task (it is bound to another Task, or it is unbound
728     //     and this task it bound).
729     return (waiting_for_gc || 
730             cap->returning_tasks_hd != NULL ||
731             (!emptyRunQueue(cap) && (task->tso == NULL
732                                      ? cap->run_queue_hd->bound != NULL
733                                      : cap->run_queue_hd->bound != task)));
734 }
735
736 // This is the single place where a Task goes to sleep.  There are
737 // two reasons it might need to sleep:
738 //    - there are no threads to run
739 //    - we need to yield this Capability to someone else 
740 //      (see shouldYieldCapability())
741 //
742 // Careful: the scheduler loop is quite delicate.  Make sure you run
743 // the tests in testsuite/concurrent (all ways) after modifying this,
744 // and also check the benchmarks in nofib/parallel for regressions.
745
746 static void
747 scheduleYield (Capability **pcap, Task *task)
748 {
749     Capability *cap = *pcap;
750
751     // if we have work, and we don't need to give up the Capability, continue.
752     if (!shouldYieldCapability(cap,task) && 
753         (!emptyRunQueue(cap) ||
754          !emptyWakeupQueue(cap) ||
755          blackholes_need_checking ||
756          sched_state >= SCHED_INTERRUPTING))
757         return;
758
759     // otherwise yield (sleep), and keep yielding if necessary.
760     do {
761         yieldCapability(&cap,task);
762     } 
763     while (shouldYieldCapability(cap,task));
764
765     // note there may still be no threads on the run queue at this
766     // point, the caller has to check.
767
768     *pcap = cap;
769     return;
770 }
771 #endif
772     
773 /* -----------------------------------------------------------------------------
774  * schedulePushWork()
775  *
776  * Push work to other Capabilities if we have some.
777  * -------------------------------------------------------------------------- */
778
779 static void
780 schedulePushWork(Capability *cap USED_IF_THREADS, 
781                  Task *task      USED_IF_THREADS)
782 {
783   /* following code not for PARALLEL_HASKELL. I kept the call general,
784      future GUM versions might use pushing in a distributed setup */
785 #if defined(THREADED_RTS)
786
787     Capability *free_caps[n_capabilities], *cap0;
788     nat i, n_free_caps;
789
790     // migration can be turned off with +RTS -qg
791     if (!RtsFlags.ParFlags.migrate) return;
792
793     // Check whether we have more threads on our run queue, or sparks
794     // in our pool, that we could hand to another Capability.
795     if (cap->run_queue_hd == END_TSO_QUEUE) {
796         if (sparkPoolSizeCap(cap) < 2) return;
797     } else {
798         if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
799             sparkPoolSizeCap(cap) < 1) return;
800     }
801
802     // First grab as many free Capabilities as we can.
803     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
804         cap0 = &capabilities[i];
805         if (cap != cap0 && tryGrabCapability(cap0,task)) {
806             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
807                 // it already has some work, we just grabbed it at 
808                 // the wrong moment.  Or maybe it's deadlocked!
809                 releaseCapability(cap0);
810             } else {
811                 free_caps[n_free_caps++] = cap0;
812             }
813         }
814     }
815
816     // we now have n_free_caps free capabilities stashed in
817     // free_caps[].  Share our run queue equally with them.  This is
818     // probably the simplest thing we could do; improvements we might
819     // want to do include:
820     //
821     //   - giving high priority to moving relatively new threads, on 
822     //     the gournds that they haven't had time to build up a
823     //     working set in the cache on this CPU/Capability.
824     //
825     //   - giving low priority to moving long-lived threads
826
827     if (n_free_caps > 0) {
828         StgTSO *prev, *t, *next;
829         rtsBool pushed_to_all;
830
831         debugTrace(DEBUG_sched, 
832                    "cap %d: %s and %d free capabilities, sharing...", 
833                    cap->no, 
834                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
835                    "excess threads on run queue":"sparks to share (>=2)",
836                    n_free_caps);
837
838         i = 0;
839         pushed_to_all = rtsFalse;
840
841         if (cap->run_queue_hd != END_TSO_QUEUE) {
842             prev = cap->run_queue_hd;
843             t = prev->_link;
844             prev->_link = END_TSO_QUEUE;
845             for (; t != END_TSO_QUEUE; t = next) {
846                 next = t->_link;
847                 t->_link = END_TSO_QUEUE;
848                 if (t->what_next == ThreadRelocated
849                     || t->bound == task // don't move my bound thread
850                     || tsoLocked(t)) {  // don't move a locked thread
851                     setTSOLink(cap, prev, t);
852                     prev = t;
853                 } else if (i == n_free_caps) {
854                     pushed_to_all = rtsTrue;
855                     i = 0;
856                     // keep one for us
857                     setTSOLink(cap, prev, t);
858                     prev = t;
859                 } else {
860                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
861                     appendToRunQueue(free_caps[i],t);
862
863         postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
864
865                     if (t->bound) { t->bound->cap = free_caps[i]; }
866                     t->cap = free_caps[i];
867                     i++;
868                 }
869             }
870             cap->run_queue_tl = prev;
871         }
872
873 #ifdef SPARK_PUSHING
874         /* JB I left this code in place, it would work but is not necessary */
875
876         // If there are some free capabilities that we didn't push any
877         // threads to, then try to push a spark to each one.
878         if (!pushed_to_all) {
879             StgClosure *spark;
880             // i is the next free capability to push to
881             for (; i < n_free_caps; i++) {
882                 if (emptySparkPoolCap(free_caps[i])) {
883                     spark = tryStealSpark(cap->sparks);
884                     if (spark != NULL) {
885                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
886
887       postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no);
888
889                         newSpark(&(free_caps[i]->r), spark);
890                     }
891                 }
892             }
893         }
894 #endif /* SPARK_PUSHING */
895
896         // release the capabilities
897         for (i = 0; i < n_free_caps; i++) {
898             task->cap = free_caps[i];
899             releaseAndWakeupCapability(free_caps[i]);
900         }
901     }
902     task->cap = cap; // reset to point to our Capability.
903
904 #endif /* THREADED_RTS */
905
906 }
907
908 /* ----------------------------------------------------------------------------
909  * Start any pending signal handlers
910  * ------------------------------------------------------------------------- */
911
912 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
913 static void
914 scheduleStartSignalHandlers(Capability *cap)
915 {
916     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
917         // safe outside the lock
918         startSignalHandlers(cap);
919     }
920 }
921 #else
922 static void
923 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
924 {
925 }
926 #endif
927
928 /* ----------------------------------------------------------------------------
929  * Check for blocked threads that can be woken up.
930  * ------------------------------------------------------------------------- */
931
932 static void
933 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
934 {
935 #if !defined(THREADED_RTS)
936     //
937     // Check whether any waiting threads need to be woken up.  If the
938     // run queue is empty, and there are no other tasks running, we
939     // can wait indefinitely for something to happen.
940     //
941     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
942     {
943         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
944     }
945 #endif
946 }
947
948
949 /* ----------------------------------------------------------------------------
950  * Check for threads woken up by other Capabilities
951  * ------------------------------------------------------------------------- */
952
953 static void
954 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
955 {
956 #if defined(THREADED_RTS)
957     // Any threads that were woken up by other Capabilities get
958     // appended to our run queue.
959     if (!emptyWakeupQueue(cap)) {
960         ACQUIRE_LOCK(&cap->lock);
961         if (emptyRunQueue(cap)) {
962             cap->run_queue_hd = cap->wakeup_queue_hd;
963             cap->run_queue_tl = cap->wakeup_queue_tl;
964         } else {
965             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
966             cap->run_queue_tl = cap->wakeup_queue_tl;
967         }
968         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
969         RELEASE_LOCK(&cap->lock);
970     }
971 #endif
972 }
973
974 /* ----------------------------------------------------------------------------
975  * Check for threads blocked on BLACKHOLEs that can be woken up
976  * ------------------------------------------------------------------------- */
977 static void
978 scheduleCheckBlackHoles (Capability *cap)
979 {
980     if ( blackholes_need_checking ) // check without the lock first
981     {
982         ACQUIRE_LOCK(&sched_mutex);
983         if ( blackholes_need_checking ) {
984             blackholes_need_checking = rtsFalse;
985             // important that we reset the flag *before* checking the
986             // blackhole queue, otherwise we could get deadlock.  This
987             // happens as follows: we wake up a thread that
988             // immediately runs on another Capability, blocks on a
989             // blackhole, and then we reset the blackholes_need_checking flag.
990             checkBlackHoles(cap);
991         }
992         RELEASE_LOCK(&sched_mutex);
993     }
994 }
995
996 /* ----------------------------------------------------------------------------
997  * Detect deadlock conditions and attempt to resolve them.
998  * ------------------------------------------------------------------------- */
999
1000 static void
1001 scheduleDetectDeadlock (Capability *cap, Task *task)
1002 {
1003
1004 #if defined(PARALLEL_HASKELL)
1005     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
1006     return;
1007 #endif
1008
1009     /* 
1010      * Detect deadlock: when we have no threads to run, there are no
1011      * threads blocked, waiting for I/O, or sleeping, and all the
1012      * other tasks are waiting for work, we must have a deadlock of
1013      * some description.
1014      */
1015     if ( emptyThreadQueues(cap) )
1016     {
1017 #if defined(THREADED_RTS)
1018         /* 
1019          * In the threaded RTS, we only check for deadlock if there
1020          * has been no activity in a complete timeslice.  This means
1021          * we won't eagerly start a full GC just because we don't have
1022          * any threads to run currently.
1023          */
1024         if (recent_activity != ACTIVITY_INACTIVE) return;
1025 #endif
1026
1027         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
1028
1029         // Garbage collection can release some new threads due to
1030         // either (a) finalizers or (b) threads resurrected because
1031         // they are unreachable and will therefore be sent an
1032         // exception.  Any threads thus released will be immediately
1033         // runnable.
1034         cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
1035         // when force_major == rtsTrue. scheduleDoGC sets
1036         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
1037         // signal.
1038
1039         if ( !emptyRunQueue(cap) ) return;
1040
1041 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1042         /* If we have user-installed signal handlers, then wait
1043          * for signals to arrive rather then bombing out with a
1044          * deadlock.
1045          */
1046         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1047             debugTrace(DEBUG_sched,
1048                        "still deadlocked, waiting for signals...");
1049
1050             awaitUserSignals();
1051
1052             if (signals_pending()) {
1053                 startSignalHandlers(cap);
1054             }
1055
1056             // either we have threads to run, or we were interrupted:
1057             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1058
1059             return;
1060         }
1061 #endif
1062
1063 #if !defined(THREADED_RTS)
1064         /* Probably a real deadlock.  Send the current main thread the
1065          * Deadlock exception.
1066          */
1067         if (task->tso) {
1068             switch (task->tso->why_blocked) {
1069             case BlockedOnSTM:
1070             case BlockedOnBlackHole:
1071             case BlockedOnException:
1072             case BlockedOnMVar:
1073                 throwToSingleThreaded(cap, task->tso, 
1074                                       (StgClosure *)nonTermination_closure);
1075                 return;
1076             default:
1077                 barf("deadlock: main thread blocked in a strange way");
1078             }
1079         }
1080         return;
1081 #endif
1082     }
1083 }
1084
1085
1086 /* ----------------------------------------------------------------------------
1087  * Send pending messages (PARALLEL_HASKELL only)
1088  * ------------------------------------------------------------------------- */
1089
1090 #if defined(PARALLEL_HASKELL)
1091 static void
1092 scheduleSendPendingMessages(void)
1093 {
1094
1095 # if defined(PAR) // global Mem.Mgmt., omit for now
1096     if (PendingFetches != END_BF_QUEUE) {
1097         processFetches();
1098     }
1099 # endif
1100     
1101     if (RtsFlags.ParFlags.BufferTime) {
1102         // if we use message buffering, we must send away all message
1103         // packets which have become too old...
1104         sendOldBuffers(); 
1105     }
1106 }
1107 #endif
1108
1109 /* ----------------------------------------------------------------------------
1110  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1111  * ------------------------------------------------------------------------- */
1112
1113 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1114 static void
1115 scheduleActivateSpark(Capability *cap)
1116 {
1117     if (anySparks())
1118     {
1119         createSparkThread(cap);
1120         debugTrace(DEBUG_sched, "creating a spark thread");
1121     }
1122 }
1123 #endif // PARALLEL_HASKELL || THREADED_RTS
1124
1125 /* ----------------------------------------------------------------------------
1126  * Get work from a remote node (PARALLEL_HASKELL only)
1127  * ------------------------------------------------------------------------- */
1128     
1129 #if defined(PARALLEL_HASKELL)
1130 static rtsBool /* return value used in PARALLEL_HASKELL only */
1131 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1132 {
1133 #if defined(PARALLEL_HASKELL)
1134   rtsBool receivedFinish = rtsFalse;
1135
1136   // idle() , i.e. send all buffers, wait for work
1137   if (RtsFlags.ParFlags.BufferTime) {
1138         IF_PAR_DEBUG(verbose, 
1139                 debugBelch("...send all pending data,"));
1140         {
1141           nat i;
1142           for (i=1; i<=nPEs; i++)
1143             sendImmediately(i); // send all messages away immediately
1144         }
1145   }
1146
1147   /* this would be the place for fishing in GUM... 
1148
1149      if (no-earlier-fish-around) 
1150           sendFish(choosePe());
1151    */
1152
1153   // Eden:just look for incoming messages (blocking receive)
1154   IF_PAR_DEBUG(verbose, 
1155                debugBelch("...wait for incoming messages...\n"));
1156   processMessages(cap, &receivedFinish); // blocking receive...
1157
1158
1159   return receivedFinish;
1160   // reenter scheduling look after having received something
1161
1162 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1163
1164   return rtsFalse; /* return value unused in THREADED_RTS */
1165
1166 #endif /* PARALLEL_HASKELL */
1167 }
1168 #endif // PARALLEL_HASKELL || THREADED_RTS
1169
1170 /* ----------------------------------------------------------------------------
1171  * After running a thread...
1172  * ------------------------------------------------------------------------- */
1173
1174 static void
1175 schedulePostRunThread (Capability *cap, StgTSO *t)
1176 {
1177     // We have to be able to catch transactions that are in an
1178     // infinite loop as a result of seeing an inconsistent view of
1179     // memory, e.g. 
1180     //
1181     //   atomically $ do
1182     //       [a,b] <- mapM readTVar [ta,tb]
1183     //       when (a == b) loop
1184     //
1185     // and a is never equal to b given a consistent view of memory.
1186     //
1187     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1188         if (!stmValidateNestOfTransactions (t -> trec)) {
1189             debugTrace(DEBUG_sched | DEBUG_stm,
1190                        "trec %p found wasting its time", t);
1191             
1192             // strip the stack back to the
1193             // ATOMICALLY_FRAME, aborting the (nested)
1194             // transaction, and saving the stack of any
1195             // partially-evaluated thunks on the heap.
1196             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1197             
1198             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1199         }
1200     }
1201
1202   /* some statistics gathering in the parallel case */
1203 }
1204
1205 /* -----------------------------------------------------------------------------
1206  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1207  * -------------------------------------------------------------------------- */
1208
1209 static rtsBool
1210 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1211 {
1212     // did the task ask for a large block?
1213     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1214         // if so, get one and push it on the front of the nursery.
1215         bdescr *bd;
1216         lnat blocks;
1217         
1218         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1219         
1220         debugTrace(DEBUG_sched,
1221                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1222                    (long)t->id, whatNext_strs[t->what_next], blocks);
1223     
1224         // don't do this if the nursery is (nearly) full, we'll GC first.
1225         if (cap->r.rCurrentNursery->link != NULL ||
1226             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1227                                                // if the nursery has only one block.
1228             
1229             ACQUIRE_SM_LOCK
1230             bd = allocGroup( blocks );
1231             RELEASE_SM_LOCK
1232             cap->r.rNursery->n_blocks += blocks;
1233             
1234             // link the new group into the list
1235             bd->link = cap->r.rCurrentNursery;
1236             bd->u.back = cap->r.rCurrentNursery->u.back;
1237             if (cap->r.rCurrentNursery->u.back != NULL) {
1238                 cap->r.rCurrentNursery->u.back->link = bd;
1239             } else {
1240 #if !defined(THREADED_RTS)
1241                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1242                        g0s0 == cap->r.rNursery);
1243 #endif
1244                 cap->r.rNursery->blocks = bd;
1245             }             
1246             cap->r.rCurrentNursery->u.back = bd;
1247             
1248             // initialise it as a nursery block.  We initialise the
1249             // step, gen_no, and flags field of *every* sub-block in
1250             // this large block, because this is easier than making
1251             // sure that we always find the block head of a large
1252             // block whenever we call Bdescr() (eg. evacuate() and
1253             // isAlive() in the GC would both have to do this, at
1254             // least).
1255             { 
1256                 bdescr *x;
1257                 for (x = bd; x < bd + blocks; x++) {
1258                     x->step = cap->r.rNursery;
1259                     x->gen_no = 0;
1260                     x->flags = 0;
1261                 }
1262             }
1263             
1264             // This assert can be a killer if the app is doing lots
1265             // of large block allocations.
1266             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1267             
1268             // now update the nursery to point to the new block
1269             cap->r.rCurrentNursery = bd;
1270             
1271             // we might be unlucky and have another thread get on the
1272             // run queue before us and steal the large block, but in that
1273             // case the thread will just end up requesting another large
1274             // block.
1275             pushOnRunQueue(cap,t);
1276             return rtsFalse;  /* not actually GC'ing */
1277         }
1278     }
1279     
1280     debugTrace(DEBUG_sched,
1281                "--<< thread %ld (%s) stopped: HeapOverflow",
1282                (long)t->id, whatNext_strs[t->what_next]);
1283
1284     if (cap->r.rHpLim == NULL || cap->context_switch) {
1285         // Sometimes we miss a context switch, e.g. when calling
1286         // primitives in a tight loop, MAYBE_GC() doesn't check the
1287         // context switch flag, and we end up waiting for a GC.
1288         // See #1984, and concurrent/should_run/1984
1289         cap->context_switch = 0;
1290         addToRunQueue(cap,t);
1291     } else {
1292         pushOnRunQueue(cap,t);
1293     }
1294     return rtsTrue;
1295     /* actual GC is done at the end of the while loop in schedule() */
1296 }
1297
1298 /* -----------------------------------------------------------------------------
1299  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1300  * -------------------------------------------------------------------------- */
1301
1302 static void
1303 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1304 {
1305     debugTrace (DEBUG_sched,
1306                 "--<< thread %ld (%s) stopped, StackOverflow", 
1307                 (long)t->id, whatNext_strs[t->what_next]);
1308
1309     /* just adjust the stack for this thread, then pop it back
1310      * on the run queue.
1311      */
1312     { 
1313         /* enlarge the stack */
1314         StgTSO *new_t = threadStackOverflow(cap, t);
1315         
1316         /* The TSO attached to this Task may have moved, so update the
1317          * pointer to it.
1318          */
1319         if (task->tso == t) {
1320             task->tso = new_t;
1321         }
1322         pushOnRunQueue(cap,new_t);
1323     }
1324 }
1325
1326 /* -----------------------------------------------------------------------------
1327  * Handle a thread that returned to the scheduler with ThreadYielding
1328  * -------------------------------------------------------------------------- */
1329
1330 static rtsBool
1331 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1332 {
1333     // Reset the context switch flag.  We don't do this just before
1334     // running the thread, because that would mean we would lose ticks
1335     // during GC, which can lead to unfair scheduling (a thread hogs
1336     // the CPU because the tick always arrives during GC).  This way
1337     // penalises threads that do a lot of allocation, but that seems
1338     // better than the alternative.
1339     cap->context_switch = 0;
1340     
1341     /* put the thread back on the run queue.  Then, if we're ready to
1342      * GC, check whether this is the last task to stop.  If so, wake
1343      * up the GC thread.  getThread will block during a GC until the
1344      * GC is finished.
1345      */
1346 #ifdef DEBUG
1347     if (t->what_next != prev_what_next) {
1348         debugTrace(DEBUG_sched,
1349                    "--<< thread %ld (%s) stopped to switch evaluators", 
1350                    (long)t->id, whatNext_strs[t->what_next]);
1351     } else {
1352         debugTrace(DEBUG_sched,
1353                    "--<< thread %ld (%s) stopped, yielding",
1354                    (long)t->id, whatNext_strs[t->what_next]);
1355     }
1356 #endif
1357     
1358     IF_DEBUG(sanity,
1359              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1360              checkTSO(t));
1361     ASSERT(t->_link == END_TSO_QUEUE);
1362     
1363     // Shortcut if we're just switching evaluators: don't bother
1364     // doing stack squeezing (which can be expensive), just run the
1365     // thread.
1366     if (t->what_next != prev_what_next) {
1367         return rtsTrue;
1368     }
1369
1370     addToRunQueue(cap,t);
1371
1372     return rtsFalse;
1373 }
1374
1375 /* -----------------------------------------------------------------------------
1376  * Handle a thread that returned to the scheduler with ThreadBlocked
1377  * -------------------------------------------------------------------------- */
1378
1379 static void
1380 scheduleHandleThreadBlocked( StgTSO *t
1381 #if !defined(GRAN) && !defined(DEBUG)
1382     STG_UNUSED
1383 #endif
1384     )
1385 {
1386
1387       // We don't need to do anything.  The thread is blocked, and it
1388       // has tidied up its stack and placed itself on whatever queue
1389       // it needs to be on.
1390
1391     // ASSERT(t->why_blocked != NotBlocked);
1392     // Not true: for example,
1393     //    - in THREADED_RTS, the thread may already have been woken
1394     //      up by another Capability.  This actually happens: try
1395     //      conc023 +RTS -N2.
1396     //    - the thread may have woken itself up already, because
1397     //      threadPaused() might have raised a blocked throwTo
1398     //      exception, see maybePerformBlockedException().
1399
1400 #ifdef DEBUG
1401     if (traceClass(DEBUG_sched)) {
1402         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1403                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1404         printThreadBlockage(t);
1405         debugTraceEnd();
1406     }
1407 #endif
1408 }
1409
1410 /* -----------------------------------------------------------------------------
1411  * Handle a thread that returned to the scheduler with ThreadFinished
1412  * -------------------------------------------------------------------------- */
1413
1414 static rtsBool
1415 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1416 {
1417     /* Need to check whether this was a main thread, and if so,
1418      * return with the return value.
1419      *
1420      * We also end up here if the thread kills itself with an
1421      * uncaught exception, see Exception.cmm.
1422      */
1423     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1424                (unsigned long)t->id, whatNext_strs[t->what_next]);
1425
1426     // blocked exceptions can now complete, even if the thread was in
1427     // blocked mode (see #2910).  This unconditionally calls
1428     // lockTSO(), which ensures that we don't miss any threads that
1429     // are engaged in throwTo() with this thread as a target.
1430     awakenBlockedExceptionQueue (cap, t);
1431
1432       //
1433       // Check whether the thread that just completed was a bound
1434       // thread, and if so return with the result.  
1435       //
1436       // There is an assumption here that all thread completion goes
1437       // through this point; we need to make sure that if a thread
1438       // ends up in the ThreadKilled state, that it stays on the run
1439       // queue so it can be dealt with here.
1440       //
1441
1442       if (t->bound) {
1443
1444           if (t->bound != task) {
1445 #if !defined(THREADED_RTS)
1446               // Must be a bound thread that is not the topmost one.  Leave
1447               // it on the run queue until the stack has unwound to the
1448               // point where we can deal with this.  Leaving it on the run
1449               // queue also ensures that the garbage collector knows about
1450               // this thread and its return value (it gets dropped from the
1451               // step->threads list so there's no other way to find it).
1452               appendToRunQueue(cap,t);
1453               return rtsFalse;
1454 #else
1455               // this cannot happen in the threaded RTS, because a
1456               // bound thread can only be run by the appropriate Task.
1457               barf("finished bound thread that isn't mine");
1458 #endif
1459           }
1460
1461           ASSERT(task->tso == t);
1462
1463           if (t->what_next == ThreadComplete) {
1464               if (task->ret) {
1465                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1466                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1467               }
1468               task->stat = Success;
1469           } else {
1470               if (task->ret) {
1471                   *(task->ret) = NULL;
1472               }
1473               if (sched_state >= SCHED_INTERRUPTING) {
1474                   if (heap_overflow) {
1475                       task->stat = HeapExhausted;
1476                   } else {
1477                       task->stat = Interrupted;
1478                   }
1479               } else {
1480                   task->stat = Killed;
1481               }
1482           }
1483 #ifdef DEBUG
1484           removeThreadLabel((StgWord)task->tso->id);
1485 #endif
1486           return rtsTrue; // tells schedule() to return
1487       }
1488
1489       return rtsFalse;
1490 }
1491
1492 /* -----------------------------------------------------------------------------
1493  * Perform a heap census
1494  * -------------------------------------------------------------------------- */
1495
1496 static rtsBool
1497 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1498 {
1499     // When we have +RTS -i0 and we're heap profiling, do a census at
1500     // every GC.  This lets us get repeatable runs for debugging.
1501     if (performHeapProfile ||
1502         (RtsFlags.ProfFlags.profileInterval==0 &&
1503          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1504         return rtsTrue;
1505     } else {
1506         return rtsFalse;
1507     }
1508 }
1509
1510 /* -----------------------------------------------------------------------------
1511  * Perform a garbage collection if necessary
1512  * -------------------------------------------------------------------------- */
1513
1514 static Capability *
1515 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1516 {
1517     rtsBool heap_census;
1518 #ifdef THREADED_RTS
1519     /* extern static volatile StgWord waiting_for_gc; 
1520        lives inside capability.c */
1521     rtsBool gc_type, prev_pending_gc;
1522     nat i;
1523 #endif
1524
1525     if (sched_state == SCHED_SHUTTING_DOWN) {
1526         // The final GC has already been done, and the system is
1527         // shutting down.  We'll probably deadlock if we try to GC
1528         // now.
1529         return cap;
1530     }
1531
1532 #ifdef THREADED_RTS
1533     if (sched_state < SCHED_INTERRUPTING
1534         && RtsFlags.ParFlags.parGcEnabled
1535         && N >= RtsFlags.ParFlags.parGcGen
1536         && ! oldest_gen->steps[0].mark)
1537     {
1538         gc_type = PENDING_GC_PAR;
1539     } else {
1540         gc_type = PENDING_GC_SEQ;
1541     }
1542
1543     // In order to GC, there must be no threads running Haskell code.
1544     // Therefore, the GC thread needs to hold *all* the capabilities,
1545     // and release them after the GC has completed.  
1546     //
1547     // This seems to be the simplest way: previous attempts involved
1548     // making all the threads with capabilities give up their
1549     // capabilities and sleep except for the *last* one, which
1550     // actually did the GC.  But it's quite hard to arrange for all
1551     // the other tasks to sleep and stay asleep.
1552     //
1553
1554     /*  Other capabilities are prevented from running yet more Haskell
1555         threads if waiting_for_gc is set. Tested inside
1556         yieldCapability() and releaseCapability() in Capability.c */
1557
1558     prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1559     if (prev_pending_gc) {
1560         do {
1561             debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
1562                        prev_pending_gc);
1563             ASSERT(cap);
1564             yieldCapability(&cap,task);
1565         } while (waiting_for_gc);
1566         return cap;  // NOTE: task->cap might have changed here
1567     }
1568
1569     setContextSwitches();
1570
1571     // The final shutdown GC is always single-threaded, because it's
1572     // possible that some of the Capabilities have no worker threads.
1573     
1574     if (gc_type == PENDING_GC_SEQ)
1575     {
1576         postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
1577         // single-threaded GC: grab all the capabilities
1578         for (i=0; i < n_capabilities; i++) {
1579             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1580             if (cap != &capabilities[i]) {
1581                 Capability *pcap = &capabilities[i];
1582                 // we better hope this task doesn't get migrated to
1583                 // another Capability while we're waiting for this one.
1584                 // It won't, because load balancing happens while we have
1585                 // all the Capabilities, but even so it's a slightly
1586                 // unsavoury invariant.
1587                 task->cap = pcap;
1588                 waitForReturnCapability(&pcap, task);
1589                 if (pcap != &capabilities[i]) {
1590                     barf("scheduleDoGC: got the wrong capability");
1591                 }
1592             }
1593         }
1594     }
1595     else
1596     {
1597         // multi-threaded GC: make sure all the Capabilities donate one
1598         // GC thread each.
1599         postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
1600         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1601
1602         waitForGcThreads(cap);
1603     }
1604 #endif
1605
1606     // so this happens periodically:
1607     if (cap) scheduleCheckBlackHoles(cap);
1608     
1609     IF_DEBUG(scheduler, printAllThreads());
1610
1611 delete_threads_and_gc:
1612     /*
1613      * We now have all the capabilities; if we're in an interrupting
1614      * state, then we should take the opportunity to delete all the
1615      * threads in the system.
1616      */
1617     if (sched_state == SCHED_INTERRUPTING) {
1618         deleteAllThreads(cap);
1619         sched_state = SCHED_SHUTTING_DOWN;
1620     }
1621     
1622     heap_census = scheduleNeedHeapProfile(rtsTrue);
1623
1624 #if defined(THREADED_RTS)
1625     postEvent(cap, EVENT_GC_START, 0, 0);
1626     debugTrace(DEBUG_sched, "doing GC");
1627     // reset waiting_for_gc *before* GC, so that when the GC threads
1628     // emerge they don't immediately re-enter the GC.
1629     waiting_for_gc = 0;
1630     GarbageCollect(force_major || heap_census, gc_type, cap);
1631 #else
1632     GarbageCollect(force_major || heap_census, 0, cap);
1633 #endif
1634     postEvent(cap, EVENT_GC_END, 0, 0);
1635
1636     if (recent_activity == ACTIVITY_INACTIVE && force_major)
1637     {
1638         // We are doing a GC because the system has been idle for a
1639         // timeslice and we need to check for deadlock.  Record the
1640         // fact that we've done a GC and turn off the timer signal;
1641         // it will get re-enabled if we run any threads after the GC.
1642         recent_activity = ACTIVITY_DONE_GC;
1643         stopTimer();
1644     }
1645     else
1646     {
1647         // the GC might have taken long enough for the timer to set
1648         // recent_activity = ACTIVITY_INACTIVE, but we aren't
1649         // necessarily deadlocked:
1650         recent_activity = ACTIVITY_YES;
1651     }
1652
1653 #if defined(THREADED_RTS)
1654     if (gc_type == PENDING_GC_PAR)
1655     {
1656         releaseGCThreads(cap);
1657     }
1658 #endif
1659
1660     if (heap_census) {
1661         debugTrace(DEBUG_sched, "performing heap census");
1662         heapCensus();
1663         performHeapProfile = rtsFalse;
1664     }
1665
1666     if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1667         // GC set the heap_overflow flag, so we should proceed with
1668         // an orderly shutdown now.  Ultimately we want the main
1669         // thread to return to its caller with HeapExhausted, at which
1670         // point the caller should call hs_exit().  The first step is
1671         // to delete all the threads.
1672         //
1673         // Another way to do this would be to raise an exception in
1674         // the main thread, which we really should do because it gives
1675         // the program a chance to clean up.  But how do we find the
1676         // main thread?  It should presumably be the same one that
1677         // gets ^C exceptions, but that's all done on the Haskell side
1678         // (GHC.TopHandler).
1679         sched_state = SCHED_INTERRUPTING;
1680         goto delete_threads_and_gc;
1681     }
1682
1683 #ifdef SPARKBALANCE
1684     /* JB 
1685        Once we are all together... this would be the place to balance all
1686        spark pools. No concurrent stealing or adding of new sparks can
1687        occur. Should be defined in Sparks.c. */
1688     balanceSparkPoolsCaps(n_capabilities, capabilities);
1689 #endif
1690
1691 #if defined(THREADED_RTS)
1692     if (gc_type == PENDING_GC_SEQ) {
1693         // release our stash of capabilities.
1694         for (i = 0; i < n_capabilities; i++) {
1695             if (cap != &capabilities[i]) {
1696                 task->cap = &capabilities[i];
1697                 releaseCapability(&capabilities[i]);
1698             }
1699         }
1700     }
1701     if (cap) {
1702         task->cap = cap;
1703     } else {
1704         task->cap = NULL;
1705     }
1706 #endif
1707
1708     return cap;
1709 }
1710
1711 /* ---------------------------------------------------------------------------
1712  * Singleton fork(). Do not copy any running threads.
1713  * ------------------------------------------------------------------------- */
1714
1715 pid_t
1716 forkProcess(HsStablePtr *entry
1717 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1718             STG_UNUSED
1719 #endif
1720            )
1721 {
1722 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1723     Task *task;
1724     pid_t pid;
1725     StgTSO* t,*next;
1726     Capability *cap;
1727     nat s;
1728     
1729 #if defined(THREADED_RTS)
1730     if (RtsFlags.ParFlags.nNodes > 1) {
1731         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1732         stg_exit(EXIT_FAILURE);
1733     }
1734 #endif
1735
1736     debugTrace(DEBUG_sched, "forking!");
1737     
1738     // ToDo: for SMP, we should probably acquire *all* the capabilities
1739     cap = rts_lock();
1740     
1741     // no funny business: hold locks while we fork, otherwise if some
1742     // other thread is holding a lock when the fork happens, the data
1743     // structure protected by the lock will forever be in an
1744     // inconsistent state in the child.  See also #1391.
1745     ACQUIRE_LOCK(&sched_mutex);
1746     ACQUIRE_LOCK(&cap->lock);
1747     ACQUIRE_LOCK(&cap->running_task->lock);
1748
1749     pid = fork();
1750     
1751     if (pid) { // parent
1752         
1753         RELEASE_LOCK(&sched_mutex);
1754         RELEASE_LOCK(&cap->lock);
1755         RELEASE_LOCK(&cap->running_task->lock);
1756
1757         // just return the pid
1758         rts_unlock(cap);
1759         return pid;
1760         
1761     } else { // child
1762         
1763 #if defined(THREADED_RTS)
1764         initMutex(&sched_mutex);
1765         initMutex(&cap->lock);
1766         initMutex(&cap->running_task->lock);
1767 #endif
1768
1769         // Now, all OS threads except the thread that forked are
1770         // stopped.  We need to stop all Haskell threads, including
1771         // those involved in foreign calls.  Also we need to delete
1772         // all Tasks, because they correspond to OS threads that are
1773         // now gone.
1774
1775         for (s = 0; s < total_steps; s++) {
1776           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1777             if (t->what_next == ThreadRelocated) {
1778                 next = t->_link;
1779             } else {
1780                 next = t->global_link;
1781                 // don't allow threads to catch the ThreadKilled
1782                 // exception, but we do want to raiseAsync() because these
1783                 // threads may be evaluating thunks that we need later.
1784                 deleteThread_(cap,t);
1785             }
1786           }
1787         }
1788         
1789         // Empty the run queue.  It seems tempting to let all the
1790         // killed threads stay on the run queue as zombies to be
1791         // cleaned up later, but some of them correspond to bound
1792         // threads for which the corresponding Task does not exist.
1793         cap->run_queue_hd = END_TSO_QUEUE;
1794         cap->run_queue_tl = END_TSO_QUEUE;
1795
1796         // Any suspended C-calling Tasks are no more, their OS threads
1797         // don't exist now:
1798         cap->suspended_ccalling_tasks = NULL;
1799
1800         // Empty the threads lists.  Otherwise, the garbage
1801         // collector may attempt to resurrect some of these threads.
1802         for (s = 0; s < total_steps; s++) {
1803             all_steps[s].threads = END_TSO_QUEUE;
1804         }
1805
1806         // Wipe the task list, except the current Task.
1807         ACQUIRE_LOCK(&sched_mutex);
1808         for (task = all_tasks; task != NULL; task=task->all_link) {
1809             if (task != cap->running_task) {
1810 #if defined(THREADED_RTS)
1811                 initMutex(&task->lock); // see #1391
1812 #endif
1813                 discardTask(task);
1814             }
1815         }
1816         RELEASE_LOCK(&sched_mutex);
1817
1818 #if defined(THREADED_RTS)
1819         // Wipe our spare workers list, they no longer exist.  New
1820         // workers will be created if necessary.
1821         cap->spare_workers = NULL;
1822         cap->returning_tasks_hd = NULL;
1823         cap->returning_tasks_tl = NULL;
1824 #endif
1825
1826         // On Unix, all timers are reset in the child, so we need to start
1827         // the timer again.
1828         initTimer();
1829         startTimer();
1830
1831         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1832         rts_checkSchedStatus("forkProcess",cap);
1833         
1834         rts_unlock(cap);
1835         hs_exit();                      // clean up and exit
1836         stg_exit(EXIT_SUCCESS);
1837     }
1838 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1839     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1840     return -1;
1841 #endif
1842 }
1843
1844 /* ---------------------------------------------------------------------------
1845  * Delete all the threads in the system
1846  * ------------------------------------------------------------------------- */
1847    
1848 static void
1849 deleteAllThreads ( Capability *cap )
1850 {
1851     // NOTE: only safe to call if we own all capabilities.
1852
1853     StgTSO* t, *next;
1854     nat s;
1855
1856     debugTrace(DEBUG_sched,"deleting all threads");
1857     for (s = 0; s < total_steps; s++) {
1858       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1859         if (t->what_next == ThreadRelocated) {
1860             next = t->_link;
1861         } else {
1862             next = t->global_link;
1863             deleteThread(cap,t);
1864         }
1865       }
1866     }      
1867
1868     // The run queue now contains a bunch of ThreadKilled threads.  We
1869     // must not throw these away: the main thread(s) will be in there
1870     // somewhere, and the main scheduler loop has to deal with it.
1871     // Also, the run queue is the only thing keeping these threads from
1872     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1873
1874 #if !defined(THREADED_RTS)
1875     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1876     ASSERT(sleeping_queue == END_TSO_QUEUE);
1877 #endif
1878 }
1879
1880 /* -----------------------------------------------------------------------------
1881    Managing the suspended_ccalling_tasks list.
1882    Locks required: sched_mutex
1883    -------------------------------------------------------------------------- */
1884
1885 STATIC_INLINE void
1886 suspendTask (Capability *cap, Task *task)
1887 {
1888     ASSERT(task->next == NULL && task->prev == NULL);
1889     task->next = cap->suspended_ccalling_tasks;
1890     task->prev = NULL;
1891     if (cap->suspended_ccalling_tasks) {
1892         cap->suspended_ccalling_tasks->prev = task;
1893     }
1894     cap->suspended_ccalling_tasks = task;
1895 }
1896
1897 STATIC_INLINE void
1898 recoverSuspendedTask (Capability *cap, Task *task)
1899 {
1900     if (task->prev) {
1901         task->prev->next = task->next;
1902     } else {
1903         ASSERT(cap->suspended_ccalling_tasks == task);
1904         cap->suspended_ccalling_tasks = task->next;
1905     }
1906     if (task->next) {
1907         task->next->prev = task->prev;
1908     }
1909     task->next = task->prev = NULL;
1910 }
1911
1912 /* ---------------------------------------------------------------------------
1913  * Suspending & resuming Haskell threads.
1914  * 
1915  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1916  * its capability before calling the C function.  This allows another
1917  * task to pick up the capability and carry on running Haskell
1918  * threads.  It also means that if the C call blocks, it won't lock
1919  * the whole system.
1920  *
1921  * The Haskell thread making the C call is put to sleep for the
1922  * duration of the call, on the susepended_ccalling_threads queue.  We
1923  * give out a token to the task, which it can use to resume the thread
1924  * on return from the C function.
1925  * ------------------------------------------------------------------------- */
1926    
1927 void *
1928 suspendThread (StgRegTable *reg)
1929 {
1930   Capability *cap;
1931   int saved_errno;
1932   StgTSO *tso;
1933   Task *task;
1934 #if mingw32_HOST_OS
1935   StgWord32 saved_winerror;
1936 #endif
1937
1938   saved_errno = errno;
1939 #if mingw32_HOST_OS
1940   saved_winerror = GetLastError();
1941 #endif
1942
1943   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1944    */
1945   cap = regTableToCapability(reg);
1946
1947   task = cap->running_task;
1948   tso = cap->r.rCurrentTSO;
1949
1950   postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
1951   debugTrace(DEBUG_sched, 
1952              "thread %lu did a safe foreign call", 
1953              (unsigned long)cap->r.rCurrentTSO->id);
1954
1955   // XXX this might not be necessary --SDM
1956   tso->what_next = ThreadRunGHC;
1957
1958   threadPaused(cap,tso);
1959
1960   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1961       tso->why_blocked = BlockedOnCCall;
1962       tso->flags |= TSO_BLOCKEX;
1963       tso->flags &= ~TSO_INTERRUPTIBLE;
1964   } else {
1965       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1966   }
1967
1968   // Hand back capability
1969   task->suspended_tso = tso;
1970
1971   ACQUIRE_LOCK(&cap->lock);
1972
1973   suspendTask(cap,task);
1974   cap->in_haskell = rtsFalse;
1975   releaseCapability_(cap,rtsFalse);
1976   
1977   RELEASE_LOCK(&cap->lock);
1978
1979 #if defined(THREADED_RTS)
1980   /* Preparing to leave the RTS, so ensure there's a native thread/task
1981      waiting to take over.
1982   */
1983   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1984 #endif
1985
1986   errno = saved_errno;
1987 #if mingw32_HOST_OS
1988   SetLastError(saved_winerror);
1989 #endif
1990   return task;
1991 }
1992
1993 StgRegTable *
1994 resumeThread (void *task_)
1995 {
1996     StgTSO *tso;
1997     Capability *cap;
1998     Task *task = task_;
1999     int saved_errno;
2000 #if mingw32_HOST_OS
2001     StgWord32 saved_winerror;
2002 #endif
2003
2004     saved_errno = errno;
2005 #if mingw32_HOST_OS
2006     saved_winerror = GetLastError();
2007 #endif
2008
2009     cap = task->cap;
2010     // Wait for permission to re-enter the RTS with the result.
2011     waitForReturnCapability(&cap,task);
2012     // we might be on a different capability now... but if so, our
2013     // entry on the suspended_ccalling_tasks list will also have been
2014     // migrated.
2015
2016     // Remove the thread from the suspended list
2017     recoverSuspendedTask(cap,task);
2018
2019     tso = task->suspended_tso;
2020     task->suspended_tso = NULL;
2021     tso->_link = END_TSO_QUEUE; // no write barrier reqd
2022
2023     postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
2024     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2025     
2026     if (tso->why_blocked == BlockedOnCCall) {
2027         // avoid locking the TSO if we don't have to
2028         if (tso->blocked_exceptions != END_TSO_QUEUE) {
2029             awakenBlockedExceptionQueue(cap,tso);
2030         }
2031         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2032     }
2033     
2034     /* Reset blocking status */
2035     tso->why_blocked  = NotBlocked;
2036     
2037     cap->r.rCurrentTSO = tso;
2038     cap->in_haskell = rtsTrue;
2039     errno = saved_errno;
2040 #if mingw32_HOST_OS
2041     SetLastError(saved_winerror);
2042 #endif
2043
2044     /* We might have GC'd, mark the TSO dirty again */
2045     dirty_TSO(cap,tso);
2046
2047     IF_DEBUG(sanity, checkTSO(tso));
2048
2049     return &cap->r;
2050 }
2051
2052 /* ---------------------------------------------------------------------------
2053  * scheduleThread()
2054  *
2055  * scheduleThread puts a thread on the end  of the runnable queue.
2056  * This will usually be done immediately after a thread is created.
2057  * The caller of scheduleThread must create the thread using e.g.
2058  * createThread and push an appropriate closure
2059  * on this thread's stack before the scheduler is invoked.
2060  * ------------------------------------------------------------------------ */
2061
2062 void
2063 scheduleThread(Capability *cap, StgTSO *tso)
2064 {
2065     // The thread goes at the *end* of the run-queue, to avoid possible
2066     // starvation of any threads already on the queue.
2067     appendToRunQueue(cap,tso);
2068 }
2069
2070 void
2071 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2072 {
2073 #if defined(THREADED_RTS)
2074     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2075                               // move this thread from now on.
2076     cpu %= RtsFlags.ParFlags.nNodes;
2077     if (cpu == cap->no) {
2078         appendToRunQueue(cap,tso);
2079     } else {
2080         postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
2081         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
2082     }
2083 #else
2084     appendToRunQueue(cap,tso);
2085 #endif
2086 }
2087
2088 Capability *
2089 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2090 {
2091     Task *task;
2092
2093     // We already created/initialised the Task
2094     task = cap->running_task;
2095
2096     // This TSO is now a bound thread; make the Task and TSO
2097     // point to each other.
2098     tso->bound = task;
2099     tso->cap = cap;
2100
2101     task->tso = tso;
2102     task->ret = ret;
2103     task->stat = NoStatus;
2104
2105     appendToRunQueue(cap,tso);
2106
2107     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2108
2109     cap = schedule(cap,task);
2110
2111     ASSERT(task->stat != NoStatus);
2112     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2113
2114     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2115     return cap;
2116 }
2117
2118 /* ----------------------------------------------------------------------------
2119  * Starting Tasks
2120  * ------------------------------------------------------------------------- */
2121
2122 #if defined(THREADED_RTS)
2123 void OSThreadProcAttr
2124 workerStart(Task *task)
2125 {
2126     Capability *cap;
2127
2128     // See startWorkerTask().
2129     ACQUIRE_LOCK(&task->lock);
2130     cap = task->cap;
2131     RELEASE_LOCK(&task->lock);
2132
2133     if (RtsFlags.ParFlags.setAffinity) {
2134         setThreadAffinity(cap->no, n_capabilities);
2135     }
2136
2137     // set the thread-local pointer to the Task:
2138     taskEnter(task);
2139
2140     // schedule() runs without a lock.
2141     cap = schedule(cap,task);
2142
2143     // On exit from schedule(), we have a Capability, but possibly not
2144     // the same one we started with.
2145
2146     // During shutdown, the requirement is that after all the
2147     // Capabilities are shut down, all workers that are shutting down
2148     // have finished workerTaskStop().  This is why we hold on to
2149     // cap->lock until we've finished workerTaskStop() below.
2150     //
2151     // There may be workers still involved in foreign calls; those
2152     // will just block in waitForReturnCapability() because the
2153     // Capability has been shut down.
2154     //
2155     ACQUIRE_LOCK(&cap->lock);
2156     releaseCapability_(cap,rtsFalse);
2157     workerTaskStop(task);
2158     RELEASE_LOCK(&cap->lock);
2159 }
2160 #endif
2161
2162 /* ---------------------------------------------------------------------------
2163  * initScheduler()
2164  *
2165  * Initialise the scheduler.  This resets all the queues - if the
2166  * queues contained any threads, they'll be garbage collected at the
2167  * next pass.
2168  *
2169  * ------------------------------------------------------------------------ */
2170
2171 void 
2172 initScheduler(void)
2173 {
2174 #if !defined(THREADED_RTS)
2175   blocked_queue_hd  = END_TSO_QUEUE;
2176   blocked_queue_tl  = END_TSO_QUEUE;
2177   sleeping_queue    = END_TSO_QUEUE;
2178 #endif
2179
2180   blackhole_queue   = END_TSO_QUEUE;
2181
2182   sched_state    = SCHED_RUNNING;
2183   recent_activity = ACTIVITY_YES;
2184
2185 #if defined(THREADED_RTS)
2186   /* Initialise the mutex and condition variables used by
2187    * the scheduler. */
2188   initMutex(&sched_mutex);
2189 #endif
2190   
2191   ACQUIRE_LOCK(&sched_mutex);
2192
2193   /* A capability holds the state a native thread needs in
2194    * order to execute STG code. At least one capability is
2195    * floating around (only THREADED_RTS builds have more than one).
2196    */
2197   initCapabilities();
2198
2199   initTaskManager();
2200
2201 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2202   initSparkPools();
2203 #endif
2204
2205 #if defined(THREADED_RTS)
2206   /*
2207    * Eagerly start one worker to run each Capability, except for
2208    * Capability 0.  The idea is that we're probably going to start a
2209    * bound thread on Capability 0 pretty soon, so we don't want a
2210    * worker task hogging it.
2211    */
2212   { 
2213       nat i;
2214       Capability *cap;
2215       for (i = 1; i < n_capabilities; i++) {
2216           cap = &capabilities[i];
2217           ACQUIRE_LOCK(&cap->lock);
2218           startWorkerTask(cap, workerStart);
2219           RELEASE_LOCK(&cap->lock);
2220       }
2221   }
2222 #endif
2223
2224   RELEASE_LOCK(&sched_mutex);
2225 }
2226
2227 void
2228 exitScheduler(
2229     rtsBool wait_foreign
2230 #if !defined(THREADED_RTS)
2231                          __attribute__((unused))
2232 #endif
2233 )
2234                /* see Capability.c, shutdownCapability() */
2235 {
2236     Task *task = NULL;
2237
2238     ACQUIRE_LOCK(&sched_mutex);
2239     task = newBoundTask();
2240     RELEASE_LOCK(&sched_mutex);
2241
2242     // If we haven't killed all the threads yet, do it now.
2243     if (sched_state < SCHED_SHUTTING_DOWN) {
2244         sched_state = SCHED_INTERRUPTING;
2245         waitForReturnCapability(&task->cap,task);
2246         scheduleDoGC(task->cap,task,rtsFalse);    
2247         releaseCapability(task->cap);
2248     }
2249     sched_state = SCHED_SHUTTING_DOWN;
2250
2251 #if defined(THREADED_RTS)
2252     { 
2253         nat i;
2254         
2255         for (i = 0; i < n_capabilities; i++) {
2256             shutdownCapability(&capabilities[i], task, wait_foreign);
2257         }
2258         boundTaskExiting(task);
2259     }
2260 #endif
2261 }
2262
2263 void
2264 freeScheduler( void )
2265 {
2266     nat still_running;
2267
2268     ACQUIRE_LOCK(&sched_mutex);
2269     still_running = freeTaskManager();
2270     // We can only free the Capabilities if there are no Tasks still
2271     // running.  We might have a Task about to return from a foreign
2272     // call into waitForReturnCapability(), for example (actually,
2273     // this should be the *only* thing that a still-running Task can
2274     // do at this point, and it will block waiting for the
2275     // Capability).
2276     if (still_running == 0) {
2277         freeCapabilities();
2278         if (n_capabilities != 1) {
2279             stgFree(capabilities);
2280         }
2281     }
2282     RELEASE_LOCK(&sched_mutex);
2283 #if defined(THREADED_RTS)
2284     closeMutex(&sched_mutex);
2285 #endif
2286 }
2287
2288 /* -----------------------------------------------------------------------------
2289    performGC
2290
2291    This is the interface to the garbage collector from Haskell land.
2292    We provide this so that external C code can allocate and garbage
2293    collect when called from Haskell via _ccall_GC.
2294    -------------------------------------------------------------------------- */
2295
2296 static void
2297 performGC_(rtsBool force_major)
2298 {
2299     Task *task;
2300
2301     // We must grab a new Task here, because the existing Task may be
2302     // associated with a particular Capability, and chained onto the 
2303     // suspended_ccalling_tasks queue.
2304     ACQUIRE_LOCK(&sched_mutex);
2305     task = newBoundTask();
2306     RELEASE_LOCK(&sched_mutex);
2307
2308     waitForReturnCapability(&task->cap,task);
2309     scheduleDoGC(task->cap,task,force_major);
2310     releaseCapability(task->cap);
2311     boundTaskExiting(task);
2312 }
2313
2314 void
2315 performGC(void)
2316 {
2317     performGC_(rtsFalse);
2318 }
2319
2320 void
2321 performMajorGC(void)
2322 {
2323     performGC_(rtsTrue);
2324 }
2325
2326 /* -----------------------------------------------------------------------------
2327    Stack overflow
2328
2329    If the thread has reached its maximum stack size, then raise the
2330    StackOverflow exception in the offending thread.  Otherwise
2331    relocate the TSO into a larger chunk of memory and adjust its stack
2332    size appropriately.
2333    -------------------------------------------------------------------------- */
2334
2335 static StgTSO *
2336 threadStackOverflow(Capability *cap, StgTSO *tso)
2337 {
2338   nat new_stack_size, stack_words;
2339   lnat new_tso_size;
2340   StgPtr new_sp;
2341   StgTSO *dest;
2342
2343   IF_DEBUG(sanity,checkTSO(tso));
2344
2345   // don't allow throwTo() to modify the blocked_exceptions queue
2346   // while we are moving the TSO:
2347   lockClosure((StgClosure *)tso);
2348
2349   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2350       // NB. never raise a StackOverflow exception if the thread is
2351       // inside Control.Exceptino.block.  It is impractical to protect
2352       // against stack overflow exceptions, since virtually anything
2353       // can raise one (even 'catch'), so this is the only sensible
2354       // thing to do here.  See bug #767.
2355
2356       debugTrace(DEBUG_gc,
2357                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2358                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2359       IF_DEBUG(gc,
2360                /* If we're debugging, just print out the top of the stack */
2361                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2362                                                 tso->sp+64)));
2363
2364       // Send this thread the StackOverflow exception
2365       unlockTSO(tso);
2366       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2367       return tso;
2368   }
2369
2370   /* Try to double the current stack size.  If that takes us over the
2371    * maximum stack size for this thread, then use the maximum instead
2372    * (that is, unless we're already at or over the max size and we
2373    * can't raise the StackOverflow exception (see above), in which
2374    * case just double the size). Finally round up so the TSO ends up as
2375    * a whole number of blocks.
2376    */
2377   if (tso->stack_size >= tso->max_stack_size) {
2378       new_stack_size = tso->stack_size * 2;
2379   } else { 
2380       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2381   }
2382   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2383                                        TSO_STRUCT_SIZE)/sizeof(W_);
2384   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2385   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2386
2387   debugTrace(DEBUG_sched, 
2388              "increasing stack size from %ld words to %d.",
2389              (long)tso->stack_size, new_stack_size);
2390
2391   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2392   TICK_ALLOC_TSO(new_stack_size,0);
2393
2394   /* copy the TSO block and the old stack into the new area */
2395   memcpy(dest,tso,TSO_STRUCT_SIZE);
2396   stack_words = tso->stack + tso->stack_size - tso->sp;
2397   new_sp = (P_)dest + new_tso_size - stack_words;
2398   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2399
2400   /* relocate the stack pointers... */
2401   dest->sp         = new_sp;
2402   dest->stack_size = new_stack_size;
2403         
2404   /* Mark the old TSO as relocated.  We have to check for relocated
2405    * TSOs in the garbage collector and any primops that deal with TSOs.
2406    *
2407    * It's important to set the sp value to just beyond the end
2408    * of the stack, so we don't attempt to scavenge any part of the
2409    * dead TSO's stack.
2410    */
2411   tso->what_next = ThreadRelocated;
2412   setTSOLink(cap,tso,dest);
2413   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2414   tso->why_blocked = NotBlocked;
2415
2416   IF_PAR_DEBUG(verbose,
2417                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2418                      tso->id, tso, tso->stack_size);
2419                /* If we're debugging, just print out the top of the stack */
2420                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2421                                                 tso->sp+64)));
2422   
2423   unlockTSO(dest);
2424   unlockTSO(tso);
2425
2426   IF_DEBUG(sanity,checkTSO(dest));
2427 #if 0
2428   IF_DEBUG(scheduler,printTSO(dest));
2429 #endif
2430
2431   return dest;
2432 }
2433
2434 static StgTSO *
2435 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2436 {
2437     bdescr *bd, *new_bd;
2438     lnat free_w, tso_size_w;
2439     StgTSO *new_tso;
2440
2441     tso_size_w = tso_sizeW(tso);
2442
2443     if (tso_size_w < MBLOCK_SIZE_W || 
2444         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2445     {
2446         return tso;
2447     }
2448
2449     // don't allow throwTo() to modify the blocked_exceptions queue
2450     // while we are moving the TSO:
2451     lockClosure((StgClosure *)tso);
2452
2453     // this is the number of words we'll free
2454     free_w = round_to_mblocks(tso_size_w/2);
2455
2456     bd = Bdescr((StgPtr)tso);
2457     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2458     bd->free = bd->start + TSO_STRUCT_SIZEW;
2459
2460     new_tso = (StgTSO *)new_bd->start;
2461     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2462     new_tso->stack_size = new_bd->free - new_tso->stack;
2463
2464     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2465                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2466
2467     tso->what_next = ThreadRelocated;
2468     tso->_link = new_tso; // no write barrier reqd: same generation
2469
2470     // The TSO attached to this Task may have moved, so update the
2471     // pointer to it.
2472     if (task->tso == tso) {
2473         task->tso = new_tso;
2474     }
2475
2476     unlockTSO(new_tso);
2477     unlockTSO(tso);
2478
2479     IF_DEBUG(sanity,checkTSO(new_tso));
2480
2481     return new_tso;
2482 }
2483
2484 /* ---------------------------------------------------------------------------
2485    Interrupt execution
2486    - usually called inside a signal handler so it mustn't do anything fancy.   
2487    ------------------------------------------------------------------------ */
2488
2489 void
2490 interruptStgRts(void)
2491 {
2492     sched_state = SCHED_INTERRUPTING;
2493     setContextSwitches();
2494     wakeUpRts();
2495 }
2496
2497 /* -----------------------------------------------------------------------------
2498    Wake up the RTS
2499    
2500    This function causes at least one OS thread to wake up and run the
2501    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2502    an external event has arrived that may need servicing (eg. a
2503    keyboard interrupt).
2504
2505    In the single-threaded RTS we don't do anything here; we only have
2506    one thread anyway, and the event that caused us to want to wake up
2507    will have interrupted any blocking system call in progress anyway.
2508    -------------------------------------------------------------------------- */
2509
2510 void
2511 wakeUpRts(void)
2512 {
2513 #if defined(THREADED_RTS)
2514     // This forces the IO Manager thread to wakeup, which will
2515     // in turn ensure that some OS thread wakes up and runs the
2516     // scheduler loop, which will cause a GC and deadlock check.
2517     ioManagerWakeup();
2518 #endif
2519 }
2520
2521 /* -----------------------------------------------------------------------------
2522  * checkBlackHoles()
2523  *
2524  * Check the blackhole_queue for threads that can be woken up.  We do
2525  * this periodically: before every GC, and whenever the run queue is
2526  * empty.
2527  *
2528  * An elegant solution might be to just wake up all the blocked
2529  * threads with awakenBlockedQueue occasionally: they'll go back to
2530  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2531  * doesn't give us a way to tell whether we've actually managed to
2532  * wake up any threads, so we would be busy-waiting.
2533  *
2534  * -------------------------------------------------------------------------- */
2535
2536 static rtsBool
2537 checkBlackHoles (Capability *cap)
2538 {
2539     StgTSO **prev, *t;
2540     rtsBool any_woke_up = rtsFalse;
2541     StgHalfWord type;
2542
2543     // blackhole_queue is global:
2544     ASSERT_LOCK_HELD(&sched_mutex);
2545
2546     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2547
2548     // ASSUMES: sched_mutex
2549     prev = &blackhole_queue;
2550     t = blackhole_queue;
2551     while (t != END_TSO_QUEUE) {
2552         if (t->what_next == ThreadRelocated) {
2553             t = t->_link;
2554             continue;
2555         }
2556         ASSERT(t->why_blocked == BlockedOnBlackHole);
2557         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2558         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2559             IF_DEBUG(sanity,checkTSO(t));
2560             t = unblockOne(cap, t);
2561             *prev = t;
2562             any_woke_up = rtsTrue;
2563         } else {
2564             prev = &t->_link;
2565             t = t->_link;
2566         }
2567     }
2568
2569     return any_woke_up;
2570 }
2571
2572 /* -----------------------------------------------------------------------------
2573    Deleting threads
2574
2575    This is used for interruption (^C) and forking, and corresponds to
2576    raising an exception but without letting the thread catch the
2577    exception.
2578    -------------------------------------------------------------------------- */
2579
2580 static void 
2581 deleteThread (Capability *cap, StgTSO *tso)
2582 {
2583     // NOTE: must only be called on a TSO that we have exclusive
2584     // access to, because we will call throwToSingleThreaded() below.
2585     // The TSO must be on the run queue of the Capability we own, or 
2586     // we must own all Capabilities.
2587
2588     if (tso->why_blocked != BlockedOnCCall &&
2589         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2590         throwToSingleThreaded(cap,tso,NULL);
2591     }
2592 }
2593
2594 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2595 static void 
2596 deleteThread_(Capability *cap, StgTSO *tso)
2597 { // for forkProcess only:
2598   // like deleteThread(), but we delete threads in foreign calls, too.
2599
2600     if (tso->why_blocked == BlockedOnCCall ||
2601         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2602         unblockOne(cap,tso);
2603         tso->what_next = ThreadKilled;
2604     } else {
2605         deleteThread(cap,tso);
2606     }
2607 }
2608 #endif
2609
2610 /* -----------------------------------------------------------------------------
2611    raiseExceptionHelper
2612    
2613    This function is called by the raise# primitve, just so that we can
2614    move some of the tricky bits of raising an exception from C-- into
2615    C.  Who knows, it might be a useful re-useable thing here too.
2616    -------------------------------------------------------------------------- */
2617
2618 StgWord
2619 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2620 {
2621     Capability *cap = regTableToCapability(reg);
2622     StgThunk *raise_closure = NULL;
2623     StgPtr p, next;
2624     StgRetInfoTable *info;
2625     //
2626     // This closure represents the expression 'raise# E' where E
2627     // is the exception raise.  It is used to overwrite all the
2628     // thunks which are currently under evaluataion.
2629     //
2630
2631     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2632     // LDV profiling: stg_raise_info has THUNK as its closure
2633     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2634     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2635     // 1 does not cause any problem unless profiling is performed.
2636     // However, when LDV profiling goes on, we need to linearly scan
2637     // small object pool, where raise_closure is stored, so we should
2638     // use MIN_UPD_SIZE.
2639     //
2640     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2641     //                                 sizeofW(StgClosure)+1);
2642     //
2643
2644     //
2645     // Walk up the stack, looking for the catch frame.  On the way,
2646     // we update any closures pointed to from update frames with the
2647     // raise closure that we just built.
2648     //
2649     p = tso->sp;
2650     while(1) {
2651         info = get_ret_itbl((StgClosure *)p);
2652         next = p + stack_frame_sizeW((StgClosure *)p);
2653         switch (info->i.type) {
2654             
2655         case UPDATE_FRAME:
2656             // Only create raise_closure if we need to.
2657             if (raise_closure == NULL) {
2658                 raise_closure = 
2659                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2660                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2661                 raise_closure->payload[0] = exception;
2662             }
2663             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2664             p = next;
2665             continue;
2666
2667         case ATOMICALLY_FRAME:
2668             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2669             tso->sp = p;
2670             return ATOMICALLY_FRAME;
2671             
2672         case CATCH_FRAME:
2673             tso->sp = p;
2674             return CATCH_FRAME;
2675
2676         case CATCH_STM_FRAME:
2677             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2678             tso->sp = p;
2679             return CATCH_STM_FRAME;
2680             
2681         case STOP_FRAME:
2682             tso->sp = p;
2683             return STOP_FRAME;
2684
2685         case CATCH_RETRY_FRAME:
2686         default:
2687             p = next; 
2688             continue;
2689         }
2690     }
2691 }
2692
2693
2694 /* -----------------------------------------------------------------------------
2695    findRetryFrameHelper
2696
2697    This function is called by the retry# primitive.  It traverses the stack
2698    leaving tso->sp referring to the frame which should handle the retry.  
2699
2700    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2701    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2702
2703    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2704    create) because retries are not considered to be exceptions, despite the
2705    similar implementation.
2706
2707    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2708    not be created within memory transactions.
2709    -------------------------------------------------------------------------- */
2710
2711 StgWord
2712 findRetryFrameHelper (StgTSO *tso)
2713 {
2714   StgPtr           p, next;
2715   StgRetInfoTable *info;
2716
2717   p = tso -> sp;
2718   while (1) {
2719     info = get_ret_itbl((StgClosure *)p);
2720     next = p + stack_frame_sizeW((StgClosure *)p);
2721     switch (info->i.type) {
2722       
2723     case ATOMICALLY_FRAME:
2724         debugTrace(DEBUG_stm,
2725                    "found ATOMICALLY_FRAME at %p during retry", p);
2726         tso->sp = p;
2727         return ATOMICALLY_FRAME;
2728       
2729     case CATCH_RETRY_FRAME:
2730         debugTrace(DEBUG_stm,
2731                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2732         tso->sp = p;
2733         return CATCH_RETRY_FRAME;
2734       
2735     case CATCH_STM_FRAME: {
2736         StgTRecHeader *trec = tso -> trec;
2737         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2738         debugTrace(DEBUG_stm,
2739                    "found CATCH_STM_FRAME at %p during retry", p);
2740         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2741         stmAbortTransaction(tso -> cap, trec);
2742         stmFreeAbortedTRec(tso -> cap, trec);
2743         tso -> trec = outer;
2744         p = next; 
2745         continue;
2746     }
2747       
2748
2749     default:
2750       ASSERT(info->i.type != CATCH_FRAME);
2751       ASSERT(info->i.type != STOP_FRAME);
2752       p = next; 
2753       continue;
2754     }
2755   }
2756 }
2757
2758 /* -----------------------------------------------------------------------------
2759    resurrectThreads is called after garbage collection on the list of
2760    threads found to be garbage.  Each of these threads will be woken
2761    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2762    on an MVar, or NonTermination if the thread was blocked on a Black
2763    Hole.
2764
2765    Locks: assumes we hold *all* the capabilities.
2766    -------------------------------------------------------------------------- */
2767
2768 void
2769 resurrectThreads (StgTSO *threads)
2770 {
2771     StgTSO *tso, *next;
2772     Capability *cap;
2773     step *step;
2774
2775     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2776         next = tso->global_link;
2777
2778         step = Bdescr((P_)tso)->step;
2779         tso->global_link = step->threads;
2780         step->threads = tso;
2781
2782         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2783         
2784         // Wake up the thread on the Capability it was last on
2785         cap = tso->cap;
2786         
2787         switch (tso->why_blocked) {
2788         case BlockedOnMVar:
2789         case BlockedOnException:
2790             /* Called by GC - sched_mutex lock is currently held. */
2791             throwToSingleThreaded(cap, tso,
2792                                   (StgClosure *)blockedOnDeadMVar_closure);
2793             break;
2794         case BlockedOnBlackHole:
2795             throwToSingleThreaded(cap, tso,
2796                                   (StgClosure *)nonTermination_closure);
2797             break;
2798         case BlockedOnSTM:
2799             throwToSingleThreaded(cap, tso,
2800                                   (StgClosure *)blockedIndefinitely_closure);
2801             break;
2802         case NotBlocked:
2803             /* This might happen if the thread was blocked on a black hole
2804              * belonging to a thread that we've just woken up (raiseAsync
2805              * can wake up threads, remember...).
2806              */
2807             continue;
2808         default:
2809             barf("resurrectThreads: thread blocked in a strange way");
2810         }
2811     }
2812 }
2813
2814 /* -----------------------------------------------------------------------------
2815    performPendingThrowTos is called after garbage collection, and
2816    passed a list of threads that were found to have pending throwTos
2817    (tso->blocked_exceptions was not empty), and were blocked.
2818    Normally this doesn't happen, because we would deliver the
2819    exception directly if the target thread is blocked, but there are
2820    small windows where it might occur on a multiprocessor (see
2821    throwTo()).
2822
2823    NB. we must be holding all the capabilities at this point, just
2824    like resurrectThreads().
2825    -------------------------------------------------------------------------- */
2826
2827 void
2828 performPendingThrowTos (StgTSO *threads)
2829 {
2830     StgTSO *tso, *next;
2831     Capability *cap;
2832     step *step;
2833
2834     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2835         next = tso->global_link;
2836
2837         step = Bdescr((P_)tso)->step;
2838         tso->global_link = step->threads;
2839         step->threads = tso;
2840
2841         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2842         
2843         cap = tso->cap;
2844         maybePerformBlockedException(cap, tso);
2845     }
2846 }