499cf77e9bd8dff0e89121103c7c1d68451a18ee
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  *
96  * NB. must be StgWord, we do xchg() on it.
97  */
98 volatile StgWord recent_activity = ACTIVITY_YES;
99
100 /* if this flag is set as well, give up execution
101  * LOCK: none (changes monotonically)
102  */
103 volatile StgWord sched_state = SCHED_RUNNING;
104
105 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
106  *  exists - earlier gccs apparently didn't.
107  *  -= chak
108  */
109 StgTSO dummy_tso;
110
111 /*
112  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
113  * in an MT setting, needed to signal that a worker thread shouldn't hang around
114  * in the scheduler when it is out of work.
115  */
116 rtsBool shutting_down_scheduler = rtsFalse;
117
118 /*
119  * This mutex protects most of the global scheduler data in
120  * the THREADED_RTS runtime.
121  */
122 #if defined(THREADED_RTS)
123 Mutex sched_mutex;
124 #endif
125
126 #if !defined(mingw32_HOST_OS)
127 #define FORKPROCESS_PRIMOP_SUPPORTED
128 #endif
129
130 /* -----------------------------------------------------------------------------
131  * static function prototypes
132  * -------------------------------------------------------------------------- */
133
134 static Capability *schedule (Capability *initialCapability, Task *task);
135
136 //
137 // These function all encapsulate parts of the scheduler loop, and are
138 // abstracted only to make the structure and control flow of the
139 // scheduler clearer.
140 //
141 static void schedulePreLoop (void);
142 static void scheduleFindWork (Capability *cap);
143 #if defined(THREADED_RTS)
144 static void scheduleYield (Capability **pcap, Task *task);
145 #endif
146 static void scheduleStartSignalHandlers (Capability *cap);
147 static void scheduleCheckBlockedThreads (Capability *cap);
148 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
149 static void scheduleCheckBlackHoles (Capability *cap);
150 static void scheduleDetectDeadlock (Capability *cap, Task *task);
151 static void schedulePushWork(Capability *cap, Task *task);
152 #if defined(PARALLEL_HASKELL)
153 static rtsBool scheduleGetRemoteWork(Capability *cap);
154 static void scheduleSendPendingMessages(void);
155 #endif
156 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
157 static void scheduleActivateSpark(Capability *cap);
158 #endif
159 static void schedulePostRunThread(Capability *cap, StgTSO *t);
160 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
161 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
162                                          StgTSO *t);
163 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
164                                     nat prev_what_next );
165 static void scheduleHandleThreadBlocked( StgTSO *t );
166 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
167                                              StgTSO *t );
168 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
169 static Capability *scheduleDoGC(Capability *cap, Task *task,
170                                 rtsBool force_major);
171
172 static rtsBool checkBlackHoles(Capability *cap);
173
174 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
175 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
176
177 static void deleteThread (Capability *cap, StgTSO *tso);
178 static void deleteAllThreads (Capability *cap);
179
180 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
181 static void deleteThread_(Capability *cap, StgTSO *tso);
182 #endif
183
184 #ifdef DEBUG
185 static char *whatNext_strs[] = {
186   "(unknown)",
187   "ThreadRunGHC",
188   "ThreadInterpret",
189   "ThreadKilled",
190   "ThreadRelocated",
191   "ThreadComplete"
192 };
193 #endif
194
195 /* -----------------------------------------------------------------------------
196  * Putting a thread on the run queue: different scheduling policies
197  * -------------------------------------------------------------------------- */
198
199 STATIC_INLINE void
200 addToRunQueue( Capability *cap, StgTSO *t )
201 {
202 #if defined(PARALLEL_HASKELL)
203     if (RtsFlags.ParFlags.doFairScheduling) { 
204         // this does round-robin scheduling; good for concurrency
205         appendToRunQueue(cap,t);
206     } else {
207         // this does unfair scheduling; good for parallelism
208         pushOnRunQueue(cap,t);
209     }
210 #else
211     // this does round-robin scheduling; good for concurrency
212     appendToRunQueue(cap,t);
213 #endif
214 }
215
216 /* ---------------------------------------------------------------------------
217    Main scheduling loop.
218
219    We use round-robin scheduling, each thread returning to the
220    scheduler loop when one of these conditions is detected:
221
222       * out of heap space
223       * timer expires (thread yields)
224       * thread blocks
225       * thread ends
226       * stack overflow
227
228    GRAN version:
229      In a GranSim setup this loop iterates over the global event queue.
230      This revolves around the global event queue, which determines what 
231      to do next. Therefore, it's more complicated than either the 
232      concurrent or the parallel (GUM) setup.
233   This version has been entirely removed (JB 2008/08).
234
235    GUM version:
236      GUM iterates over incoming messages.
237      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
238      and sends out a fish whenever it has nothing to do; in-between
239      doing the actual reductions (shared code below) it processes the
240      incoming messages and deals with delayed operations 
241      (see PendingFetches).
242      This is not the ugliest code you could imagine, but it's bloody close.
243
244   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
245   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
246   as well as future GUM versions. This file has been refurbished to
247   only contain valid code, which is however incomplete, refers to
248   invalid includes etc.
249
250    ------------------------------------------------------------------------ */
251
252 static Capability *
253 schedule (Capability *initialCapability, Task *task)
254 {
255   StgTSO *t;
256   Capability *cap;
257   StgThreadReturnCode ret;
258 #if defined(PARALLEL_HASKELL)
259   rtsBool receivedFinish = rtsFalse;
260 #endif
261   nat prev_what_next;
262   rtsBool ready_to_gc;
263 #if defined(THREADED_RTS)
264   rtsBool first = rtsTrue;
265 #endif
266   
267   cap = initialCapability;
268
269   // Pre-condition: this task owns initialCapability.
270   // The sched_mutex is *NOT* held
271   // NB. on return, we still hold a capability.
272
273   debugTrace (DEBUG_sched, 
274               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
275               task, initialCapability);
276
277   schedulePreLoop();
278
279   // -----------------------------------------------------------
280   // Scheduler loop starts here:
281
282 #if defined(PARALLEL_HASKELL)
283 #define TERMINATION_CONDITION        (!receivedFinish)
284 #else
285 #define TERMINATION_CONDITION        rtsTrue
286 #endif
287
288   while (TERMINATION_CONDITION) {
289
290     // Check whether we have re-entered the RTS from Haskell without
291     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
292     // call).
293     if (cap->in_haskell) {
294           errorBelch("schedule: re-entered unsafely.\n"
295                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
296           stg_exit(EXIT_FAILURE);
297     }
298
299     // The interruption / shutdown sequence.
300     // 
301     // In order to cleanly shut down the runtime, we want to:
302     //   * make sure that all main threads return to their callers
303     //     with the state 'Interrupted'.
304     //   * clean up all OS threads assocated with the runtime
305     //   * free all memory etc.
306     //
307     // So the sequence for ^C goes like this:
308     //
309     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
310     //     arranges for some Capability to wake up
311     //
312     //   * all threads in the system are halted, and the zombies are
313     //     placed on the run queue for cleaning up.  We acquire all
314     //     the capabilities in order to delete the threads, this is
315     //     done by scheduleDoGC() for convenience (because GC already
316     //     needs to acquire all the capabilities).  We can't kill
317     //     threads involved in foreign calls.
318     // 
319     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
320     //
321     //   * sched_state := SCHED_SHUTTING_DOWN
322     //
323     //   * all workers exit when the run queue on their capability
324     //     drains.  All main threads will also exit when their TSO
325     //     reaches the head of the run queue and they can return.
326     //
327     //   * eventually all Capabilities will shut down, and the RTS can
328     //     exit.
329     //
330     //   * We might be left with threads blocked in foreign calls, 
331     //     we should really attempt to kill these somehow (TODO);
332     
333     switch (sched_state) {
334     case SCHED_RUNNING:
335         break;
336     case SCHED_INTERRUPTING:
337         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
338 #if defined(THREADED_RTS)
339         discardSparksCap(cap);
340 #endif
341         /* scheduleDoGC() deletes all the threads */
342         cap = scheduleDoGC(cap,task,rtsFalse);
343         break;
344     case SCHED_SHUTTING_DOWN:
345         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
346         // If we are a worker, just exit.  If we're a bound thread
347         // then we will exit below when we've removed our TSO from
348         // the run queue.
349         if (task->tso == NULL && emptyRunQueue(cap)) {
350             return cap;
351         }
352         break;
353     default:
354         barf("sched_state: %d", sched_state);
355     }
356
357     scheduleFindWork(cap);
358
359     /* work pushing, currently relevant only for THREADED_RTS:
360        (pushes threads, wakes up idle capabilities for stealing) */
361     schedulePushWork(cap,task);
362
363 #if defined(PARALLEL_HASKELL)
364     /* since we perform a blocking receive and continue otherwise,
365        either we never reach here or we definitely have work! */
366     // from here: non-empty run queue
367     ASSERT(!emptyRunQueue(cap));
368
369     if (PacketsWaiting()) {  /* now process incoming messages, if any
370                                 pending...  
371
372                                 CAUTION: scheduleGetRemoteWork called
373                                 above, waits for messages as well! */
374       processMessages(cap, &receivedFinish);
375     }
376 #endif // PARALLEL_HASKELL: non-empty run queue!
377
378     scheduleDetectDeadlock(cap,task);
379
380 #if defined(THREADED_RTS)
381     cap = task->cap;    // reload cap, it might have changed
382 #endif
383
384     // Normally, the only way we can get here with no threads to
385     // run is if a keyboard interrupt received during 
386     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
387     // Additionally, it is not fatal for the
388     // threaded RTS to reach here with no threads to run.
389     //
390     // win32: might be here due to awaitEvent() being abandoned
391     // as a result of a console event having been delivered.
392     
393 #if defined(THREADED_RTS)
394     if (first) 
395     {
396     // XXX: ToDo
397     //     // don't yield the first time, we want a chance to run this
398     //     // thread for a bit, even if there are others banging at the
399     //     // door.
400     //     first = rtsFalse;
401     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402     }
403
404   yield:
405     scheduleYield(&cap,task);
406     if (emptyRunQueue(cap)) continue; // look for work again
407 #endif
408
409 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
410     if ( emptyRunQueue(cap) ) {
411         ASSERT(sched_state >= SCHED_INTERRUPTING);
412     }
413 #endif
414
415     // 
416     // Get a thread to run
417     //
418     t = popRunQueue(cap);
419
420     // Sanity check the thread we're about to run.  This can be
421     // expensive if there is lots of thread switching going on...
422     IF_DEBUG(sanity,checkTSO(t));
423
424 #if defined(THREADED_RTS)
425     // Check whether we can run this thread in the current task.
426     // If not, we have to pass our capability to the right task.
427     {
428         Task *bound = t->bound;
429       
430         if (bound) {
431             if (bound == task) {
432                 debugTrace(DEBUG_sched,
433                            "### Running thread %lu in bound thread", (unsigned long)t->id);
434                 // yes, the Haskell thread is bound to the current native thread
435             } else {
436                 debugTrace(DEBUG_sched,
437                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
438                 // no, bound to a different Haskell thread: pass to that thread
439                 pushOnRunQueue(cap,t);
440                 continue;
441             }
442         } else {
443             // The thread we want to run is unbound.
444             if (task->tso) { 
445                 debugTrace(DEBUG_sched,
446                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
447                 // no, the current native thread is bound to a different
448                 // Haskell thread, so pass it to any worker thread
449                 pushOnRunQueue(cap,t);
450                 continue; 
451             }
452         }
453     }
454 #endif
455
456     /* context switches are initiated by the timer signal, unless
457      * the user specified "context switch as often as possible", with
458      * +RTS -C0
459      */
460     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
461         && !emptyThreadQueues(cap)) {
462         cap->context_switch = 1;
463     }
464          
465 run_thread:
466
467     // CurrentTSO is the thread to run.  t might be different if we
468     // loop back to run_thread, so make sure to set CurrentTSO after
469     // that.
470     cap->r.rCurrentTSO = t;
471
472     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
473                               (long)t->id, whatNext_strs[t->what_next]);
474
475     startHeapProfTimer();
476
477     // Check for exceptions blocked on this thread
478     maybePerformBlockedException (cap, t);
479
480     // ----------------------------------------------------------------------
481     // Run the current thread 
482
483     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
484     ASSERT(t->cap == cap);
485     ASSERT(t->bound ? t->bound->cap == cap : 1);
486
487     prev_what_next = t->what_next;
488
489     errno = t->saved_errno;
490 #if mingw32_HOST_OS
491     SetLastError(t->saved_winerror);
492 #endif
493
494     cap->in_haskell = rtsTrue;
495
496     dirty_TSO(cap,t);
497
498 #if defined(THREADED_RTS)
499     if (recent_activity == ACTIVITY_DONE_GC) {
500         // ACTIVITY_DONE_GC means we turned off the timer signal to
501         // conserve power (see #1623).  Re-enable it here.
502         nat prev;
503         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
504         if (prev == ACTIVITY_DONE_GC) {
505             startTimer();
506         }
507     } else {
508         recent_activity = ACTIVITY_YES;
509     }
510 #endif
511
512     switch (prev_what_next) {
513         
514     case ThreadKilled:
515     case ThreadComplete:
516         /* Thread already finished, return to scheduler. */
517         ret = ThreadFinished;
518         break;
519         
520     case ThreadRunGHC:
521     {
522         StgRegTable *r;
523         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
524         cap = regTableToCapability(r);
525         ret = r->rRet;
526         break;
527     }
528     
529     case ThreadInterpret:
530         cap = interpretBCO(cap);
531         ret = cap->r.rRet;
532         break;
533         
534     default:
535         barf("schedule: invalid what_next field");
536     }
537
538     cap->in_haskell = rtsFalse;
539
540     // The TSO might have moved, eg. if it re-entered the RTS and a GC
541     // happened.  So find the new location:
542     t = cap->r.rCurrentTSO;
543
544     // We have run some Haskell code: there might be blackhole-blocked
545     // threads to wake up now.
546     // Lock-free test here should be ok, we're just setting a flag.
547     if ( blackhole_queue != END_TSO_QUEUE ) {
548         blackholes_need_checking = rtsTrue;
549     }
550     
551     // And save the current errno in this thread.
552     // XXX: possibly bogus for SMP because this thread might already
553     // be running again, see code below.
554     t->saved_errno = errno;
555 #if mingw32_HOST_OS
556     // Similarly for Windows error code
557     t->saved_winerror = GetLastError();
558 #endif
559
560 #if defined(THREADED_RTS)
561     // If ret is ThreadBlocked, and this Task is bound to the TSO that
562     // blocked, we are in limbo - the TSO is now owned by whatever it
563     // is blocked on, and may in fact already have been woken up,
564     // perhaps even on a different Capability.  It may be the case
565     // that task->cap != cap.  We better yield this Capability
566     // immediately and return to normaility.
567     if (ret == ThreadBlocked) {
568         debugTrace(DEBUG_sched,
569                    "--<< thread %lu (%s) stopped: blocked",
570                    (unsigned long)t->id, whatNext_strs[t->what_next]);
571         goto yield;
572     }
573 #endif
574
575     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
576     ASSERT(t->cap == cap);
577
578     // ----------------------------------------------------------------------
579     
580     // Costs for the scheduler are assigned to CCS_SYSTEM
581     stopHeapProfTimer();
582 #if defined(PROFILING)
583     CCCS = CCS_SYSTEM;
584 #endif
585     
586     schedulePostRunThread(cap,t);
587
588     t = threadStackUnderflow(task,t);
589
590     ready_to_gc = rtsFalse;
591
592     switch (ret) {
593     case HeapOverflow:
594         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
595         break;
596
597     case StackOverflow:
598         scheduleHandleStackOverflow(cap,task,t);
599         break;
600
601     case ThreadYielding:
602         if (scheduleHandleYield(cap, t, prev_what_next)) {
603             // shortcut for switching between compiler/interpreter:
604             goto run_thread; 
605         }
606         break;
607
608     case ThreadBlocked:
609         scheduleHandleThreadBlocked(t);
610         break;
611
612     case ThreadFinished:
613         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
614         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
615         break;
616
617     default:
618       barf("schedule: invalid thread return code %d", (int)ret);
619     }
620
621     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
622       cap = scheduleDoGC(cap,task,rtsFalse);
623     }
624   } /* end of while() */
625 }
626
627 /* ----------------------------------------------------------------------------
628  * Setting up the scheduler loop
629  * ------------------------------------------------------------------------- */
630
631 static void
632 schedulePreLoop(void)
633 {
634   // initialisation for scheduler - what cannot go into initScheduler()  
635 }
636
637 /* -----------------------------------------------------------------------------
638  * scheduleFindWork()
639  *
640  * Search for work to do, and handle messages from elsewhere.
641  * -------------------------------------------------------------------------- */
642
643 static void
644 scheduleFindWork (Capability *cap)
645 {
646     scheduleStartSignalHandlers(cap);
647
648     // Only check the black holes here if we've nothing else to do.
649     // During normal execution, the black hole list only gets checked
650     // at GC time, to avoid repeatedly traversing this possibly long
651     // list each time around the scheduler.
652     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
653
654     scheduleCheckWakeupThreads(cap);
655
656     scheduleCheckBlockedThreads(cap);
657
658 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
659     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
660 #endif
661
662 #if defined(PARALLEL_HASKELL)
663     // if messages have been buffered...
664     scheduleSendPendingMessages();
665 #endif
666
667 #if defined(PARALLEL_HASKELL)
668     if (emptyRunQueue(cap)) {
669         receivedFinish = scheduleGetRemoteWork(cap);
670         continue; //  a new round, (hopefully) with new work
671         /* 
672            in GUM, this a) sends out a FISH and returns IF no fish is
673                            out already
674                         b) (blocking) awaits and receives messages
675            
676            in Eden, this is only the blocking receive, as b) in GUM.
677         */
678     }
679 #endif
680 }
681
682 #if defined(THREADED_RTS)
683 STATIC_INLINE rtsBool
684 shouldYieldCapability (Capability *cap, Task *task)
685 {
686     // we need to yield this capability to someone else if..
687     //   - another thread is initiating a GC
688     //   - another Task is returning from a foreign call
689     //   - the thread at the head of the run queue cannot be run
690     //     by this Task (it is bound to another Task, or it is unbound
691     //     and this task it bound).
692     return (waiting_for_gc || 
693             cap->returning_tasks_hd != NULL ||
694             (!emptyRunQueue(cap) && (task->tso == NULL
695                                      ? cap->run_queue_hd->bound != NULL
696                                      : cap->run_queue_hd->bound != task)));
697 }
698
699 // This is the single place where a Task goes to sleep.  There are
700 // two reasons it might need to sleep:
701 //    - there are no threads to run
702 //    - we need to yield this Capability to someone else 
703 //      (see shouldYieldCapability())
704 //
705 // Careful: the scheduler loop is quite delicate.  Make sure you run
706 // the tests in testsuite/concurrent (all ways) after modifying this,
707 // and also check the benchmarks in nofib/parallel for regressions.
708
709 static void
710 scheduleYield (Capability **pcap, Task *task)
711 {
712     Capability *cap = *pcap;
713
714     // if we have work, and we don't need to give up the Capability, continue.
715     if (!shouldYieldCapability(cap,task) && 
716         (!emptyRunQueue(cap) ||
717          blackholes_need_checking ||
718          sched_state >= SCHED_INTERRUPTING))
719         return;
720
721     // otherwise yield (sleep), and keep yielding if necessary.
722     do {
723         yieldCapability(&cap,task);
724     } 
725     while (shouldYieldCapability(cap,task));
726
727     // note there may still be no threads on the run queue at this
728     // point, the caller has to check.
729
730     *pcap = cap;
731     return;
732 }
733 #endif
734     
735 /* -----------------------------------------------------------------------------
736  * schedulePushWork()
737  *
738  * Push work to other Capabilities if we have some.
739  * -------------------------------------------------------------------------- */
740
741 static void
742 schedulePushWork(Capability *cap USED_IF_THREADS, 
743                  Task *task      USED_IF_THREADS)
744 {
745   /* following code not for PARALLEL_HASKELL. I kept the call general,
746      future GUM versions might use pushing in a distributed setup */
747 #if defined(THREADED_RTS)
748
749     Capability *free_caps[n_capabilities], *cap0;
750     nat i, n_free_caps;
751
752     // migration can be turned off with +RTS -qg
753     if (!RtsFlags.ParFlags.migrate) return;
754
755     // Check whether we have more threads on our run queue, or sparks
756     // in our pool, that we could hand to another Capability.
757     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
758         && sparkPoolSizeCap(cap) < 2) {
759         return;
760     }
761
762     // First grab as many free Capabilities as we can.
763     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
764         cap0 = &capabilities[i];
765         if (cap != cap0 && tryGrabCapability(cap0,task)) {
766             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
767                 // it already has some work, we just grabbed it at 
768                 // the wrong moment.  Or maybe it's deadlocked!
769                 releaseCapability(cap0);
770             } else {
771                 free_caps[n_free_caps++] = cap0;
772             }
773         }
774     }
775
776     // we now have n_free_caps free capabilities stashed in
777     // free_caps[].  Share our run queue equally with them.  This is
778     // probably the simplest thing we could do; improvements we might
779     // want to do include:
780     //
781     //   - giving high priority to moving relatively new threads, on 
782     //     the gournds that they haven't had time to build up a
783     //     working set in the cache on this CPU/Capability.
784     //
785     //   - giving low priority to moving long-lived threads
786
787     if (n_free_caps > 0) {
788         StgTSO *prev, *t, *next;
789         rtsBool pushed_to_all;
790
791         debugTrace(DEBUG_sched, 
792                    "cap %d: %s and %d free capabilities, sharing...", 
793                    cap->no, 
794                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
795                    "excess threads on run queue":"sparks to share (>=2)",
796                    n_free_caps);
797
798         i = 0;
799         pushed_to_all = rtsFalse;
800
801         if (cap->run_queue_hd != END_TSO_QUEUE) {
802             prev = cap->run_queue_hd;
803             t = prev->_link;
804             prev->_link = END_TSO_QUEUE;
805             for (; t != END_TSO_QUEUE; t = next) {
806                 next = t->_link;
807                 t->_link = END_TSO_QUEUE;
808                 if (t->what_next == ThreadRelocated
809                     || t->bound == task // don't move my bound thread
810                     || tsoLocked(t)) {  // don't move a locked thread
811                     setTSOLink(cap, prev, t);
812                     prev = t;
813                 } else if (i == n_free_caps) {
814                     pushed_to_all = rtsTrue;
815                     i = 0;
816                     // keep one for us
817                     setTSOLink(cap, prev, t);
818                     prev = t;
819                 } else {
820                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
821                     appendToRunQueue(free_caps[i],t);
822                     if (t->bound) { t->bound->cap = free_caps[i]; }
823                     t->cap = free_caps[i];
824                     i++;
825                 }
826             }
827             cap->run_queue_tl = prev;
828         }
829
830 #ifdef SPARK_PUSHING
831         /* JB I left this code in place, it would work but is not necessary */
832
833         // If there are some free capabilities that we didn't push any
834         // threads to, then try to push a spark to each one.
835         if (!pushed_to_all) {
836             StgClosure *spark;
837             // i is the next free capability to push to
838             for (; i < n_free_caps; i++) {
839                 if (emptySparkPoolCap(free_caps[i])) {
840                     spark = tryStealSpark(cap->sparks);
841                     if (spark != NULL) {
842                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
843                         newSpark(&(free_caps[i]->r), spark);
844                     }
845                 }
846             }
847         }
848 #endif /* SPARK_PUSHING */
849
850         // release the capabilities
851         for (i = 0; i < n_free_caps; i++) {
852             task->cap = free_caps[i];
853             releaseAndWakeupCapability(free_caps[i]);
854         }
855     }
856     task->cap = cap; // reset to point to our Capability.
857
858 #endif /* THREADED_RTS */
859
860 }
861
862 /* ----------------------------------------------------------------------------
863  * Start any pending signal handlers
864  * ------------------------------------------------------------------------- */
865
866 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
867 static void
868 scheduleStartSignalHandlers(Capability *cap)
869 {
870     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
871         // safe outside the lock
872         startSignalHandlers(cap);
873     }
874 }
875 #else
876 static void
877 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
878 {
879 }
880 #endif
881
882 /* ----------------------------------------------------------------------------
883  * Check for blocked threads that can be woken up.
884  * ------------------------------------------------------------------------- */
885
886 static void
887 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
888 {
889 #if !defined(THREADED_RTS)
890     //
891     // Check whether any waiting threads need to be woken up.  If the
892     // run queue is empty, and there are no other tasks running, we
893     // can wait indefinitely for something to happen.
894     //
895     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
896     {
897         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
898     }
899 #endif
900 }
901
902
903 /* ----------------------------------------------------------------------------
904  * Check for threads woken up by other Capabilities
905  * ------------------------------------------------------------------------- */
906
907 static void
908 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
909 {
910 #if defined(THREADED_RTS)
911     // Any threads that were woken up by other Capabilities get
912     // appended to our run queue.
913     if (!emptyWakeupQueue(cap)) {
914         ACQUIRE_LOCK(&cap->lock);
915         if (emptyRunQueue(cap)) {
916             cap->run_queue_hd = cap->wakeup_queue_hd;
917             cap->run_queue_tl = cap->wakeup_queue_tl;
918         } else {
919             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
920             cap->run_queue_tl = cap->wakeup_queue_tl;
921         }
922         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
923         RELEASE_LOCK(&cap->lock);
924     }
925 #endif
926 }
927
928 /* ----------------------------------------------------------------------------
929  * Check for threads blocked on BLACKHOLEs that can be woken up
930  * ------------------------------------------------------------------------- */
931 static void
932 scheduleCheckBlackHoles (Capability *cap)
933 {
934     if ( blackholes_need_checking ) // check without the lock first
935     {
936         ACQUIRE_LOCK(&sched_mutex);
937         if ( blackholes_need_checking ) {
938             blackholes_need_checking = rtsFalse;
939             // important that we reset the flag *before* checking the
940             // blackhole queue, otherwise we could get deadlock.  This
941             // happens as follows: we wake up a thread that
942             // immediately runs on another Capability, blocks on a
943             // blackhole, and then we reset the blackholes_need_checking flag.
944             checkBlackHoles(cap);
945         }
946         RELEASE_LOCK(&sched_mutex);
947     }
948 }
949
950 /* ----------------------------------------------------------------------------
951  * Detect deadlock conditions and attempt to resolve them.
952  * ------------------------------------------------------------------------- */
953
954 static void
955 scheduleDetectDeadlock (Capability *cap, Task *task)
956 {
957
958 #if defined(PARALLEL_HASKELL)
959     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
960     return;
961 #endif
962
963     /* 
964      * Detect deadlock: when we have no threads to run, there are no
965      * threads blocked, waiting for I/O, or sleeping, and all the
966      * other tasks are waiting for work, we must have a deadlock of
967      * some description.
968      */
969     if ( emptyThreadQueues(cap) )
970     {
971 #if defined(THREADED_RTS)
972         /* 
973          * In the threaded RTS, we only check for deadlock if there
974          * has been no activity in a complete timeslice.  This means
975          * we won't eagerly start a full GC just because we don't have
976          * any threads to run currently.
977          */
978         if (recent_activity != ACTIVITY_INACTIVE) return;
979 #endif
980
981         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
982
983         // Garbage collection can release some new threads due to
984         // either (a) finalizers or (b) threads resurrected because
985         // they are unreachable and will therefore be sent an
986         // exception.  Any threads thus released will be immediately
987         // runnable.
988         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
989
990         recent_activity = ACTIVITY_DONE_GC;
991         // disable timer signals (see #1623)
992         stopTimer();
993         
994         if ( !emptyRunQueue(cap) ) return;
995
996 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
997         /* If we have user-installed signal handlers, then wait
998          * for signals to arrive rather then bombing out with a
999          * deadlock.
1000          */
1001         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1002             debugTrace(DEBUG_sched,
1003                        "still deadlocked, waiting for signals...");
1004
1005             awaitUserSignals();
1006
1007             if (signals_pending()) {
1008                 startSignalHandlers(cap);
1009             }
1010
1011             // either we have threads to run, or we were interrupted:
1012             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1013
1014             return;
1015         }
1016 #endif
1017
1018 #if !defined(THREADED_RTS)
1019         /* Probably a real deadlock.  Send the current main thread the
1020          * Deadlock exception.
1021          */
1022         if (task->tso) {
1023             switch (task->tso->why_blocked) {
1024             case BlockedOnSTM:
1025             case BlockedOnBlackHole:
1026             case BlockedOnException:
1027             case BlockedOnMVar:
1028                 throwToSingleThreaded(cap, task->tso, 
1029                                       (StgClosure *)nonTermination_closure);
1030                 return;
1031             default:
1032                 barf("deadlock: main thread blocked in a strange way");
1033             }
1034         }
1035         return;
1036 #endif
1037     }
1038 }
1039
1040
1041 /* ----------------------------------------------------------------------------
1042  * Send pending messages (PARALLEL_HASKELL only)
1043  * ------------------------------------------------------------------------- */
1044
1045 #if defined(PARALLEL_HASKELL)
1046 static void
1047 scheduleSendPendingMessages(void)
1048 {
1049
1050 # if defined(PAR) // global Mem.Mgmt., omit for now
1051     if (PendingFetches != END_BF_QUEUE) {
1052         processFetches();
1053     }
1054 # endif
1055     
1056     if (RtsFlags.ParFlags.BufferTime) {
1057         // if we use message buffering, we must send away all message
1058         // packets which have become too old...
1059         sendOldBuffers(); 
1060     }
1061 }
1062 #endif
1063
1064 /* ----------------------------------------------------------------------------
1065  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1066  * ------------------------------------------------------------------------- */
1067
1068 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1069 static void
1070 scheduleActivateSpark(Capability *cap)
1071 {
1072     if (anySparks())
1073     {
1074         createSparkThread(cap);
1075         debugTrace(DEBUG_sched, "creating a spark thread");
1076     }
1077 }
1078 #endif // PARALLEL_HASKELL || THREADED_RTS
1079
1080 /* ----------------------------------------------------------------------------
1081  * Get work from a remote node (PARALLEL_HASKELL only)
1082  * ------------------------------------------------------------------------- */
1083     
1084 #if defined(PARALLEL_HASKELL)
1085 static rtsBool /* return value used in PARALLEL_HASKELL only */
1086 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1087 {
1088 #if defined(PARALLEL_HASKELL)
1089   rtsBool receivedFinish = rtsFalse;
1090
1091   // idle() , i.e. send all buffers, wait for work
1092   if (RtsFlags.ParFlags.BufferTime) {
1093         IF_PAR_DEBUG(verbose, 
1094                 debugBelch("...send all pending data,"));
1095         {
1096           nat i;
1097           for (i=1; i<=nPEs; i++)
1098             sendImmediately(i); // send all messages away immediately
1099         }
1100   }
1101
1102   /* this would be the place for fishing in GUM... 
1103
1104      if (no-earlier-fish-around) 
1105           sendFish(choosePe());
1106    */
1107
1108   // Eden:just look for incoming messages (blocking receive)
1109   IF_PAR_DEBUG(verbose, 
1110                debugBelch("...wait for incoming messages...\n"));
1111   processMessages(cap, &receivedFinish); // blocking receive...
1112
1113
1114   return receivedFinish;
1115   // reenter scheduling look after having received something
1116
1117 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1118
1119   return rtsFalse; /* return value unused in THREADED_RTS */
1120
1121 #endif /* PARALLEL_HASKELL */
1122 }
1123 #endif // PARALLEL_HASKELL || THREADED_RTS
1124
1125 /* ----------------------------------------------------------------------------
1126  * After running a thread...
1127  * ------------------------------------------------------------------------- */
1128
1129 static void
1130 schedulePostRunThread (Capability *cap, StgTSO *t)
1131 {
1132     // We have to be able to catch transactions that are in an
1133     // infinite loop as a result of seeing an inconsistent view of
1134     // memory, e.g. 
1135     //
1136     //   atomically $ do
1137     //       [a,b] <- mapM readTVar [ta,tb]
1138     //       when (a == b) loop
1139     //
1140     // and a is never equal to b given a consistent view of memory.
1141     //
1142     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1143         if (!stmValidateNestOfTransactions (t -> trec)) {
1144             debugTrace(DEBUG_sched | DEBUG_stm,
1145                        "trec %p found wasting its time", t);
1146             
1147             // strip the stack back to the
1148             // ATOMICALLY_FRAME, aborting the (nested)
1149             // transaction, and saving the stack of any
1150             // partially-evaluated thunks on the heap.
1151             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1152             
1153             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1154         }
1155     }
1156
1157   /* some statistics gathering in the parallel case */
1158 }
1159
1160 /* -----------------------------------------------------------------------------
1161  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1162  * -------------------------------------------------------------------------- */
1163
1164 static rtsBool
1165 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1166 {
1167     // did the task ask for a large block?
1168     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1169         // if so, get one and push it on the front of the nursery.
1170         bdescr *bd;
1171         lnat blocks;
1172         
1173         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1174         
1175         debugTrace(DEBUG_sched,
1176                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1177                    (long)t->id, whatNext_strs[t->what_next], blocks);
1178     
1179         // don't do this if the nursery is (nearly) full, we'll GC first.
1180         if (cap->r.rCurrentNursery->link != NULL ||
1181             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1182                                                // if the nursery has only one block.
1183             
1184             ACQUIRE_SM_LOCK
1185             bd = allocGroup( blocks );
1186             RELEASE_SM_LOCK
1187             cap->r.rNursery->n_blocks += blocks;
1188             
1189             // link the new group into the list
1190             bd->link = cap->r.rCurrentNursery;
1191             bd->u.back = cap->r.rCurrentNursery->u.back;
1192             if (cap->r.rCurrentNursery->u.back != NULL) {
1193                 cap->r.rCurrentNursery->u.back->link = bd;
1194             } else {
1195 #if !defined(THREADED_RTS)
1196                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1197                        g0s0 == cap->r.rNursery);
1198 #endif
1199                 cap->r.rNursery->blocks = bd;
1200             }             
1201             cap->r.rCurrentNursery->u.back = bd;
1202             
1203             // initialise it as a nursery block.  We initialise the
1204             // step, gen_no, and flags field of *every* sub-block in
1205             // this large block, because this is easier than making
1206             // sure that we always find the block head of a large
1207             // block whenever we call Bdescr() (eg. evacuate() and
1208             // isAlive() in the GC would both have to do this, at
1209             // least).
1210             { 
1211                 bdescr *x;
1212                 for (x = bd; x < bd + blocks; x++) {
1213                     x->step = cap->r.rNursery;
1214                     x->gen_no = 0;
1215                     x->flags = 0;
1216                 }
1217             }
1218             
1219             // This assert can be a killer if the app is doing lots
1220             // of large block allocations.
1221             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1222             
1223             // now update the nursery to point to the new block
1224             cap->r.rCurrentNursery = bd;
1225             
1226             // we might be unlucky and have another thread get on the
1227             // run queue before us and steal the large block, but in that
1228             // case the thread will just end up requesting another large
1229             // block.
1230             pushOnRunQueue(cap,t);
1231             return rtsFalse;  /* not actually GC'ing */
1232         }
1233     }
1234     
1235     debugTrace(DEBUG_sched,
1236                "--<< thread %ld (%s) stopped: HeapOverflow",
1237                (long)t->id, whatNext_strs[t->what_next]);
1238
1239     if (cap->context_switch) {
1240         // Sometimes we miss a context switch, e.g. when calling
1241         // primitives in a tight loop, MAYBE_GC() doesn't check the
1242         // context switch flag, and we end up waiting for a GC.
1243         // See #1984, and concurrent/should_run/1984
1244         cap->context_switch = 0;
1245         addToRunQueue(cap,t);
1246     } else {
1247         pushOnRunQueue(cap,t);
1248     }
1249     return rtsTrue;
1250     /* actual GC is done at the end of the while loop in schedule() */
1251 }
1252
1253 /* -----------------------------------------------------------------------------
1254  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1255  * -------------------------------------------------------------------------- */
1256
1257 static void
1258 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1259 {
1260     debugTrace (DEBUG_sched,
1261                 "--<< thread %ld (%s) stopped, StackOverflow", 
1262                 (long)t->id, whatNext_strs[t->what_next]);
1263
1264     /* just adjust the stack for this thread, then pop it back
1265      * on the run queue.
1266      */
1267     { 
1268         /* enlarge the stack */
1269         StgTSO *new_t = threadStackOverflow(cap, t);
1270         
1271         /* The TSO attached to this Task may have moved, so update the
1272          * pointer to it.
1273          */
1274         if (task->tso == t) {
1275             task->tso = new_t;
1276         }
1277         pushOnRunQueue(cap,new_t);
1278     }
1279 }
1280
1281 /* -----------------------------------------------------------------------------
1282  * Handle a thread that returned to the scheduler with ThreadYielding
1283  * -------------------------------------------------------------------------- */
1284
1285 static rtsBool
1286 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1287 {
1288     // Reset the context switch flag.  We don't do this just before
1289     // running the thread, because that would mean we would lose ticks
1290     // during GC, which can lead to unfair scheduling (a thread hogs
1291     // the CPU because the tick always arrives during GC).  This way
1292     // penalises threads that do a lot of allocation, but that seems
1293     // better than the alternative.
1294     cap->context_switch = 0;
1295     
1296     /* put the thread back on the run queue.  Then, if we're ready to
1297      * GC, check whether this is the last task to stop.  If so, wake
1298      * up the GC thread.  getThread will block during a GC until the
1299      * GC is finished.
1300      */
1301 #ifdef DEBUG
1302     if (t->what_next != prev_what_next) {
1303         debugTrace(DEBUG_sched,
1304                    "--<< thread %ld (%s) stopped to switch evaluators", 
1305                    (long)t->id, whatNext_strs[t->what_next]);
1306     } else {
1307         debugTrace(DEBUG_sched,
1308                    "--<< thread %ld (%s) stopped, yielding",
1309                    (long)t->id, whatNext_strs[t->what_next]);
1310     }
1311 #endif
1312     
1313     IF_DEBUG(sanity,
1314              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1315              checkTSO(t));
1316     ASSERT(t->_link == END_TSO_QUEUE);
1317     
1318     // Shortcut if we're just switching evaluators: don't bother
1319     // doing stack squeezing (which can be expensive), just run the
1320     // thread.
1321     if (t->what_next != prev_what_next) {
1322         return rtsTrue;
1323     }
1324
1325     addToRunQueue(cap,t);
1326
1327     return rtsFalse;
1328 }
1329
1330 /* -----------------------------------------------------------------------------
1331  * Handle a thread that returned to the scheduler with ThreadBlocked
1332  * -------------------------------------------------------------------------- */
1333
1334 static void
1335 scheduleHandleThreadBlocked( StgTSO *t
1336 #if !defined(GRAN) && !defined(DEBUG)
1337     STG_UNUSED
1338 #endif
1339     )
1340 {
1341
1342       // We don't need to do anything.  The thread is blocked, and it
1343       // has tidied up its stack and placed itself on whatever queue
1344       // it needs to be on.
1345
1346     // ASSERT(t->why_blocked != NotBlocked);
1347     // Not true: for example,
1348     //    - in THREADED_RTS, the thread may already have been woken
1349     //      up by another Capability.  This actually happens: try
1350     //      conc023 +RTS -N2.
1351     //    - the thread may have woken itself up already, because
1352     //      threadPaused() might have raised a blocked throwTo
1353     //      exception, see maybePerformBlockedException().
1354
1355 #ifdef DEBUG
1356     if (traceClass(DEBUG_sched)) {
1357         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1358                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1359         printThreadBlockage(t);
1360         debugTraceEnd();
1361     }
1362 #endif
1363 }
1364
1365 /* -----------------------------------------------------------------------------
1366  * Handle a thread that returned to the scheduler with ThreadFinished
1367  * -------------------------------------------------------------------------- */
1368
1369 static rtsBool
1370 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1371 {
1372     /* Need to check whether this was a main thread, and if so,
1373      * return with the return value.
1374      *
1375      * We also end up here if the thread kills itself with an
1376      * uncaught exception, see Exception.cmm.
1377      */
1378     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1379                (unsigned long)t->id, whatNext_strs[t->what_next]);
1380
1381       //
1382       // Check whether the thread that just completed was a bound
1383       // thread, and if so return with the result.  
1384       //
1385       // There is an assumption here that all thread completion goes
1386       // through this point; we need to make sure that if a thread
1387       // ends up in the ThreadKilled state, that it stays on the run
1388       // queue so it can be dealt with here.
1389       //
1390
1391       if (t->bound) {
1392
1393           if (t->bound != task) {
1394 #if !defined(THREADED_RTS)
1395               // Must be a bound thread that is not the topmost one.  Leave
1396               // it on the run queue until the stack has unwound to the
1397               // point where we can deal with this.  Leaving it on the run
1398               // queue also ensures that the garbage collector knows about
1399               // this thread and its return value (it gets dropped from the
1400               // step->threads list so there's no other way to find it).
1401               appendToRunQueue(cap,t);
1402               return rtsFalse;
1403 #else
1404               // this cannot happen in the threaded RTS, because a
1405               // bound thread can only be run by the appropriate Task.
1406               barf("finished bound thread that isn't mine");
1407 #endif
1408           }
1409
1410           ASSERT(task->tso == t);
1411
1412           if (t->what_next == ThreadComplete) {
1413               if (task->ret) {
1414                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1415                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1416               }
1417               task->stat = Success;
1418           } else {
1419               if (task->ret) {
1420                   *(task->ret) = NULL;
1421               }
1422               if (sched_state >= SCHED_INTERRUPTING) {
1423                   task->stat = Interrupted;
1424               } else {
1425                   task->stat = Killed;
1426               }
1427           }
1428 #ifdef DEBUG
1429           removeThreadLabel((StgWord)task->tso->id);
1430 #endif
1431           return rtsTrue; // tells schedule() to return
1432       }
1433
1434       return rtsFalse;
1435 }
1436
1437 /* -----------------------------------------------------------------------------
1438  * Perform a heap census
1439  * -------------------------------------------------------------------------- */
1440
1441 static rtsBool
1442 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1443 {
1444     // When we have +RTS -i0 and we're heap profiling, do a census at
1445     // every GC.  This lets us get repeatable runs for debugging.
1446     if (performHeapProfile ||
1447         (RtsFlags.ProfFlags.profileInterval==0 &&
1448          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1449         return rtsTrue;
1450     } else {
1451         return rtsFalse;
1452     }
1453 }
1454
1455 /* -----------------------------------------------------------------------------
1456  * Perform a garbage collection if necessary
1457  * -------------------------------------------------------------------------- */
1458
1459 static Capability *
1460 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1461 {
1462     rtsBool heap_census;
1463 #ifdef THREADED_RTS
1464     /* extern static volatile StgWord waiting_for_gc; 
1465        lives inside capability.c */
1466     rtsBool was_waiting;
1467     nat i;
1468 #endif
1469
1470 #ifdef THREADED_RTS
1471     // In order to GC, there must be no threads running Haskell code.
1472     // Therefore, the GC thread needs to hold *all* the capabilities,
1473     // and release them after the GC has completed.  
1474     //
1475     // This seems to be the simplest way: previous attempts involved
1476     // making all the threads with capabilities give up their
1477     // capabilities and sleep except for the *last* one, which
1478     // actually did the GC.  But it's quite hard to arrange for all
1479     // the other tasks to sleep and stay asleep.
1480     //
1481         
1482     /*  Other capabilities are prevented from running yet more Haskell
1483         threads if waiting_for_gc is set. Tested inside
1484         yieldCapability() and releaseCapability() in Capability.c */
1485
1486     was_waiting = cas(&waiting_for_gc, 0, 1);
1487     if (was_waiting) {
1488         do {
1489             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1490             if (cap) yieldCapability(&cap,task);
1491         } while (waiting_for_gc);
1492         return cap;  // NOTE: task->cap might have changed here
1493     }
1494
1495     setContextSwitches();
1496     for (i=0; i < n_capabilities; i++) {
1497         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1498         if (cap != &capabilities[i]) {
1499             Capability *pcap = &capabilities[i];
1500             // we better hope this task doesn't get migrated to
1501             // another Capability while we're waiting for this one.
1502             // It won't, because load balancing happens while we have
1503             // all the Capabilities, but even so it's a slightly
1504             // unsavoury invariant.
1505             task->cap = pcap;
1506             waitForReturnCapability(&pcap, task);
1507             if (pcap != &capabilities[i]) {
1508                 barf("scheduleDoGC: got the wrong capability");
1509             }
1510         }
1511     }
1512
1513     waiting_for_gc = rtsFalse;
1514 #endif
1515
1516     // so this happens periodically:
1517     if (cap) scheduleCheckBlackHoles(cap);
1518     
1519     IF_DEBUG(scheduler, printAllThreads());
1520
1521     /*
1522      * We now have all the capabilities; if we're in an interrupting
1523      * state, then we should take the opportunity to delete all the
1524      * threads in the system.
1525      */
1526     if (sched_state >= SCHED_INTERRUPTING) {
1527         deleteAllThreads(&capabilities[0]);
1528         sched_state = SCHED_SHUTTING_DOWN;
1529     }
1530     
1531     heap_census = scheduleNeedHeapProfile(rtsTrue);
1532
1533     /* everybody back, start the GC.
1534      * Could do it in this thread, or signal a condition var
1535      * to do it in another thread.  Either way, we need to
1536      * broadcast on gc_pending_cond afterward.
1537      */
1538 #if defined(THREADED_RTS)
1539     debugTrace(DEBUG_sched, "doing GC");
1540 #endif
1541     GarbageCollect(force_major || heap_census);
1542     
1543     if (heap_census) {
1544         debugTrace(DEBUG_sched, "performing heap census");
1545         heapCensus();
1546         performHeapProfile = rtsFalse;
1547     }
1548
1549 #ifdef SPARKBALANCE
1550     /* JB 
1551        Once we are all together... this would be the place to balance all
1552        spark pools. No concurrent stealing or adding of new sparks can
1553        occur. Should be defined in Sparks.c. */
1554     balanceSparkPoolsCaps(n_capabilities, capabilities);
1555 #endif
1556
1557 #if defined(THREADED_RTS)
1558     // release our stash of capabilities.
1559     for (i = 0; i < n_capabilities; i++) {
1560         if (cap != &capabilities[i]) {
1561             task->cap = &capabilities[i];
1562             releaseCapability(&capabilities[i]);
1563         }
1564     }
1565     if (cap) {
1566         task->cap = cap;
1567     } else {
1568         task->cap = NULL;
1569     }
1570 #endif
1571
1572     return cap;
1573 }
1574
1575 /* ---------------------------------------------------------------------------
1576  * Singleton fork(). Do not copy any running threads.
1577  * ------------------------------------------------------------------------- */
1578
1579 pid_t
1580 forkProcess(HsStablePtr *entry
1581 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1582             STG_UNUSED
1583 #endif
1584            )
1585 {
1586 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1587     Task *task;
1588     pid_t pid;
1589     StgTSO* t,*next;
1590     Capability *cap;
1591     nat s;
1592     
1593 #if defined(THREADED_RTS)
1594     if (RtsFlags.ParFlags.nNodes > 1) {
1595         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1596         stg_exit(EXIT_FAILURE);
1597     }
1598 #endif
1599
1600     debugTrace(DEBUG_sched, "forking!");
1601     
1602     // ToDo: for SMP, we should probably acquire *all* the capabilities
1603     cap = rts_lock();
1604     
1605     // no funny business: hold locks while we fork, otherwise if some
1606     // other thread is holding a lock when the fork happens, the data
1607     // structure protected by the lock will forever be in an
1608     // inconsistent state in the child.  See also #1391.
1609     ACQUIRE_LOCK(&sched_mutex);
1610     ACQUIRE_LOCK(&cap->lock);
1611     ACQUIRE_LOCK(&cap->running_task->lock);
1612
1613     pid = fork();
1614     
1615     if (pid) { // parent
1616         
1617         RELEASE_LOCK(&sched_mutex);
1618         RELEASE_LOCK(&cap->lock);
1619         RELEASE_LOCK(&cap->running_task->lock);
1620
1621         // just return the pid
1622         rts_unlock(cap);
1623         return pid;
1624         
1625     } else { // child
1626         
1627 #if defined(THREADED_RTS)
1628         initMutex(&sched_mutex);
1629         initMutex(&cap->lock);
1630         initMutex(&cap->running_task->lock);
1631 #endif
1632
1633         // Now, all OS threads except the thread that forked are
1634         // stopped.  We need to stop all Haskell threads, including
1635         // those involved in foreign calls.  Also we need to delete
1636         // all Tasks, because they correspond to OS threads that are
1637         // now gone.
1638
1639         for (s = 0; s < total_steps; s++) {
1640           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1641             if (t->what_next == ThreadRelocated) {
1642                 next = t->_link;
1643             } else {
1644                 next = t->global_link;
1645                 // don't allow threads to catch the ThreadKilled
1646                 // exception, but we do want to raiseAsync() because these
1647                 // threads may be evaluating thunks that we need later.
1648                 deleteThread_(cap,t);
1649             }
1650           }
1651         }
1652         
1653         // Empty the run queue.  It seems tempting to let all the
1654         // killed threads stay on the run queue as zombies to be
1655         // cleaned up later, but some of them correspond to bound
1656         // threads for which the corresponding Task does not exist.
1657         cap->run_queue_hd = END_TSO_QUEUE;
1658         cap->run_queue_tl = END_TSO_QUEUE;
1659
1660         // Any suspended C-calling Tasks are no more, their OS threads
1661         // don't exist now:
1662         cap->suspended_ccalling_tasks = NULL;
1663
1664         // Empty the threads lists.  Otherwise, the garbage
1665         // collector may attempt to resurrect some of these threads.
1666         for (s = 0; s < total_steps; s++) {
1667             all_steps[s].threads = END_TSO_QUEUE;
1668         }
1669
1670         // Wipe the task list, except the current Task.
1671         ACQUIRE_LOCK(&sched_mutex);
1672         for (task = all_tasks; task != NULL; task=task->all_link) {
1673             if (task != cap->running_task) {
1674 #if defined(THREADED_RTS)
1675                 initMutex(&task->lock); // see #1391
1676 #endif
1677                 discardTask(task);
1678             }
1679         }
1680         RELEASE_LOCK(&sched_mutex);
1681
1682 #if defined(THREADED_RTS)
1683         // Wipe our spare workers list, they no longer exist.  New
1684         // workers will be created if necessary.
1685         cap->spare_workers = NULL;
1686         cap->returning_tasks_hd = NULL;
1687         cap->returning_tasks_tl = NULL;
1688 #endif
1689
1690         // On Unix, all timers are reset in the child, so we need to start
1691         // the timer again.
1692         initTimer();
1693         startTimer();
1694
1695         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1696         rts_checkSchedStatus("forkProcess",cap);
1697         
1698         rts_unlock(cap);
1699         hs_exit();                      // clean up and exit
1700         stg_exit(EXIT_SUCCESS);
1701     }
1702 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1703     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1704     return -1;
1705 #endif
1706 }
1707
1708 /* ---------------------------------------------------------------------------
1709  * Delete all the threads in the system
1710  * ------------------------------------------------------------------------- */
1711    
1712 static void
1713 deleteAllThreads ( Capability *cap )
1714 {
1715     // NOTE: only safe to call if we own all capabilities.
1716
1717     StgTSO* t, *next;
1718     nat s;
1719
1720     debugTrace(DEBUG_sched,"deleting all threads");
1721     for (s = 0; s < total_steps; s++) {
1722       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1723         if (t->what_next == ThreadRelocated) {
1724             next = t->_link;
1725         } else {
1726             next = t->global_link;
1727             deleteThread(cap,t);
1728         }
1729       }
1730     }      
1731
1732     // The run queue now contains a bunch of ThreadKilled threads.  We
1733     // must not throw these away: the main thread(s) will be in there
1734     // somewhere, and the main scheduler loop has to deal with it.
1735     // Also, the run queue is the only thing keeping these threads from
1736     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1737
1738 #if !defined(THREADED_RTS)
1739     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1740     ASSERT(sleeping_queue == END_TSO_QUEUE);
1741 #endif
1742 }
1743
1744 /* -----------------------------------------------------------------------------
1745    Managing the suspended_ccalling_tasks list.
1746    Locks required: sched_mutex
1747    -------------------------------------------------------------------------- */
1748
1749 STATIC_INLINE void
1750 suspendTask (Capability *cap, Task *task)
1751 {
1752     ASSERT(task->next == NULL && task->prev == NULL);
1753     task->next = cap->suspended_ccalling_tasks;
1754     task->prev = NULL;
1755     if (cap->suspended_ccalling_tasks) {
1756         cap->suspended_ccalling_tasks->prev = task;
1757     }
1758     cap->suspended_ccalling_tasks = task;
1759 }
1760
1761 STATIC_INLINE void
1762 recoverSuspendedTask (Capability *cap, Task *task)
1763 {
1764     if (task->prev) {
1765         task->prev->next = task->next;
1766     } else {
1767         ASSERT(cap->suspended_ccalling_tasks == task);
1768         cap->suspended_ccalling_tasks = task->next;
1769     }
1770     if (task->next) {
1771         task->next->prev = task->prev;
1772     }
1773     task->next = task->prev = NULL;
1774 }
1775
1776 /* ---------------------------------------------------------------------------
1777  * Suspending & resuming Haskell threads.
1778  * 
1779  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1780  * its capability before calling the C function.  This allows another
1781  * task to pick up the capability and carry on running Haskell
1782  * threads.  It also means that if the C call blocks, it won't lock
1783  * the whole system.
1784  *
1785  * The Haskell thread making the C call is put to sleep for the
1786  * duration of the call, on the susepended_ccalling_threads queue.  We
1787  * give out a token to the task, which it can use to resume the thread
1788  * on return from the C function.
1789  * ------------------------------------------------------------------------- */
1790    
1791 void *
1792 suspendThread (StgRegTable *reg)
1793 {
1794   Capability *cap;
1795   int saved_errno;
1796   StgTSO *tso;
1797   Task *task;
1798 #if mingw32_HOST_OS
1799   StgWord32 saved_winerror;
1800 #endif
1801
1802   saved_errno = errno;
1803 #if mingw32_HOST_OS
1804   saved_winerror = GetLastError();
1805 #endif
1806
1807   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1808    */
1809   cap = regTableToCapability(reg);
1810
1811   task = cap->running_task;
1812   tso = cap->r.rCurrentTSO;
1813
1814   debugTrace(DEBUG_sched, 
1815              "thread %lu did a safe foreign call", 
1816              (unsigned long)cap->r.rCurrentTSO->id);
1817
1818   // XXX this might not be necessary --SDM
1819   tso->what_next = ThreadRunGHC;
1820
1821   threadPaused(cap,tso);
1822
1823   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1824       tso->why_blocked = BlockedOnCCall;
1825       tso->flags |= TSO_BLOCKEX;
1826       tso->flags &= ~TSO_INTERRUPTIBLE;
1827   } else {
1828       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1829   }
1830
1831   // Hand back capability
1832   task->suspended_tso = tso;
1833
1834   ACQUIRE_LOCK(&cap->lock);
1835
1836   suspendTask(cap,task);
1837   cap->in_haskell = rtsFalse;
1838   releaseCapability_(cap,rtsFalse);
1839   
1840   RELEASE_LOCK(&cap->lock);
1841
1842 #if defined(THREADED_RTS)
1843   /* Preparing to leave the RTS, so ensure there's a native thread/task
1844      waiting to take over.
1845   */
1846   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1847 #endif
1848
1849   errno = saved_errno;
1850 #if mingw32_HOST_OS
1851   SetLastError(saved_winerror);
1852 #endif
1853   return task;
1854 }
1855
1856 StgRegTable *
1857 resumeThread (void *task_)
1858 {
1859     StgTSO *tso;
1860     Capability *cap;
1861     Task *task = task_;
1862     int saved_errno;
1863 #if mingw32_HOST_OS
1864     StgWord32 saved_winerror;
1865 #endif
1866
1867     saved_errno = errno;
1868 #if mingw32_HOST_OS
1869     saved_winerror = GetLastError();
1870 #endif
1871
1872     cap = task->cap;
1873     // Wait for permission to re-enter the RTS with the result.
1874     waitForReturnCapability(&cap,task);
1875     // we might be on a different capability now... but if so, our
1876     // entry on the suspended_ccalling_tasks list will also have been
1877     // migrated.
1878
1879     // Remove the thread from the suspended list
1880     recoverSuspendedTask(cap,task);
1881
1882     tso = task->suspended_tso;
1883     task->suspended_tso = NULL;
1884     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1885     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1886     
1887     if (tso->why_blocked == BlockedOnCCall) {
1888         awakenBlockedExceptionQueue(cap,tso);
1889         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1890     }
1891     
1892     /* Reset blocking status */
1893     tso->why_blocked  = NotBlocked;
1894     
1895     cap->r.rCurrentTSO = tso;
1896     cap->in_haskell = rtsTrue;
1897     errno = saved_errno;
1898 #if mingw32_HOST_OS
1899     SetLastError(saved_winerror);
1900 #endif
1901
1902     /* We might have GC'd, mark the TSO dirty again */
1903     dirty_TSO(cap,tso);
1904
1905     IF_DEBUG(sanity, checkTSO(tso));
1906
1907     return &cap->r;
1908 }
1909
1910 /* ---------------------------------------------------------------------------
1911  * scheduleThread()
1912  *
1913  * scheduleThread puts a thread on the end  of the runnable queue.
1914  * This will usually be done immediately after a thread is created.
1915  * The caller of scheduleThread must create the thread using e.g.
1916  * createThread and push an appropriate closure
1917  * on this thread's stack before the scheduler is invoked.
1918  * ------------------------------------------------------------------------ */
1919
1920 void
1921 scheduleThread(Capability *cap, StgTSO *tso)
1922 {
1923     // The thread goes at the *end* of the run-queue, to avoid possible
1924     // starvation of any threads already on the queue.
1925     appendToRunQueue(cap,tso);
1926 }
1927
1928 void
1929 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1930 {
1931 #if defined(THREADED_RTS)
1932     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1933                               // move this thread from now on.
1934     cpu %= RtsFlags.ParFlags.nNodes;
1935     if (cpu == cap->no) {
1936         appendToRunQueue(cap,tso);
1937     } else {
1938         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1939     }
1940 #else
1941     appendToRunQueue(cap,tso);
1942 #endif
1943 }
1944
1945 Capability *
1946 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1947 {
1948     Task *task;
1949
1950     // We already created/initialised the Task
1951     task = cap->running_task;
1952
1953     // This TSO is now a bound thread; make the Task and TSO
1954     // point to each other.
1955     tso->bound = task;
1956     tso->cap = cap;
1957
1958     task->tso = tso;
1959     task->ret = ret;
1960     task->stat = NoStatus;
1961
1962     appendToRunQueue(cap,tso);
1963
1964     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1965
1966     cap = schedule(cap,task);
1967
1968     ASSERT(task->stat != NoStatus);
1969     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1970
1971     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1972     return cap;
1973 }
1974
1975 /* ----------------------------------------------------------------------------
1976  * Starting Tasks
1977  * ------------------------------------------------------------------------- */
1978
1979 #if defined(THREADED_RTS)
1980 void OSThreadProcAttr
1981 workerStart(Task *task)
1982 {
1983     Capability *cap;
1984
1985     // See startWorkerTask().
1986     ACQUIRE_LOCK(&task->lock);
1987     cap = task->cap;
1988     RELEASE_LOCK(&task->lock);
1989
1990     // set the thread-local pointer to the Task:
1991     taskEnter(task);
1992
1993     // schedule() runs without a lock.
1994     cap = schedule(cap,task);
1995
1996     // On exit from schedule(), we have a Capability.
1997     releaseCapability(cap);
1998     workerTaskStop(task);
1999 }
2000 #endif
2001
2002 /* ---------------------------------------------------------------------------
2003  * initScheduler()
2004  *
2005  * Initialise the scheduler.  This resets all the queues - if the
2006  * queues contained any threads, they'll be garbage collected at the
2007  * next pass.
2008  *
2009  * ------------------------------------------------------------------------ */
2010
2011 void 
2012 initScheduler(void)
2013 {
2014 #if !defined(THREADED_RTS)
2015   blocked_queue_hd  = END_TSO_QUEUE;
2016   blocked_queue_tl  = END_TSO_QUEUE;
2017   sleeping_queue    = END_TSO_QUEUE;
2018 #endif
2019
2020   blackhole_queue   = END_TSO_QUEUE;
2021
2022   sched_state    = SCHED_RUNNING;
2023   recent_activity = ACTIVITY_YES;
2024
2025 #if defined(THREADED_RTS)
2026   /* Initialise the mutex and condition variables used by
2027    * the scheduler. */
2028   initMutex(&sched_mutex);
2029 #endif
2030   
2031   ACQUIRE_LOCK(&sched_mutex);
2032
2033   /* A capability holds the state a native thread needs in
2034    * order to execute STG code. At least one capability is
2035    * floating around (only THREADED_RTS builds have more than one).
2036    */
2037   initCapabilities();
2038
2039   initTaskManager();
2040
2041 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2042   initSparkPools();
2043 #endif
2044
2045 #if defined(THREADED_RTS)
2046   /*
2047    * Eagerly start one worker to run each Capability, except for
2048    * Capability 0.  The idea is that we're probably going to start a
2049    * bound thread on Capability 0 pretty soon, so we don't want a
2050    * worker task hogging it.
2051    */
2052   { 
2053       nat i;
2054       Capability *cap;
2055       for (i = 1; i < n_capabilities; i++) {
2056           cap = &capabilities[i];
2057           ACQUIRE_LOCK(&cap->lock);
2058           startWorkerTask(cap, workerStart);
2059           RELEASE_LOCK(&cap->lock);
2060       }
2061   }
2062 #endif
2063
2064   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2065
2066   RELEASE_LOCK(&sched_mutex);
2067 }
2068
2069 void
2070 exitScheduler(
2071     rtsBool wait_foreign
2072 #if !defined(THREADED_RTS)
2073                          __attribute__((unused))
2074 #endif
2075 )
2076                /* see Capability.c, shutdownCapability() */
2077 {
2078     Task *task = NULL;
2079
2080 #if defined(THREADED_RTS)
2081     ACQUIRE_LOCK(&sched_mutex);
2082     task = newBoundTask();
2083     RELEASE_LOCK(&sched_mutex);
2084 #endif
2085
2086     // If we haven't killed all the threads yet, do it now.
2087     if (sched_state < SCHED_SHUTTING_DOWN) {
2088         sched_state = SCHED_INTERRUPTING;
2089         scheduleDoGC(NULL,task,rtsFalse);    
2090     }
2091     sched_state = SCHED_SHUTTING_DOWN;
2092
2093 #if defined(THREADED_RTS)
2094     { 
2095         nat i;
2096         
2097         for (i = 0; i < n_capabilities; i++) {
2098             shutdownCapability(&capabilities[i], task, wait_foreign);
2099         }
2100         boundTaskExiting(task);
2101         stopTaskManager();
2102     }
2103 #endif
2104 }
2105
2106 void
2107 freeScheduler( void )
2108 {
2109     freeCapabilities();
2110     freeTaskManager();
2111     if (n_capabilities != 1) {
2112         stgFree(capabilities);
2113     }
2114 #if defined(THREADED_RTS)
2115     closeMutex(&sched_mutex);
2116 #endif
2117 }
2118
2119 /* -----------------------------------------------------------------------------
2120    performGC
2121
2122    This is the interface to the garbage collector from Haskell land.
2123    We provide this so that external C code can allocate and garbage
2124    collect when called from Haskell via _ccall_GC.
2125    -------------------------------------------------------------------------- */
2126
2127 static void
2128 performGC_(rtsBool force_major)
2129 {
2130     Task *task;
2131     // We must grab a new Task here, because the existing Task may be
2132     // associated with a particular Capability, and chained onto the 
2133     // suspended_ccalling_tasks queue.
2134     ACQUIRE_LOCK(&sched_mutex);
2135     task = newBoundTask();
2136     RELEASE_LOCK(&sched_mutex);
2137     scheduleDoGC(NULL,task,force_major);
2138     boundTaskExiting(task);
2139 }
2140
2141 void
2142 performGC(void)
2143 {
2144     performGC_(rtsFalse);
2145 }
2146
2147 void
2148 performMajorGC(void)
2149 {
2150     performGC_(rtsTrue);
2151 }
2152
2153 /* -----------------------------------------------------------------------------
2154    Stack overflow
2155
2156    If the thread has reached its maximum stack size, then raise the
2157    StackOverflow exception in the offending thread.  Otherwise
2158    relocate the TSO into a larger chunk of memory and adjust its stack
2159    size appropriately.
2160    -------------------------------------------------------------------------- */
2161
2162 static StgTSO *
2163 threadStackOverflow(Capability *cap, StgTSO *tso)
2164 {
2165   nat new_stack_size, stack_words;
2166   lnat new_tso_size;
2167   StgPtr new_sp;
2168   StgTSO *dest;
2169
2170   IF_DEBUG(sanity,checkTSO(tso));
2171
2172   // don't allow throwTo() to modify the blocked_exceptions queue
2173   // while we are moving the TSO:
2174   lockClosure((StgClosure *)tso);
2175
2176   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2177       // NB. never raise a StackOverflow exception if the thread is
2178       // inside Control.Exceptino.block.  It is impractical to protect
2179       // against stack overflow exceptions, since virtually anything
2180       // can raise one (even 'catch'), so this is the only sensible
2181       // thing to do here.  See bug #767.
2182
2183       debugTrace(DEBUG_gc,
2184                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2185                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2186       IF_DEBUG(gc,
2187                /* If we're debugging, just print out the top of the stack */
2188                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2189                                                 tso->sp+64)));
2190
2191       // Send this thread the StackOverflow exception
2192       unlockTSO(tso);
2193       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2194       return tso;
2195   }
2196
2197   /* Try to double the current stack size.  If that takes us over the
2198    * maximum stack size for this thread, then use the maximum instead
2199    * (that is, unless we're already at or over the max size and we
2200    * can't raise the StackOverflow exception (see above), in which
2201    * case just double the size). Finally round up so the TSO ends up as
2202    * a whole number of blocks.
2203    */
2204   if (tso->stack_size >= tso->max_stack_size) {
2205       new_stack_size = tso->stack_size * 2;
2206   } else { 
2207       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2208   }
2209   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2210                                        TSO_STRUCT_SIZE)/sizeof(W_);
2211   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2212   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2213
2214   debugTrace(DEBUG_sched, 
2215              "increasing stack size from %ld words to %d.",
2216              (long)tso->stack_size, new_stack_size);
2217
2218   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2219   TICK_ALLOC_TSO(new_stack_size,0);
2220
2221   /* copy the TSO block and the old stack into the new area */
2222   memcpy(dest,tso,TSO_STRUCT_SIZE);
2223   stack_words = tso->stack + tso->stack_size - tso->sp;
2224   new_sp = (P_)dest + new_tso_size - stack_words;
2225   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2226
2227   /* relocate the stack pointers... */
2228   dest->sp         = new_sp;
2229   dest->stack_size = new_stack_size;
2230         
2231   /* Mark the old TSO as relocated.  We have to check for relocated
2232    * TSOs in the garbage collector and any primops that deal with TSOs.
2233    *
2234    * It's important to set the sp value to just beyond the end
2235    * of the stack, so we don't attempt to scavenge any part of the
2236    * dead TSO's stack.
2237    */
2238   tso->what_next = ThreadRelocated;
2239   setTSOLink(cap,tso,dest);
2240   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2241   tso->why_blocked = NotBlocked;
2242
2243   IF_PAR_DEBUG(verbose,
2244                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2245                      tso->id, tso, tso->stack_size);
2246                /* If we're debugging, just print out the top of the stack */
2247                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2248                                                 tso->sp+64)));
2249   
2250   unlockTSO(dest);
2251   unlockTSO(tso);
2252
2253   IF_DEBUG(sanity,checkTSO(dest));
2254 #if 0
2255   IF_DEBUG(scheduler,printTSO(dest));
2256 #endif
2257
2258   return dest;
2259 }
2260
2261 static StgTSO *
2262 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2263 {
2264     bdescr *bd, *new_bd;
2265     lnat free_w, tso_size_w;
2266     StgTSO *new_tso;
2267
2268     tso_size_w = tso_sizeW(tso);
2269
2270     if (tso_size_w < MBLOCK_SIZE_W || 
2271         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2272     {
2273         return tso;
2274     }
2275
2276     // don't allow throwTo() to modify the blocked_exceptions queue
2277     // while we are moving the TSO:
2278     lockClosure((StgClosure *)tso);
2279
2280     // this is the number of words we'll free
2281     free_w = round_to_mblocks(tso_size_w/2);
2282
2283     bd = Bdescr((StgPtr)tso);
2284     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2285     bd->free = bd->start + TSO_STRUCT_SIZEW;
2286
2287     new_tso = (StgTSO *)new_bd->start;
2288     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2289     new_tso->stack_size = new_bd->free - new_tso->stack;
2290
2291     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2292                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2293
2294     tso->what_next = ThreadRelocated;
2295     tso->_link = new_tso; // no write barrier reqd: same generation
2296
2297     // The TSO attached to this Task may have moved, so update the
2298     // pointer to it.
2299     if (task->tso == tso) {
2300         task->tso = new_tso;
2301     }
2302
2303     unlockTSO(new_tso);
2304     unlockTSO(tso);
2305
2306     IF_DEBUG(sanity,checkTSO(new_tso));
2307
2308     return new_tso;
2309 }
2310
2311 /* ---------------------------------------------------------------------------
2312    Interrupt execution
2313    - usually called inside a signal handler so it mustn't do anything fancy.   
2314    ------------------------------------------------------------------------ */
2315
2316 void
2317 interruptStgRts(void)
2318 {
2319     sched_state = SCHED_INTERRUPTING;
2320     setContextSwitches();
2321     wakeUpRts();
2322 }
2323
2324 /* -----------------------------------------------------------------------------
2325    Wake up the RTS
2326    
2327    This function causes at least one OS thread to wake up and run the
2328    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2329    an external event has arrived that may need servicing (eg. a
2330    keyboard interrupt).
2331
2332    In the single-threaded RTS we don't do anything here; we only have
2333    one thread anyway, and the event that caused us to want to wake up
2334    will have interrupted any blocking system call in progress anyway.
2335    -------------------------------------------------------------------------- */
2336
2337 void
2338 wakeUpRts(void)
2339 {
2340 #if defined(THREADED_RTS)
2341     // This forces the IO Manager thread to wakeup, which will
2342     // in turn ensure that some OS thread wakes up and runs the
2343     // scheduler loop, which will cause a GC and deadlock check.
2344     ioManagerWakeup();
2345 #endif
2346 }
2347
2348 /* -----------------------------------------------------------------------------
2349  * checkBlackHoles()
2350  *
2351  * Check the blackhole_queue for threads that can be woken up.  We do
2352  * this periodically: before every GC, and whenever the run queue is
2353  * empty.
2354  *
2355  * An elegant solution might be to just wake up all the blocked
2356  * threads with awakenBlockedQueue occasionally: they'll go back to
2357  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2358  * doesn't give us a way to tell whether we've actually managed to
2359  * wake up any threads, so we would be busy-waiting.
2360  *
2361  * -------------------------------------------------------------------------- */
2362
2363 static rtsBool
2364 checkBlackHoles (Capability *cap)
2365 {
2366     StgTSO **prev, *t;
2367     rtsBool any_woke_up = rtsFalse;
2368     StgHalfWord type;
2369
2370     // blackhole_queue is global:
2371     ASSERT_LOCK_HELD(&sched_mutex);
2372
2373     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2374
2375     // ASSUMES: sched_mutex
2376     prev = &blackhole_queue;
2377     t = blackhole_queue;
2378     while (t != END_TSO_QUEUE) {
2379         ASSERT(t->why_blocked == BlockedOnBlackHole);
2380         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2381         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2382             IF_DEBUG(sanity,checkTSO(t));
2383             t = unblockOne(cap, t);
2384             *prev = t;
2385             any_woke_up = rtsTrue;
2386         } else {
2387             prev = &t->_link;
2388             t = t->_link;
2389         }
2390     }
2391
2392     return any_woke_up;
2393 }
2394
2395 /* -----------------------------------------------------------------------------
2396    Deleting threads
2397
2398    This is used for interruption (^C) and forking, and corresponds to
2399    raising an exception but without letting the thread catch the
2400    exception.
2401    -------------------------------------------------------------------------- */
2402
2403 static void 
2404 deleteThread (Capability *cap, StgTSO *tso)
2405 {
2406     // NOTE: must only be called on a TSO that we have exclusive
2407     // access to, because we will call throwToSingleThreaded() below.
2408     // The TSO must be on the run queue of the Capability we own, or 
2409     // we must own all Capabilities.
2410
2411     if (tso->why_blocked != BlockedOnCCall &&
2412         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2413         throwToSingleThreaded(cap,tso,NULL);
2414     }
2415 }
2416
2417 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2418 static void 
2419 deleteThread_(Capability *cap, StgTSO *tso)
2420 { // for forkProcess only:
2421   // like deleteThread(), but we delete threads in foreign calls, too.
2422
2423     if (tso->why_blocked == BlockedOnCCall ||
2424         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2425         unblockOne(cap,tso);
2426         tso->what_next = ThreadKilled;
2427     } else {
2428         deleteThread(cap,tso);
2429     }
2430 }
2431 #endif
2432
2433 /* -----------------------------------------------------------------------------
2434    raiseExceptionHelper
2435    
2436    This function is called by the raise# primitve, just so that we can
2437    move some of the tricky bits of raising an exception from C-- into
2438    C.  Who knows, it might be a useful re-useable thing here too.
2439    -------------------------------------------------------------------------- */
2440
2441 StgWord
2442 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2443 {
2444     Capability *cap = regTableToCapability(reg);
2445     StgThunk *raise_closure = NULL;
2446     StgPtr p, next;
2447     StgRetInfoTable *info;
2448     //
2449     // This closure represents the expression 'raise# E' where E
2450     // is the exception raise.  It is used to overwrite all the
2451     // thunks which are currently under evaluataion.
2452     //
2453
2454     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2455     // LDV profiling: stg_raise_info has THUNK as its closure
2456     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2457     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2458     // 1 does not cause any problem unless profiling is performed.
2459     // However, when LDV profiling goes on, we need to linearly scan
2460     // small object pool, where raise_closure is stored, so we should
2461     // use MIN_UPD_SIZE.
2462     //
2463     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2464     //                                 sizeofW(StgClosure)+1);
2465     //
2466
2467     //
2468     // Walk up the stack, looking for the catch frame.  On the way,
2469     // we update any closures pointed to from update frames with the
2470     // raise closure that we just built.
2471     //
2472     p = tso->sp;
2473     while(1) {
2474         info = get_ret_itbl((StgClosure *)p);
2475         next = p + stack_frame_sizeW((StgClosure *)p);
2476         switch (info->i.type) {
2477             
2478         case UPDATE_FRAME:
2479             // Only create raise_closure if we need to.
2480             if (raise_closure == NULL) {
2481                 raise_closure = 
2482                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2483                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2484                 raise_closure->payload[0] = exception;
2485             }
2486             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2487             p = next;
2488             continue;
2489
2490         case ATOMICALLY_FRAME:
2491             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2492             tso->sp = p;
2493             return ATOMICALLY_FRAME;
2494             
2495         case CATCH_FRAME:
2496             tso->sp = p;
2497             return CATCH_FRAME;
2498
2499         case CATCH_STM_FRAME:
2500             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2501             tso->sp = p;
2502             return CATCH_STM_FRAME;
2503             
2504         case STOP_FRAME:
2505             tso->sp = p;
2506             return STOP_FRAME;
2507
2508         case CATCH_RETRY_FRAME:
2509         default:
2510             p = next; 
2511             continue;
2512         }
2513     }
2514 }
2515
2516
2517 /* -----------------------------------------------------------------------------
2518    findRetryFrameHelper
2519
2520    This function is called by the retry# primitive.  It traverses the stack
2521    leaving tso->sp referring to the frame which should handle the retry.  
2522
2523    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2524    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2525
2526    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2527    create) because retries are not considered to be exceptions, despite the
2528    similar implementation.
2529
2530    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2531    not be created within memory transactions.
2532    -------------------------------------------------------------------------- */
2533
2534 StgWord
2535 findRetryFrameHelper (StgTSO *tso)
2536 {
2537   StgPtr           p, next;
2538   StgRetInfoTable *info;
2539
2540   p = tso -> sp;
2541   while (1) {
2542     info = get_ret_itbl((StgClosure *)p);
2543     next = p + stack_frame_sizeW((StgClosure *)p);
2544     switch (info->i.type) {
2545       
2546     case ATOMICALLY_FRAME:
2547         debugTrace(DEBUG_stm,
2548                    "found ATOMICALLY_FRAME at %p during retry", p);
2549         tso->sp = p;
2550         return ATOMICALLY_FRAME;
2551       
2552     case CATCH_RETRY_FRAME:
2553         debugTrace(DEBUG_stm,
2554                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2555         tso->sp = p;
2556         return CATCH_RETRY_FRAME;
2557       
2558     case CATCH_STM_FRAME: {
2559         StgTRecHeader *trec = tso -> trec;
2560         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2561         debugTrace(DEBUG_stm,
2562                    "found CATCH_STM_FRAME at %p during retry", p);
2563         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2564         stmAbortTransaction(tso -> cap, trec);
2565         stmFreeAbortedTRec(tso -> cap, trec);
2566         tso -> trec = outer;
2567         p = next; 
2568         continue;
2569     }
2570       
2571
2572     default:
2573       ASSERT(info->i.type != CATCH_FRAME);
2574       ASSERT(info->i.type != STOP_FRAME);
2575       p = next; 
2576       continue;
2577     }
2578   }
2579 }
2580
2581 /* -----------------------------------------------------------------------------
2582    resurrectThreads is called after garbage collection on the list of
2583    threads found to be garbage.  Each of these threads will be woken
2584    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2585    on an MVar, or NonTermination if the thread was blocked on a Black
2586    Hole.
2587
2588    Locks: assumes we hold *all* the capabilities.
2589    -------------------------------------------------------------------------- */
2590
2591 void
2592 resurrectThreads (StgTSO *threads)
2593 {
2594     StgTSO *tso, *next;
2595     Capability *cap;
2596     step *step;
2597
2598     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2599         next = tso->global_link;
2600
2601         step = Bdescr((P_)tso)->step;
2602         tso->global_link = step->threads;
2603         step->threads = tso;
2604
2605         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2606         
2607         // Wake up the thread on the Capability it was last on
2608         cap = tso->cap;
2609         
2610         switch (tso->why_blocked) {
2611         case BlockedOnMVar:
2612         case BlockedOnException:
2613             /* Called by GC - sched_mutex lock is currently held. */
2614             throwToSingleThreaded(cap, tso,
2615                                   (StgClosure *)blockedOnDeadMVar_closure);
2616             break;
2617         case BlockedOnBlackHole:
2618             throwToSingleThreaded(cap, tso,
2619                                   (StgClosure *)nonTermination_closure);
2620             break;
2621         case BlockedOnSTM:
2622             throwToSingleThreaded(cap, tso,
2623                                   (StgClosure *)blockedIndefinitely_closure);
2624             break;
2625         case NotBlocked:
2626             /* This might happen if the thread was blocked on a black hole
2627              * belonging to a thread that we've just woken up (raiseAsync
2628              * can wake up threads, remember...).
2629              */
2630             continue;
2631         default:
2632             barf("resurrectThreads: thread blocked in a strange way");
2633         }
2634     }
2635 }
2636
2637 /* -----------------------------------------------------------------------------
2638    performPendingThrowTos is called after garbage collection, and
2639    passed a list of threads that were found to have pending throwTos
2640    (tso->blocked_exceptions was not empty), and were blocked.
2641    Normally this doesn't happen, because we would deliver the
2642    exception directly if the target thread is blocked, but there are
2643    small windows where it might occur on a multiprocessor (see
2644    throwTo()).
2645
2646    NB. we must be holding all the capabilities at this point, just
2647    like resurrectThreads().
2648    -------------------------------------------------------------------------- */
2649
2650 void
2651 performPendingThrowTos (StgTSO *threads)
2652 {
2653     StgTSO *tso, *next;
2654     Capability *cap;
2655     step *step;
2656
2657     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2658         next = tso->global_link;
2659
2660         step = Bdescr((P_)tso)->step;
2661         tso->global_link = step->threads;
2662         step->threads = tso;
2663
2664         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2665         
2666         cap = tso->cap;
2667         maybePerformBlockedException(cap, tso);
2668     }
2669 }