don't yield if the system is shutting down
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  */
96 nat recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99  * LOCK: none (changes once, from false->true)
100  */
101 rtsBool sched_state = SCHED_RUNNING;
102
103 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
104  *  exists - earlier gccs apparently didn't.
105  *  -= chak
106  */
107 StgTSO dummy_tso;
108
109 /*
110  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111  * in an MT setting, needed to signal that a worker thread shouldn't hang around
112  * in the scheduler when it is out of work.
113  */
114 rtsBool shutting_down_scheduler = rtsFalse;
115
116 /*
117  * This mutex protects most of the global scheduler data in
118  * the THREADED_RTS runtime.
119  */
120 #if defined(THREADED_RTS)
121 Mutex sched_mutex;
122 #endif
123
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
126 #endif
127
128 /* -----------------------------------------------------------------------------
129  * static function prototypes
130  * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability *cap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
143 #endif
144 static void scheduleStartSignalHandlers (Capability *cap);
145 static void scheduleCheckBlockedThreads (Capability *cap);
146 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
147 static void scheduleCheckBlackHoles (Capability *cap);
148 static void scheduleDetectDeadlock (Capability *cap, Task *task);
149 static void schedulePushWork(Capability *cap, Task *task);
150 #if defined(PARALLEL_HASKELL)
151 static rtsBool scheduleGetRemoteWork(Capability *cap);
152 static void scheduleSendPendingMessages(void);
153 #endif
154 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
155 static void scheduleActivateSpark(Capability *cap);
156 #endif
157 static void schedulePostRunThread(Capability *cap, StgTSO *t);
158 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
159 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
160                                          StgTSO *t);
161 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
162                                     nat prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
165                                              StgTSO *t );
166 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
167 static Capability *scheduleDoGC(Capability *cap, Task *task,
168                                 rtsBool force_major);
169
170 static rtsBool checkBlackHoles(Capability *cap);
171
172 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
173 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
174
175 static void deleteThread (Capability *cap, StgTSO *tso);
176 static void deleteAllThreads (Capability *cap);
177
178 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
179 static void deleteThread_(Capability *cap, StgTSO *tso);
180 #endif
181
182 #ifdef DEBUG
183 static char *whatNext_strs[] = {
184   "(unknown)",
185   "ThreadRunGHC",
186   "ThreadInterpret",
187   "ThreadKilled",
188   "ThreadRelocated",
189   "ThreadComplete"
190 };
191 #endif
192
193 /* -----------------------------------------------------------------------------
194  * Putting a thread on the run queue: different scheduling policies
195  * -------------------------------------------------------------------------- */
196
197 STATIC_INLINE void
198 addToRunQueue( Capability *cap, StgTSO *t )
199 {
200 #if defined(PARALLEL_HASKELL)
201     if (RtsFlags.ParFlags.doFairScheduling) { 
202         // this does round-robin scheduling; good for concurrency
203         appendToRunQueue(cap,t);
204     } else {
205         // this does unfair scheduling; good for parallelism
206         pushOnRunQueue(cap,t);
207     }
208 #else
209     // this does round-robin scheduling; good for concurrency
210     appendToRunQueue(cap,t);
211 #endif
212 }
213
214 /* ---------------------------------------------------------------------------
215    Main scheduling loop.
216
217    We use round-robin scheduling, each thread returning to the
218    scheduler loop when one of these conditions is detected:
219
220       * out of heap space
221       * timer expires (thread yields)
222       * thread blocks
223       * thread ends
224       * stack overflow
225
226    GRAN version:
227      In a GranSim setup this loop iterates over the global event queue.
228      This revolves around the global event queue, which determines what 
229      to do next. Therefore, it's more complicated than either the 
230      concurrent or the parallel (GUM) setup.
231   This version has been entirely removed (JB 2008/08).
232
233    GUM version:
234      GUM iterates over incoming messages.
235      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
236      and sends out a fish whenever it has nothing to do; in-between
237      doing the actual reductions (shared code below) it processes the
238      incoming messages and deals with delayed operations 
239      (see PendingFetches).
240      This is not the ugliest code you could imagine, but it's bloody close.
241
242   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
243   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
244   as well as future GUM versions. This file has been refurbished to
245   only contain valid code, which is however incomplete, refers to
246   invalid includes etc.
247
248    ------------------------------------------------------------------------ */
249
250 static Capability *
251 schedule (Capability *initialCapability, Task *task)
252 {
253   StgTSO *t;
254   Capability *cap;
255   StgThreadReturnCode ret;
256 #if defined(PARALLEL_HASKELL)
257   rtsBool receivedFinish = rtsFalse;
258 #endif
259   nat prev_what_next;
260   rtsBool ready_to_gc;
261 #if defined(THREADED_RTS)
262   rtsBool first = rtsTrue;
263 #endif
264   
265   cap = initialCapability;
266
267   // Pre-condition: this task owns initialCapability.
268   // The sched_mutex is *NOT* held
269   // NB. on return, we still hold a capability.
270
271   debugTrace (DEBUG_sched, 
272               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
273               task, initialCapability);
274
275   schedulePreLoop();
276
277   // -----------------------------------------------------------
278   // Scheduler loop starts here:
279
280 #if defined(PARALLEL_HASKELL)
281 #define TERMINATION_CONDITION        (!receivedFinish)
282 #else
283 #define TERMINATION_CONDITION        rtsTrue
284 #endif
285
286   while (TERMINATION_CONDITION) {
287
288     // Check whether we have re-entered the RTS from Haskell without
289     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
290     // call).
291     if (cap->in_haskell) {
292           errorBelch("schedule: re-entered unsafely.\n"
293                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
294           stg_exit(EXIT_FAILURE);
295     }
296
297     // The interruption / shutdown sequence.
298     // 
299     // In order to cleanly shut down the runtime, we want to:
300     //   * make sure that all main threads return to their callers
301     //     with the state 'Interrupted'.
302     //   * clean up all OS threads assocated with the runtime
303     //   * free all memory etc.
304     //
305     // So the sequence for ^C goes like this:
306     //
307     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
308     //     arranges for some Capability to wake up
309     //
310     //   * all threads in the system are halted, and the zombies are
311     //     placed on the run queue for cleaning up.  We acquire all
312     //     the capabilities in order to delete the threads, this is
313     //     done by scheduleDoGC() for convenience (because GC already
314     //     needs to acquire all the capabilities).  We can't kill
315     //     threads involved in foreign calls.
316     // 
317     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
318     //
319     //   * sched_state := SCHED_SHUTTING_DOWN
320     //
321     //   * all workers exit when the run queue on their capability
322     //     drains.  All main threads will also exit when their TSO
323     //     reaches the head of the run queue and they can return.
324     //
325     //   * eventually all Capabilities will shut down, and the RTS can
326     //     exit.
327     //
328     //   * We might be left with threads blocked in foreign calls, 
329     //     we should really attempt to kill these somehow (TODO);
330     
331     switch (sched_state) {
332     case SCHED_RUNNING:
333         break;
334     case SCHED_INTERRUPTING:
335         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
336 #if defined(THREADED_RTS)
337         discardSparksCap(cap);
338 #endif
339         /* scheduleDoGC() deletes all the threads */
340         cap = scheduleDoGC(cap,task,rtsFalse);
341         break;
342     case SCHED_SHUTTING_DOWN:
343         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
344         // If we are a worker, just exit.  If we're a bound thread
345         // then we will exit below when we've removed our TSO from
346         // the run queue.
347         if (task->tso == NULL && emptyRunQueue(cap)) {
348             return cap;
349         }
350         break;
351     default:
352         barf("sched_state: %d", sched_state);
353     }
354
355     scheduleFindWork(cap);
356
357     /* work pushing, currently relevant only for THREADED_RTS:
358        (pushes threads, wakes up idle capabilities for stealing) */
359     schedulePushWork(cap,task);
360
361 #if defined(PARALLEL_HASKELL)
362     /* since we perform a blocking receive and continue otherwise,
363        either we never reach here or we definitely have work! */
364     // from here: non-empty run queue
365     ASSERT(!emptyRunQueue(cap));
366
367     if (PacketsWaiting()) {  /* now process incoming messages, if any
368                                 pending...  
369
370                                 CAUTION: scheduleGetRemoteWork called
371                                 above, waits for messages as well! */
372       processMessages(cap, &receivedFinish);
373     }
374 #endif // PARALLEL_HASKELL: non-empty run queue!
375
376     scheduleDetectDeadlock(cap,task);
377
378 #if defined(THREADED_RTS)
379     cap = task->cap;    // reload cap, it might have changed
380 #endif
381
382     // Normally, the only way we can get here with no threads to
383     // run is if a keyboard interrupt received during 
384     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
385     // Additionally, it is not fatal for the
386     // threaded RTS to reach here with no threads to run.
387     //
388     // win32: might be here due to awaitEvent() being abandoned
389     // as a result of a console event having been delivered.
390     
391 #if defined(THREADED_RTS)
392     if (first) 
393     {
394     // XXX: ToDo
395     //     // don't yield the first time, we want a chance to run this
396     //     // thread for a bit, even if there are others banging at the
397     //     // door.
398     //     first = rtsFalse;
399     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
400     }
401
402   yield:
403     scheduleYield(&cap,task);
404     if (emptyRunQueue(cap)) continue; // look for work again
405 #endif
406
407 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
408     if ( emptyRunQueue(cap) ) {
409         ASSERT(sched_state >= SCHED_INTERRUPTING);
410     }
411 #endif
412
413     // 
414     // Get a thread to run
415     //
416     t = popRunQueue(cap);
417
418     // Sanity check the thread we're about to run.  This can be
419     // expensive if there is lots of thread switching going on...
420     IF_DEBUG(sanity,checkTSO(t));
421
422 #if defined(THREADED_RTS)
423     // Check whether we can run this thread in the current task.
424     // If not, we have to pass our capability to the right task.
425     {
426         Task *bound = t->bound;
427       
428         if (bound) {
429             if (bound == task) {
430                 debugTrace(DEBUG_sched,
431                            "### Running thread %lu in bound thread", (unsigned long)t->id);
432                 // yes, the Haskell thread is bound to the current native thread
433             } else {
434                 debugTrace(DEBUG_sched,
435                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
436                 // no, bound to a different Haskell thread: pass to that thread
437                 pushOnRunQueue(cap,t);
438                 continue;
439             }
440         } else {
441             // The thread we want to run is unbound.
442             if (task->tso) { 
443                 debugTrace(DEBUG_sched,
444                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
445                 // no, the current native thread is bound to a different
446                 // Haskell thread, so pass it to any worker thread
447                 pushOnRunQueue(cap,t);
448                 continue; 
449             }
450         }
451     }
452 #endif
453
454     /* context switches are initiated by the timer signal, unless
455      * the user specified "context switch as often as possible", with
456      * +RTS -C0
457      */
458     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
459         && !emptyThreadQueues(cap)) {
460         cap->context_switch = 1;
461     }
462          
463 run_thread:
464
465     // CurrentTSO is the thread to run.  t might be different if we
466     // loop back to run_thread, so make sure to set CurrentTSO after
467     // that.
468     cap->r.rCurrentTSO = t;
469
470     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
471                               (long)t->id, whatNext_strs[t->what_next]);
472
473     startHeapProfTimer();
474
475     // Check for exceptions blocked on this thread
476     maybePerformBlockedException (cap, t);
477
478     // ----------------------------------------------------------------------
479     // Run the current thread 
480
481     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
482     ASSERT(t->cap == cap);
483     ASSERT(t->bound ? t->bound->cap == cap : 1);
484
485     prev_what_next = t->what_next;
486
487     errno = t->saved_errno;
488 #if mingw32_HOST_OS
489     SetLastError(t->saved_winerror);
490 #endif
491
492     cap->in_haskell = rtsTrue;
493
494     dirty_TSO(cap,t);
495
496 #if defined(THREADED_RTS)
497     if (recent_activity == ACTIVITY_DONE_GC) {
498         // ACTIVITY_DONE_GC means we turned off the timer signal to
499         // conserve power (see #1623).  Re-enable it here.
500         nat prev;
501         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
502         if (prev == ACTIVITY_DONE_GC) {
503             startTimer();
504         }
505     } else {
506         recent_activity = ACTIVITY_YES;
507     }
508 #endif
509
510     switch (prev_what_next) {
511         
512     case ThreadKilled:
513     case ThreadComplete:
514         /* Thread already finished, return to scheduler. */
515         ret = ThreadFinished;
516         break;
517         
518     case ThreadRunGHC:
519     {
520         StgRegTable *r;
521         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
522         cap = regTableToCapability(r);
523         ret = r->rRet;
524         break;
525     }
526     
527     case ThreadInterpret:
528         cap = interpretBCO(cap);
529         ret = cap->r.rRet;
530         break;
531         
532     default:
533         barf("schedule: invalid what_next field");
534     }
535
536     cap->in_haskell = rtsFalse;
537
538     // The TSO might have moved, eg. if it re-entered the RTS and a GC
539     // happened.  So find the new location:
540     t = cap->r.rCurrentTSO;
541
542     // We have run some Haskell code: there might be blackhole-blocked
543     // threads to wake up now.
544     // Lock-free test here should be ok, we're just setting a flag.
545     if ( blackhole_queue != END_TSO_QUEUE ) {
546         blackholes_need_checking = rtsTrue;
547     }
548     
549     // And save the current errno in this thread.
550     // XXX: possibly bogus for SMP because this thread might already
551     // be running again, see code below.
552     t->saved_errno = errno;
553 #if mingw32_HOST_OS
554     // Similarly for Windows error code
555     t->saved_winerror = GetLastError();
556 #endif
557
558 #if defined(THREADED_RTS)
559     // If ret is ThreadBlocked, and this Task is bound to the TSO that
560     // blocked, we are in limbo - the TSO is now owned by whatever it
561     // is blocked on, and may in fact already have been woken up,
562     // perhaps even on a different Capability.  It may be the case
563     // that task->cap != cap.  We better yield this Capability
564     // immediately and return to normaility.
565     if (ret == ThreadBlocked) {
566         debugTrace(DEBUG_sched,
567                    "--<< thread %lu (%s) stopped: blocked",
568                    (unsigned long)t->id, whatNext_strs[t->what_next]);
569         goto yield;
570     }
571 #endif
572
573     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
574     ASSERT(t->cap == cap);
575
576     // ----------------------------------------------------------------------
577     
578     // Costs for the scheduler are assigned to CCS_SYSTEM
579     stopHeapProfTimer();
580 #if defined(PROFILING)
581     CCCS = CCS_SYSTEM;
582 #endif
583     
584     schedulePostRunThread(cap,t);
585
586     t = threadStackUnderflow(task,t);
587
588     ready_to_gc = rtsFalse;
589
590     switch (ret) {
591     case HeapOverflow:
592         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
593         break;
594
595     case StackOverflow:
596         scheduleHandleStackOverflow(cap,task,t);
597         break;
598
599     case ThreadYielding:
600         if (scheduleHandleYield(cap, t, prev_what_next)) {
601             // shortcut for switching between compiler/interpreter:
602             goto run_thread; 
603         }
604         break;
605
606     case ThreadBlocked:
607         scheduleHandleThreadBlocked(t);
608         break;
609
610     case ThreadFinished:
611         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
612         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
613         break;
614
615     default:
616       barf("schedule: invalid thread return code %d", (int)ret);
617     }
618
619     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
620       cap = scheduleDoGC(cap,task,rtsFalse);
621     }
622   } /* end of while() */
623 }
624
625 /* ----------------------------------------------------------------------------
626  * Setting up the scheduler loop
627  * ------------------------------------------------------------------------- */
628
629 static void
630 schedulePreLoop(void)
631 {
632   // initialisation for scheduler - what cannot go into initScheduler()  
633 }
634
635 /* -----------------------------------------------------------------------------
636  * scheduleFindWork()
637  *
638  * Search for work to do, and handle messages from elsewhere.
639  * -------------------------------------------------------------------------- */
640
641 static void
642 scheduleFindWork (Capability *cap)
643 {
644     scheduleStartSignalHandlers(cap);
645
646     // Only check the black holes here if we've nothing else to do.
647     // During normal execution, the black hole list only gets checked
648     // at GC time, to avoid repeatedly traversing this possibly long
649     // list each time around the scheduler.
650     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
651
652     scheduleCheckWakeupThreads(cap);
653
654     scheduleCheckBlockedThreads(cap);
655
656 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
657     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
658 #endif
659
660 #if defined(PARALLEL_HASKELL)
661     // if messages have been buffered...
662     scheduleSendPendingMessages();
663 #endif
664
665 #if defined(PARALLEL_HASKELL)
666     if (emptyRunQueue(cap)) {
667         receivedFinish = scheduleGetRemoteWork(cap);
668         continue; //  a new round, (hopefully) with new work
669         /* 
670            in GUM, this a) sends out a FISH and returns IF no fish is
671                            out already
672                         b) (blocking) awaits and receives messages
673            
674            in Eden, this is only the blocking receive, as b) in GUM.
675         */
676     }
677 #endif
678 }
679
680 #if defined(THREADED_RTS)
681 STATIC_INLINE rtsBool
682 shouldYieldCapability (Capability *cap, Task *task)
683 {
684     // we need to yield this capability to someone else if..
685     //   - another thread is initiating a GC
686     //   - another Task is returning from a foreign call
687     //   - the thread at the head of the run queue cannot be run
688     //     by this Task (it is bound to another Task, or it is unbound
689     //     and this task it bound).
690     return (waiting_for_gc || 
691             cap->returning_tasks_hd != NULL ||
692             (!emptyRunQueue(cap) && (task->tso == NULL
693                                      ? cap->run_queue_hd->bound != NULL
694                                      : cap->run_queue_hd->bound != task)));
695 }
696
697 // This is the single place where a Task goes to sleep.  There are
698 // two reasons it might need to sleep:
699 //    - there are no threads to run
700 //    - we need to yield this Capability to someone else 
701 //      (see shouldYieldCapability())
702 //
703 // Careful: the scheduler loop is quite delicate.  Make sure you run
704 // the tests in testsuite/concurrent (all ways) after modifying this,
705 // and also check the benchmarks in nofib/parallel for regressions.
706
707 static void
708 scheduleYield (Capability **pcap, Task *task)
709 {
710     Capability *cap = *pcap;
711
712     // if we have work, and we don't need to give up the Capability, continue.
713     if (!shouldYieldCapability(cap,task) && 
714         (!emptyRunQueue(cap) ||
715          blackholes_need_checking ||
716          sched_state >= SCHED_INTERRUPTING))
717         return;
718
719     // otherwise yield (sleep), and keep yielding if necessary.
720     do {
721         yieldCapability(&cap,task);
722     } 
723     while (shouldYieldCapability(cap,task));
724
725     // note there may still be no threads on the run queue at this
726     // point, the caller has to check.
727
728     *pcap = cap;
729     return;
730 }
731 #endif
732     
733 /* -----------------------------------------------------------------------------
734  * schedulePushWork()
735  *
736  * Push work to other Capabilities if we have some.
737  * -------------------------------------------------------------------------- */
738
739 static void
740 schedulePushWork(Capability *cap USED_IF_THREADS, 
741                  Task *task      USED_IF_THREADS)
742 {
743   /* following code not for PARALLEL_HASKELL. I kept the call general,
744      future GUM versions might use pushing in a distributed setup */
745 #if defined(THREADED_RTS)
746
747     Capability *free_caps[n_capabilities], *cap0;
748     nat i, n_free_caps;
749
750     // migration can be turned off with +RTS -qg
751     if (!RtsFlags.ParFlags.migrate) return;
752
753     // Check whether we have more threads on our run queue, or sparks
754     // in our pool, that we could hand to another Capability.
755     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
756         && sparkPoolSizeCap(cap) < 2) {
757         return;
758     }
759
760     // First grab as many free Capabilities as we can.
761     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
762         cap0 = &capabilities[i];
763         if (cap != cap0 && tryGrabCapability(cap0,task)) {
764             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
765                 // it already has some work, we just grabbed it at 
766                 // the wrong moment.  Or maybe it's deadlocked!
767                 releaseCapability(cap0);
768             } else {
769                 free_caps[n_free_caps++] = cap0;
770             }
771         }
772     }
773
774     // we now have n_free_caps free capabilities stashed in
775     // free_caps[].  Share our run queue equally with them.  This is
776     // probably the simplest thing we could do; improvements we might
777     // want to do include:
778     //
779     //   - giving high priority to moving relatively new threads, on 
780     //     the gournds that they haven't had time to build up a
781     //     working set in the cache on this CPU/Capability.
782     //
783     //   - giving low priority to moving long-lived threads
784
785     if (n_free_caps > 0) {
786         StgTSO *prev, *t, *next;
787         rtsBool pushed_to_all;
788
789         debugTrace(DEBUG_sched, 
790                    "cap %d: %s and %d free capabilities, sharing...", 
791                    cap->no, 
792                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
793                    "excess threads on run queue":"sparks to share (>=2)",
794                    n_free_caps);
795
796         i = 0;
797         pushed_to_all = rtsFalse;
798
799         if (cap->run_queue_hd != END_TSO_QUEUE) {
800             prev = cap->run_queue_hd;
801             t = prev->_link;
802             prev->_link = END_TSO_QUEUE;
803             for (; t != END_TSO_QUEUE; t = next) {
804                 next = t->_link;
805                 t->_link = END_TSO_QUEUE;
806                 if (t->what_next == ThreadRelocated
807                     || t->bound == task // don't move my bound thread
808                     || tsoLocked(t)) {  // don't move a locked thread
809                     setTSOLink(cap, prev, t);
810                     prev = t;
811                 } else if (i == n_free_caps) {
812                     pushed_to_all = rtsTrue;
813                     i = 0;
814                     // keep one for us
815                     setTSOLink(cap, prev, t);
816                     prev = t;
817                 } else {
818                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
819                     appendToRunQueue(free_caps[i],t);
820                     if (t->bound) { t->bound->cap = free_caps[i]; }
821                     t->cap = free_caps[i];
822                     i++;
823                 }
824             }
825             cap->run_queue_tl = prev;
826         }
827
828 #ifdef SPARK_PUSHING
829         /* JB I left this code in place, it would work but is not necessary */
830
831         // If there are some free capabilities that we didn't push any
832         // threads to, then try to push a spark to each one.
833         if (!pushed_to_all) {
834             StgClosure *spark;
835             // i is the next free capability to push to
836             for (; i < n_free_caps; i++) {
837                 if (emptySparkPoolCap(free_caps[i])) {
838                     spark = tryStealSpark(cap->sparks);
839                     if (spark != NULL) {
840                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
841                         newSpark(&(free_caps[i]->r), spark);
842                     }
843                 }
844             }
845         }
846 #endif /* SPARK_PUSHING */
847
848         // release the capabilities
849         for (i = 0; i < n_free_caps; i++) {
850             task->cap = free_caps[i];
851             releaseAndWakeupCapability(free_caps[i]);
852         }
853     }
854     task->cap = cap; // reset to point to our Capability.
855
856 #endif /* THREADED_RTS */
857
858 }
859
860 /* ----------------------------------------------------------------------------
861  * Start any pending signal handlers
862  * ------------------------------------------------------------------------- */
863
864 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
865 static void
866 scheduleStartSignalHandlers(Capability *cap)
867 {
868     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
869         // safe outside the lock
870         startSignalHandlers(cap);
871     }
872 }
873 #else
874 static void
875 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
876 {
877 }
878 #endif
879
880 /* ----------------------------------------------------------------------------
881  * Check for blocked threads that can be woken up.
882  * ------------------------------------------------------------------------- */
883
884 static void
885 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
886 {
887 #if !defined(THREADED_RTS)
888     //
889     // Check whether any waiting threads need to be woken up.  If the
890     // run queue is empty, and there are no other tasks running, we
891     // can wait indefinitely for something to happen.
892     //
893     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
894     {
895         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
896     }
897 #endif
898 }
899
900
901 /* ----------------------------------------------------------------------------
902  * Check for threads woken up by other Capabilities
903  * ------------------------------------------------------------------------- */
904
905 static void
906 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
907 {
908 #if defined(THREADED_RTS)
909     // Any threads that were woken up by other Capabilities get
910     // appended to our run queue.
911     if (!emptyWakeupQueue(cap)) {
912         ACQUIRE_LOCK(&cap->lock);
913         if (emptyRunQueue(cap)) {
914             cap->run_queue_hd = cap->wakeup_queue_hd;
915             cap->run_queue_tl = cap->wakeup_queue_tl;
916         } else {
917             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
918             cap->run_queue_tl = cap->wakeup_queue_tl;
919         }
920         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
921         RELEASE_LOCK(&cap->lock);
922     }
923 #endif
924 }
925
926 /* ----------------------------------------------------------------------------
927  * Check for threads blocked on BLACKHOLEs that can be woken up
928  * ------------------------------------------------------------------------- */
929 static void
930 scheduleCheckBlackHoles (Capability *cap)
931 {
932     if ( blackholes_need_checking ) // check without the lock first
933     {
934         ACQUIRE_LOCK(&sched_mutex);
935         if ( blackholes_need_checking ) {
936             blackholes_need_checking = rtsFalse;
937             // important that we reset the flag *before* checking the
938             // blackhole queue, otherwise we could get deadlock.  This
939             // happens as follows: we wake up a thread that
940             // immediately runs on another Capability, blocks on a
941             // blackhole, and then we reset the blackholes_need_checking flag.
942             checkBlackHoles(cap);
943         }
944         RELEASE_LOCK(&sched_mutex);
945     }
946 }
947
948 /* ----------------------------------------------------------------------------
949  * Detect deadlock conditions and attempt to resolve them.
950  * ------------------------------------------------------------------------- */
951
952 static void
953 scheduleDetectDeadlock (Capability *cap, Task *task)
954 {
955
956 #if defined(PARALLEL_HASKELL)
957     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
958     return;
959 #endif
960
961     /* 
962      * Detect deadlock: when we have no threads to run, there are no
963      * threads blocked, waiting for I/O, or sleeping, and all the
964      * other tasks are waiting for work, we must have a deadlock of
965      * some description.
966      */
967     if ( emptyThreadQueues(cap) )
968     {
969 #if defined(THREADED_RTS)
970         /* 
971          * In the threaded RTS, we only check for deadlock if there
972          * has been no activity in a complete timeslice.  This means
973          * we won't eagerly start a full GC just because we don't have
974          * any threads to run currently.
975          */
976         if (recent_activity != ACTIVITY_INACTIVE) return;
977 #endif
978
979         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
980
981         // Garbage collection can release some new threads due to
982         // either (a) finalizers or (b) threads resurrected because
983         // they are unreachable and will therefore be sent an
984         // exception.  Any threads thus released will be immediately
985         // runnable.
986         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
987
988         recent_activity = ACTIVITY_DONE_GC;
989         // disable timer signals (see #1623)
990         stopTimer();
991         
992         if ( !emptyRunQueue(cap) ) return;
993
994 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
995         /* If we have user-installed signal handlers, then wait
996          * for signals to arrive rather then bombing out with a
997          * deadlock.
998          */
999         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1000             debugTrace(DEBUG_sched,
1001                        "still deadlocked, waiting for signals...");
1002
1003             awaitUserSignals();
1004
1005             if (signals_pending()) {
1006                 startSignalHandlers(cap);
1007             }
1008
1009             // either we have threads to run, or we were interrupted:
1010             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1011
1012             return;
1013         }
1014 #endif
1015
1016 #if !defined(THREADED_RTS)
1017         /* Probably a real deadlock.  Send the current main thread the
1018          * Deadlock exception.
1019          */
1020         if (task->tso) {
1021             switch (task->tso->why_blocked) {
1022             case BlockedOnSTM:
1023             case BlockedOnBlackHole:
1024             case BlockedOnException:
1025             case BlockedOnMVar:
1026                 throwToSingleThreaded(cap, task->tso, 
1027                                       (StgClosure *)nonTermination_closure);
1028                 return;
1029             default:
1030                 barf("deadlock: main thread blocked in a strange way");
1031             }
1032         }
1033         return;
1034 #endif
1035     }
1036 }
1037
1038
1039 /* ----------------------------------------------------------------------------
1040  * Send pending messages (PARALLEL_HASKELL only)
1041  * ------------------------------------------------------------------------- */
1042
1043 #if defined(PARALLEL_HASKELL)
1044 static void
1045 scheduleSendPendingMessages(void)
1046 {
1047
1048 # if defined(PAR) // global Mem.Mgmt., omit for now
1049     if (PendingFetches != END_BF_QUEUE) {
1050         processFetches();
1051     }
1052 # endif
1053     
1054     if (RtsFlags.ParFlags.BufferTime) {
1055         // if we use message buffering, we must send away all message
1056         // packets which have become too old...
1057         sendOldBuffers(); 
1058     }
1059 }
1060 #endif
1061
1062 /* ----------------------------------------------------------------------------
1063  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1064  * ------------------------------------------------------------------------- */
1065
1066 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1067 static void
1068 scheduleActivateSpark(Capability *cap)
1069 {
1070     if (anySparks())
1071     {
1072         createSparkThread(cap);
1073         debugTrace(DEBUG_sched, "creating a spark thread");
1074     }
1075 }
1076 #endif // PARALLEL_HASKELL || THREADED_RTS
1077
1078 /* ----------------------------------------------------------------------------
1079  * Get work from a remote node (PARALLEL_HASKELL only)
1080  * ------------------------------------------------------------------------- */
1081     
1082 #if defined(PARALLEL_HASKELL)
1083 static rtsBool /* return value used in PARALLEL_HASKELL only */
1084 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1085 {
1086 #if defined(PARALLEL_HASKELL)
1087   rtsBool receivedFinish = rtsFalse;
1088
1089   // idle() , i.e. send all buffers, wait for work
1090   if (RtsFlags.ParFlags.BufferTime) {
1091         IF_PAR_DEBUG(verbose, 
1092                 debugBelch("...send all pending data,"));
1093         {
1094           nat i;
1095           for (i=1; i<=nPEs; i++)
1096             sendImmediately(i); // send all messages away immediately
1097         }
1098   }
1099
1100   /* this would be the place for fishing in GUM... 
1101
1102      if (no-earlier-fish-around) 
1103           sendFish(choosePe());
1104    */
1105
1106   // Eden:just look for incoming messages (blocking receive)
1107   IF_PAR_DEBUG(verbose, 
1108                debugBelch("...wait for incoming messages...\n"));
1109   processMessages(cap, &receivedFinish); // blocking receive...
1110
1111
1112   return receivedFinish;
1113   // reenter scheduling look after having received something
1114
1115 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1116
1117   return rtsFalse; /* return value unused in THREADED_RTS */
1118
1119 #endif /* PARALLEL_HASKELL */
1120 }
1121 #endif // PARALLEL_HASKELL || THREADED_RTS
1122
1123 /* ----------------------------------------------------------------------------
1124  * After running a thread...
1125  * ------------------------------------------------------------------------- */
1126
1127 static void
1128 schedulePostRunThread (Capability *cap, StgTSO *t)
1129 {
1130     // We have to be able to catch transactions that are in an
1131     // infinite loop as a result of seeing an inconsistent view of
1132     // memory, e.g. 
1133     //
1134     //   atomically $ do
1135     //       [a,b] <- mapM readTVar [ta,tb]
1136     //       when (a == b) loop
1137     //
1138     // and a is never equal to b given a consistent view of memory.
1139     //
1140     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1141         if (!stmValidateNestOfTransactions (t -> trec)) {
1142             debugTrace(DEBUG_sched | DEBUG_stm,
1143                        "trec %p found wasting its time", t);
1144             
1145             // strip the stack back to the
1146             // ATOMICALLY_FRAME, aborting the (nested)
1147             // transaction, and saving the stack of any
1148             // partially-evaluated thunks on the heap.
1149             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1150             
1151             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1152         }
1153     }
1154
1155   /* some statistics gathering in the parallel case */
1156 }
1157
1158 /* -----------------------------------------------------------------------------
1159  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1160  * -------------------------------------------------------------------------- */
1161
1162 static rtsBool
1163 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1164 {
1165     // did the task ask for a large block?
1166     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1167         // if so, get one and push it on the front of the nursery.
1168         bdescr *bd;
1169         lnat blocks;
1170         
1171         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1172         
1173         debugTrace(DEBUG_sched,
1174                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1175                    (long)t->id, whatNext_strs[t->what_next], blocks);
1176     
1177         // don't do this if the nursery is (nearly) full, we'll GC first.
1178         if (cap->r.rCurrentNursery->link != NULL ||
1179             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1180                                                // if the nursery has only one block.
1181             
1182             ACQUIRE_SM_LOCK
1183             bd = allocGroup( blocks );
1184             RELEASE_SM_LOCK
1185             cap->r.rNursery->n_blocks += blocks;
1186             
1187             // link the new group into the list
1188             bd->link = cap->r.rCurrentNursery;
1189             bd->u.back = cap->r.rCurrentNursery->u.back;
1190             if (cap->r.rCurrentNursery->u.back != NULL) {
1191                 cap->r.rCurrentNursery->u.back->link = bd;
1192             } else {
1193 #if !defined(THREADED_RTS)
1194                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1195                        g0s0 == cap->r.rNursery);
1196 #endif
1197                 cap->r.rNursery->blocks = bd;
1198             }             
1199             cap->r.rCurrentNursery->u.back = bd;
1200             
1201             // initialise it as a nursery block.  We initialise the
1202             // step, gen_no, and flags field of *every* sub-block in
1203             // this large block, because this is easier than making
1204             // sure that we always find the block head of a large
1205             // block whenever we call Bdescr() (eg. evacuate() and
1206             // isAlive() in the GC would both have to do this, at
1207             // least).
1208             { 
1209                 bdescr *x;
1210                 for (x = bd; x < bd + blocks; x++) {
1211                     x->step = cap->r.rNursery;
1212                     x->gen_no = 0;
1213                     x->flags = 0;
1214                 }
1215             }
1216             
1217             // This assert can be a killer if the app is doing lots
1218             // of large block allocations.
1219             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1220             
1221             // now update the nursery to point to the new block
1222             cap->r.rCurrentNursery = bd;
1223             
1224             // we might be unlucky and have another thread get on the
1225             // run queue before us and steal the large block, but in that
1226             // case the thread will just end up requesting another large
1227             // block.
1228             pushOnRunQueue(cap,t);
1229             return rtsFalse;  /* not actually GC'ing */
1230         }
1231     }
1232     
1233     debugTrace(DEBUG_sched,
1234                "--<< thread %ld (%s) stopped: HeapOverflow",
1235                (long)t->id, whatNext_strs[t->what_next]);
1236
1237     if (cap->context_switch) {
1238         // Sometimes we miss a context switch, e.g. when calling
1239         // primitives in a tight loop, MAYBE_GC() doesn't check the
1240         // context switch flag, and we end up waiting for a GC.
1241         // See #1984, and concurrent/should_run/1984
1242         cap->context_switch = 0;
1243         addToRunQueue(cap,t);
1244     } else {
1245         pushOnRunQueue(cap,t);
1246     }
1247     return rtsTrue;
1248     /* actual GC is done at the end of the while loop in schedule() */
1249 }
1250
1251 /* -----------------------------------------------------------------------------
1252  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1253  * -------------------------------------------------------------------------- */
1254
1255 static void
1256 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1257 {
1258     debugTrace (DEBUG_sched,
1259                 "--<< thread %ld (%s) stopped, StackOverflow", 
1260                 (long)t->id, whatNext_strs[t->what_next]);
1261
1262     /* just adjust the stack for this thread, then pop it back
1263      * on the run queue.
1264      */
1265     { 
1266         /* enlarge the stack */
1267         StgTSO *new_t = threadStackOverflow(cap, t);
1268         
1269         /* The TSO attached to this Task may have moved, so update the
1270          * pointer to it.
1271          */
1272         if (task->tso == t) {
1273             task->tso = new_t;
1274         }
1275         pushOnRunQueue(cap,new_t);
1276     }
1277 }
1278
1279 /* -----------------------------------------------------------------------------
1280  * Handle a thread that returned to the scheduler with ThreadYielding
1281  * -------------------------------------------------------------------------- */
1282
1283 static rtsBool
1284 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1285 {
1286     // Reset the context switch flag.  We don't do this just before
1287     // running the thread, because that would mean we would lose ticks
1288     // during GC, which can lead to unfair scheduling (a thread hogs
1289     // the CPU because the tick always arrives during GC).  This way
1290     // penalises threads that do a lot of allocation, but that seems
1291     // better than the alternative.
1292     cap->context_switch = 0;
1293     
1294     /* put the thread back on the run queue.  Then, if we're ready to
1295      * GC, check whether this is the last task to stop.  If so, wake
1296      * up the GC thread.  getThread will block during a GC until the
1297      * GC is finished.
1298      */
1299 #ifdef DEBUG
1300     if (t->what_next != prev_what_next) {
1301         debugTrace(DEBUG_sched,
1302                    "--<< thread %ld (%s) stopped to switch evaluators", 
1303                    (long)t->id, whatNext_strs[t->what_next]);
1304     } else {
1305         debugTrace(DEBUG_sched,
1306                    "--<< thread %ld (%s) stopped, yielding",
1307                    (long)t->id, whatNext_strs[t->what_next]);
1308     }
1309 #endif
1310     
1311     IF_DEBUG(sanity,
1312              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1313              checkTSO(t));
1314     ASSERT(t->_link == END_TSO_QUEUE);
1315     
1316     // Shortcut if we're just switching evaluators: don't bother
1317     // doing stack squeezing (which can be expensive), just run the
1318     // thread.
1319     if (t->what_next != prev_what_next) {
1320         return rtsTrue;
1321     }
1322
1323     addToRunQueue(cap,t);
1324
1325     return rtsFalse;
1326 }
1327
1328 /* -----------------------------------------------------------------------------
1329  * Handle a thread that returned to the scheduler with ThreadBlocked
1330  * -------------------------------------------------------------------------- */
1331
1332 static void
1333 scheduleHandleThreadBlocked( StgTSO *t
1334 #if !defined(GRAN) && !defined(DEBUG)
1335     STG_UNUSED
1336 #endif
1337     )
1338 {
1339
1340       // We don't need to do anything.  The thread is blocked, and it
1341       // has tidied up its stack and placed itself on whatever queue
1342       // it needs to be on.
1343
1344     // ASSERT(t->why_blocked != NotBlocked);
1345     // Not true: for example,
1346     //    - in THREADED_RTS, the thread may already have been woken
1347     //      up by another Capability.  This actually happens: try
1348     //      conc023 +RTS -N2.
1349     //    - the thread may have woken itself up already, because
1350     //      threadPaused() might have raised a blocked throwTo
1351     //      exception, see maybePerformBlockedException().
1352
1353 #ifdef DEBUG
1354     if (traceClass(DEBUG_sched)) {
1355         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1356                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1357         printThreadBlockage(t);
1358         debugTraceEnd();
1359     }
1360 #endif
1361 }
1362
1363 /* -----------------------------------------------------------------------------
1364  * Handle a thread that returned to the scheduler with ThreadFinished
1365  * -------------------------------------------------------------------------- */
1366
1367 static rtsBool
1368 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1369 {
1370     /* Need to check whether this was a main thread, and if so,
1371      * return with the return value.
1372      *
1373      * We also end up here if the thread kills itself with an
1374      * uncaught exception, see Exception.cmm.
1375      */
1376     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1377                (unsigned long)t->id, whatNext_strs[t->what_next]);
1378
1379       //
1380       // Check whether the thread that just completed was a bound
1381       // thread, and if so return with the result.  
1382       //
1383       // There is an assumption here that all thread completion goes
1384       // through this point; we need to make sure that if a thread
1385       // ends up in the ThreadKilled state, that it stays on the run
1386       // queue so it can be dealt with here.
1387       //
1388
1389       if (t->bound) {
1390
1391           if (t->bound != task) {
1392 #if !defined(THREADED_RTS)
1393               // Must be a bound thread that is not the topmost one.  Leave
1394               // it on the run queue until the stack has unwound to the
1395               // point where we can deal with this.  Leaving it on the run
1396               // queue also ensures that the garbage collector knows about
1397               // this thread and its return value (it gets dropped from the
1398               // step->threads list so there's no other way to find it).
1399               appendToRunQueue(cap,t);
1400               return rtsFalse;
1401 #else
1402               // this cannot happen in the threaded RTS, because a
1403               // bound thread can only be run by the appropriate Task.
1404               barf("finished bound thread that isn't mine");
1405 #endif
1406           }
1407
1408           ASSERT(task->tso == t);
1409
1410           if (t->what_next == ThreadComplete) {
1411               if (task->ret) {
1412                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1413                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1414               }
1415               task->stat = Success;
1416           } else {
1417               if (task->ret) {
1418                   *(task->ret) = NULL;
1419               }
1420               if (sched_state >= SCHED_INTERRUPTING) {
1421                   task->stat = Interrupted;
1422               } else {
1423                   task->stat = Killed;
1424               }
1425           }
1426 #ifdef DEBUG
1427           removeThreadLabel((StgWord)task->tso->id);
1428 #endif
1429           return rtsTrue; // tells schedule() to return
1430       }
1431
1432       return rtsFalse;
1433 }
1434
1435 /* -----------------------------------------------------------------------------
1436  * Perform a heap census
1437  * -------------------------------------------------------------------------- */
1438
1439 static rtsBool
1440 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1441 {
1442     // When we have +RTS -i0 and we're heap profiling, do a census at
1443     // every GC.  This lets us get repeatable runs for debugging.
1444     if (performHeapProfile ||
1445         (RtsFlags.ProfFlags.profileInterval==0 &&
1446          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1447         return rtsTrue;
1448     } else {
1449         return rtsFalse;
1450     }
1451 }
1452
1453 /* -----------------------------------------------------------------------------
1454  * Perform a garbage collection if necessary
1455  * -------------------------------------------------------------------------- */
1456
1457 static Capability *
1458 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1459 {
1460     rtsBool heap_census;
1461 #ifdef THREADED_RTS
1462     /* extern static volatile StgWord waiting_for_gc; 
1463        lives inside capability.c */
1464     rtsBool was_waiting;
1465     nat i;
1466 #endif
1467
1468 #ifdef THREADED_RTS
1469     // In order to GC, there must be no threads running Haskell code.
1470     // Therefore, the GC thread needs to hold *all* the capabilities,
1471     // and release them after the GC has completed.  
1472     //
1473     // This seems to be the simplest way: previous attempts involved
1474     // making all the threads with capabilities give up their
1475     // capabilities and sleep except for the *last* one, which
1476     // actually did the GC.  But it's quite hard to arrange for all
1477     // the other tasks to sleep and stay asleep.
1478     //
1479         
1480     /*  Other capabilities are prevented from running yet more Haskell
1481         threads if waiting_for_gc is set. Tested inside
1482         yieldCapability() and releaseCapability() in Capability.c */
1483
1484     was_waiting = cas(&waiting_for_gc, 0, 1);
1485     if (was_waiting) {
1486         do {
1487             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1488             if (cap) yieldCapability(&cap,task);
1489         } while (waiting_for_gc);
1490         return cap;  // NOTE: task->cap might have changed here
1491     }
1492
1493     setContextSwitches();
1494     for (i=0; i < n_capabilities; i++) {
1495         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1496         if (cap != &capabilities[i]) {
1497             Capability *pcap = &capabilities[i];
1498             // we better hope this task doesn't get migrated to
1499             // another Capability while we're waiting for this one.
1500             // It won't, because load balancing happens while we have
1501             // all the Capabilities, but even so it's a slightly
1502             // unsavoury invariant.
1503             task->cap = pcap;
1504             waitForReturnCapability(&pcap, task);
1505             if (pcap != &capabilities[i]) {
1506                 barf("scheduleDoGC: got the wrong capability");
1507             }
1508         }
1509     }
1510
1511     waiting_for_gc = rtsFalse;
1512 #endif
1513
1514     // so this happens periodically:
1515     if (cap) scheduleCheckBlackHoles(cap);
1516     
1517     IF_DEBUG(scheduler, printAllThreads());
1518
1519     /*
1520      * We now have all the capabilities; if we're in an interrupting
1521      * state, then we should take the opportunity to delete all the
1522      * threads in the system.
1523      */
1524     if (sched_state >= SCHED_INTERRUPTING) {
1525         deleteAllThreads(&capabilities[0]);
1526         sched_state = SCHED_SHUTTING_DOWN;
1527     }
1528     
1529     heap_census = scheduleNeedHeapProfile(rtsTrue);
1530
1531     /* everybody back, start the GC.
1532      * Could do it in this thread, or signal a condition var
1533      * to do it in another thread.  Either way, we need to
1534      * broadcast on gc_pending_cond afterward.
1535      */
1536 #if defined(THREADED_RTS)
1537     debugTrace(DEBUG_sched, "doing GC");
1538 #endif
1539     GarbageCollect(force_major || heap_census);
1540     
1541     if (heap_census) {
1542         debugTrace(DEBUG_sched, "performing heap census");
1543         heapCensus();
1544         performHeapProfile = rtsFalse;
1545     }
1546
1547 #ifdef SPARKBALANCE
1548     /* JB 
1549        Once we are all together... this would be the place to balance all
1550        spark pools. No concurrent stealing or adding of new sparks can
1551        occur. Should be defined in Sparks.c. */
1552     balanceSparkPoolsCaps(n_capabilities, capabilities);
1553 #endif
1554
1555 #if defined(THREADED_RTS)
1556     // release our stash of capabilities.
1557     for (i = 0; i < n_capabilities; i++) {
1558         if (cap != &capabilities[i]) {
1559             task->cap = &capabilities[i];
1560             releaseCapability(&capabilities[i]);
1561         }
1562     }
1563     if (cap) {
1564         task->cap = cap;
1565     } else {
1566         task->cap = NULL;
1567     }
1568 #endif
1569
1570     return cap;
1571 }
1572
1573 /* ---------------------------------------------------------------------------
1574  * Singleton fork(). Do not copy any running threads.
1575  * ------------------------------------------------------------------------- */
1576
1577 pid_t
1578 forkProcess(HsStablePtr *entry
1579 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1580             STG_UNUSED
1581 #endif
1582            )
1583 {
1584 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1585     Task *task;
1586     pid_t pid;
1587     StgTSO* t,*next;
1588     Capability *cap;
1589     nat s;
1590     
1591 #if defined(THREADED_RTS)
1592     if (RtsFlags.ParFlags.nNodes > 1) {
1593         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1594         stg_exit(EXIT_FAILURE);
1595     }
1596 #endif
1597
1598     debugTrace(DEBUG_sched, "forking!");
1599     
1600     // ToDo: for SMP, we should probably acquire *all* the capabilities
1601     cap = rts_lock();
1602     
1603     // no funny business: hold locks while we fork, otherwise if some
1604     // other thread is holding a lock when the fork happens, the data
1605     // structure protected by the lock will forever be in an
1606     // inconsistent state in the child.  See also #1391.
1607     ACQUIRE_LOCK(&sched_mutex);
1608     ACQUIRE_LOCK(&cap->lock);
1609     ACQUIRE_LOCK(&cap->running_task->lock);
1610
1611     pid = fork();
1612     
1613     if (pid) { // parent
1614         
1615         RELEASE_LOCK(&sched_mutex);
1616         RELEASE_LOCK(&cap->lock);
1617         RELEASE_LOCK(&cap->running_task->lock);
1618
1619         // just return the pid
1620         rts_unlock(cap);
1621         return pid;
1622         
1623     } else { // child
1624         
1625 #if defined(THREADED_RTS)
1626         initMutex(&sched_mutex);
1627         initMutex(&cap->lock);
1628         initMutex(&cap->running_task->lock);
1629 #endif
1630
1631         // Now, all OS threads except the thread that forked are
1632         // stopped.  We need to stop all Haskell threads, including
1633         // those involved in foreign calls.  Also we need to delete
1634         // all Tasks, because they correspond to OS threads that are
1635         // now gone.
1636
1637         for (s = 0; s < total_steps; s++) {
1638           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1639             if (t->what_next == ThreadRelocated) {
1640                 next = t->_link;
1641             } else {
1642                 next = t->global_link;
1643                 // don't allow threads to catch the ThreadKilled
1644                 // exception, but we do want to raiseAsync() because these
1645                 // threads may be evaluating thunks that we need later.
1646                 deleteThread_(cap,t);
1647             }
1648           }
1649         }
1650         
1651         // Empty the run queue.  It seems tempting to let all the
1652         // killed threads stay on the run queue as zombies to be
1653         // cleaned up later, but some of them correspond to bound
1654         // threads for which the corresponding Task does not exist.
1655         cap->run_queue_hd = END_TSO_QUEUE;
1656         cap->run_queue_tl = END_TSO_QUEUE;
1657
1658         // Any suspended C-calling Tasks are no more, their OS threads
1659         // don't exist now:
1660         cap->suspended_ccalling_tasks = NULL;
1661
1662         // Empty the threads lists.  Otherwise, the garbage
1663         // collector may attempt to resurrect some of these threads.
1664         for (s = 0; s < total_steps; s++) {
1665             all_steps[s].threads = END_TSO_QUEUE;
1666         }
1667
1668         // Wipe the task list, except the current Task.
1669         ACQUIRE_LOCK(&sched_mutex);
1670         for (task = all_tasks; task != NULL; task=task->all_link) {
1671             if (task != cap->running_task) {
1672 #if defined(THREADED_RTS)
1673                 initMutex(&task->lock); // see #1391
1674 #endif
1675                 discardTask(task);
1676             }
1677         }
1678         RELEASE_LOCK(&sched_mutex);
1679
1680 #if defined(THREADED_RTS)
1681         // Wipe our spare workers list, they no longer exist.  New
1682         // workers will be created if necessary.
1683         cap->spare_workers = NULL;
1684         cap->returning_tasks_hd = NULL;
1685         cap->returning_tasks_tl = NULL;
1686 #endif
1687
1688         // On Unix, all timers are reset in the child, so we need to start
1689         // the timer again.
1690         initTimer();
1691         startTimer();
1692
1693         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1694         rts_checkSchedStatus("forkProcess",cap);
1695         
1696         rts_unlock(cap);
1697         hs_exit();                      // clean up and exit
1698         stg_exit(EXIT_SUCCESS);
1699     }
1700 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1701     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1702     return -1;
1703 #endif
1704 }
1705
1706 /* ---------------------------------------------------------------------------
1707  * Delete all the threads in the system
1708  * ------------------------------------------------------------------------- */
1709    
1710 static void
1711 deleteAllThreads ( Capability *cap )
1712 {
1713     // NOTE: only safe to call if we own all capabilities.
1714
1715     StgTSO* t, *next;
1716     nat s;
1717
1718     debugTrace(DEBUG_sched,"deleting all threads");
1719     for (s = 0; s < total_steps; s++) {
1720       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1721         if (t->what_next == ThreadRelocated) {
1722             next = t->_link;
1723         } else {
1724             next = t->global_link;
1725             deleteThread(cap,t);
1726         }
1727       }
1728     }      
1729
1730     // The run queue now contains a bunch of ThreadKilled threads.  We
1731     // must not throw these away: the main thread(s) will be in there
1732     // somewhere, and the main scheduler loop has to deal with it.
1733     // Also, the run queue is the only thing keeping these threads from
1734     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1735
1736 #if !defined(THREADED_RTS)
1737     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1738     ASSERT(sleeping_queue == END_TSO_QUEUE);
1739 #endif
1740 }
1741
1742 /* -----------------------------------------------------------------------------
1743    Managing the suspended_ccalling_tasks list.
1744    Locks required: sched_mutex
1745    -------------------------------------------------------------------------- */
1746
1747 STATIC_INLINE void
1748 suspendTask (Capability *cap, Task *task)
1749 {
1750     ASSERT(task->next == NULL && task->prev == NULL);
1751     task->next = cap->suspended_ccalling_tasks;
1752     task->prev = NULL;
1753     if (cap->suspended_ccalling_tasks) {
1754         cap->suspended_ccalling_tasks->prev = task;
1755     }
1756     cap->suspended_ccalling_tasks = task;
1757 }
1758
1759 STATIC_INLINE void
1760 recoverSuspendedTask (Capability *cap, Task *task)
1761 {
1762     if (task->prev) {
1763         task->prev->next = task->next;
1764     } else {
1765         ASSERT(cap->suspended_ccalling_tasks == task);
1766         cap->suspended_ccalling_tasks = task->next;
1767     }
1768     if (task->next) {
1769         task->next->prev = task->prev;
1770     }
1771     task->next = task->prev = NULL;
1772 }
1773
1774 /* ---------------------------------------------------------------------------
1775  * Suspending & resuming Haskell threads.
1776  * 
1777  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1778  * its capability before calling the C function.  This allows another
1779  * task to pick up the capability and carry on running Haskell
1780  * threads.  It also means that if the C call blocks, it won't lock
1781  * the whole system.
1782  *
1783  * The Haskell thread making the C call is put to sleep for the
1784  * duration of the call, on the susepended_ccalling_threads queue.  We
1785  * give out a token to the task, which it can use to resume the thread
1786  * on return from the C function.
1787  * ------------------------------------------------------------------------- */
1788    
1789 void *
1790 suspendThread (StgRegTable *reg)
1791 {
1792   Capability *cap;
1793   int saved_errno;
1794   StgTSO *tso;
1795   Task *task;
1796 #if mingw32_HOST_OS
1797   StgWord32 saved_winerror;
1798 #endif
1799
1800   saved_errno = errno;
1801 #if mingw32_HOST_OS
1802   saved_winerror = GetLastError();
1803 #endif
1804
1805   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1806    */
1807   cap = regTableToCapability(reg);
1808
1809   task = cap->running_task;
1810   tso = cap->r.rCurrentTSO;
1811
1812   debugTrace(DEBUG_sched, 
1813              "thread %lu did a safe foreign call", 
1814              (unsigned long)cap->r.rCurrentTSO->id);
1815
1816   // XXX this might not be necessary --SDM
1817   tso->what_next = ThreadRunGHC;
1818
1819   threadPaused(cap,tso);
1820
1821   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1822       tso->why_blocked = BlockedOnCCall;
1823       tso->flags |= TSO_BLOCKEX;
1824       tso->flags &= ~TSO_INTERRUPTIBLE;
1825   } else {
1826       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1827   }
1828
1829   // Hand back capability
1830   task->suspended_tso = tso;
1831
1832   ACQUIRE_LOCK(&cap->lock);
1833
1834   suspendTask(cap,task);
1835   cap->in_haskell = rtsFalse;
1836   releaseCapability_(cap,rtsFalse);
1837   
1838   RELEASE_LOCK(&cap->lock);
1839
1840 #if defined(THREADED_RTS)
1841   /* Preparing to leave the RTS, so ensure there's a native thread/task
1842      waiting to take over.
1843   */
1844   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1845 #endif
1846
1847   errno = saved_errno;
1848 #if mingw32_HOST_OS
1849   SetLastError(saved_winerror);
1850 #endif
1851   return task;
1852 }
1853
1854 StgRegTable *
1855 resumeThread (void *task_)
1856 {
1857     StgTSO *tso;
1858     Capability *cap;
1859     Task *task = task_;
1860     int saved_errno;
1861 #if mingw32_HOST_OS
1862     StgWord32 saved_winerror;
1863 #endif
1864
1865     saved_errno = errno;
1866 #if mingw32_HOST_OS
1867     saved_winerror = GetLastError();
1868 #endif
1869
1870     cap = task->cap;
1871     // Wait for permission to re-enter the RTS with the result.
1872     waitForReturnCapability(&cap,task);
1873     // we might be on a different capability now... but if so, our
1874     // entry on the suspended_ccalling_tasks list will also have been
1875     // migrated.
1876
1877     // Remove the thread from the suspended list
1878     recoverSuspendedTask(cap,task);
1879
1880     tso = task->suspended_tso;
1881     task->suspended_tso = NULL;
1882     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1883     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1884     
1885     if (tso->why_blocked == BlockedOnCCall) {
1886         awakenBlockedExceptionQueue(cap,tso);
1887         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1888     }
1889     
1890     /* Reset blocking status */
1891     tso->why_blocked  = NotBlocked;
1892     
1893     cap->r.rCurrentTSO = tso;
1894     cap->in_haskell = rtsTrue;
1895     errno = saved_errno;
1896 #if mingw32_HOST_OS
1897     SetLastError(saved_winerror);
1898 #endif
1899
1900     /* We might have GC'd, mark the TSO dirty again */
1901     dirty_TSO(cap,tso);
1902
1903     IF_DEBUG(sanity, checkTSO(tso));
1904
1905     return &cap->r;
1906 }
1907
1908 /* ---------------------------------------------------------------------------
1909  * scheduleThread()
1910  *
1911  * scheduleThread puts a thread on the end  of the runnable queue.
1912  * This will usually be done immediately after a thread is created.
1913  * The caller of scheduleThread must create the thread using e.g.
1914  * createThread and push an appropriate closure
1915  * on this thread's stack before the scheduler is invoked.
1916  * ------------------------------------------------------------------------ */
1917
1918 void
1919 scheduleThread(Capability *cap, StgTSO *tso)
1920 {
1921     // The thread goes at the *end* of the run-queue, to avoid possible
1922     // starvation of any threads already on the queue.
1923     appendToRunQueue(cap,tso);
1924 }
1925
1926 void
1927 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1928 {
1929 #if defined(THREADED_RTS)
1930     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1931                               // move this thread from now on.
1932     cpu %= RtsFlags.ParFlags.nNodes;
1933     if (cpu == cap->no) {
1934         appendToRunQueue(cap,tso);
1935     } else {
1936         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1937     }
1938 #else
1939     appendToRunQueue(cap,tso);
1940 #endif
1941 }
1942
1943 Capability *
1944 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1945 {
1946     Task *task;
1947
1948     // We already created/initialised the Task
1949     task = cap->running_task;
1950
1951     // This TSO is now a bound thread; make the Task and TSO
1952     // point to each other.
1953     tso->bound = task;
1954     tso->cap = cap;
1955
1956     task->tso = tso;
1957     task->ret = ret;
1958     task->stat = NoStatus;
1959
1960     appendToRunQueue(cap,tso);
1961
1962     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1963
1964     cap = schedule(cap,task);
1965
1966     ASSERT(task->stat != NoStatus);
1967     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1968
1969     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1970     return cap;
1971 }
1972
1973 /* ----------------------------------------------------------------------------
1974  * Starting Tasks
1975  * ------------------------------------------------------------------------- */
1976
1977 #if defined(THREADED_RTS)
1978 void OSThreadProcAttr
1979 workerStart(Task *task)
1980 {
1981     Capability *cap;
1982
1983     // See startWorkerTask().
1984     ACQUIRE_LOCK(&task->lock);
1985     cap = task->cap;
1986     RELEASE_LOCK(&task->lock);
1987
1988     // set the thread-local pointer to the Task:
1989     taskEnter(task);
1990
1991     // schedule() runs without a lock.
1992     cap = schedule(cap,task);
1993
1994     // On exit from schedule(), we have a Capability.
1995     releaseCapability(cap);
1996     workerTaskStop(task);
1997 }
1998 #endif
1999
2000 /* ---------------------------------------------------------------------------
2001  * initScheduler()
2002  *
2003  * Initialise the scheduler.  This resets all the queues - if the
2004  * queues contained any threads, they'll be garbage collected at the
2005  * next pass.
2006  *
2007  * ------------------------------------------------------------------------ */
2008
2009 void 
2010 initScheduler(void)
2011 {
2012 #if !defined(THREADED_RTS)
2013   blocked_queue_hd  = END_TSO_QUEUE;
2014   blocked_queue_tl  = END_TSO_QUEUE;
2015   sleeping_queue    = END_TSO_QUEUE;
2016 #endif
2017
2018   blackhole_queue   = END_TSO_QUEUE;
2019
2020   sched_state    = SCHED_RUNNING;
2021   recent_activity = ACTIVITY_YES;
2022
2023 #if defined(THREADED_RTS)
2024   /* Initialise the mutex and condition variables used by
2025    * the scheduler. */
2026   initMutex(&sched_mutex);
2027 #endif
2028   
2029   ACQUIRE_LOCK(&sched_mutex);
2030
2031   /* A capability holds the state a native thread needs in
2032    * order to execute STG code. At least one capability is
2033    * floating around (only THREADED_RTS builds have more than one).
2034    */
2035   initCapabilities();
2036
2037   initTaskManager();
2038
2039 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2040   initSparkPools();
2041 #endif
2042
2043 #if defined(THREADED_RTS)
2044   /*
2045    * Eagerly start one worker to run each Capability, except for
2046    * Capability 0.  The idea is that we're probably going to start a
2047    * bound thread on Capability 0 pretty soon, so we don't want a
2048    * worker task hogging it.
2049    */
2050   { 
2051       nat i;
2052       Capability *cap;
2053       for (i = 1; i < n_capabilities; i++) {
2054           cap = &capabilities[i];
2055           ACQUIRE_LOCK(&cap->lock);
2056           startWorkerTask(cap, workerStart);
2057           RELEASE_LOCK(&cap->lock);
2058       }
2059   }
2060 #endif
2061
2062   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2063
2064   RELEASE_LOCK(&sched_mutex);
2065 }
2066
2067 void
2068 exitScheduler(
2069     rtsBool wait_foreign
2070 #if !defined(THREADED_RTS)
2071                          __attribute__((unused))
2072 #endif
2073 )
2074                /* see Capability.c, shutdownCapability() */
2075 {
2076     Task *task = NULL;
2077
2078 #if defined(THREADED_RTS)
2079     ACQUIRE_LOCK(&sched_mutex);
2080     task = newBoundTask();
2081     RELEASE_LOCK(&sched_mutex);
2082 #endif
2083
2084     // If we haven't killed all the threads yet, do it now.
2085     if (sched_state < SCHED_SHUTTING_DOWN) {
2086         sched_state = SCHED_INTERRUPTING;
2087         scheduleDoGC(NULL,task,rtsFalse);    
2088     }
2089     sched_state = SCHED_SHUTTING_DOWN;
2090
2091 #if defined(THREADED_RTS)
2092     { 
2093         nat i;
2094         
2095         for (i = 0; i < n_capabilities; i++) {
2096             shutdownCapability(&capabilities[i], task, wait_foreign);
2097         }
2098         boundTaskExiting(task);
2099         stopTaskManager();
2100     }
2101 #endif
2102 }
2103
2104 void
2105 freeScheduler( void )
2106 {
2107     freeCapabilities();
2108     freeTaskManager();
2109     if (n_capabilities != 1) {
2110         stgFree(capabilities);
2111     }
2112 #if defined(THREADED_RTS)
2113     closeMutex(&sched_mutex);
2114 #endif
2115 }
2116
2117 /* -----------------------------------------------------------------------------
2118    performGC
2119
2120    This is the interface to the garbage collector from Haskell land.
2121    We provide this so that external C code can allocate and garbage
2122    collect when called from Haskell via _ccall_GC.
2123    -------------------------------------------------------------------------- */
2124
2125 static void
2126 performGC_(rtsBool force_major)
2127 {
2128     Task *task;
2129     // We must grab a new Task here, because the existing Task may be
2130     // associated with a particular Capability, and chained onto the 
2131     // suspended_ccalling_tasks queue.
2132     ACQUIRE_LOCK(&sched_mutex);
2133     task = newBoundTask();
2134     RELEASE_LOCK(&sched_mutex);
2135     scheduleDoGC(NULL,task,force_major);
2136     boundTaskExiting(task);
2137 }
2138
2139 void
2140 performGC(void)
2141 {
2142     performGC_(rtsFalse);
2143 }
2144
2145 void
2146 performMajorGC(void)
2147 {
2148     performGC_(rtsTrue);
2149 }
2150
2151 /* -----------------------------------------------------------------------------
2152    Stack overflow
2153
2154    If the thread has reached its maximum stack size, then raise the
2155    StackOverflow exception in the offending thread.  Otherwise
2156    relocate the TSO into a larger chunk of memory and adjust its stack
2157    size appropriately.
2158    -------------------------------------------------------------------------- */
2159
2160 static StgTSO *
2161 threadStackOverflow(Capability *cap, StgTSO *tso)
2162 {
2163   nat new_stack_size, stack_words;
2164   lnat new_tso_size;
2165   StgPtr new_sp;
2166   StgTSO *dest;
2167
2168   IF_DEBUG(sanity,checkTSO(tso));
2169
2170   // don't allow throwTo() to modify the blocked_exceptions queue
2171   // while we are moving the TSO:
2172   lockClosure((StgClosure *)tso);
2173
2174   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2175       // NB. never raise a StackOverflow exception if the thread is
2176       // inside Control.Exceptino.block.  It is impractical to protect
2177       // against stack overflow exceptions, since virtually anything
2178       // can raise one (even 'catch'), so this is the only sensible
2179       // thing to do here.  See bug #767.
2180
2181       debugTrace(DEBUG_gc,
2182                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2183                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2184       IF_DEBUG(gc,
2185                /* If we're debugging, just print out the top of the stack */
2186                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2187                                                 tso->sp+64)));
2188
2189       // Send this thread the StackOverflow exception
2190       unlockTSO(tso);
2191       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2192       return tso;
2193   }
2194
2195   /* Try to double the current stack size.  If that takes us over the
2196    * maximum stack size for this thread, then use the maximum instead
2197    * (that is, unless we're already at or over the max size and we
2198    * can't raise the StackOverflow exception (see above), in which
2199    * case just double the size). Finally round up so the TSO ends up as
2200    * a whole number of blocks.
2201    */
2202   if (tso->stack_size >= tso->max_stack_size) {
2203       new_stack_size = tso->stack_size * 2;
2204   } else { 
2205       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2206   }
2207   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2208                                        TSO_STRUCT_SIZE)/sizeof(W_);
2209   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2210   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2211
2212   debugTrace(DEBUG_sched, 
2213              "increasing stack size from %ld words to %d.",
2214              (long)tso->stack_size, new_stack_size);
2215
2216   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2217   TICK_ALLOC_TSO(new_stack_size,0);
2218
2219   /* copy the TSO block and the old stack into the new area */
2220   memcpy(dest,tso,TSO_STRUCT_SIZE);
2221   stack_words = tso->stack + tso->stack_size - tso->sp;
2222   new_sp = (P_)dest + new_tso_size - stack_words;
2223   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2224
2225   /* relocate the stack pointers... */
2226   dest->sp         = new_sp;
2227   dest->stack_size = new_stack_size;
2228         
2229   /* Mark the old TSO as relocated.  We have to check for relocated
2230    * TSOs in the garbage collector and any primops that deal with TSOs.
2231    *
2232    * It's important to set the sp value to just beyond the end
2233    * of the stack, so we don't attempt to scavenge any part of the
2234    * dead TSO's stack.
2235    */
2236   tso->what_next = ThreadRelocated;
2237   setTSOLink(cap,tso,dest);
2238   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2239   tso->why_blocked = NotBlocked;
2240
2241   IF_PAR_DEBUG(verbose,
2242                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2243                      tso->id, tso, tso->stack_size);
2244                /* If we're debugging, just print out the top of the stack */
2245                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2246                                                 tso->sp+64)));
2247   
2248   unlockTSO(dest);
2249   unlockTSO(tso);
2250
2251   IF_DEBUG(sanity,checkTSO(dest));
2252 #if 0
2253   IF_DEBUG(scheduler,printTSO(dest));
2254 #endif
2255
2256   return dest;
2257 }
2258
2259 static StgTSO *
2260 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2261 {
2262     bdescr *bd, *new_bd;
2263     lnat free_w, tso_size_w;
2264     StgTSO *new_tso;
2265
2266     tso_size_w = tso_sizeW(tso);
2267
2268     if (tso_size_w < MBLOCK_SIZE_W || 
2269         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2270     {
2271         return tso;
2272     }
2273
2274     // don't allow throwTo() to modify the blocked_exceptions queue
2275     // while we are moving the TSO:
2276     lockClosure((StgClosure *)tso);
2277
2278     // this is the number of words we'll free
2279     free_w = round_to_mblocks(tso_size_w/2);
2280
2281     bd = Bdescr((StgPtr)tso);
2282     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2283     bd->free = bd->start + TSO_STRUCT_SIZEW;
2284
2285     new_tso = (StgTSO *)new_bd->start;
2286     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2287     new_tso->stack_size = new_bd->free - new_tso->stack;
2288
2289     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2290                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2291
2292     tso->what_next = ThreadRelocated;
2293     tso->_link = new_tso; // no write barrier reqd: same generation
2294
2295     // The TSO attached to this Task may have moved, so update the
2296     // pointer to it.
2297     if (task->tso == tso) {
2298         task->tso = new_tso;
2299     }
2300
2301     unlockTSO(new_tso);
2302     unlockTSO(tso);
2303
2304     IF_DEBUG(sanity,checkTSO(new_tso));
2305
2306     return new_tso;
2307 }
2308
2309 /* ---------------------------------------------------------------------------
2310    Interrupt execution
2311    - usually called inside a signal handler so it mustn't do anything fancy.   
2312    ------------------------------------------------------------------------ */
2313
2314 void
2315 interruptStgRts(void)
2316 {
2317     sched_state = SCHED_INTERRUPTING;
2318     setContextSwitches();
2319     wakeUpRts();
2320 }
2321
2322 /* -----------------------------------------------------------------------------
2323    Wake up the RTS
2324    
2325    This function causes at least one OS thread to wake up and run the
2326    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2327    an external event has arrived that may need servicing (eg. a
2328    keyboard interrupt).
2329
2330    In the single-threaded RTS we don't do anything here; we only have
2331    one thread anyway, and the event that caused us to want to wake up
2332    will have interrupted any blocking system call in progress anyway.
2333    -------------------------------------------------------------------------- */
2334
2335 void
2336 wakeUpRts(void)
2337 {
2338 #if defined(THREADED_RTS)
2339     // This forces the IO Manager thread to wakeup, which will
2340     // in turn ensure that some OS thread wakes up and runs the
2341     // scheduler loop, which will cause a GC and deadlock check.
2342     ioManagerWakeup();
2343 #endif
2344 }
2345
2346 /* -----------------------------------------------------------------------------
2347  * checkBlackHoles()
2348  *
2349  * Check the blackhole_queue for threads that can be woken up.  We do
2350  * this periodically: before every GC, and whenever the run queue is
2351  * empty.
2352  *
2353  * An elegant solution might be to just wake up all the blocked
2354  * threads with awakenBlockedQueue occasionally: they'll go back to
2355  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2356  * doesn't give us a way to tell whether we've actually managed to
2357  * wake up any threads, so we would be busy-waiting.
2358  *
2359  * -------------------------------------------------------------------------- */
2360
2361 static rtsBool
2362 checkBlackHoles (Capability *cap)
2363 {
2364     StgTSO **prev, *t;
2365     rtsBool any_woke_up = rtsFalse;
2366     StgHalfWord type;
2367
2368     // blackhole_queue is global:
2369     ASSERT_LOCK_HELD(&sched_mutex);
2370
2371     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2372
2373     // ASSUMES: sched_mutex
2374     prev = &blackhole_queue;
2375     t = blackhole_queue;
2376     while (t != END_TSO_QUEUE) {
2377         ASSERT(t->why_blocked == BlockedOnBlackHole);
2378         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2379         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2380             IF_DEBUG(sanity,checkTSO(t));
2381             t = unblockOne(cap, t);
2382             *prev = t;
2383             any_woke_up = rtsTrue;
2384         } else {
2385             prev = &t->_link;
2386             t = t->_link;
2387         }
2388     }
2389
2390     return any_woke_up;
2391 }
2392
2393 /* -----------------------------------------------------------------------------
2394    Deleting threads
2395
2396    This is used for interruption (^C) and forking, and corresponds to
2397    raising an exception but without letting the thread catch the
2398    exception.
2399    -------------------------------------------------------------------------- */
2400
2401 static void 
2402 deleteThread (Capability *cap, StgTSO *tso)
2403 {
2404     // NOTE: must only be called on a TSO that we have exclusive
2405     // access to, because we will call throwToSingleThreaded() below.
2406     // The TSO must be on the run queue of the Capability we own, or 
2407     // we must own all Capabilities.
2408
2409     if (tso->why_blocked != BlockedOnCCall &&
2410         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2411         throwToSingleThreaded(cap,tso,NULL);
2412     }
2413 }
2414
2415 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2416 static void 
2417 deleteThread_(Capability *cap, StgTSO *tso)
2418 { // for forkProcess only:
2419   // like deleteThread(), but we delete threads in foreign calls, too.
2420
2421     if (tso->why_blocked == BlockedOnCCall ||
2422         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2423         unblockOne(cap,tso);
2424         tso->what_next = ThreadKilled;
2425     } else {
2426         deleteThread(cap,tso);
2427     }
2428 }
2429 #endif
2430
2431 /* -----------------------------------------------------------------------------
2432    raiseExceptionHelper
2433    
2434    This function is called by the raise# primitve, just so that we can
2435    move some of the tricky bits of raising an exception from C-- into
2436    C.  Who knows, it might be a useful re-useable thing here too.
2437    -------------------------------------------------------------------------- */
2438
2439 StgWord
2440 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2441 {
2442     Capability *cap = regTableToCapability(reg);
2443     StgThunk *raise_closure = NULL;
2444     StgPtr p, next;
2445     StgRetInfoTable *info;
2446     //
2447     // This closure represents the expression 'raise# E' where E
2448     // is the exception raise.  It is used to overwrite all the
2449     // thunks which are currently under evaluataion.
2450     //
2451
2452     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2453     // LDV profiling: stg_raise_info has THUNK as its closure
2454     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2455     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2456     // 1 does not cause any problem unless profiling is performed.
2457     // However, when LDV profiling goes on, we need to linearly scan
2458     // small object pool, where raise_closure is stored, so we should
2459     // use MIN_UPD_SIZE.
2460     //
2461     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2462     //                                 sizeofW(StgClosure)+1);
2463     //
2464
2465     //
2466     // Walk up the stack, looking for the catch frame.  On the way,
2467     // we update any closures pointed to from update frames with the
2468     // raise closure that we just built.
2469     //
2470     p = tso->sp;
2471     while(1) {
2472         info = get_ret_itbl((StgClosure *)p);
2473         next = p + stack_frame_sizeW((StgClosure *)p);
2474         switch (info->i.type) {
2475             
2476         case UPDATE_FRAME:
2477             // Only create raise_closure if we need to.
2478             if (raise_closure == NULL) {
2479                 raise_closure = 
2480                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2481                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2482                 raise_closure->payload[0] = exception;
2483             }
2484             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2485             p = next;
2486             continue;
2487
2488         case ATOMICALLY_FRAME:
2489             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2490             tso->sp = p;
2491             return ATOMICALLY_FRAME;
2492             
2493         case CATCH_FRAME:
2494             tso->sp = p;
2495             return CATCH_FRAME;
2496
2497         case CATCH_STM_FRAME:
2498             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2499             tso->sp = p;
2500             return CATCH_STM_FRAME;
2501             
2502         case STOP_FRAME:
2503             tso->sp = p;
2504             return STOP_FRAME;
2505
2506         case CATCH_RETRY_FRAME:
2507         default:
2508             p = next; 
2509             continue;
2510         }
2511     }
2512 }
2513
2514
2515 /* -----------------------------------------------------------------------------
2516    findRetryFrameHelper
2517
2518    This function is called by the retry# primitive.  It traverses the stack
2519    leaving tso->sp referring to the frame which should handle the retry.  
2520
2521    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2522    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2523
2524    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2525    create) because retries are not considered to be exceptions, despite the
2526    similar implementation.
2527
2528    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2529    not be created within memory transactions.
2530    -------------------------------------------------------------------------- */
2531
2532 StgWord
2533 findRetryFrameHelper (StgTSO *tso)
2534 {
2535   StgPtr           p, next;
2536   StgRetInfoTable *info;
2537
2538   p = tso -> sp;
2539   while (1) {
2540     info = get_ret_itbl((StgClosure *)p);
2541     next = p + stack_frame_sizeW((StgClosure *)p);
2542     switch (info->i.type) {
2543       
2544     case ATOMICALLY_FRAME:
2545         debugTrace(DEBUG_stm,
2546                    "found ATOMICALLY_FRAME at %p during retry", p);
2547         tso->sp = p;
2548         return ATOMICALLY_FRAME;
2549       
2550     case CATCH_RETRY_FRAME:
2551         debugTrace(DEBUG_stm,
2552                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2553         tso->sp = p;
2554         return CATCH_RETRY_FRAME;
2555       
2556     case CATCH_STM_FRAME: {
2557         StgTRecHeader *trec = tso -> trec;
2558         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2559         debugTrace(DEBUG_stm,
2560                    "found CATCH_STM_FRAME at %p during retry", p);
2561         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2562         stmAbortTransaction(tso -> cap, trec);
2563         stmFreeAbortedTRec(tso -> cap, trec);
2564         tso -> trec = outer;
2565         p = next; 
2566         continue;
2567     }
2568       
2569
2570     default:
2571       ASSERT(info->i.type != CATCH_FRAME);
2572       ASSERT(info->i.type != STOP_FRAME);
2573       p = next; 
2574       continue;
2575     }
2576   }
2577 }
2578
2579 /* -----------------------------------------------------------------------------
2580    resurrectThreads is called after garbage collection on the list of
2581    threads found to be garbage.  Each of these threads will be woken
2582    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2583    on an MVar, or NonTermination if the thread was blocked on a Black
2584    Hole.
2585
2586    Locks: assumes we hold *all* the capabilities.
2587    -------------------------------------------------------------------------- */
2588
2589 void
2590 resurrectThreads (StgTSO *threads)
2591 {
2592     StgTSO *tso, *next;
2593     Capability *cap;
2594     step *step;
2595
2596     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2597         next = tso->global_link;
2598
2599         step = Bdescr((P_)tso)->step;
2600         tso->global_link = step->threads;
2601         step->threads = tso;
2602
2603         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2604         
2605         // Wake up the thread on the Capability it was last on
2606         cap = tso->cap;
2607         
2608         switch (tso->why_blocked) {
2609         case BlockedOnMVar:
2610         case BlockedOnException:
2611             /* Called by GC - sched_mutex lock is currently held. */
2612             throwToSingleThreaded(cap, tso,
2613                                   (StgClosure *)blockedOnDeadMVar_closure);
2614             break;
2615         case BlockedOnBlackHole:
2616             throwToSingleThreaded(cap, tso,
2617                                   (StgClosure *)nonTermination_closure);
2618             break;
2619         case BlockedOnSTM:
2620             throwToSingleThreaded(cap, tso,
2621                                   (StgClosure *)blockedIndefinitely_closure);
2622             break;
2623         case NotBlocked:
2624             /* This might happen if the thread was blocked on a black hole
2625              * belonging to a thread that we've just woken up (raiseAsync
2626              * can wake up threads, remember...).
2627              */
2628             continue;
2629         default:
2630             barf("resurrectThreads: thread blocked in a strange way");
2631         }
2632     }
2633 }
2634
2635 /* -----------------------------------------------------------------------------
2636    performPendingThrowTos is called after garbage collection, and
2637    passed a list of threads that were found to have pending throwTos
2638    (tso->blocked_exceptions was not empty), and were blocked.
2639    Normally this doesn't happen, because we would deliver the
2640    exception directly if the target thread is blocked, but there are
2641    small windows where it might occur on a multiprocessor (see
2642    throwTo()).
2643
2644    NB. we must be holding all the capabilities at this point, just
2645    like resurrectThreads().
2646    -------------------------------------------------------------------------- */
2647
2648 void
2649 performPendingThrowTos (StgTSO *threads)
2650 {
2651     StgTSO *tso, *next;
2652     Capability *cap;
2653     step *step;
2654
2655     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2656         next = tso->global_link;
2657
2658         step = Bdescr((P_)tso)->step;
2659         tso->global_link = step->threads;
2660         step->threads = tso;
2661
2662         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2663         
2664         cap = tso->cap;
2665         maybePerformBlockedException(cap, tso);
2666     }
2667 }