don't shrink the stack smaller than the value set by +RTS -k<size>
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #include "GC.h"
35 #include "Weak.h"
36 #include "EventLog.h"
37
38 /* PARALLEL_HASKELL includes go here */
39
40 #include "Sparks.h"
41 #include "Capability.h"
42 #include "Task.h"
43 #include "AwaitEvent.h"
44 #if defined(mingw32_HOST_OS)
45 #include "win32/IOManager.h"
46 #endif
47 #include "Trace.h"
48 #include "RaiseAsync.h"
49 #include "Threads.h"
50 #include "ThrIOManager.h"
51
52 #ifdef HAVE_SYS_TYPES_H
53 #include <sys/types.h>
54 #endif
55 #ifdef HAVE_UNISTD_H
56 #include <unistd.h>
57 #endif
58
59 #include <string.h>
60 #include <stdlib.h>
61 #include <stdarg.h>
62
63 #ifdef HAVE_ERRNO_H
64 #include <errno.h>
65 #endif
66
67 // Turn off inlining when debugging - it obfuscates things
68 #ifdef DEBUG
69 # undef  STATIC_INLINE
70 # define STATIC_INLINE static
71 #endif
72
73 /* -----------------------------------------------------------------------------
74  * Global variables
75  * -------------------------------------------------------------------------- */
76
77 #if !defined(THREADED_RTS)
78 // Blocked/sleeping thrads
79 StgTSO *blocked_queue_hd = NULL;
80 StgTSO *blocked_queue_tl = NULL;
81 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
82 #endif
83
84 /* Threads blocked on blackholes.
85  * LOCK: sched_mutex+capability, or all capabilities
86  */
87 StgTSO *blackhole_queue = NULL;
88
89 /* The blackhole_queue should be checked for threads to wake up.  See
90  * Schedule.h for more thorough comment.
91  * LOCK: none (doesn't matter if we miss an update)
92  */
93 rtsBool blackholes_need_checking = rtsFalse;
94
95 /* Set to true when the latest garbage collection failed to reclaim
96  * enough space, and the runtime should proceed to shut itself down in
97  * an orderly fashion (emitting profiling info etc.)
98  */
99 rtsBool heap_overflow = rtsFalse;
100
101 /* flag that tracks whether we have done any execution in this time slice.
102  * LOCK: currently none, perhaps we should lock (but needs to be
103  * updated in the fast path of the scheduler).
104  *
105  * NB. must be StgWord, we do xchg() on it.
106  */
107 volatile StgWord recent_activity = ACTIVITY_YES;
108
109 /* if this flag is set as well, give up execution
110  * LOCK: none (changes monotonically)
111  */
112 volatile StgWord sched_state = SCHED_RUNNING;
113
114 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
115  *  exists - earlier gccs apparently didn't.
116  *  -= chak
117  */
118 StgTSO dummy_tso;
119
120 /*
121  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
122  * in an MT setting, needed to signal that a worker thread shouldn't hang around
123  * in the scheduler when it is out of work.
124  */
125 rtsBool shutting_down_scheduler = rtsFalse;
126
127 /*
128  * This mutex protects most of the global scheduler data in
129  * the THREADED_RTS runtime.
130  */
131 #if defined(THREADED_RTS)
132 Mutex sched_mutex;
133 #endif
134
135 #if !defined(mingw32_HOST_OS)
136 #define FORKPROCESS_PRIMOP_SUPPORTED
137 #endif
138
139 /* -----------------------------------------------------------------------------
140  * static function prototypes
141  * -------------------------------------------------------------------------- */
142
143 static Capability *schedule (Capability *initialCapability, Task *task);
144
145 //
146 // These function all encapsulate parts of the scheduler loop, and are
147 // abstracted only to make the structure and control flow of the
148 // scheduler clearer.
149 //
150 static void schedulePreLoop (void);
151 static void scheduleFindWork (Capability *cap);
152 #if defined(THREADED_RTS)
153 static void scheduleYield (Capability **pcap, Task *task);
154 #endif
155 static void scheduleStartSignalHandlers (Capability *cap);
156 static void scheduleCheckBlockedThreads (Capability *cap);
157 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
158 static void scheduleCheckBlackHoles (Capability *cap);
159 static void scheduleDetectDeadlock (Capability *cap, Task *task);
160 static void schedulePushWork(Capability *cap, Task *task);
161 #if defined(PARALLEL_HASKELL)
162 static rtsBool scheduleGetRemoteWork(Capability *cap);
163 static void scheduleSendPendingMessages(void);
164 #endif
165 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
166 static void scheduleActivateSpark(Capability *cap);
167 #endif
168 static void schedulePostRunThread(Capability *cap, StgTSO *t);
169 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
170 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
171                                          StgTSO *t);
172 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
173                                     nat prev_what_next );
174 static void scheduleHandleThreadBlocked( StgTSO *t );
175 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
176                                              StgTSO *t );
177 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
178 static Capability *scheduleDoGC(Capability *cap, Task *task,
179                                 rtsBool force_major);
180
181 static rtsBool checkBlackHoles(Capability *cap);
182
183 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
184 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
185
186 static void deleteThread (Capability *cap, StgTSO *tso);
187 static void deleteAllThreads (Capability *cap);
188
189 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
190 static void deleteThread_(Capability *cap, StgTSO *tso);
191 #endif
192
193 #ifdef DEBUG
194 static char *whatNext_strs[] = {
195   "(unknown)",
196   "ThreadRunGHC",
197   "ThreadInterpret",
198   "ThreadKilled",
199   "ThreadRelocated",
200   "ThreadComplete"
201 };
202 #endif
203
204 /* -----------------------------------------------------------------------------
205  * Putting a thread on the run queue: different scheduling policies
206  * -------------------------------------------------------------------------- */
207
208 STATIC_INLINE void
209 addToRunQueue( Capability *cap, StgTSO *t )
210 {
211 #if defined(PARALLEL_HASKELL)
212     if (RtsFlags.ParFlags.doFairScheduling) { 
213         // this does round-robin scheduling; good for concurrency
214         appendToRunQueue(cap,t);
215     } else {
216         // this does unfair scheduling; good for parallelism
217         pushOnRunQueue(cap,t);
218     }
219 #else
220     // this does round-robin scheduling; good for concurrency
221     appendToRunQueue(cap,t);
222 #endif
223 }
224
225 /* ---------------------------------------------------------------------------
226    Main scheduling loop.
227
228    We use round-robin scheduling, each thread returning to the
229    scheduler loop when one of these conditions is detected:
230
231       * out of heap space
232       * timer expires (thread yields)
233       * thread blocks
234       * thread ends
235       * stack overflow
236
237    GRAN version:
238      In a GranSim setup this loop iterates over the global event queue.
239      This revolves around the global event queue, which determines what 
240      to do next. Therefore, it's more complicated than either the 
241      concurrent or the parallel (GUM) setup.
242   This version has been entirely removed (JB 2008/08).
243
244    GUM version:
245      GUM iterates over incoming messages.
246      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
247      and sends out a fish whenever it has nothing to do; in-between
248      doing the actual reductions (shared code below) it processes the
249      incoming messages and deals with delayed operations 
250      (see PendingFetches).
251      This is not the ugliest code you could imagine, but it's bloody close.
252
253   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
254   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
255   as well as future GUM versions. This file has been refurbished to
256   only contain valid code, which is however incomplete, refers to
257   invalid includes etc.
258
259    ------------------------------------------------------------------------ */
260
261 static Capability *
262 schedule (Capability *initialCapability, Task *task)
263 {
264   StgTSO *t;
265   Capability *cap;
266   StgThreadReturnCode ret;
267 #if defined(PARALLEL_HASKELL)
268   rtsBool receivedFinish = rtsFalse;
269 #endif
270   nat prev_what_next;
271   rtsBool ready_to_gc;
272 #if defined(THREADED_RTS)
273   rtsBool first = rtsTrue;
274 #endif
275   
276   cap = initialCapability;
277
278   // Pre-condition: this task owns initialCapability.
279   // The sched_mutex is *NOT* held
280   // NB. on return, we still hold a capability.
281
282   debugTrace (DEBUG_sched, 
283               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
284               task, initialCapability);
285
286   if (running_finalizers) {
287       errorBelch("error: a C finalizer called back into Haskell.\n"
288                  "   This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n"
289                  "   To create finalizers that may call back into Haskll, use\n"
290                  "   Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr.");
291       stg_exit(EXIT_FAILURE);
292   }
293
294   schedulePreLoop();
295
296   // -----------------------------------------------------------
297   // Scheduler loop starts here:
298
299 #if defined(PARALLEL_HASKELL)
300 #define TERMINATION_CONDITION        (!receivedFinish)
301 #else
302 #define TERMINATION_CONDITION        rtsTrue
303 #endif
304
305   while (TERMINATION_CONDITION) {
306
307     // Check whether we have re-entered the RTS from Haskell without
308     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
309     // call).
310     if (cap->in_haskell) {
311           errorBelch("schedule: re-entered unsafely.\n"
312                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
313           stg_exit(EXIT_FAILURE);
314     }
315
316     // The interruption / shutdown sequence.
317     // 
318     // In order to cleanly shut down the runtime, we want to:
319     //   * make sure that all main threads return to their callers
320     //     with the state 'Interrupted'.
321     //   * clean up all OS threads assocated with the runtime
322     //   * free all memory etc.
323     //
324     // So the sequence for ^C goes like this:
325     //
326     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
327     //     arranges for some Capability to wake up
328     //
329     //   * all threads in the system are halted, and the zombies are
330     //     placed on the run queue for cleaning up.  We acquire all
331     //     the capabilities in order to delete the threads, this is
332     //     done by scheduleDoGC() for convenience (because GC already
333     //     needs to acquire all the capabilities).  We can't kill
334     //     threads involved in foreign calls.
335     // 
336     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
337     //
338     //   * sched_state := SCHED_SHUTTING_DOWN
339     //
340     //   * all workers exit when the run queue on their capability
341     //     drains.  All main threads will also exit when their TSO
342     //     reaches the head of the run queue and they can return.
343     //
344     //   * eventually all Capabilities will shut down, and the RTS can
345     //     exit.
346     //
347     //   * We might be left with threads blocked in foreign calls, 
348     //     we should really attempt to kill these somehow (TODO);
349     
350     switch (sched_state) {
351     case SCHED_RUNNING:
352         break;
353     case SCHED_INTERRUPTING:
354         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
355 #if defined(THREADED_RTS)
356         discardSparksCap(cap);
357 #endif
358         /* scheduleDoGC() deletes all the threads */
359         cap = scheduleDoGC(cap,task,rtsFalse);
360
361         // after scheduleDoGC(), we must be shutting down.  Either some
362         // other Capability did the final GC, or we did it above,
363         // either way we can fall through to the SCHED_SHUTTING_DOWN
364         // case now.
365         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
366         // fall through
367
368     case SCHED_SHUTTING_DOWN:
369         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
370         // If we are a worker, just exit.  If we're a bound thread
371         // then we will exit below when we've removed our TSO from
372         // the run queue.
373         if (task->tso == NULL && emptyRunQueue(cap)) {
374             return cap;
375         }
376         break;
377     default:
378         barf("sched_state: %d", sched_state);
379     }
380
381     scheduleFindWork(cap);
382
383     /* work pushing, currently relevant only for THREADED_RTS:
384        (pushes threads, wakes up idle capabilities for stealing) */
385     schedulePushWork(cap,task);
386
387 #if defined(PARALLEL_HASKELL)
388     /* since we perform a blocking receive and continue otherwise,
389        either we never reach here or we definitely have work! */
390     // from here: non-empty run queue
391     ASSERT(!emptyRunQueue(cap));
392
393     if (PacketsWaiting()) {  /* now process incoming messages, if any
394                                 pending...  
395
396                                 CAUTION: scheduleGetRemoteWork called
397                                 above, waits for messages as well! */
398       processMessages(cap, &receivedFinish);
399     }
400 #endif // PARALLEL_HASKELL: non-empty run queue!
401
402     scheduleDetectDeadlock(cap,task);
403
404 #if defined(THREADED_RTS)
405     cap = task->cap;    // reload cap, it might have changed
406 #endif
407
408     // Normally, the only way we can get here with no threads to
409     // run is if a keyboard interrupt received during 
410     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
411     // Additionally, it is not fatal for the
412     // threaded RTS to reach here with no threads to run.
413     //
414     // win32: might be here due to awaitEvent() being abandoned
415     // as a result of a console event having been delivered.
416     
417 #if defined(THREADED_RTS)
418     if (first) 
419     {
420     // XXX: ToDo
421     //     // don't yield the first time, we want a chance to run this
422     //     // thread for a bit, even if there are others banging at the
423     //     // door.
424     //     first = rtsFalse;
425     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
426     }
427
428   yield:
429     scheduleYield(&cap,task);
430     if (emptyRunQueue(cap)) continue; // look for work again
431 #endif
432
433 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
434     if ( emptyRunQueue(cap) ) {
435         ASSERT(sched_state >= SCHED_INTERRUPTING);
436     }
437 #endif
438
439     // 
440     // Get a thread to run
441     //
442     t = popRunQueue(cap);
443
444     // Sanity check the thread we're about to run.  This can be
445     // expensive if there is lots of thread switching going on...
446     IF_DEBUG(sanity,checkTSO(t));
447
448 #if defined(THREADED_RTS)
449     // Check whether we can run this thread in the current task.
450     // If not, we have to pass our capability to the right task.
451     {
452         Task *bound = t->bound;
453       
454         if (bound) {
455             if (bound == task) {
456                 debugTrace(DEBUG_sched,
457                            "### Running thread %lu in bound thread", (unsigned long)t->id);
458                 // yes, the Haskell thread is bound to the current native thread
459             } else {
460                 debugTrace(DEBUG_sched,
461                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
462                 // no, bound to a different Haskell thread: pass to that thread
463                 pushOnRunQueue(cap,t);
464                 continue;
465             }
466         } else {
467             // The thread we want to run is unbound.
468             if (task->tso) { 
469                 debugTrace(DEBUG_sched,
470                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
471                 // no, the current native thread is bound to a different
472                 // Haskell thread, so pass it to any worker thread
473                 pushOnRunQueue(cap,t);
474                 continue; 
475             }
476         }
477     }
478 #endif
479
480     // If we're shutting down, and this thread has not yet been
481     // killed, kill it now.  This sometimes happens when a finalizer
482     // thread is created by the final GC, or a thread previously
483     // in a foreign call returns.
484     if (sched_state >= SCHED_INTERRUPTING &&
485         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
486         deleteThread(cap,t);
487     }
488
489     /* context switches are initiated by the timer signal, unless
490      * the user specified "context switch as often as possible", with
491      * +RTS -C0
492      */
493     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
494         && !emptyThreadQueues(cap)) {
495         cap->context_switch = 1;
496     }
497          
498 run_thread:
499
500     // CurrentTSO is the thread to run.  t might be different if we
501     // loop back to run_thread, so make sure to set CurrentTSO after
502     // that.
503     cap->r.rCurrentTSO = t;
504
505     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
506                               (long)t->id, whatNext_strs[t->what_next]);
507
508     startHeapProfTimer();
509
510     // Check for exceptions blocked on this thread
511     maybePerformBlockedException (cap, t);
512
513     // ----------------------------------------------------------------------
514     // Run the current thread 
515
516     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
517     ASSERT(t->cap == cap);
518     ASSERT(t->bound ? t->bound->cap == cap : 1);
519
520     prev_what_next = t->what_next;
521
522     errno = t->saved_errno;
523 #if mingw32_HOST_OS
524     SetLastError(t->saved_winerror);
525 #endif
526
527     cap->in_haskell = rtsTrue;
528
529     dirty_TSO(cap,t);
530
531 #if defined(THREADED_RTS)
532     if (recent_activity == ACTIVITY_DONE_GC) {
533         // ACTIVITY_DONE_GC means we turned off the timer signal to
534         // conserve power (see #1623).  Re-enable it here.
535         nat prev;
536         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
537         if (prev == ACTIVITY_DONE_GC) {
538             startTimer();
539         }
540     } else {
541         recent_activity = ACTIVITY_YES;
542     }
543 #endif
544
545     postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
546
547     switch (prev_what_next) {
548         
549     case ThreadKilled:
550     case ThreadComplete:
551         /* Thread already finished, return to scheduler. */
552         ret = ThreadFinished;
553         break;
554         
555     case ThreadRunGHC:
556     {
557         StgRegTable *r;
558         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
559         cap = regTableToCapability(r);
560         ret = r->rRet;
561         break;
562     }
563     
564     case ThreadInterpret:
565         cap = interpretBCO(cap);
566         ret = cap->r.rRet;
567         break;
568         
569     default:
570         barf("schedule: invalid what_next field");
571     }
572
573     cap->in_haskell = rtsFalse;
574
575     // The TSO might have moved, eg. if it re-entered the RTS and a GC
576     // happened.  So find the new location:
577     t = cap->r.rCurrentTSO;
578
579     // We have run some Haskell code: there might be blackhole-blocked
580     // threads to wake up now.
581     // Lock-free test here should be ok, we're just setting a flag.
582     if ( blackhole_queue != END_TSO_QUEUE ) {
583         blackholes_need_checking = rtsTrue;
584     }
585     
586     // And save the current errno in this thread.
587     // XXX: possibly bogus for SMP because this thread might already
588     // be running again, see code below.
589     t->saved_errno = errno;
590 #if mingw32_HOST_OS
591     // Similarly for Windows error code
592     t->saved_winerror = GetLastError();
593 #endif
594
595     postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
596
597 #if defined(THREADED_RTS)
598     // If ret is ThreadBlocked, and this Task is bound to the TSO that
599     // blocked, we are in limbo - the TSO is now owned by whatever it
600     // is blocked on, and may in fact already have been woken up,
601     // perhaps even on a different Capability.  It may be the case
602     // that task->cap != cap.  We better yield this Capability
603     // immediately and return to normaility.
604     if (ret == ThreadBlocked) {
605         debugTrace(DEBUG_sched,
606                    "--<< thread %lu (%s) stopped: blocked",
607                    (unsigned long)t->id, whatNext_strs[t->what_next]);
608         goto yield;
609     }
610 #endif
611
612     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
613     ASSERT(t->cap == cap);
614
615     // ----------------------------------------------------------------------
616     
617     // Costs for the scheduler are assigned to CCS_SYSTEM
618     stopHeapProfTimer();
619 #if defined(PROFILING)
620     CCCS = CCS_SYSTEM;
621 #endif
622     
623     schedulePostRunThread(cap,t);
624
625     t = threadStackUnderflow(task,t);
626
627     ready_to_gc = rtsFalse;
628
629     switch (ret) {
630     case HeapOverflow:
631         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
632         break;
633
634     case StackOverflow:
635         scheduleHandleStackOverflow(cap,task,t);
636         break;
637
638     case ThreadYielding:
639         if (scheduleHandleYield(cap, t, prev_what_next)) {
640             // shortcut for switching between compiler/interpreter:
641             goto run_thread; 
642         }
643         break;
644
645     case ThreadBlocked:
646         scheduleHandleThreadBlocked(t);
647         break;
648
649     case ThreadFinished:
650         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
651         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
652         break;
653
654     default:
655       barf("schedule: invalid thread return code %d", (int)ret);
656     }
657
658     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
659       cap = scheduleDoGC(cap,task,rtsFalse);
660     }
661   } /* end of while() */
662 }
663
664 /* ----------------------------------------------------------------------------
665  * Setting up the scheduler loop
666  * ------------------------------------------------------------------------- */
667
668 static void
669 schedulePreLoop(void)
670 {
671   // initialisation for scheduler - what cannot go into initScheduler()  
672 }
673
674 /* -----------------------------------------------------------------------------
675  * scheduleFindWork()
676  *
677  * Search for work to do, and handle messages from elsewhere.
678  * -------------------------------------------------------------------------- */
679
680 static void
681 scheduleFindWork (Capability *cap)
682 {
683     scheduleStartSignalHandlers(cap);
684
685     // Only check the black holes here if we've nothing else to do.
686     // During normal execution, the black hole list only gets checked
687     // at GC time, to avoid repeatedly traversing this possibly long
688     // list each time around the scheduler.
689     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
690
691     scheduleCheckWakeupThreads(cap);
692
693     scheduleCheckBlockedThreads(cap);
694
695 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
696     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
697 #endif
698
699 #if defined(PARALLEL_HASKELL)
700     // if messages have been buffered...
701     scheduleSendPendingMessages();
702 #endif
703
704 #if defined(PARALLEL_HASKELL)
705     if (emptyRunQueue(cap)) {
706         receivedFinish = scheduleGetRemoteWork(cap);
707         continue; //  a new round, (hopefully) with new work
708         /* 
709            in GUM, this a) sends out a FISH and returns IF no fish is
710                            out already
711                         b) (blocking) awaits and receives messages
712            
713            in Eden, this is only the blocking receive, as b) in GUM.
714         */
715     }
716 #endif
717 }
718
719 #if defined(THREADED_RTS)
720 STATIC_INLINE rtsBool
721 shouldYieldCapability (Capability *cap, Task *task)
722 {
723     // we need to yield this capability to someone else if..
724     //   - another thread is initiating a GC
725     //   - another Task is returning from a foreign call
726     //   - the thread at the head of the run queue cannot be run
727     //     by this Task (it is bound to another Task, or it is unbound
728     //     and this task it bound).
729     return (waiting_for_gc || 
730             cap->returning_tasks_hd != NULL ||
731             (!emptyRunQueue(cap) && (task->tso == NULL
732                                      ? cap->run_queue_hd->bound != NULL
733                                      : cap->run_queue_hd->bound != task)));
734 }
735
736 // This is the single place where a Task goes to sleep.  There are
737 // two reasons it might need to sleep:
738 //    - there are no threads to run
739 //    - we need to yield this Capability to someone else 
740 //      (see shouldYieldCapability())
741 //
742 // Careful: the scheduler loop is quite delicate.  Make sure you run
743 // the tests in testsuite/concurrent (all ways) after modifying this,
744 // and also check the benchmarks in nofib/parallel for regressions.
745
746 static void
747 scheduleYield (Capability **pcap, Task *task)
748 {
749     Capability *cap = *pcap;
750
751     // if we have work, and we don't need to give up the Capability, continue.
752     if (!shouldYieldCapability(cap,task) && 
753         (!emptyRunQueue(cap) ||
754          !emptyWakeupQueue(cap) ||
755          blackholes_need_checking ||
756          sched_state >= SCHED_INTERRUPTING))
757         return;
758
759     // otherwise yield (sleep), and keep yielding if necessary.
760     do {
761         yieldCapability(&cap,task);
762     } 
763     while (shouldYieldCapability(cap,task));
764
765     // note there may still be no threads on the run queue at this
766     // point, the caller has to check.
767
768     *pcap = cap;
769     return;
770 }
771 #endif
772     
773 /* -----------------------------------------------------------------------------
774  * schedulePushWork()
775  *
776  * Push work to other Capabilities if we have some.
777  * -------------------------------------------------------------------------- */
778
779 static void
780 schedulePushWork(Capability *cap USED_IF_THREADS, 
781                  Task *task      USED_IF_THREADS)
782 {
783   /* following code not for PARALLEL_HASKELL. I kept the call general,
784      future GUM versions might use pushing in a distributed setup */
785 #if defined(THREADED_RTS)
786
787     Capability *free_caps[n_capabilities], *cap0;
788     nat i, n_free_caps;
789
790     // migration can be turned off with +RTS -qg
791     if (!RtsFlags.ParFlags.migrate) return;
792
793     // Check whether we have more threads on our run queue, or sparks
794     // in our pool, that we could hand to another Capability.
795     if (cap->run_queue_hd == END_TSO_QUEUE) {
796         if (sparkPoolSizeCap(cap) < 2) return;
797     } else {
798         if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
799             sparkPoolSizeCap(cap) < 1) return;
800     }
801
802     // First grab as many free Capabilities as we can.
803     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
804         cap0 = &capabilities[i];
805         if (cap != cap0 && tryGrabCapability(cap0,task)) {
806             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
807                 // it already has some work, we just grabbed it at 
808                 // the wrong moment.  Or maybe it's deadlocked!
809                 releaseCapability(cap0);
810             } else {
811                 free_caps[n_free_caps++] = cap0;
812             }
813         }
814     }
815
816     // we now have n_free_caps free capabilities stashed in
817     // free_caps[].  Share our run queue equally with them.  This is
818     // probably the simplest thing we could do; improvements we might
819     // want to do include:
820     //
821     //   - giving high priority to moving relatively new threads, on 
822     //     the gournds that they haven't had time to build up a
823     //     working set in the cache on this CPU/Capability.
824     //
825     //   - giving low priority to moving long-lived threads
826
827     if (n_free_caps > 0) {
828         StgTSO *prev, *t, *next;
829         rtsBool pushed_to_all;
830
831         debugTrace(DEBUG_sched, 
832                    "cap %d: %s and %d free capabilities, sharing...", 
833                    cap->no, 
834                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
835                    "excess threads on run queue":"sparks to share (>=2)",
836                    n_free_caps);
837
838         i = 0;
839         pushed_to_all = rtsFalse;
840
841         if (cap->run_queue_hd != END_TSO_QUEUE) {
842             prev = cap->run_queue_hd;
843             t = prev->_link;
844             prev->_link = END_TSO_QUEUE;
845             for (; t != END_TSO_QUEUE; t = next) {
846                 next = t->_link;
847                 t->_link = END_TSO_QUEUE;
848                 if (t->what_next == ThreadRelocated
849                     || t->bound == task // don't move my bound thread
850                     || tsoLocked(t)) {  // don't move a locked thread
851                     setTSOLink(cap, prev, t);
852                     prev = t;
853                 } else if (i == n_free_caps) {
854                     pushed_to_all = rtsTrue;
855                     i = 0;
856                     // keep one for us
857                     setTSOLink(cap, prev, t);
858                     prev = t;
859                 } else {
860                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
861                     appendToRunQueue(free_caps[i],t);
862
863         postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
864
865                     if (t->bound) { t->bound->cap = free_caps[i]; }
866                     t->cap = free_caps[i];
867                     i++;
868                 }
869             }
870             cap->run_queue_tl = prev;
871         }
872
873 #ifdef SPARK_PUSHING
874         /* JB I left this code in place, it would work but is not necessary */
875
876         // If there are some free capabilities that we didn't push any
877         // threads to, then try to push a spark to each one.
878         if (!pushed_to_all) {
879             StgClosure *spark;
880             // i is the next free capability to push to
881             for (; i < n_free_caps; i++) {
882                 if (emptySparkPoolCap(free_caps[i])) {
883                     spark = tryStealSpark(cap->sparks);
884                     if (spark != NULL) {
885                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
886
887       postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no);
888
889                         newSpark(&(free_caps[i]->r), spark);
890                     }
891                 }
892             }
893         }
894 #endif /* SPARK_PUSHING */
895
896         // release the capabilities
897         for (i = 0; i < n_free_caps; i++) {
898             task->cap = free_caps[i];
899             releaseAndWakeupCapability(free_caps[i]);
900         }
901     }
902     task->cap = cap; // reset to point to our Capability.
903
904 #endif /* THREADED_RTS */
905
906 }
907
908 /* ----------------------------------------------------------------------------
909  * Start any pending signal handlers
910  * ------------------------------------------------------------------------- */
911
912 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
913 static void
914 scheduleStartSignalHandlers(Capability *cap)
915 {
916     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
917         // safe outside the lock
918         startSignalHandlers(cap);
919     }
920 }
921 #else
922 static void
923 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
924 {
925 }
926 #endif
927
928 /* ----------------------------------------------------------------------------
929  * Check for blocked threads that can be woken up.
930  * ------------------------------------------------------------------------- */
931
932 static void
933 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
934 {
935 #if !defined(THREADED_RTS)
936     //
937     // Check whether any waiting threads need to be woken up.  If the
938     // run queue is empty, and there are no other tasks running, we
939     // can wait indefinitely for something to happen.
940     //
941     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
942     {
943         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
944     }
945 #endif
946 }
947
948
949 /* ----------------------------------------------------------------------------
950  * Check for threads woken up by other Capabilities
951  * ------------------------------------------------------------------------- */
952
953 static void
954 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
955 {
956 #if defined(THREADED_RTS)
957     // Any threads that were woken up by other Capabilities get
958     // appended to our run queue.
959     if (!emptyWakeupQueue(cap)) {
960         ACQUIRE_LOCK(&cap->lock);
961         if (emptyRunQueue(cap)) {
962             cap->run_queue_hd = cap->wakeup_queue_hd;
963             cap->run_queue_tl = cap->wakeup_queue_tl;
964         } else {
965             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
966             cap->run_queue_tl = cap->wakeup_queue_tl;
967         }
968         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
969         RELEASE_LOCK(&cap->lock);
970     }
971 #endif
972 }
973
974 /* ----------------------------------------------------------------------------
975  * Check for threads blocked on BLACKHOLEs that can be woken up
976  * ------------------------------------------------------------------------- */
977 static void
978 scheduleCheckBlackHoles (Capability *cap)
979 {
980     if ( blackholes_need_checking ) // check without the lock first
981     {
982         ACQUIRE_LOCK(&sched_mutex);
983         if ( blackholes_need_checking ) {
984             blackholes_need_checking = rtsFalse;
985             // important that we reset the flag *before* checking the
986             // blackhole queue, otherwise we could get deadlock.  This
987             // happens as follows: we wake up a thread that
988             // immediately runs on another Capability, blocks on a
989             // blackhole, and then we reset the blackholes_need_checking flag.
990             checkBlackHoles(cap);
991         }
992         RELEASE_LOCK(&sched_mutex);
993     }
994 }
995
996 /* ----------------------------------------------------------------------------
997  * Detect deadlock conditions and attempt to resolve them.
998  * ------------------------------------------------------------------------- */
999
1000 static void
1001 scheduleDetectDeadlock (Capability *cap, Task *task)
1002 {
1003
1004 #if defined(PARALLEL_HASKELL)
1005     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
1006     return;
1007 #endif
1008
1009     /* 
1010      * Detect deadlock: when we have no threads to run, there are no
1011      * threads blocked, waiting for I/O, or sleeping, and all the
1012      * other tasks are waiting for work, we must have a deadlock of
1013      * some description.
1014      */
1015     if ( emptyThreadQueues(cap) )
1016     {
1017 #if defined(THREADED_RTS)
1018         /* 
1019          * In the threaded RTS, we only check for deadlock if there
1020          * has been no activity in a complete timeslice.  This means
1021          * we won't eagerly start a full GC just because we don't have
1022          * any threads to run currently.
1023          */
1024         if (recent_activity != ACTIVITY_INACTIVE) return;
1025 #endif
1026
1027         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
1028
1029         // Garbage collection can release some new threads due to
1030         // either (a) finalizers or (b) threads resurrected because
1031         // they are unreachable and will therefore be sent an
1032         // exception.  Any threads thus released will be immediately
1033         // runnable.
1034         cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
1035         // when force_major == rtsTrue. scheduleDoGC sets
1036         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
1037         // signal.
1038
1039         if ( !emptyRunQueue(cap) ) return;
1040
1041 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1042         /* If we have user-installed signal handlers, then wait
1043          * for signals to arrive rather then bombing out with a
1044          * deadlock.
1045          */
1046         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1047             debugTrace(DEBUG_sched,
1048                        "still deadlocked, waiting for signals...");
1049
1050             awaitUserSignals();
1051
1052             if (signals_pending()) {
1053                 startSignalHandlers(cap);
1054             }
1055
1056             // either we have threads to run, or we were interrupted:
1057             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1058
1059             return;
1060         }
1061 #endif
1062
1063 #if !defined(THREADED_RTS)
1064         /* Probably a real deadlock.  Send the current main thread the
1065          * Deadlock exception.
1066          */
1067         if (task->tso) {
1068             switch (task->tso->why_blocked) {
1069             case BlockedOnSTM:
1070             case BlockedOnBlackHole:
1071             case BlockedOnException:
1072             case BlockedOnMVar:
1073                 throwToSingleThreaded(cap, task->tso, 
1074                                       (StgClosure *)nonTermination_closure);
1075                 return;
1076             default:
1077                 barf("deadlock: main thread blocked in a strange way");
1078             }
1079         }
1080         return;
1081 #endif
1082     }
1083 }
1084
1085
1086 /* ----------------------------------------------------------------------------
1087  * Send pending messages (PARALLEL_HASKELL only)
1088  * ------------------------------------------------------------------------- */
1089
1090 #if defined(PARALLEL_HASKELL)
1091 static void
1092 scheduleSendPendingMessages(void)
1093 {
1094
1095 # if defined(PAR) // global Mem.Mgmt., omit for now
1096     if (PendingFetches != END_BF_QUEUE) {
1097         processFetches();
1098     }
1099 # endif
1100     
1101     if (RtsFlags.ParFlags.BufferTime) {
1102         // if we use message buffering, we must send away all message
1103         // packets which have become too old...
1104         sendOldBuffers(); 
1105     }
1106 }
1107 #endif
1108
1109 /* ----------------------------------------------------------------------------
1110  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1111  * ------------------------------------------------------------------------- */
1112
1113 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1114 static void
1115 scheduleActivateSpark(Capability *cap)
1116 {
1117     if (anySparks())
1118     {
1119         createSparkThread(cap);
1120         debugTrace(DEBUG_sched, "creating a spark thread");
1121     }
1122 }
1123 #endif // PARALLEL_HASKELL || THREADED_RTS
1124
1125 /* ----------------------------------------------------------------------------
1126  * Get work from a remote node (PARALLEL_HASKELL only)
1127  * ------------------------------------------------------------------------- */
1128     
1129 #if defined(PARALLEL_HASKELL)
1130 static rtsBool /* return value used in PARALLEL_HASKELL only */
1131 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1132 {
1133 #if defined(PARALLEL_HASKELL)
1134   rtsBool receivedFinish = rtsFalse;
1135
1136   // idle() , i.e. send all buffers, wait for work
1137   if (RtsFlags.ParFlags.BufferTime) {
1138         IF_PAR_DEBUG(verbose, 
1139                 debugBelch("...send all pending data,"));
1140         {
1141           nat i;
1142           for (i=1; i<=nPEs; i++)
1143             sendImmediately(i); // send all messages away immediately
1144         }
1145   }
1146
1147   /* this would be the place for fishing in GUM... 
1148
1149      if (no-earlier-fish-around) 
1150           sendFish(choosePe());
1151    */
1152
1153   // Eden:just look for incoming messages (blocking receive)
1154   IF_PAR_DEBUG(verbose, 
1155                debugBelch("...wait for incoming messages...\n"));
1156   processMessages(cap, &receivedFinish); // blocking receive...
1157
1158
1159   return receivedFinish;
1160   // reenter scheduling look after having received something
1161
1162 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1163
1164   return rtsFalse; /* return value unused in THREADED_RTS */
1165
1166 #endif /* PARALLEL_HASKELL */
1167 }
1168 #endif // PARALLEL_HASKELL || THREADED_RTS
1169
1170 /* ----------------------------------------------------------------------------
1171  * After running a thread...
1172  * ------------------------------------------------------------------------- */
1173
1174 static void
1175 schedulePostRunThread (Capability *cap, StgTSO *t)
1176 {
1177     // We have to be able to catch transactions that are in an
1178     // infinite loop as a result of seeing an inconsistent view of
1179     // memory, e.g. 
1180     //
1181     //   atomically $ do
1182     //       [a,b] <- mapM readTVar [ta,tb]
1183     //       when (a == b) loop
1184     //
1185     // and a is never equal to b given a consistent view of memory.
1186     //
1187     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1188         if (!stmValidateNestOfTransactions (t -> trec)) {
1189             debugTrace(DEBUG_sched | DEBUG_stm,
1190                        "trec %p found wasting its time", t);
1191             
1192             // strip the stack back to the
1193             // ATOMICALLY_FRAME, aborting the (nested)
1194             // transaction, and saving the stack of any
1195             // partially-evaluated thunks on the heap.
1196             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1197             
1198             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1199         }
1200     }
1201
1202   /* some statistics gathering in the parallel case */
1203 }
1204
1205 /* -----------------------------------------------------------------------------
1206  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1207  * -------------------------------------------------------------------------- */
1208
1209 static rtsBool
1210 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1211 {
1212     // did the task ask for a large block?
1213     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1214         // if so, get one and push it on the front of the nursery.
1215         bdescr *bd;
1216         lnat blocks;
1217         
1218         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1219         
1220         debugTrace(DEBUG_sched,
1221                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1222                    (long)t->id, whatNext_strs[t->what_next], blocks);
1223     
1224         // don't do this if the nursery is (nearly) full, we'll GC first.
1225         if (cap->r.rCurrentNursery->link != NULL ||
1226             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1227                                                // if the nursery has only one block.
1228             
1229             ACQUIRE_SM_LOCK
1230             bd = allocGroup( blocks );
1231             RELEASE_SM_LOCK
1232             cap->r.rNursery->n_blocks += blocks;
1233             
1234             // link the new group into the list
1235             bd->link = cap->r.rCurrentNursery;
1236             bd->u.back = cap->r.rCurrentNursery->u.back;
1237             if (cap->r.rCurrentNursery->u.back != NULL) {
1238                 cap->r.rCurrentNursery->u.back->link = bd;
1239             } else {
1240 #if !defined(THREADED_RTS)
1241                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1242                        g0s0 == cap->r.rNursery);
1243 #endif
1244                 cap->r.rNursery->blocks = bd;
1245             }             
1246             cap->r.rCurrentNursery->u.back = bd;
1247             
1248             // initialise it as a nursery block.  We initialise the
1249             // step, gen_no, and flags field of *every* sub-block in
1250             // this large block, because this is easier than making
1251             // sure that we always find the block head of a large
1252             // block whenever we call Bdescr() (eg. evacuate() and
1253             // isAlive() in the GC would both have to do this, at
1254             // least).
1255             { 
1256                 bdescr *x;
1257                 for (x = bd; x < bd + blocks; x++) {
1258                     x->step = cap->r.rNursery;
1259                     x->gen_no = 0;
1260                     x->flags = 0;
1261                 }
1262             }
1263             
1264             // This assert can be a killer if the app is doing lots
1265             // of large block allocations.
1266             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1267             
1268             // now update the nursery to point to the new block
1269             cap->r.rCurrentNursery = bd;
1270             
1271             // we might be unlucky and have another thread get on the
1272             // run queue before us and steal the large block, but in that
1273             // case the thread will just end up requesting another large
1274             // block.
1275             pushOnRunQueue(cap,t);
1276             return rtsFalse;  /* not actually GC'ing */
1277         }
1278     }
1279     
1280     debugTrace(DEBUG_sched,
1281                "--<< thread %ld (%s) stopped: HeapOverflow",
1282                (long)t->id, whatNext_strs[t->what_next]);
1283
1284     if (cap->r.rHpLim == NULL || cap->context_switch) {
1285         // Sometimes we miss a context switch, e.g. when calling
1286         // primitives in a tight loop, MAYBE_GC() doesn't check the
1287         // context switch flag, and we end up waiting for a GC.
1288         // See #1984, and concurrent/should_run/1984
1289         cap->context_switch = 0;
1290         addToRunQueue(cap,t);
1291     } else {
1292         pushOnRunQueue(cap,t);
1293     }
1294     return rtsTrue;
1295     /* actual GC is done at the end of the while loop in schedule() */
1296 }
1297
1298 /* -----------------------------------------------------------------------------
1299  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1300  * -------------------------------------------------------------------------- */
1301
1302 static void
1303 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1304 {
1305     debugTrace (DEBUG_sched,
1306                 "--<< thread %ld (%s) stopped, StackOverflow", 
1307                 (long)t->id, whatNext_strs[t->what_next]);
1308
1309     /* just adjust the stack for this thread, then pop it back
1310      * on the run queue.
1311      */
1312     { 
1313         /* enlarge the stack */
1314         StgTSO *new_t = threadStackOverflow(cap, t);
1315         
1316         /* The TSO attached to this Task may have moved, so update the
1317          * pointer to it.
1318          */
1319         if (task->tso == t) {
1320             task->tso = new_t;
1321         }
1322         pushOnRunQueue(cap,new_t);
1323     }
1324 }
1325
1326 /* -----------------------------------------------------------------------------
1327  * Handle a thread that returned to the scheduler with ThreadYielding
1328  * -------------------------------------------------------------------------- */
1329
1330 static rtsBool
1331 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1332 {
1333     // Reset the context switch flag.  We don't do this just before
1334     // running the thread, because that would mean we would lose ticks
1335     // during GC, which can lead to unfair scheduling (a thread hogs
1336     // the CPU because the tick always arrives during GC).  This way
1337     // penalises threads that do a lot of allocation, but that seems
1338     // better than the alternative.
1339     cap->context_switch = 0;
1340     
1341     /* put the thread back on the run queue.  Then, if we're ready to
1342      * GC, check whether this is the last task to stop.  If so, wake
1343      * up the GC thread.  getThread will block during a GC until the
1344      * GC is finished.
1345      */
1346 #ifdef DEBUG
1347     if (t->what_next != prev_what_next) {
1348         debugTrace(DEBUG_sched,
1349                    "--<< thread %ld (%s) stopped to switch evaluators", 
1350                    (long)t->id, whatNext_strs[t->what_next]);
1351     } else {
1352         debugTrace(DEBUG_sched,
1353                    "--<< thread %ld (%s) stopped, yielding",
1354                    (long)t->id, whatNext_strs[t->what_next]);
1355     }
1356 #endif
1357     
1358     IF_DEBUG(sanity,
1359              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1360              checkTSO(t));
1361     ASSERT(t->_link == END_TSO_QUEUE);
1362     
1363     // Shortcut if we're just switching evaluators: don't bother
1364     // doing stack squeezing (which can be expensive), just run the
1365     // thread.
1366     if (t->what_next != prev_what_next) {
1367         return rtsTrue;
1368     }
1369
1370     addToRunQueue(cap,t);
1371
1372     return rtsFalse;
1373 }
1374
1375 /* -----------------------------------------------------------------------------
1376  * Handle a thread that returned to the scheduler with ThreadBlocked
1377  * -------------------------------------------------------------------------- */
1378
1379 static void
1380 scheduleHandleThreadBlocked( StgTSO *t
1381 #if !defined(GRAN) && !defined(DEBUG)
1382     STG_UNUSED
1383 #endif
1384     )
1385 {
1386
1387       // We don't need to do anything.  The thread is blocked, and it
1388       // has tidied up its stack and placed itself on whatever queue
1389       // it needs to be on.
1390
1391     // ASSERT(t->why_blocked != NotBlocked);
1392     // Not true: for example,
1393     //    - in THREADED_RTS, the thread may already have been woken
1394     //      up by another Capability.  This actually happens: try
1395     //      conc023 +RTS -N2.
1396     //    - the thread may have woken itself up already, because
1397     //      threadPaused() might have raised a blocked throwTo
1398     //      exception, see maybePerformBlockedException().
1399
1400 #ifdef DEBUG
1401     if (traceClass(DEBUG_sched)) {
1402         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1403                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1404         printThreadBlockage(t);
1405         debugTraceEnd();
1406     }
1407 #endif
1408 }
1409
1410 /* -----------------------------------------------------------------------------
1411  * Handle a thread that returned to the scheduler with ThreadFinished
1412  * -------------------------------------------------------------------------- */
1413
1414 static rtsBool
1415 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1416 {
1417     /* Need to check whether this was a main thread, and if so,
1418      * return with the return value.
1419      *
1420      * We also end up here if the thread kills itself with an
1421      * uncaught exception, see Exception.cmm.
1422      */
1423     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1424                (unsigned long)t->id, whatNext_strs[t->what_next]);
1425
1426     // blocked exceptions can now complete, even if the thread was in
1427     // blocked mode (see #2910).  This unconditionally calls
1428     // lockTSO(), which ensures that we don't miss any threads that
1429     // are engaged in throwTo() with this thread as a target.
1430     awakenBlockedExceptionQueue (cap, t);
1431
1432       //
1433       // Check whether the thread that just completed was a bound
1434       // thread, and if so return with the result.  
1435       //
1436       // There is an assumption here that all thread completion goes
1437       // through this point; we need to make sure that if a thread
1438       // ends up in the ThreadKilled state, that it stays on the run
1439       // queue so it can be dealt with here.
1440       //
1441
1442       if (t->bound) {
1443
1444           if (t->bound != task) {
1445 #if !defined(THREADED_RTS)
1446               // Must be a bound thread that is not the topmost one.  Leave
1447               // it on the run queue until the stack has unwound to the
1448               // point where we can deal with this.  Leaving it on the run
1449               // queue also ensures that the garbage collector knows about
1450               // this thread and its return value (it gets dropped from the
1451               // step->threads list so there's no other way to find it).
1452               appendToRunQueue(cap,t);
1453               return rtsFalse;
1454 #else
1455               // this cannot happen in the threaded RTS, because a
1456               // bound thread can only be run by the appropriate Task.
1457               barf("finished bound thread that isn't mine");
1458 #endif
1459           }
1460
1461           ASSERT(task->tso == t);
1462
1463           if (t->what_next == ThreadComplete) {
1464               if (task->ret) {
1465                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1466                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1467               }
1468               task->stat = Success;
1469           } else {
1470               if (task->ret) {
1471                   *(task->ret) = NULL;
1472               }
1473               if (sched_state >= SCHED_INTERRUPTING) {
1474                   if (heap_overflow) {
1475                       task->stat = HeapExhausted;
1476                   } else {
1477                       task->stat = Interrupted;
1478                   }
1479               } else {
1480                   task->stat = Killed;
1481               }
1482           }
1483 #ifdef DEBUG
1484           removeThreadLabel((StgWord)task->tso->id);
1485 #endif
1486           return rtsTrue; // tells schedule() to return
1487       }
1488
1489       return rtsFalse;
1490 }
1491
1492 /* -----------------------------------------------------------------------------
1493  * Perform a heap census
1494  * -------------------------------------------------------------------------- */
1495
1496 static rtsBool
1497 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1498 {
1499     // When we have +RTS -i0 and we're heap profiling, do a census at
1500     // every GC.  This lets us get repeatable runs for debugging.
1501     if (performHeapProfile ||
1502         (RtsFlags.ProfFlags.profileInterval==0 &&
1503          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1504         return rtsTrue;
1505     } else {
1506         return rtsFalse;
1507     }
1508 }
1509
1510 /* -----------------------------------------------------------------------------
1511  * Perform a garbage collection if necessary
1512  * -------------------------------------------------------------------------- */
1513
1514 static Capability *
1515 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1516 {
1517     rtsBool heap_census;
1518 #ifdef THREADED_RTS
1519     /* extern static volatile StgWord waiting_for_gc; 
1520        lives inside capability.c */
1521     rtsBool gc_type, prev_pending_gc;
1522     nat i;
1523 #endif
1524
1525     if (sched_state == SCHED_SHUTTING_DOWN) {
1526         // The final GC has already been done, and the system is
1527         // shutting down.  We'll probably deadlock if we try to GC
1528         // now.
1529         return cap;
1530     }
1531
1532 #ifdef THREADED_RTS
1533     if (sched_state < SCHED_INTERRUPTING
1534         && RtsFlags.ParFlags.parGcEnabled
1535         && N >= RtsFlags.ParFlags.parGcGen
1536         && ! oldest_gen->steps[0].mark)
1537     {
1538         gc_type = PENDING_GC_PAR;
1539     } else {
1540         gc_type = PENDING_GC_SEQ;
1541     }
1542
1543     // In order to GC, there must be no threads running Haskell code.
1544     // Therefore, the GC thread needs to hold *all* the capabilities,
1545     // and release them after the GC has completed.  
1546     //
1547     // This seems to be the simplest way: previous attempts involved
1548     // making all the threads with capabilities give up their
1549     // capabilities and sleep except for the *last* one, which
1550     // actually did the GC.  But it's quite hard to arrange for all
1551     // the other tasks to sleep and stay asleep.
1552     //
1553
1554     /*  Other capabilities are prevented from running yet more Haskell
1555         threads if waiting_for_gc is set. Tested inside
1556         yieldCapability() and releaseCapability() in Capability.c */
1557
1558     prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1559     if (prev_pending_gc) {
1560         do {
1561             debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
1562                        prev_pending_gc);
1563             ASSERT(cap);
1564             yieldCapability(&cap,task);
1565         } while (waiting_for_gc);
1566         return cap;  // NOTE: task->cap might have changed here
1567     }
1568
1569     setContextSwitches();
1570
1571     // The final shutdown GC is always single-threaded, because it's
1572     // possible that some of the Capabilities have no worker threads.
1573     
1574     if (gc_type == PENDING_GC_SEQ)
1575     {
1576         postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
1577         // single-threaded GC: grab all the capabilities
1578         for (i=0; i < n_capabilities; i++) {
1579             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1580             if (cap != &capabilities[i]) {
1581                 Capability *pcap = &capabilities[i];
1582                 // we better hope this task doesn't get migrated to
1583                 // another Capability while we're waiting for this one.
1584                 // It won't, because load balancing happens while we have
1585                 // all the Capabilities, but even so it's a slightly
1586                 // unsavoury invariant.
1587                 task->cap = pcap;
1588                 waitForReturnCapability(&pcap, task);
1589                 if (pcap != &capabilities[i]) {
1590                     barf("scheduleDoGC: got the wrong capability");
1591                 }
1592             }
1593         }
1594     }
1595     else
1596     {
1597         // multi-threaded GC: make sure all the Capabilities donate one
1598         // GC thread each.
1599         postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
1600         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1601
1602         waitForGcThreads(cap);
1603     }
1604 #endif
1605
1606     // so this happens periodically:
1607     if (cap) scheduleCheckBlackHoles(cap);
1608     
1609     IF_DEBUG(scheduler, printAllThreads());
1610
1611 delete_threads_and_gc:
1612     /*
1613      * We now have all the capabilities; if we're in an interrupting
1614      * state, then we should take the opportunity to delete all the
1615      * threads in the system.
1616      */
1617     if (sched_state == SCHED_INTERRUPTING) {
1618         deleteAllThreads(cap);
1619         sched_state = SCHED_SHUTTING_DOWN;
1620     }
1621     
1622     heap_census = scheduleNeedHeapProfile(rtsTrue);
1623
1624 #if defined(THREADED_RTS)
1625     postEvent(cap, EVENT_GC_START, 0, 0);
1626     debugTrace(DEBUG_sched, "doing GC");
1627     // reset waiting_for_gc *before* GC, so that when the GC threads
1628     // emerge they don't immediately re-enter the GC.
1629     waiting_for_gc = 0;
1630     GarbageCollect(force_major || heap_census, gc_type, cap);
1631 #else
1632     GarbageCollect(force_major || heap_census, 0, cap);
1633 #endif
1634     postEvent(cap, EVENT_GC_END, 0, 0);
1635
1636     if (recent_activity == ACTIVITY_INACTIVE && force_major)
1637     {
1638         // We are doing a GC because the system has been idle for a
1639         // timeslice and we need to check for deadlock.  Record the
1640         // fact that we've done a GC and turn off the timer signal;
1641         // it will get re-enabled if we run any threads after the GC.
1642         recent_activity = ACTIVITY_DONE_GC;
1643         stopTimer();
1644     }
1645     else
1646     {
1647         // the GC might have taken long enough for the timer to set
1648         // recent_activity = ACTIVITY_INACTIVE, but we aren't
1649         // necessarily deadlocked:
1650         recent_activity = ACTIVITY_YES;
1651     }
1652
1653 #if defined(THREADED_RTS)
1654     if (gc_type == PENDING_GC_PAR)
1655     {
1656         releaseGCThreads(cap);
1657     }
1658 #endif
1659
1660     if (heap_census) {
1661         debugTrace(DEBUG_sched, "performing heap census");
1662         heapCensus();
1663         performHeapProfile = rtsFalse;
1664     }
1665
1666     if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1667         // GC set the heap_overflow flag, so we should proceed with
1668         // an orderly shutdown now.  Ultimately we want the main
1669         // thread to return to its caller with HeapExhausted, at which
1670         // point the caller should call hs_exit().  The first step is
1671         // to delete all the threads.
1672         //
1673         // Another way to do this would be to raise an exception in
1674         // the main thread, which we really should do because it gives
1675         // the program a chance to clean up.  But how do we find the
1676         // main thread?  It should presumably be the same one that
1677         // gets ^C exceptions, but that's all done on the Haskell side
1678         // (GHC.TopHandler).
1679         sched_state = SCHED_INTERRUPTING;
1680         goto delete_threads_and_gc;
1681     }
1682
1683 #ifdef SPARKBALANCE
1684     /* JB 
1685        Once we are all together... this would be the place to balance all
1686        spark pools. No concurrent stealing or adding of new sparks can
1687        occur. Should be defined in Sparks.c. */
1688     balanceSparkPoolsCaps(n_capabilities, capabilities);
1689 #endif
1690
1691 #if defined(THREADED_RTS)
1692     if (gc_type == PENDING_GC_SEQ) {
1693         // release our stash of capabilities.
1694         for (i = 0; i < n_capabilities; i++) {
1695             if (cap != &capabilities[i]) {
1696                 task->cap = &capabilities[i];
1697                 releaseCapability(&capabilities[i]);
1698             }
1699         }
1700     }
1701     if (cap) {
1702         task->cap = cap;
1703     } else {
1704         task->cap = NULL;
1705     }
1706 #endif
1707
1708     return cap;
1709 }
1710
1711 /* ---------------------------------------------------------------------------
1712  * Singleton fork(). Do not copy any running threads.
1713  * ------------------------------------------------------------------------- */
1714
1715 pid_t
1716 forkProcess(HsStablePtr *entry
1717 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1718             STG_UNUSED
1719 #endif
1720            )
1721 {
1722 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1723     Task *task;
1724     pid_t pid;
1725     StgTSO* t,*next;
1726     Capability *cap;
1727     nat s;
1728     
1729 #if defined(THREADED_RTS)
1730     if (RtsFlags.ParFlags.nNodes > 1) {
1731         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1732         stg_exit(EXIT_FAILURE);
1733     }
1734 #endif
1735
1736     debugTrace(DEBUG_sched, "forking!");
1737     
1738     // ToDo: for SMP, we should probably acquire *all* the capabilities
1739     cap = rts_lock();
1740     
1741     // no funny business: hold locks while we fork, otherwise if some
1742     // other thread is holding a lock when the fork happens, the data
1743     // structure protected by the lock will forever be in an
1744     // inconsistent state in the child.  See also #1391.
1745     ACQUIRE_LOCK(&sched_mutex);
1746     ACQUIRE_LOCK(&cap->lock);
1747     ACQUIRE_LOCK(&cap->running_task->lock);
1748
1749     pid = fork();
1750     
1751     if (pid) { // parent
1752         
1753         RELEASE_LOCK(&sched_mutex);
1754         RELEASE_LOCK(&cap->lock);
1755         RELEASE_LOCK(&cap->running_task->lock);
1756
1757         // just return the pid
1758         rts_unlock(cap);
1759         return pid;
1760         
1761     } else { // child
1762         
1763 #if defined(THREADED_RTS)
1764         initMutex(&sched_mutex);
1765         initMutex(&cap->lock);
1766         initMutex(&cap->running_task->lock);
1767 #endif
1768
1769         // Now, all OS threads except the thread that forked are
1770         // stopped.  We need to stop all Haskell threads, including
1771         // those involved in foreign calls.  Also we need to delete
1772         // all Tasks, because they correspond to OS threads that are
1773         // now gone.
1774
1775         for (s = 0; s < total_steps; s++) {
1776           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1777             if (t->what_next == ThreadRelocated) {
1778                 next = t->_link;
1779             } else {
1780                 next = t->global_link;
1781                 // don't allow threads to catch the ThreadKilled
1782                 // exception, but we do want to raiseAsync() because these
1783                 // threads may be evaluating thunks that we need later.
1784                 deleteThread_(cap,t);
1785             }
1786           }
1787         }
1788         
1789         // Empty the run queue.  It seems tempting to let all the
1790         // killed threads stay on the run queue as zombies to be
1791         // cleaned up later, but some of them correspond to bound
1792         // threads for which the corresponding Task does not exist.
1793         cap->run_queue_hd = END_TSO_QUEUE;
1794         cap->run_queue_tl = END_TSO_QUEUE;
1795
1796         // Any suspended C-calling Tasks are no more, their OS threads
1797         // don't exist now:
1798         cap->suspended_ccalling_tasks = NULL;
1799
1800         // Empty the threads lists.  Otherwise, the garbage
1801         // collector may attempt to resurrect some of these threads.
1802         for (s = 0; s < total_steps; s++) {
1803             all_steps[s].threads = END_TSO_QUEUE;
1804         }
1805
1806         // Wipe the task list, except the current Task.
1807         ACQUIRE_LOCK(&sched_mutex);
1808         for (task = all_tasks; task != NULL; task=task->all_link) {
1809             if (task != cap->running_task) {
1810 #if defined(THREADED_RTS)
1811                 initMutex(&task->lock); // see #1391
1812 #endif
1813                 discardTask(task);
1814             }
1815         }
1816         RELEASE_LOCK(&sched_mutex);
1817
1818 #if defined(THREADED_RTS)
1819         // Wipe our spare workers list, they no longer exist.  New
1820         // workers will be created if necessary.
1821         cap->spare_workers = NULL;
1822         cap->returning_tasks_hd = NULL;
1823         cap->returning_tasks_tl = NULL;
1824 #endif
1825
1826         // On Unix, all timers are reset in the child, so we need to start
1827         // the timer again.
1828         initTimer();
1829         startTimer();
1830
1831         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1832         rts_checkSchedStatus("forkProcess",cap);
1833         
1834         rts_unlock(cap);
1835         hs_exit();                      // clean up and exit
1836         stg_exit(EXIT_SUCCESS);
1837     }
1838 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1839     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1840     return -1;
1841 #endif
1842 }
1843
1844 /* ---------------------------------------------------------------------------
1845  * Delete all the threads in the system
1846  * ------------------------------------------------------------------------- */
1847    
1848 static void
1849 deleteAllThreads ( Capability *cap )
1850 {
1851     // NOTE: only safe to call if we own all capabilities.
1852
1853     StgTSO* t, *next;
1854     nat s;
1855
1856     debugTrace(DEBUG_sched,"deleting all threads");
1857     for (s = 0; s < total_steps; s++) {
1858       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1859         if (t->what_next == ThreadRelocated) {
1860             next = t->_link;
1861         } else {
1862             next = t->global_link;
1863             deleteThread(cap,t);
1864         }
1865       }
1866     }      
1867
1868     // The run queue now contains a bunch of ThreadKilled threads.  We
1869     // must not throw these away: the main thread(s) will be in there
1870     // somewhere, and the main scheduler loop has to deal with it.
1871     // Also, the run queue is the only thing keeping these threads from
1872     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1873
1874 #if !defined(THREADED_RTS)
1875     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1876     ASSERT(sleeping_queue == END_TSO_QUEUE);
1877 #endif
1878 }
1879
1880 /* -----------------------------------------------------------------------------
1881    Managing the suspended_ccalling_tasks list.
1882    Locks required: sched_mutex
1883    -------------------------------------------------------------------------- */
1884
1885 STATIC_INLINE void
1886 suspendTask (Capability *cap, Task *task)
1887 {
1888     ASSERT(task->next == NULL && task->prev == NULL);
1889     task->next = cap->suspended_ccalling_tasks;
1890     task->prev = NULL;
1891     if (cap->suspended_ccalling_tasks) {
1892         cap->suspended_ccalling_tasks->prev = task;
1893     }
1894     cap->suspended_ccalling_tasks = task;
1895 }
1896
1897 STATIC_INLINE void
1898 recoverSuspendedTask (Capability *cap, Task *task)
1899 {
1900     if (task->prev) {
1901         task->prev->next = task->next;
1902     } else {
1903         ASSERT(cap->suspended_ccalling_tasks == task);
1904         cap->suspended_ccalling_tasks = task->next;
1905     }
1906     if (task->next) {
1907         task->next->prev = task->prev;
1908     }
1909     task->next = task->prev = NULL;
1910 }
1911
1912 /* ---------------------------------------------------------------------------
1913  * Suspending & resuming Haskell threads.
1914  * 
1915  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1916  * its capability before calling the C function.  This allows another
1917  * task to pick up the capability and carry on running Haskell
1918  * threads.  It also means that if the C call blocks, it won't lock
1919  * the whole system.
1920  *
1921  * The Haskell thread making the C call is put to sleep for the
1922  * duration of the call, on the susepended_ccalling_threads queue.  We
1923  * give out a token to the task, which it can use to resume the thread
1924  * on return from the C function.
1925  * ------------------------------------------------------------------------- */
1926    
1927 void *
1928 suspendThread (StgRegTable *reg)
1929 {
1930   Capability *cap;
1931   int saved_errno;
1932   StgTSO *tso;
1933   Task *task;
1934 #if mingw32_HOST_OS
1935   StgWord32 saved_winerror;
1936 #endif
1937
1938   saved_errno = errno;
1939 #if mingw32_HOST_OS
1940   saved_winerror = GetLastError();
1941 #endif
1942
1943   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1944    */
1945   cap = regTableToCapability(reg);
1946
1947   task = cap->running_task;
1948   tso = cap->r.rCurrentTSO;
1949
1950   postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
1951   debugTrace(DEBUG_sched, 
1952              "thread %lu did a safe foreign call", 
1953              (unsigned long)cap->r.rCurrentTSO->id);
1954
1955   // XXX this might not be necessary --SDM
1956   tso->what_next = ThreadRunGHC;
1957
1958   threadPaused(cap,tso);
1959
1960   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1961       tso->why_blocked = BlockedOnCCall;
1962       tso->flags |= TSO_BLOCKEX;
1963       tso->flags &= ~TSO_INTERRUPTIBLE;
1964   } else {
1965       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1966   }
1967
1968   // Hand back capability
1969   task->suspended_tso = tso;
1970
1971   ACQUIRE_LOCK(&cap->lock);
1972
1973   suspendTask(cap,task);
1974   cap->in_haskell = rtsFalse;
1975   releaseCapability_(cap,rtsFalse);
1976   
1977   RELEASE_LOCK(&cap->lock);
1978
1979 #if defined(THREADED_RTS)
1980   /* Preparing to leave the RTS, so ensure there's a native thread/task
1981      waiting to take over.
1982   */
1983   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1984 #endif
1985
1986   errno = saved_errno;
1987 #if mingw32_HOST_OS
1988   SetLastError(saved_winerror);
1989 #endif
1990   return task;
1991 }
1992
1993 StgRegTable *
1994 resumeThread (void *task_)
1995 {
1996     StgTSO *tso;
1997     Capability *cap;
1998     Task *task = task_;
1999     int saved_errno;
2000 #if mingw32_HOST_OS
2001     StgWord32 saved_winerror;
2002 #endif
2003
2004     saved_errno = errno;
2005 #if mingw32_HOST_OS
2006     saved_winerror = GetLastError();
2007 #endif
2008
2009     cap = task->cap;
2010     // Wait for permission to re-enter the RTS with the result.
2011     waitForReturnCapability(&cap,task);
2012     // we might be on a different capability now... but if so, our
2013     // entry on the suspended_ccalling_tasks list will also have been
2014     // migrated.
2015
2016     // Remove the thread from the suspended list
2017     recoverSuspendedTask(cap,task);
2018
2019     tso = task->suspended_tso;
2020     task->suspended_tso = NULL;
2021     tso->_link = END_TSO_QUEUE; // no write barrier reqd
2022
2023     postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
2024     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2025     
2026     if (tso->why_blocked == BlockedOnCCall) {
2027         // avoid locking the TSO if we don't have to
2028         if (tso->blocked_exceptions != END_TSO_QUEUE) {
2029             awakenBlockedExceptionQueue(cap,tso);
2030         }
2031         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2032     }
2033     
2034     /* Reset blocking status */
2035     tso->why_blocked  = NotBlocked;
2036     
2037     cap->r.rCurrentTSO = tso;
2038     cap->in_haskell = rtsTrue;
2039     errno = saved_errno;
2040 #if mingw32_HOST_OS
2041     SetLastError(saved_winerror);
2042 #endif
2043
2044     /* We might have GC'd, mark the TSO dirty again */
2045     dirty_TSO(cap,tso);
2046
2047     IF_DEBUG(sanity, checkTSO(tso));
2048
2049     return &cap->r;
2050 }
2051
2052 /* ---------------------------------------------------------------------------
2053  * scheduleThread()
2054  *
2055  * scheduleThread puts a thread on the end  of the runnable queue.
2056  * This will usually be done immediately after a thread is created.
2057  * The caller of scheduleThread must create the thread using e.g.
2058  * createThread and push an appropriate closure
2059  * on this thread's stack before the scheduler is invoked.
2060  * ------------------------------------------------------------------------ */
2061
2062 void
2063 scheduleThread(Capability *cap, StgTSO *tso)
2064 {
2065     // The thread goes at the *end* of the run-queue, to avoid possible
2066     // starvation of any threads already on the queue.
2067     appendToRunQueue(cap,tso);
2068 }
2069
2070 void
2071 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2072 {
2073 #if defined(THREADED_RTS)
2074     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2075                               // move this thread from now on.
2076     cpu %= RtsFlags.ParFlags.nNodes;
2077     if (cpu == cap->no) {
2078         appendToRunQueue(cap,tso);
2079     } else {
2080         postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
2081         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
2082     }
2083 #else
2084     appendToRunQueue(cap,tso);
2085 #endif
2086 }
2087
2088 Capability *
2089 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2090 {
2091     Task *task;
2092
2093     // We already created/initialised the Task
2094     task = cap->running_task;
2095
2096     // This TSO is now a bound thread; make the Task and TSO
2097     // point to each other.
2098     tso->bound = task;
2099     tso->cap = cap;
2100
2101     task->tso = tso;
2102     task->ret = ret;
2103     task->stat = NoStatus;
2104
2105     appendToRunQueue(cap,tso);
2106
2107     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2108
2109     cap = schedule(cap,task);
2110
2111     ASSERT(task->stat != NoStatus);
2112     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2113
2114     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2115     return cap;
2116 }
2117
2118 /* ----------------------------------------------------------------------------
2119  * Starting Tasks
2120  * ------------------------------------------------------------------------- */
2121
2122 #if defined(THREADED_RTS)
2123 void OSThreadProcAttr
2124 workerStart(Task *task)
2125 {
2126     Capability *cap;
2127
2128     // See startWorkerTask().
2129     ACQUIRE_LOCK(&task->lock);
2130     cap = task->cap;
2131     RELEASE_LOCK(&task->lock);
2132
2133     if (RtsFlags.ParFlags.setAffinity) {
2134         setThreadAffinity(cap->no, n_capabilities);
2135     }
2136
2137     // set the thread-local pointer to the Task:
2138     taskEnter(task);
2139
2140     // schedule() runs without a lock.
2141     cap = schedule(cap,task);
2142
2143     // On exit from schedule(), we have a Capability, but possibly not
2144     // the same one we started with.
2145
2146     // During shutdown, the requirement is that after all the
2147     // Capabilities are shut down, all workers that are shutting down
2148     // have finished workerTaskStop().  This is why we hold on to
2149     // cap->lock until we've finished workerTaskStop() below.
2150     //
2151     // There may be workers still involved in foreign calls; those
2152     // will just block in waitForReturnCapability() because the
2153     // Capability has been shut down.
2154     //
2155     ACQUIRE_LOCK(&cap->lock);
2156     releaseCapability_(cap,rtsFalse);
2157     workerTaskStop(task);
2158     RELEASE_LOCK(&cap->lock);
2159 }
2160 #endif
2161
2162 /* ---------------------------------------------------------------------------
2163  * initScheduler()
2164  *
2165  * Initialise the scheduler.  This resets all the queues - if the
2166  * queues contained any threads, they'll be garbage collected at the
2167  * next pass.
2168  *
2169  * ------------------------------------------------------------------------ */
2170
2171 void 
2172 initScheduler(void)
2173 {
2174 #if !defined(THREADED_RTS)
2175   blocked_queue_hd  = END_TSO_QUEUE;
2176   blocked_queue_tl  = END_TSO_QUEUE;
2177   sleeping_queue    = END_TSO_QUEUE;
2178 #endif
2179
2180   blackhole_queue   = END_TSO_QUEUE;
2181
2182   sched_state    = SCHED_RUNNING;
2183   recent_activity = ACTIVITY_YES;
2184
2185 #if defined(THREADED_RTS)
2186   /* Initialise the mutex and condition variables used by
2187    * the scheduler. */
2188   initMutex(&sched_mutex);
2189 #endif
2190   
2191   ACQUIRE_LOCK(&sched_mutex);
2192
2193   /* A capability holds the state a native thread needs in
2194    * order to execute STG code. At least one capability is
2195    * floating around (only THREADED_RTS builds have more than one).
2196    */
2197   initCapabilities();
2198
2199   initTaskManager();
2200
2201 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2202   initSparkPools();
2203 #endif
2204
2205 #if defined(THREADED_RTS)
2206   /*
2207    * Eagerly start one worker to run each Capability, except for
2208    * Capability 0.  The idea is that we're probably going to start a
2209    * bound thread on Capability 0 pretty soon, so we don't want a
2210    * worker task hogging it.
2211    */
2212   { 
2213       nat i;
2214       Capability *cap;
2215       for (i = 1; i < n_capabilities; i++) {
2216           cap = &capabilities[i];
2217           ACQUIRE_LOCK(&cap->lock);
2218           startWorkerTask(cap, workerStart);
2219           RELEASE_LOCK(&cap->lock);
2220       }
2221   }
2222 #endif
2223
2224   RELEASE_LOCK(&sched_mutex);
2225 }
2226
2227 void
2228 exitScheduler(
2229     rtsBool wait_foreign
2230 #if !defined(THREADED_RTS)
2231                          __attribute__((unused))
2232 #endif
2233 )
2234                /* see Capability.c, shutdownCapability() */
2235 {
2236     Task *task = NULL;
2237
2238     task = newBoundTask();
2239
2240     // If we haven't killed all the threads yet, do it now.
2241     if (sched_state < SCHED_SHUTTING_DOWN) {
2242         sched_state = SCHED_INTERRUPTING;
2243         waitForReturnCapability(&task->cap,task);
2244         scheduleDoGC(task->cap,task,rtsFalse);    
2245         releaseCapability(task->cap);
2246     }
2247     sched_state = SCHED_SHUTTING_DOWN;
2248
2249 #if defined(THREADED_RTS)
2250     { 
2251         nat i;
2252         
2253         for (i = 0; i < n_capabilities; i++) {
2254             shutdownCapability(&capabilities[i], task, wait_foreign);
2255         }
2256         boundTaskExiting(task);
2257     }
2258 #endif
2259 }
2260
2261 void
2262 freeScheduler( void )
2263 {
2264     nat still_running;
2265
2266     ACQUIRE_LOCK(&sched_mutex);
2267     still_running = freeTaskManager();
2268     // We can only free the Capabilities if there are no Tasks still
2269     // running.  We might have a Task about to return from a foreign
2270     // call into waitForReturnCapability(), for example (actually,
2271     // this should be the *only* thing that a still-running Task can
2272     // do at this point, and it will block waiting for the
2273     // Capability).
2274     if (still_running == 0) {
2275         freeCapabilities();
2276         if (n_capabilities != 1) {
2277             stgFree(capabilities);
2278         }
2279     }
2280     RELEASE_LOCK(&sched_mutex);
2281 #if defined(THREADED_RTS)
2282     closeMutex(&sched_mutex);
2283 #endif
2284 }
2285
2286 /* -----------------------------------------------------------------------------
2287    performGC
2288
2289    This is the interface to the garbage collector from Haskell land.
2290    We provide this so that external C code can allocate and garbage
2291    collect when called from Haskell via _ccall_GC.
2292    -------------------------------------------------------------------------- */
2293
2294 static void
2295 performGC_(rtsBool force_major)
2296 {
2297     Task *task;
2298
2299     // We must grab a new Task here, because the existing Task may be
2300     // associated with a particular Capability, and chained onto the 
2301     // suspended_ccalling_tasks queue.
2302     task = newBoundTask();
2303
2304     waitForReturnCapability(&task->cap,task);
2305     scheduleDoGC(task->cap,task,force_major);
2306     releaseCapability(task->cap);
2307     boundTaskExiting(task);
2308 }
2309
2310 void
2311 performGC(void)
2312 {
2313     performGC_(rtsFalse);
2314 }
2315
2316 void
2317 performMajorGC(void)
2318 {
2319     performGC_(rtsTrue);
2320 }
2321
2322 /* -----------------------------------------------------------------------------
2323    Stack overflow
2324
2325    If the thread has reached its maximum stack size, then raise the
2326    StackOverflow exception in the offending thread.  Otherwise
2327    relocate the TSO into a larger chunk of memory and adjust its stack
2328    size appropriately.
2329    -------------------------------------------------------------------------- */
2330
2331 static StgTSO *
2332 threadStackOverflow(Capability *cap, StgTSO *tso)
2333 {
2334   nat new_stack_size, stack_words;
2335   lnat new_tso_size;
2336   StgPtr new_sp;
2337   StgTSO *dest;
2338
2339   IF_DEBUG(sanity,checkTSO(tso));
2340
2341   // don't allow throwTo() to modify the blocked_exceptions queue
2342   // while we are moving the TSO:
2343   lockClosure((StgClosure *)tso);
2344
2345   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2346       // NB. never raise a StackOverflow exception if the thread is
2347       // inside Control.Exceptino.block.  It is impractical to protect
2348       // against stack overflow exceptions, since virtually anything
2349       // can raise one (even 'catch'), so this is the only sensible
2350       // thing to do here.  See bug #767.
2351
2352       debugTrace(DEBUG_gc,
2353                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2354                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2355       IF_DEBUG(gc,
2356                /* If we're debugging, just print out the top of the stack */
2357                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2358                                                 tso->sp+64)));
2359
2360       // Send this thread the StackOverflow exception
2361       unlockTSO(tso);
2362       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2363       return tso;
2364   }
2365
2366   /* Try to double the current stack size.  If that takes us over the
2367    * maximum stack size for this thread, then use the maximum instead
2368    * (that is, unless we're already at or over the max size and we
2369    * can't raise the StackOverflow exception (see above), in which
2370    * case just double the size). Finally round up so the TSO ends up as
2371    * a whole number of blocks.
2372    */
2373   if (tso->stack_size >= tso->max_stack_size) {
2374       new_stack_size = tso->stack_size * 2;
2375   } else { 
2376       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2377   }
2378   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2379                                        TSO_STRUCT_SIZE)/sizeof(W_);
2380   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2381   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2382
2383   debugTrace(DEBUG_sched, 
2384              "increasing stack size from %ld words to %d.",
2385              (long)tso->stack_size, new_stack_size);
2386
2387   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2388   TICK_ALLOC_TSO(new_stack_size,0);
2389
2390   /* copy the TSO block and the old stack into the new area */
2391   memcpy(dest,tso,TSO_STRUCT_SIZE);
2392   stack_words = tso->stack + tso->stack_size - tso->sp;
2393   new_sp = (P_)dest + new_tso_size - stack_words;
2394   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2395
2396   /* relocate the stack pointers... */
2397   dest->sp         = new_sp;
2398   dest->stack_size = new_stack_size;
2399         
2400   /* Mark the old TSO as relocated.  We have to check for relocated
2401    * TSOs in the garbage collector and any primops that deal with TSOs.
2402    *
2403    * It's important to set the sp value to just beyond the end
2404    * of the stack, so we don't attempt to scavenge any part of the
2405    * dead TSO's stack.
2406    */
2407   tso->what_next = ThreadRelocated;
2408   setTSOLink(cap,tso,dest);
2409   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2410   tso->why_blocked = NotBlocked;
2411
2412   IF_PAR_DEBUG(verbose,
2413                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2414                      tso->id, tso, tso->stack_size);
2415                /* If we're debugging, just print out the top of the stack */
2416                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2417                                                 tso->sp+64)));
2418   
2419   unlockTSO(dest);
2420   unlockTSO(tso);
2421
2422   IF_DEBUG(sanity,checkTSO(dest));
2423 #if 0
2424   IF_DEBUG(scheduler,printTSO(dest));
2425 #endif
2426
2427   return dest;
2428 }
2429
2430 static StgTSO *
2431 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2432 {
2433     bdescr *bd, *new_bd;
2434     lnat free_w, tso_size_w;
2435     StgTSO *new_tso;
2436
2437     tso_size_w = tso_sizeW(tso);
2438
2439     if (tso_size_w < MBLOCK_SIZE_W ||
2440           // TSO is less than 2 mblocks (since the first mblock is
2441           // shorter than MBLOCK_SIZE_W)
2442         (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 ||
2443           // or TSO is not a whole number of megablocks (ensuring
2444           // precondition of splitLargeBlock() below)
2445         (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) ||
2446           // or TSO is smaller than the minimum stack size (rounded up)
2447         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2448           // or stack is using more than 1/4 of the available space
2449     {
2450         // then do nothing
2451         return tso;
2452     }
2453
2454     // don't allow throwTo() to modify the blocked_exceptions queue
2455     // while we are moving the TSO:
2456     lockClosure((StgClosure *)tso);
2457
2458     // this is the number of words we'll free
2459     free_w = round_to_mblocks(tso_size_w/2);
2460
2461     bd = Bdescr((StgPtr)tso);
2462     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2463     bd->free = bd->start + TSO_STRUCT_SIZEW;
2464
2465     new_tso = (StgTSO *)new_bd->start;
2466     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2467     new_tso->stack_size = new_bd->free - new_tso->stack;
2468
2469     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2470                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2471
2472     tso->what_next = ThreadRelocated;
2473     tso->_link = new_tso; // no write barrier reqd: same generation
2474
2475     // The TSO attached to this Task may have moved, so update the
2476     // pointer to it.
2477     if (task->tso == tso) {
2478         task->tso = new_tso;
2479     }
2480
2481     unlockTSO(new_tso);
2482     unlockTSO(tso);
2483
2484     IF_DEBUG(sanity,checkTSO(new_tso));
2485
2486     return new_tso;
2487 }
2488
2489 /* ---------------------------------------------------------------------------
2490    Interrupt execution
2491    - usually called inside a signal handler so it mustn't do anything fancy.   
2492    ------------------------------------------------------------------------ */
2493
2494 void
2495 interruptStgRts(void)
2496 {
2497     sched_state = SCHED_INTERRUPTING;
2498     setContextSwitches();
2499     wakeUpRts();
2500 }
2501
2502 /* -----------------------------------------------------------------------------
2503    Wake up the RTS
2504    
2505    This function causes at least one OS thread to wake up and run the
2506    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2507    an external event has arrived that may need servicing (eg. a
2508    keyboard interrupt).
2509
2510    In the single-threaded RTS we don't do anything here; we only have
2511    one thread anyway, and the event that caused us to want to wake up
2512    will have interrupted any blocking system call in progress anyway.
2513    -------------------------------------------------------------------------- */
2514
2515 void
2516 wakeUpRts(void)
2517 {
2518 #if defined(THREADED_RTS)
2519     // This forces the IO Manager thread to wakeup, which will
2520     // in turn ensure that some OS thread wakes up and runs the
2521     // scheduler loop, which will cause a GC and deadlock check.
2522     ioManagerWakeup();
2523 #endif
2524 }
2525
2526 /* -----------------------------------------------------------------------------
2527  * checkBlackHoles()
2528  *
2529  * Check the blackhole_queue for threads that can be woken up.  We do
2530  * this periodically: before every GC, and whenever the run queue is
2531  * empty.
2532  *
2533  * An elegant solution might be to just wake up all the blocked
2534  * threads with awakenBlockedQueue occasionally: they'll go back to
2535  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2536  * doesn't give us a way to tell whether we've actually managed to
2537  * wake up any threads, so we would be busy-waiting.
2538  *
2539  * -------------------------------------------------------------------------- */
2540
2541 static rtsBool
2542 checkBlackHoles (Capability *cap)
2543 {
2544     StgTSO **prev, *t;
2545     rtsBool any_woke_up = rtsFalse;
2546     StgHalfWord type;
2547
2548     // blackhole_queue is global:
2549     ASSERT_LOCK_HELD(&sched_mutex);
2550
2551     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2552
2553     // ASSUMES: sched_mutex
2554     prev = &blackhole_queue;
2555     t = blackhole_queue;
2556     while (t != END_TSO_QUEUE) {
2557         if (t->what_next == ThreadRelocated) {
2558             t = t->_link;
2559             continue;
2560         }
2561         ASSERT(t->why_blocked == BlockedOnBlackHole);
2562         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2563         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2564             IF_DEBUG(sanity,checkTSO(t));
2565             t = unblockOne(cap, t);
2566             *prev = t;
2567             any_woke_up = rtsTrue;
2568         } else {
2569             prev = &t->_link;
2570             t = t->_link;
2571         }
2572     }
2573
2574     return any_woke_up;
2575 }
2576
2577 /* -----------------------------------------------------------------------------
2578    Deleting threads
2579
2580    This is used for interruption (^C) and forking, and corresponds to
2581    raising an exception but without letting the thread catch the
2582    exception.
2583    -------------------------------------------------------------------------- */
2584
2585 static void 
2586 deleteThread (Capability *cap, StgTSO *tso)
2587 {
2588     // NOTE: must only be called on a TSO that we have exclusive
2589     // access to, because we will call throwToSingleThreaded() below.
2590     // The TSO must be on the run queue of the Capability we own, or 
2591     // we must own all Capabilities.
2592
2593     if (tso->why_blocked != BlockedOnCCall &&
2594         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2595         throwToSingleThreaded(cap,tso,NULL);
2596     }
2597 }
2598
2599 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2600 static void 
2601 deleteThread_(Capability *cap, StgTSO *tso)
2602 { // for forkProcess only:
2603   // like deleteThread(), but we delete threads in foreign calls, too.
2604
2605     if (tso->why_blocked == BlockedOnCCall ||
2606         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2607         unblockOne(cap,tso);
2608         tso->what_next = ThreadKilled;
2609     } else {
2610         deleteThread(cap,tso);
2611     }
2612 }
2613 #endif
2614
2615 /* -----------------------------------------------------------------------------
2616    raiseExceptionHelper
2617    
2618    This function is called by the raise# primitve, just so that we can
2619    move some of the tricky bits of raising an exception from C-- into
2620    C.  Who knows, it might be a useful re-useable thing here too.
2621    -------------------------------------------------------------------------- */
2622
2623 StgWord
2624 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2625 {
2626     Capability *cap = regTableToCapability(reg);
2627     StgThunk *raise_closure = NULL;
2628     StgPtr p, next;
2629     StgRetInfoTable *info;
2630     //
2631     // This closure represents the expression 'raise# E' where E
2632     // is the exception raise.  It is used to overwrite all the
2633     // thunks which are currently under evaluataion.
2634     //
2635
2636     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2637     // LDV profiling: stg_raise_info has THUNK as its closure
2638     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2639     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2640     // 1 does not cause any problem unless profiling is performed.
2641     // However, when LDV profiling goes on, we need to linearly scan
2642     // small object pool, where raise_closure is stored, so we should
2643     // use MIN_UPD_SIZE.
2644     //
2645     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2646     //                                 sizeofW(StgClosure)+1);
2647     //
2648
2649     //
2650     // Walk up the stack, looking for the catch frame.  On the way,
2651     // we update any closures pointed to from update frames with the
2652     // raise closure that we just built.
2653     //
2654     p = tso->sp;
2655     while(1) {
2656         info = get_ret_itbl((StgClosure *)p);
2657         next = p + stack_frame_sizeW((StgClosure *)p);
2658         switch (info->i.type) {
2659             
2660         case UPDATE_FRAME:
2661             // Only create raise_closure if we need to.
2662             if (raise_closure == NULL) {
2663                 raise_closure = 
2664                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2665                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2666                 raise_closure->payload[0] = exception;
2667             }
2668             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2669             p = next;
2670             continue;
2671
2672         case ATOMICALLY_FRAME:
2673             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2674             tso->sp = p;
2675             return ATOMICALLY_FRAME;
2676             
2677         case CATCH_FRAME:
2678             tso->sp = p;
2679             return CATCH_FRAME;
2680
2681         case CATCH_STM_FRAME:
2682             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2683             tso->sp = p;
2684             return CATCH_STM_FRAME;
2685             
2686         case STOP_FRAME:
2687             tso->sp = p;
2688             return STOP_FRAME;
2689
2690         case CATCH_RETRY_FRAME:
2691         default:
2692             p = next; 
2693             continue;
2694         }
2695     }
2696 }
2697
2698
2699 /* -----------------------------------------------------------------------------
2700    findRetryFrameHelper
2701
2702    This function is called by the retry# primitive.  It traverses the stack
2703    leaving tso->sp referring to the frame which should handle the retry.  
2704
2705    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2706    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2707
2708    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2709    create) because retries are not considered to be exceptions, despite the
2710    similar implementation.
2711
2712    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2713    not be created within memory transactions.
2714    -------------------------------------------------------------------------- */
2715
2716 StgWord
2717 findRetryFrameHelper (StgTSO *tso)
2718 {
2719   StgPtr           p, next;
2720   StgRetInfoTable *info;
2721
2722   p = tso -> sp;
2723   while (1) {
2724     info = get_ret_itbl((StgClosure *)p);
2725     next = p + stack_frame_sizeW((StgClosure *)p);
2726     switch (info->i.type) {
2727       
2728     case ATOMICALLY_FRAME:
2729         debugTrace(DEBUG_stm,
2730                    "found ATOMICALLY_FRAME at %p during retry", p);
2731         tso->sp = p;
2732         return ATOMICALLY_FRAME;
2733       
2734     case CATCH_RETRY_FRAME:
2735         debugTrace(DEBUG_stm,
2736                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2737         tso->sp = p;
2738         return CATCH_RETRY_FRAME;
2739       
2740     case CATCH_STM_FRAME: {
2741         StgTRecHeader *trec = tso -> trec;
2742         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2743         debugTrace(DEBUG_stm,
2744                    "found CATCH_STM_FRAME at %p during retry", p);
2745         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2746         stmAbortTransaction(tso -> cap, trec);
2747         stmFreeAbortedTRec(tso -> cap, trec);
2748         tso -> trec = outer;
2749         p = next; 
2750         continue;
2751     }
2752       
2753
2754     default:
2755       ASSERT(info->i.type != CATCH_FRAME);
2756       ASSERT(info->i.type != STOP_FRAME);
2757       p = next; 
2758       continue;
2759     }
2760   }
2761 }
2762
2763 /* -----------------------------------------------------------------------------
2764    resurrectThreads is called after garbage collection on the list of
2765    threads found to be garbage.  Each of these threads will be woken
2766    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2767    on an MVar, or NonTermination if the thread was blocked on a Black
2768    Hole.
2769
2770    Locks: assumes we hold *all* the capabilities.
2771    -------------------------------------------------------------------------- */
2772
2773 void
2774 resurrectThreads (StgTSO *threads)
2775 {
2776     StgTSO *tso, *next;
2777     Capability *cap;
2778     step *step;
2779
2780     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2781         next = tso->global_link;
2782
2783         step = Bdescr((P_)tso)->step;
2784         tso->global_link = step->threads;
2785         step->threads = tso;
2786
2787         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2788         
2789         // Wake up the thread on the Capability it was last on
2790         cap = tso->cap;
2791         
2792         switch (tso->why_blocked) {
2793         case BlockedOnMVar:
2794         case BlockedOnException:
2795             /* Called by GC - sched_mutex lock is currently held. */
2796             throwToSingleThreaded(cap, tso,
2797                                   (StgClosure *)blockedOnDeadMVar_closure);
2798             break;
2799         case BlockedOnBlackHole:
2800             throwToSingleThreaded(cap, tso,
2801                                   (StgClosure *)nonTermination_closure);
2802             break;
2803         case BlockedOnSTM:
2804             throwToSingleThreaded(cap, tso,
2805                                   (StgClosure *)blockedIndefinitely_closure);
2806             break;
2807         case NotBlocked:
2808             /* This might happen if the thread was blocked on a black hole
2809              * belonging to a thread that we've just woken up (raiseAsync
2810              * can wake up threads, remember...).
2811              */
2812             continue;
2813         default:
2814             barf("resurrectThreads: thread blocked in a strange way");
2815         }
2816     }
2817 }
2818
2819 /* -----------------------------------------------------------------------------
2820    performPendingThrowTos is called after garbage collection, and
2821    passed a list of threads that were found to have pending throwTos
2822    (tso->blocked_exceptions was not empty), and were blocked.
2823    Normally this doesn't happen, because we would deliver the
2824    exception directly if the target thread is blocked, but there are
2825    small windows where it might occur on a multiprocessor (see
2826    throwTo()).
2827
2828    NB. we must be holding all the capabilities at this point, just
2829    like resurrectThreads().
2830    -------------------------------------------------------------------------- */
2831
2832 void
2833 performPendingThrowTos (StgTSO *threads)
2834 {
2835     StgTSO *tso, *next;
2836     Capability *cap;
2837     step *step;
2838
2839     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2840         next = tso->global_link;
2841
2842         step = Bdescr((P_)tso)->step;
2843         tso->global_link = step->threads;
2844         step->threads = tso;
2845
2846         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2847         
2848         cap = tso->cap;
2849         maybePerformBlockedException(cap, tso);
2850     }
2851 }