Fix another subtle shutdown deadlock
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  *
96  * NB. must be StgWord, we do xchg() on it.
97  */
98 volatile StgWord recent_activity = ACTIVITY_YES;
99
100 /* if this flag is set as well, give up execution
101  * LOCK: none (changes monotonically)
102  */
103 volatile StgWord sched_state = SCHED_RUNNING;
104
105 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
106  *  exists - earlier gccs apparently didn't.
107  *  -= chak
108  */
109 StgTSO dummy_tso;
110
111 /*
112  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
113  * in an MT setting, needed to signal that a worker thread shouldn't hang around
114  * in the scheduler when it is out of work.
115  */
116 rtsBool shutting_down_scheduler = rtsFalse;
117
118 /*
119  * This mutex protects most of the global scheduler data in
120  * the THREADED_RTS runtime.
121  */
122 #if defined(THREADED_RTS)
123 Mutex sched_mutex;
124 #endif
125
126 #if !defined(mingw32_HOST_OS)
127 #define FORKPROCESS_PRIMOP_SUPPORTED
128 #endif
129
130 /* -----------------------------------------------------------------------------
131  * static function prototypes
132  * -------------------------------------------------------------------------- */
133
134 static Capability *schedule (Capability *initialCapability, Task *task);
135
136 //
137 // These function all encapsulate parts of the scheduler loop, and are
138 // abstracted only to make the structure and control flow of the
139 // scheduler clearer.
140 //
141 static void schedulePreLoop (void);
142 static void scheduleFindWork (Capability *cap);
143 #if defined(THREADED_RTS)
144 static void scheduleYield (Capability **pcap, Task *task);
145 #endif
146 static void scheduleStartSignalHandlers (Capability *cap);
147 static void scheduleCheckBlockedThreads (Capability *cap);
148 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
149 static void scheduleCheckBlackHoles (Capability *cap);
150 static void scheduleDetectDeadlock (Capability *cap, Task *task);
151 static void schedulePushWork(Capability *cap, Task *task);
152 #if defined(PARALLEL_HASKELL)
153 static rtsBool scheduleGetRemoteWork(Capability *cap);
154 static void scheduleSendPendingMessages(void);
155 #endif
156 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
157 static void scheduleActivateSpark(Capability *cap);
158 #endif
159 static void schedulePostRunThread(Capability *cap, StgTSO *t);
160 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
161 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
162                                          StgTSO *t);
163 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
164                                     nat prev_what_next );
165 static void scheduleHandleThreadBlocked( StgTSO *t );
166 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
167                                              StgTSO *t );
168 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
169 static Capability *scheduleDoGC(Capability *cap, Task *task,
170                                 rtsBool force_major);
171
172 static rtsBool checkBlackHoles(Capability *cap);
173
174 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
175 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
176
177 static void deleteThread (Capability *cap, StgTSO *tso);
178 static void deleteAllThreads (Capability *cap);
179
180 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
181 static void deleteThread_(Capability *cap, StgTSO *tso);
182 #endif
183
184 #ifdef DEBUG
185 static char *whatNext_strs[] = {
186   "(unknown)",
187   "ThreadRunGHC",
188   "ThreadInterpret",
189   "ThreadKilled",
190   "ThreadRelocated",
191   "ThreadComplete"
192 };
193 #endif
194
195 /* -----------------------------------------------------------------------------
196  * Putting a thread on the run queue: different scheduling policies
197  * -------------------------------------------------------------------------- */
198
199 STATIC_INLINE void
200 addToRunQueue( Capability *cap, StgTSO *t )
201 {
202 #if defined(PARALLEL_HASKELL)
203     if (RtsFlags.ParFlags.doFairScheduling) { 
204         // this does round-robin scheduling; good for concurrency
205         appendToRunQueue(cap,t);
206     } else {
207         // this does unfair scheduling; good for parallelism
208         pushOnRunQueue(cap,t);
209     }
210 #else
211     // this does round-robin scheduling; good for concurrency
212     appendToRunQueue(cap,t);
213 #endif
214 }
215
216 /* ---------------------------------------------------------------------------
217    Main scheduling loop.
218
219    We use round-robin scheduling, each thread returning to the
220    scheduler loop when one of these conditions is detected:
221
222       * out of heap space
223       * timer expires (thread yields)
224       * thread blocks
225       * thread ends
226       * stack overflow
227
228    GRAN version:
229      In a GranSim setup this loop iterates over the global event queue.
230      This revolves around the global event queue, which determines what 
231      to do next. Therefore, it's more complicated than either the 
232      concurrent or the parallel (GUM) setup.
233   This version has been entirely removed (JB 2008/08).
234
235    GUM version:
236      GUM iterates over incoming messages.
237      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
238      and sends out a fish whenever it has nothing to do; in-between
239      doing the actual reductions (shared code below) it processes the
240      incoming messages and deals with delayed operations 
241      (see PendingFetches).
242      This is not the ugliest code you could imagine, but it's bloody close.
243
244   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
245   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
246   as well as future GUM versions. This file has been refurbished to
247   only contain valid code, which is however incomplete, refers to
248   invalid includes etc.
249
250    ------------------------------------------------------------------------ */
251
252 static Capability *
253 schedule (Capability *initialCapability, Task *task)
254 {
255   StgTSO *t;
256   Capability *cap;
257   StgThreadReturnCode ret;
258 #if defined(PARALLEL_HASKELL)
259   rtsBool receivedFinish = rtsFalse;
260 #endif
261   nat prev_what_next;
262   rtsBool ready_to_gc;
263 #if defined(THREADED_RTS)
264   rtsBool first = rtsTrue;
265 #endif
266   
267   cap = initialCapability;
268
269   // Pre-condition: this task owns initialCapability.
270   // The sched_mutex is *NOT* held
271   // NB. on return, we still hold a capability.
272
273   debugTrace (DEBUG_sched, 
274               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
275               task, initialCapability);
276
277   schedulePreLoop();
278
279   // -----------------------------------------------------------
280   // Scheduler loop starts here:
281
282 #if defined(PARALLEL_HASKELL)
283 #define TERMINATION_CONDITION        (!receivedFinish)
284 #else
285 #define TERMINATION_CONDITION        rtsTrue
286 #endif
287
288   while (TERMINATION_CONDITION) {
289
290     // Check whether we have re-entered the RTS from Haskell without
291     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
292     // call).
293     if (cap->in_haskell) {
294           errorBelch("schedule: re-entered unsafely.\n"
295                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
296           stg_exit(EXIT_FAILURE);
297     }
298
299     // The interruption / shutdown sequence.
300     // 
301     // In order to cleanly shut down the runtime, we want to:
302     //   * make sure that all main threads return to their callers
303     //     with the state 'Interrupted'.
304     //   * clean up all OS threads assocated with the runtime
305     //   * free all memory etc.
306     //
307     // So the sequence for ^C goes like this:
308     //
309     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
310     //     arranges for some Capability to wake up
311     //
312     //   * all threads in the system are halted, and the zombies are
313     //     placed on the run queue for cleaning up.  We acquire all
314     //     the capabilities in order to delete the threads, this is
315     //     done by scheduleDoGC() for convenience (because GC already
316     //     needs to acquire all the capabilities).  We can't kill
317     //     threads involved in foreign calls.
318     // 
319     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
320     //
321     //   * sched_state := SCHED_SHUTTING_DOWN
322     //
323     //   * all workers exit when the run queue on their capability
324     //     drains.  All main threads will also exit when their TSO
325     //     reaches the head of the run queue and they can return.
326     //
327     //   * eventually all Capabilities will shut down, and the RTS can
328     //     exit.
329     //
330     //   * We might be left with threads blocked in foreign calls, 
331     //     we should really attempt to kill these somehow (TODO);
332     
333     switch (sched_state) {
334     case SCHED_RUNNING:
335         break;
336     case SCHED_INTERRUPTING:
337         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
338 #if defined(THREADED_RTS)
339         discardSparksCap(cap);
340 #endif
341         /* scheduleDoGC() deletes all the threads */
342         cap = scheduleDoGC(cap,task,rtsFalse);
343
344         // after scheduleDoGC(), we must be shutting down.  Either some
345         // other Capability did the final GC, or we did it above,
346         // either way we can fall through to the SCHED_SHUTTING_DOWN
347         // case now.
348         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
349         // fall through
350
351     case SCHED_SHUTTING_DOWN:
352         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
353         // If we are a worker, just exit.  If we're a bound thread
354         // then we will exit below when we've removed our TSO from
355         // the run queue.
356         if (task->tso == NULL && emptyRunQueue(cap)) {
357             return cap;
358         }
359         break;
360     default:
361         barf("sched_state: %d", sched_state);
362     }
363
364     scheduleFindWork(cap);
365
366     /* work pushing, currently relevant only for THREADED_RTS:
367        (pushes threads, wakes up idle capabilities for stealing) */
368     schedulePushWork(cap,task);
369
370 #if defined(PARALLEL_HASKELL)
371     /* since we perform a blocking receive and continue otherwise,
372        either we never reach here or we definitely have work! */
373     // from here: non-empty run queue
374     ASSERT(!emptyRunQueue(cap));
375
376     if (PacketsWaiting()) {  /* now process incoming messages, if any
377                                 pending...  
378
379                                 CAUTION: scheduleGetRemoteWork called
380                                 above, waits for messages as well! */
381       processMessages(cap, &receivedFinish);
382     }
383 #endif // PARALLEL_HASKELL: non-empty run queue!
384
385     scheduleDetectDeadlock(cap,task);
386
387 #if defined(THREADED_RTS)
388     cap = task->cap;    // reload cap, it might have changed
389 #endif
390
391     // Normally, the only way we can get here with no threads to
392     // run is if a keyboard interrupt received during 
393     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
394     // Additionally, it is not fatal for the
395     // threaded RTS to reach here with no threads to run.
396     //
397     // win32: might be here due to awaitEvent() being abandoned
398     // as a result of a console event having been delivered.
399     
400 #if defined(THREADED_RTS)
401     if (first) 
402     {
403     // XXX: ToDo
404     //     // don't yield the first time, we want a chance to run this
405     //     // thread for a bit, even if there are others banging at the
406     //     // door.
407     //     first = rtsFalse;
408     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
409     }
410
411   yield:
412     scheduleYield(&cap,task);
413     if (emptyRunQueue(cap)) continue; // look for work again
414 #endif
415
416 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
417     if ( emptyRunQueue(cap) ) {
418         ASSERT(sched_state >= SCHED_INTERRUPTING);
419     }
420 #endif
421
422     // 
423     // Get a thread to run
424     //
425     t = popRunQueue(cap);
426
427     // Sanity check the thread we're about to run.  This can be
428     // expensive if there is lots of thread switching going on...
429     IF_DEBUG(sanity,checkTSO(t));
430
431 #if defined(THREADED_RTS)
432     // Check whether we can run this thread in the current task.
433     // If not, we have to pass our capability to the right task.
434     {
435         Task *bound = t->bound;
436       
437         if (bound) {
438             if (bound == task) {
439                 debugTrace(DEBUG_sched,
440                            "### Running thread %lu in bound thread", (unsigned long)t->id);
441                 // yes, the Haskell thread is bound to the current native thread
442             } else {
443                 debugTrace(DEBUG_sched,
444                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
445                 // no, bound to a different Haskell thread: pass to that thread
446                 pushOnRunQueue(cap,t);
447                 continue;
448             }
449         } else {
450             // The thread we want to run is unbound.
451             if (task->tso) { 
452                 debugTrace(DEBUG_sched,
453                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
454                 // no, the current native thread is bound to a different
455                 // Haskell thread, so pass it to any worker thread
456                 pushOnRunQueue(cap,t);
457                 continue; 
458             }
459         }
460     }
461 #endif
462
463     /* context switches are initiated by the timer signal, unless
464      * the user specified "context switch as often as possible", with
465      * +RTS -C0
466      */
467     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
468         && !emptyThreadQueues(cap)) {
469         cap->context_switch = 1;
470     }
471          
472 run_thread:
473
474     // CurrentTSO is the thread to run.  t might be different if we
475     // loop back to run_thread, so make sure to set CurrentTSO after
476     // that.
477     cap->r.rCurrentTSO = t;
478
479     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
480                               (long)t->id, whatNext_strs[t->what_next]);
481
482     startHeapProfTimer();
483
484     // Check for exceptions blocked on this thread
485     maybePerformBlockedException (cap, t);
486
487     // ----------------------------------------------------------------------
488     // Run the current thread 
489
490     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
491     ASSERT(t->cap == cap);
492     ASSERT(t->bound ? t->bound->cap == cap : 1);
493
494     prev_what_next = t->what_next;
495
496     errno = t->saved_errno;
497 #if mingw32_HOST_OS
498     SetLastError(t->saved_winerror);
499 #endif
500
501     cap->in_haskell = rtsTrue;
502
503     dirty_TSO(cap,t);
504
505 #if defined(THREADED_RTS)
506     if (recent_activity == ACTIVITY_DONE_GC) {
507         // ACTIVITY_DONE_GC means we turned off the timer signal to
508         // conserve power (see #1623).  Re-enable it here.
509         nat prev;
510         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
511         if (prev == ACTIVITY_DONE_GC) {
512             startTimer();
513         }
514     } else {
515         recent_activity = ACTIVITY_YES;
516     }
517 #endif
518
519     switch (prev_what_next) {
520         
521     case ThreadKilled:
522     case ThreadComplete:
523         /* Thread already finished, return to scheduler. */
524         ret = ThreadFinished;
525         break;
526         
527     case ThreadRunGHC:
528     {
529         StgRegTable *r;
530         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
531         cap = regTableToCapability(r);
532         ret = r->rRet;
533         break;
534     }
535     
536     case ThreadInterpret:
537         cap = interpretBCO(cap);
538         ret = cap->r.rRet;
539         break;
540         
541     default:
542         barf("schedule: invalid what_next field");
543     }
544
545     cap->in_haskell = rtsFalse;
546
547     // The TSO might have moved, eg. if it re-entered the RTS and a GC
548     // happened.  So find the new location:
549     t = cap->r.rCurrentTSO;
550
551     // We have run some Haskell code: there might be blackhole-blocked
552     // threads to wake up now.
553     // Lock-free test here should be ok, we're just setting a flag.
554     if ( blackhole_queue != END_TSO_QUEUE ) {
555         blackholes_need_checking = rtsTrue;
556     }
557     
558     // And save the current errno in this thread.
559     // XXX: possibly bogus for SMP because this thread might already
560     // be running again, see code below.
561     t->saved_errno = errno;
562 #if mingw32_HOST_OS
563     // Similarly for Windows error code
564     t->saved_winerror = GetLastError();
565 #endif
566
567 #if defined(THREADED_RTS)
568     // If ret is ThreadBlocked, and this Task is bound to the TSO that
569     // blocked, we are in limbo - the TSO is now owned by whatever it
570     // is blocked on, and may in fact already have been woken up,
571     // perhaps even on a different Capability.  It may be the case
572     // that task->cap != cap.  We better yield this Capability
573     // immediately and return to normaility.
574     if (ret == ThreadBlocked) {
575         debugTrace(DEBUG_sched,
576                    "--<< thread %lu (%s) stopped: blocked",
577                    (unsigned long)t->id, whatNext_strs[t->what_next]);
578         goto yield;
579     }
580 #endif
581
582     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
583     ASSERT(t->cap == cap);
584
585     // ----------------------------------------------------------------------
586     
587     // Costs for the scheduler are assigned to CCS_SYSTEM
588     stopHeapProfTimer();
589 #if defined(PROFILING)
590     CCCS = CCS_SYSTEM;
591 #endif
592     
593     schedulePostRunThread(cap,t);
594
595     t = threadStackUnderflow(task,t);
596
597     ready_to_gc = rtsFalse;
598
599     switch (ret) {
600     case HeapOverflow:
601         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
602         break;
603
604     case StackOverflow:
605         scheduleHandleStackOverflow(cap,task,t);
606         break;
607
608     case ThreadYielding:
609         if (scheduleHandleYield(cap, t, prev_what_next)) {
610             // shortcut for switching between compiler/interpreter:
611             goto run_thread; 
612         }
613         break;
614
615     case ThreadBlocked:
616         scheduleHandleThreadBlocked(t);
617         break;
618
619     case ThreadFinished:
620         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
621         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
622         break;
623
624     default:
625       barf("schedule: invalid thread return code %d", (int)ret);
626     }
627
628     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
629       cap = scheduleDoGC(cap,task,rtsFalse);
630     }
631   } /* end of while() */
632 }
633
634 /* ----------------------------------------------------------------------------
635  * Setting up the scheduler loop
636  * ------------------------------------------------------------------------- */
637
638 static void
639 schedulePreLoop(void)
640 {
641   // initialisation for scheduler - what cannot go into initScheduler()  
642 }
643
644 /* -----------------------------------------------------------------------------
645  * scheduleFindWork()
646  *
647  * Search for work to do, and handle messages from elsewhere.
648  * -------------------------------------------------------------------------- */
649
650 static void
651 scheduleFindWork (Capability *cap)
652 {
653     scheduleStartSignalHandlers(cap);
654
655     // Only check the black holes here if we've nothing else to do.
656     // During normal execution, the black hole list only gets checked
657     // at GC time, to avoid repeatedly traversing this possibly long
658     // list each time around the scheduler.
659     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
660
661     scheduleCheckWakeupThreads(cap);
662
663     scheduleCheckBlockedThreads(cap);
664
665 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
666     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
667 #endif
668
669 #if defined(PARALLEL_HASKELL)
670     // if messages have been buffered...
671     scheduleSendPendingMessages();
672 #endif
673
674 #if defined(PARALLEL_HASKELL)
675     if (emptyRunQueue(cap)) {
676         receivedFinish = scheduleGetRemoteWork(cap);
677         continue; //  a new round, (hopefully) with new work
678         /* 
679            in GUM, this a) sends out a FISH and returns IF no fish is
680                            out already
681                         b) (blocking) awaits and receives messages
682            
683            in Eden, this is only the blocking receive, as b) in GUM.
684         */
685     }
686 #endif
687 }
688
689 #if defined(THREADED_RTS)
690 STATIC_INLINE rtsBool
691 shouldYieldCapability (Capability *cap, Task *task)
692 {
693     // we need to yield this capability to someone else if..
694     //   - another thread is initiating a GC
695     //   - another Task is returning from a foreign call
696     //   - the thread at the head of the run queue cannot be run
697     //     by this Task (it is bound to another Task, or it is unbound
698     //     and this task it bound).
699     return (waiting_for_gc || 
700             cap->returning_tasks_hd != NULL ||
701             (!emptyRunQueue(cap) && (task->tso == NULL
702                                      ? cap->run_queue_hd->bound != NULL
703                                      : cap->run_queue_hd->bound != task)));
704 }
705
706 // This is the single place where a Task goes to sleep.  There are
707 // two reasons it might need to sleep:
708 //    - there are no threads to run
709 //    - we need to yield this Capability to someone else 
710 //      (see shouldYieldCapability())
711 //
712 // Careful: the scheduler loop is quite delicate.  Make sure you run
713 // the tests in testsuite/concurrent (all ways) after modifying this,
714 // and also check the benchmarks in nofib/parallel for regressions.
715
716 static void
717 scheduleYield (Capability **pcap, Task *task)
718 {
719     Capability *cap = *pcap;
720
721     // if we have work, and we don't need to give up the Capability, continue.
722     if (!shouldYieldCapability(cap,task) && 
723         (!emptyRunQueue(cap) ||
724          blackholes_need_checking ||
725          sched_state >= SCHED_INTERRUPTING))
726         return;
727
728     // otherwise yield (sleep), and keep yielding if necessary.
729     do {
730         yieldCapability(&cap,task);
731     } 
732     while (shouldYieldCapability(cap,task));
733
734     // note there may still be no threads on the run queue at this
735     // point, the caller has to check.
736
737     *pcap = cap;
738     return;
739 }
740 #endif
741     
742 /* -----------------------------------------------------------------------------
743  * schedulePushWork()
744  *
745  * Push work to other Capabilities if we have some.
746  * -------------------------------------------------------------------------- */
747
748 static void
749 schedulePushWork(Capability *cap USED_IF_THREADS, 
750                  Task *task      USED_IF_THREADS)
751 {
752   /* following code not for PARALLEL_HASKELL. I kept the call general,
753      future GUM versions might use pushing in a distributed setup */
754 #if defined(THREADED_RTS)
755
756     Capability *free_caps[n_capabilities], *cap0;
757     nat i, n_free_caps;
758
759     // migration can be turned off with +RTS -qg
760     if (!RtsFlags.ParFlags.migrate) return;
761
762     // Check whether we have more threads on our run queue, or sparks
763     // in our pool, that we could hand to another Capability.
764     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
765         && sparkPoolSizeCap(cap) < 2) {
766         return;
767     }
768
769     // First grab as many free Capabilities as we can.
770     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
771         cap0 = &capabilities[i];
772         if (cap != cap0 && tryGrabCapability(cap0,task)) {
773             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
774                 // it already has some work, we just grabbed it at 
775                 // the wrong moment.  Or maybe it's deadlocked!
776                 releaseCapability(cap0);
777             } else {
778                 free_caps[n_free_caps++] = cap0;
779             }
780         }
781     }
782
783     // we now have n_free_caps free capabilities stashed in
784     // free_caps[].  Share our run queue equally with them.  This is
785     // probably the simplest thing we could do; improvements we might
786     // want to do include:
787     //
788     //   - giving high priority to moving relatively new threads, on 
789     //     the gournds that they haven't had time to build up a
790     //     working set in the cache on this CPU/Capability.
791     //
792     //   - giving low priority to moving long-lived threads
793
794     if (n_free_caps > 0) {
795         StgTSO *prev, *t, *next;
796         rtsBool pushed_to_all;
797
798         debugTrace(DEBUG_sched, 
799                    "cap %d: %s and %d free capabilities, sharing...", 
800                    cap->no, 
801                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
802                    "excess threads on run queue":"sparks to share (>=2)",
803                    n_free_caps);
804
805         i = 0;
806         pushed_to_all = rtsFalse;
807
808         if (cap->run_queue_hd != END_TSO_QUEUE) {
809             prev = cap->run_queue_hd;
810             t = prev->_link;
811             prev->_link = END_TSO_QUEUE;
812             for (; t != END_TSO_QUEUE; t = next) {
813                 next = t->_link;
814                 t->_link = END_TSO_QUEUE;
815                 if (t->what_next == ThreadRelocated
816                     || t->bound == task // don't move my bound thread
817                     || tsoLocked(t)) {  // don't move a locked thread
818                     setTSOLink(cap, prev, t);
819                     prev = t;
820                 } else if (i == n_free_caps) {
821                     pushed_to_all = rtsTrue;
822                     i = 0;
823                     // keep one for us
824                     setTSOLink(cap, prev, t);
825                     prev = t;
826                 } else {
827                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
828                     appendToRunQueue(free_caps[i],t);
829                     if (t->bound) { t->bound->cap = free_caps[i]; }
830                     t->cap = free_caps[i];
831                     i++;
832                 }
833             }
834             cap->run_queue_tl = prev;
835         }
836
837 #ifdef SPARK_PUSHING
838         /* JB I left this code in place, it would work but is not necessary */
839
840         // If there are some free capabilities that we didn't push any
841         // threads to, then try to push a spark to each one.
842         if (!pushed_to_all) {
843             StgClosure *spark;
844             // i is the next free capability to push to
845             for (; i < n_free_caps; i++) {
846                 if (emptySparkPoolCap(free_caps[i])) {
847                     spark = tryStealSpark(cap->sparks);
848                     if (spark != NULL) {
849                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
850                         newSpark(&(free_caps[i]->r), spark);
851                     }
852                 }
853             }
854         }
855 #endif /* SPARK_PUSHING */
856
857         // release the capabilities
858         for (i = 0; i < n_free_caps; i++) {
859             task->cap = free_caps[i];
860             releaseAndWakeupCapability(free_caps[i]);
861         }
862     }
863     task->cap = cap; // reset to point to our Capability.
864
865 #endif /* THREADED_RTS */
866
867 }
868
869 /* ----------------------------------------------------------------------------
870  * Start any pending signal handlers
871  * ------------------------------------------------------------------------- */
872
873 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
874 static void
875 scheduleStartSignalHandlers(Capability *cap)
876 {
877     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
878         // safe outside the lock
879         startSignalHandlers(cap);
880     }
881 }
882 #else
883 static void
884 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
885 {
886 }
887 #endif
888
889 /* ----------------------------------------------------------------------------
890  * Check for blocked threads that can be woken up.
891  * ------------------------------------------------------------------------- */
892
893 static void
894 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
895 {
896 #if !defined(THREADED_RTS)
897     //
898     // Check whether any waiting threads need to be woken up.  If the
899     // run queue is empty, and there are no other tasks running, we
900     // can wait indefinitely for something to happen.
901     //
902     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
903     {
904         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
905     }
906 #endif
907 }
908
909
910 /* ----------------------------------------------------------------------------
911  * Check for threads woken up by other Capabilities
912  * ------------------------------------------------------------------------- */
913
914 static void
915 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
916 {
917 #if defined(THREADED_RTS)
918     // Any threads that were woken up by other Capabilities get
919     // appended to our run queue.
920     if (!emptyWakeupQueue(cap)) {
921         ACQUIRE_LOCK(&cap->lock);
922         if (emptyRunQueue(cap)) {
923             cap->run_queue_hd = cap->wakeup_queue_hd;
924             cap->run_queue_tl = cap->wakeup_queue_tl;
925         } else {
926             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
927             cap->run_queue_tl = cap->wakeup_queue_tl;
928         }
929         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
930         RELEASE_LOCK(&cap->lock);
931     }
932 #endif
933 }
934
935 /* ----------------------------------------------------------------------------
936  * Check for threads blocked on BLACKHOLEs that can be woken up
937  * ------------------------------------------------------------------------- */
938 static void
939 scheduleCheckBlackHoles (Capability *cap)
940 {
941     if ( blackholes_need_checking ) // check without the lock first
942     {
943         ACQUIRE_LOCK(&sched_mutex);
944         if ( blackholes_need_checking ) {
945             blackholes_need_checking = rtsFalse;
946             // important that we reset the flag *before* checking the
947             // blackhole queue, otherwise we could get deadlock.  This
948             // happens as follows: we wake up a thread that
949             // immediately runs on another Capability, blocks on a
950             // blackhole, and then we reset the blackholes_need_checking flag.
951             checkBlackHoles(cap);
952         }
953         RELEASE_LOCK(&sched_mutex);
954     }
955 }
956
957 /* ----------------------------------------------------------------------------
958  * Detect deadlock conditions and attempt to resolve them.
959  * ------------------------------------------------------------------------- */
960
961 static void
962 scheduleDetectDeadlock (Capability *cap, Task *task)
963 {
964
965 #if defined(PARALLEL_HASKELL)
966     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
967     return;
968 #endif
969
970     /* 
971      * Detect deadlock: when we have no threads to run, there are no
972      * threads blocked, waiting for I/O, or sleeping, and all the
973      * other tasks are waiting for work, we must have a deadlock of
974      * some description.
975      */
976     if ( emptyThreadQueues(cap) )
977     {
978 #if defined(THREADED_RTS)
979         /* 
980          * In the threaded RTS, we only check for deadlock if there
981          * has been no activity in a complete timeslice.  This means
982          * we won't eagerly start a full GC just because we don't have
983          * any threads to run currently.
984          */
985         if (recent_activity != ACTIVITY_INACTIVE) return;
986 #endif
987
988         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
989
990         // Garbage collection can release some new threads due to
991         // either (a) finalizers or (b) threads resurrected because
992         // they are unreachable and will therefore be sent an
993         // exception.  Any threads thus released will be immediately
994         // runnable.
995         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
996
997         recent_activity = ACTIVITY_DONE_GC;
998         // disable timer signals (see #1623)
999         stopTimer();
1000         
1001         if ( !emptyRunQueue(cap) ) return;
1002
1003 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1004         /* If we have user-installed signal handlers, then wait
1005          * for signals to arrive rather then bombing out with a
1006          * deadlock.
1007          */
1008         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1009             debugTrace(DEBUG_sched,
1010                        "still deadlocked, waiting for signals...");
1011
1012             awaitUserSignals();
1013
1014             if (signals_pending()) {
1015                 startSignalHandlers(cap);
1016             }
1017
1018             // either we have threads to run, or we were interrupted:
1019             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1020
1021             return;
1022         }
1023 #endif
1024
1025 #if !defined(THREADED_RTS)
1026         /* Probably a real deadlock.  Send the current main thread the
1027          * Deadlock exception.
1028          */
1029         if (task->tso) {
1030             switch (task->tso->why_blocked) {
1031             case BlockedOnSTM:
1032             case BlockedOnBlackHole:
1033             case BlockedOnException:
1034             case BlockedOnMVar:
1035                 throwToSingleThreaded(cap, task->tso, 
1036                                       (StgClosure *)nonTermination_closure);
1037                 return;
1038             default:
1039                 barf("deadlock: main thread blocked in a strange way");
1040             }
1041         }
1042         return;
1043 #endif
1044     }
1045 }
1046
1047
1048 /* ----------------------------------------------------------------------------
1049  * Send pending messages (PARALLEL_HASKELL only)
1050  * ------------------------------------------------------------------------- */
1051
1052 #if defined(PARALLEL_HASKELL)
1053 static void
1054 scheduleSendPendingMessages(void)
1055 {
1056
1057 # if defined(PAR) // global Mem.Mgmt., omit for now
1058     if (PendingFetches != END_BF_QUEUE) {
1059         processFetches();
1060     }
1061 # endif
1062     
1063     if (RtsFlags.ParFlags.BufferTime) {
1064         // if we use message buffering, we must send away all message
1065         // packets which have become too old...
1066         sendOldBuffers(); 
1067     }
1068 }
1069 #endif
1070
1071 /* ----------------------------------------------------------------------------
1072  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1073  * ------------------------------------------------------------------------- */
1074
1075 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1076 static void
1077 scheduleActivateSpark(Capability *cap)
1078 {
1079     if (anySparks())
1080     {
1081         createSparkThread(cap);
1082         debugTrace(DEBUG_sched, "creating a spark thread");
1083     }
1084 }
1085 #endif // PARALLEL_HASKELL || THREADED_RTS
1086
1087 /* ----------------------------------------------------------------------------
1088  * Get work from a remote node (PARALLEL_HASKELL only)
1089  * ------------------------------------------------------------------------- */
1090     
1091 #if defined(PARALLEL_HASKELL)
1092 static rtsBool /* return value used in PARALLEL_HASKELL only */
1093 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1094 {
1095 #if defined(PARALLEL_HASKELL)
1096   rtsBool receivedFinish = rtsFalse;
1097
1098   // idle() , i.e. send all buffers, wait for work
1099   if (RtsFlags.ParFlags.BufferTime) {
1100         IF_PAR_DEBUG(verbose, 
1101                 debugBelch("...send all pending data,"));
1102         {
1103           nat i;
1104           for (i=1; i<=nPEs; i++)
1105             sendImmediately(i); // send all messages away immediately
1106         }
1107   }
1108
1109   /* this would be the place for fishing in GUM... 
1110
1111      if (no-earlier-fish-around) 
1112           sendFish(choosePe());
1113    */
1114
1115   // Eden:just look for incoming messages (blocking receive)
1116   IF_PAR_DEBUG(verbose, 
1117                debugBelch("...wait for incoming messages...\n"));
1118   processMessages(cap, &receivedFinish); // blocking receive...
1119
1120
1121   return receivedFinish;
1122   // reenter scheduling look after having received something
1123
1124 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1125
1126   return rtsFalse; /* return value unused in THREADED_RTS */
1127
1128 #endif /* PARALLEL_HASKELL */
1129 }
1130 #endif // PARALLEL_HASKELL || THREADED_RTS
1131
1132 /* ----------------------------------------------------------------------------
1133  * After running a thread...
1134  * ------------------------------------------------------------------------- */
1135
1136 static void
1137 schedulePostRunThread (Capability *cap, StgTSO *t)
1138 {
1139     // We have to be able to catch transactions that are in an
1140     // infinite loop as a result of seeing an inconsistent view of
1141     // memory, e.g. 
1142     //
1143     //   atomically $ do
1144     //       [a,b] <- mapM readTVar [ta,tb]
1145     //       when (a == b) loop
1146     //
1147     // and a is never equal to b given a consistent view of memory.
1148     //
1149     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1150         if (!stmValidateNestOfTransactions (t -> trec)) {
1151             debugTrace(DEBUG_sched | DEBUG_stm,
1152                        "trec %p found wasting its time", t);
1153             
1154             // strip the stack back to the
1155             // ATOMICALLY_FRAME, aborting the (nested)
1156             // transaction, and saving the stack of any
1157             // partially-evaluated thunks on the heap.
1158             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1159             
1160             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1161         }
1162     }
1163
1164   /* some statistics gathering in the parallel case */
1165 }
1166
1167 /* -----------------------------------------------------------------------------
1168  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1169  * -------------------------------------------------------------------------- */
1170
1171 static rtsBool
1172 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1173 {
1174     // did the task ask for a large block?
1175     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1176         // if so, get one and push it on the front of the nursery.
1177         bdescr *bd;
1178         lnat blocks;
1179         
1180         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1181         
1182         debugTrace(DEBUG_sched,
1183                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1184                    (long)t->id, whatNext_strs[t->what_next], blocks);
1185     
1186         // don't do this if the nursery is (nearly) full, we'll GC first.
1187         if (cap->r.rCurrentNursery->link != NULL ||
1188             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1189                                                // if the nursery has only one block.
1190             
1191             ACQUIRE_SM_LOCK
1192             bd = allocGroup( blocks );
1193             RELEASE_SM_LOCK
1194             cap->r.rNursery->n_blocks += blocks;
1195             
1196             // link the new group into the list
1197             bd->link = cap->r.rCurrentNursery;
1198             bd->u.back = cap->r.rCurrentNursery->u.back;
1199             if (cap->r.rCurrentNursery->u.back != NULL) {
1200                 cap->r.rCurrentNursery->u.back->link = bd;
1201             } else {
1202 #if !defined(THREADED_RTS)
1203                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1204                        g0s0 == cap->r.rNursery);
1205 #endif
1206                 cap->r.rNursery->blocks = bd;
1207             }             
1208             cap->r.rCurrentNursery->u.back = bd;
1209             
1210             // initialise it as a nursery block.  We initialise the
1211             // step, gen_no, and flags field of *every* sub-block in
1212             // this large block, because this is easier than making
1213             // sure that we always find the block head of a large
1214             // block whenever we call Bdescr() (eg. evacuate() and
1215             // isAlive() in the GC would both have to do this, at
1216             // least).
1217             { 
1218                 bdescr *x;
1219                 for (x = bd; x < bd + blocks; x++) {
1220                     x->step = cap->r.rNursery;
1221                     x->gen_no = 0;
1222                     x->flags = 0;
1223                 }
1224             }
1225             
1226             // This assert can be a killer if the app is doing lots
1227             // of large block allocations.
1228             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1229             
1230             // now update the nursery to point to the new block
1231             cap->r.rCurrentNursery = bd;
1232             
1233             // we might be unlucky and have another thread get on the
1234             // run queue before us and steal the large block, but in that
1235             // case the thread will just end up requesting another large
1236             // block.
1237             pushOnRunQueue(cap,t);
1238             return rtsFalse;  /* not actually GC'ing */
1239         }
1240     }
1241     
1242     debugTrace(DEBUG_sched,
1243                "--<< thread %ld (%s) stopped: HeapOverflow",
1244                (long)t->id, whatNext_strs[t->what_next]);
1245
1246     if (cap->context_switch) {
1247         // Sometimes we miss a context switch, e.g. when calling
1248         // primitives in a tight loop, MAYBE_GC() doesn't check the
1249         // context switch flag, and we end up waiting for a GC.
1250         // See #1984, and concurrent/should_run/1984
1251         cap->context_switch = 0;
1252         addToRunQueue(cap,t);
1253     } else {
1254         pushOnRunQueue(cap,t);
1255     }
1256     return rtsTrue;
1257     /* actual GC is done at the end of the while loop in schedule() */
1258 }
1259
1260 /* -----------------------------------------------------------------------------
1261  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1262  * -------------------------------------------------------------------------- */
1263
1264 static void
1265 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1266 {
1267     debugTrace (DEBUG_sched,
1268                 "--<< thread %ld (%s) stopped, StackOverflow", 
1269                 (long)t->id, whatNext_strs[t->what_next]);
1270
1271     /* just adjust the stack for this thread, then pop it back
1272      * on the run queue.
1273      */
1274     { 
1275         /* enlarge the stack */
1276         StgTSO *new_t = threadStackOverflow(cap, t);
1277         
1278         /* The TSO attached to this Task may have moved, so update the
1279          * pointer to it.
1280          */
1281         if (task->tso == t) {
1282             task->tso = new_t;
1283         }
1284         pushOnRunQueue(cap,new_t);
1285     }
1286 }
1287
1288 /* -----------------------------------------------------------------------------
1289  * Handle a thread that returned to the scheduler with ThreadYielding
1290  * -------------------------------------------------------------------------- */
1291
1292 static rtsBool
1293 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1294 {
1295     // Reset the context switch flag.  We don't do this just before
1296     // running the thread, because that would mean we would lose ticks
1297     // during GC, which can lead to unfair scheduling (a thread hogs
1298     // the CPU because the tick always arrives during GC).  This way
1299     // penalises threads that do a lot of allocation, but that seems
1300     // better than the alternative.
1301     cap->context_switch = 0;
1302     
1303     /* put the thread back on the run queue.  Then, if we're ready to
1304      * GC, check whether this is the last task to stop.  If so, wake
1305      * up the GC thread.  getThread will block during a GC until the
1306      * GC is finished.
1307      */
1308 #ifdef DEBUG
1309     if (t->what_next != prev_what_next) {
1310         debugTrace(DEBUG_sched,
1311                    "--<< thread %ld (%s) stopped to switch evaluators", 
1312                    (long)t->id, whatNext_strs[t->what_next]);
1313     } else {
1314         debugTrace(DEBUG_sched,
1315                    "--<< thread %ld (%s) stopped, yielding",
1316                    (long)t->id, whatNext_strs[t->what_next]);
1317     }
1318 #endif
1319     
1320     IF_DEBUG(sanity,
1321              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1322              checkTSO(t));
1323     ASSERT(t->_link == END_TSO_QUEUE);
1324     
1325     // Shortcut if we're just switching evaluators: don't bother
1326     // doing stack squeezing (which can be expensive), just run the
1327     // thread.
1328     if (t->what_next != prev_what_next) {
1329         return rtsTrue;
1330     }
1331
1332     addToRunQueue(cap,t);
1333
1334     return rtsFalse;
1335 }
1336
1337 /* -----------------------------------------------------------------------------
1338  * Handle a thread that returned to the scheduler with ThreadBlocked
1339  * -------------------------------------------------------------------------- */
1340
1341 static void
1342 scheduleHandleThreadBlocked( StgTSO *t
1343 #if !defined(GRAN) && !defined(DEBUG)
1344     STG_UNUSED
1345 #endif
1346     )
1347 {
1348
1349       // We don't need to do anything.  The thread is blocked, and it
1350       // has tidied up its stack and placed itself on whatever queue
1351       // it needs to be on.
1352
1353     // ASSERT(t->why_blocked != NotBlocked);
1354     // Not true: for example,
1355     //    - in THREADED_RTS, the thread may already have been woken
1356     //      up by another Capability.  This actually happens: try
1357     //      conc023 +RTS -N2.
1358     //    - the thread may have woken itself up already, because
1359     //      threadPaused() might have raised a blocked throwTo
1360     //      exception, see maybePerformBlockedException().
1361
1362 #ifdef DEBUG
1363     if (traceClass(DEBUG_sched)) {
1364         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1365                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1366         printThreadBlockage(t);
1367         debugTraceEnd();
1368     }
1369 #endif
1370 }
1371
1372 /* -----------------------------------------------------------------------------
1373  * Handle a thread that returned to the scheduler with ThreadFinished
1374  * -------------------------------------------------------------------------- */
1375
1376 static rtsBool
1377 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1378 {
1379     /* Need to check whether this was a main thread, and if so,
1380      * return with the return value.
1381      *
1382      * We also end up here if the thread kills itself with an
1383      * uncaught exception, see Exception.cmm.
1384      */
1385     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1386                (unsigned long)t->id, whatNext_strs[t->what_next]);
1387
1388       //
1389       // Check whether the thread that just completed was a bound
1390       // thread, and if so return with the result.  
1391       //
1392       // There is an assumption here that all thread completion goes
1393       // through this point; we need to make sure that if a thread
1394       // ends up in the ThreadKilled state, that it stays on the run
1395       // queue so it can be dealt with here.
1396       //
1397
1398       if (t->bound) {
1399
1400           if (t->bound != task) {
1401 #if !defined(THREADED_RTS)
1402               // Must be a bound thread that is not the topmost one.  Leave
1403               // it on the run queue until the stack has unwound to the
1404               // point where we can deal with this.  Leaving it on the run
1405               // queue also ensures that the garbage collector knows about
1406               // this thread and its return value (it gets dropped from the
1407               // step->threads list so there's no other way to find it).
1408               appendToRunQueue(cap,t);
1409               return rtsFalse;
1410 #else
1411               // this cannot happen in the threaded RTS, because a
1412               // bound thread can only be run by the appropriate Task.
1413               barf("finished bound thread that isn't mine");
1414 #endif
1415           }
1416
1417           ASSERT(task->tso == t);
1418
1419           if (t->what_next == ThreadComplete) {
1420               if (task->ret) {
1421                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1422                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1423               }
1424               task->stat = Success;
1425           } else {
1426               if (task->ret) {
1427                   *(task->ret) = NULL;
1428               }
1429               if (sched_state >= SCHED_INTERRUPTING) {
1430                   task->stat = Interrupted;
1431               } else {
1432                   task->stat = Killed;
1433               }
1434           }
1435 #ifdef DEBUG
1436           removeThreadLabel((StgWord)task->tso->id);
1437 #endif
1438           return rtsTrue; // tells schedule() to return
1439       }
1440
1441       return rtsFalse;
1442 }
1443
1444 /* -----------------------------------------------------------------------------
1445  * Perform a heap census
1446  * -------------------------------------------------------------------------- */
1447
1448 static rtsBool
1449 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1450 {
1451     // When we have +RTS -i0 and we're heap profiling, do a census at
1452     // every GC.  This lets us get repeatable runs for debugging.
1453     if (performHeapProfile ||
1454         (RtsFlags.ProfFlags.profileInterval==0 &&
1455          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1456         return rtsTrue;
1457     } else {
1458         return rtsFalse;
1459     }
1460 }
1461
1462 /* -----------------------------------------------------------------------------
1463  * Perform a garbage collection if necessary
1464  * -------------------------------------------------------------------------- */
1465
1466 static Capability *
1467 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1468 {
1469     rtsBool heap_census;
1470 #ifdef THREADED_RTS
1471     /* extern static volatile StgWord waiting_for_gc; 
1472        lives inside capability.c */
1473     rtsBool was_waiting;
1474     nat i;
1475 #endif
1476
1477     if (sched_state == SCHED_SHUTTING_DOWN) {
1478         // The final GC has already been done, and the system is
1479         // shutting down.  We'll probably deadlock if we try to GC
1480         // now.
1481         return cap;
1482     }
1483
1484 #ifdef THREADED_RTS
1485     // In order to GC, there must be no threads running Haskell code.
1486     // Therefore, the GC thread needs to hold *all* the capabilities,
1487     // and release them after the GC has completed.  
1488     //
1489     // This seems to be the simplest way: previous attempts involved
1490     // making all the threads with capabilities give up their
1491     // capabilities and sleep except for the *last* one, which
1492     // actually did the GC.  But it's quite hard to arrange for all
1493     // the other tasks to sleep and stay asleep.
1494     //
1495         
1496     /*  Other capabilities are prevented from running yet more Haskell
1497         threads if waiting_for_gc is set. Tested inside
1498         yieldCapability() and releaseCapability() in Capability.c */
1499
1500     was_waiting = cas(&waiting_for_gc, 0, 1);
1501     if (was_waiting) {
1502         do {
1503             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1504             if (cap) yieldCapability(&cap,task);
1505         } while (waiting_for_gc);
1506         return cap;  // NOTE: task->cap might have changed here
1507     }
1508
1509     setContextSwitches();
1510     for (i=0; i < n_capabilities; i++) {
1511         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1512         if (cap != &capabilities[i]) {
1513             Capability *pcap = &capabilities[i];
1514             // we better hope this task doesn't get migrated to
1515             // another Capability while we're waiting for this one.
1516             // It won't, because load balancing happens while we have
1517             // all the Capabilities, but even so it's a slightly
1518             // unsavoury invariant.
1519             task->cap = pcap;
1520             waitForReturnCapability(&pcap, task);
1521             if (pcap != &capabilities[i]) {
1522                 barf("scheduleDoGC: got the wrong capability");
1523             }
1524         }
1525     }
1526
1527     waiting_for_gc = rtsFalse;
1528 #endif
1529
1530     // so this happens periodically:
1531     if (cap) scheduleCheckBlackHoles(cap);
1532     
1533     IF_DEBUG(scheduler, printAllThreads());
1534
1535     /*
1536      * We now have all the capabilities; if we're in an interrupting
1537      * state, then we should take the opportunity to delete all the
1538      * threads in the system.
1539      */
1540     if (sched_state >= SCHED_INTERRUPTING) {
1541         deleteAllThreads(&capabilities[0]);
1542         sched_state = SCHED_SHUTTING_DOWN;
1543     }
1544     
1545     heap_census = scheduleNeedHeapProfile(rtsTrue);
1546
1547     /* everybody back, start the GC.
1548      * Could do it in this thread, or signal a condition var
1549      * to do it in another thread.  Either way, we need to
1550      * broadcast on gc_pending_cond afterward.
1551      */
1552 #if defined(THREADED_RTS)
1553     debugTrace(DEBUG_sched, "doing GC");
1554 #endif
1555     GarbageCollect(force_major || heap_census);
1556     
1557     if (heap_census) {
1558         debugTrace(DEBUG_sched, "performing heap census");
1559         heapCensus();
1560         performHeapProfile = rtsFalse;
1561     }
1562
1563 #ifdef SPARKBALANCE
1564     /* JB 
1565        Once we are all together... this would be the place to balance all
1566        spark pools. No concurrent stealing or adding of new sparks can
1567        occur. Should be defined in Sparks.c. */
1568     balanceSparkPoolsCaps(n_capabilities, capabilities);
1569 #endif
1570
1571 #if defined(THREADED_RTS)
1572     // release our stash of capabilities.
1573     for (i = 0; i < n_capabilities; i++) {
1574         if (cap != &capabilities[i]) {
1575             task->cap = &capabilities[i];
1576             releaseCapability(&capabilities[i]);
1577         }
1578     }
1579     if (cap) {
1580         task->cap = cap;
1581     } else {
1582         task->cap = NULL;
1583     }
1584 #endif
1585
1586     return cap;
1587 }
1588
1589 /* ---------------------------------------------------------------------------
1590  * Singleton fork(). Do not copy any running threads.
1591  * ------------------------------------------------------------------------- */
1592
1593 pid_t
1594 forkProcess(HsStablePtr *entry
1595 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1596             STG_UNUSED
1597 #endif
1598            )
1599 {
1600 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1601     Task *task;
1602     pid_t pid;
1603     StgTSO* t,*next;
1604     Capability *cap;
1605     nat s;
1606     
1607 #if defined(THREADED_RTS)
1608     if (RtsFlags.ParFlags.nNodes > 1) {
1609         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1610         stg_exit(EXIT_FAILURE);
1611     }
1612 #endif
1613
1614     debugTrace(DEBUG_sched, "forking!");
1615     
1616     // ToDo: for SMP, we should probably acquire *all* the capabilities
1617     cap = rts_lock();
1618     
1619     // no funny business: hold locks while we fork, otherwise if some
1620     // other thread is holding a lock when the fork happens, the data
1621     // structure protected by the lock will forever be in an
1622     // inconsistent state in the child.  See also #1391.
1623     ACQUIRE_LOCK(&sched_mutex);
1624     ACQUIRE_LOCK(&cap->lock);
1625     ACQUIRE_LOCK(&cap->running_task->lock);
1626
1627     pid = fork();
1628     
1629     if (pid) { // parent
1630         
1631         RELEASE_LOCK(&sched_mutex);
1632         RELEASE_LOCK(&cap->lock);
1633         RELEASE_LOCK(&cap->running_task->lock);
1634
1635         // just return the pid
1636         rts_unlock(cap);
1637         return pid;
1638         
1639     } else { // child
1640         
1641 #if defined(THREADED_RTS)
1642         initMutex(&sched_mutex);
1643         initMutex(&cap->lock);
1644         initMutex(&cap->running_task->lock);
1645 #endif
1646
1647         // Now, all OS threads except the thread that forked are
1648         // stopped.  We need to stop all Haskell threads, including
1649         // those involved in foreign calls.  Also we need to delete
1650         // all Tasks, because they correspond to OS threads that are
1651         // now gone.
1652
1653         for (s = 0; s < total_steps; s++) {
1654           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1655             if (t->what_next == ThreadRelocated) {
1656                 next = t->_link;
1657             } else {
1658                 next = t->global_link;
1659                 // don't allow threads to catch the ThreadKilled
1660                 // exception, but we do want to raiseAsync() because these
1661                 // threads may be evaluating thunks that we need later.
1662                 deleteThread_(cap,t);
1663             }
1664           }
1665         }
1666         
1667         // Empty the run queue.  It seems tempting to let all the
1668         // killed threads stay on the run queue as zombies to be
1669         // cleaned up later, but some of them correspond to bound
1670         // threads for which the corresponding Task does not exist.
1671         cap->run_queue_hd = END_TSO_QUEUE;
1672         cap->run_queue_tl = END_TSO_QUEUE;
1673
1674         // Any suspended C-calling Tasks are no more, their OS threads
1675         // don't exist now:
1676         cap->suspended_ccalling_tasks = NULL;
1677
1678         // Empty the threads lists.  Otherwise, the garbage
1679         // collector may attempt to resurrect some of these threads.
1680         for (s = 0; s < total_steps; s++) {
1681             all_steps[s].threads = END_TSO_QUEUE;
1682         }
1683
1684         // Wipe the task list, except the current Task.
1685         ACQUIRE_LOCK(&sched_mutex);
1686         for (task = all_tasks; task != NULL; task=task->all_link) {
1687             if (task != cap->running_task) {
1688 #if defined(THREADED_RTS)
1689                 initMutex(&task->lock); // see #1391
1690 #endif
1691                 discardTask(task);
1692             }
1693         }
1694         RELEASE_LOCK(&sched_mutex);
1695
1696 #if defined(THREADED_RTS)
1697         // Wipe our spare workers list, they no longer exist.  New
1698         // workers will be created if necessary.
1699         cap->spare_workers = NULL;
1700         cap->returning_tasks_hd = NULL;
1701         cap->returning_tasks_tl = NULL;
1702 #endif
1703
1704         // On Unix, all timers are reset in the child, so we need to start
1705         // the timer again.
1706         initTimer();
1707         startTimer();
1708
1709         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1710         rts_checkSchedStatus("forkProcess",cap);
1711         
1712         rts_unlock(cap);
1713         hs_exit();                      // clean up and exit
1714         stg_exit(EXIT_SUCCESS);
1715     }
1716 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1717     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1718     return -1;
1719 #endif
1720 }
1721
1722 /* ---------------------------------------------------------------------------
1723  * Delete all the threads in the system
1724  * ------------------------------------------------------------------------- */
1725    
1726 static void
1727 deleteAllThreads ( Capability *cap )
1728 {
1729     // NOTE: only safe to call if we own all capabilities.
1730
1731     StgTSO* t, *next;
1732     nat s;
1733
1734     debugTrace(DEBUG_sched,"deleting all threads");
1735     for (s = 0; s < total_steps; s++) {
1736       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1737         if (t->what_next == ThreadRelocated) {
1738             next = t->_link;
1739         } else {
1740             next = t->global_link;
1741             deleteThread(cap,t);
1742         }
1743       }
1744     }      
1745
1746     // The run queue now contains a bunch of ThreadKilled threads.  We
1747     // must not throw these away: the main thread(s) will be in there
1748     // somewhere, and the main scheduler loop has to deal with it.
1749     // Also, the run queue is the only thing keeping these threads from
1750     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1751
1752 #if !defined(THREADED_RTS)
1753     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1754     ASSERT(sleeping_queue == END_TSO_QUEUE);
1755 #endif
1756 }
1757
1758 /* -----------------------------------------------------------------------------
1759    Managing the suspended_ccalling_tasks list.
1760    Locks required: sched_mutex
1761    -------------------------------------------------------------------------- */
1762
1763 STATIC_INLINE void
1764 suspendTask (Capability *cap, Task *task)
1765 {
1766     ASSERT(task->next == NULL && task->prev == NULL);
1767     task->next = cap->suspended_ccalling_tasks;
1768     task->prev = NULL;
1769     if (cap->suspended_ccalling_tasks) {
1770         cap->suspended_ccalling_tasks->prev = task;
1771     }
1772     cap->suspended_ccalling_tasks = task;
1773 }
1774
1775 STATIC_INLINE void
1776 recoverSuspendedTask (Capability *cap, Task *task)
1777 {
1778     if (task->prev) {
1779         task->prev->next = task->next;
1780     } else {
1781         ASSERT(cap->suspended_ccalling_tasks == task);
1782         cap->suspended_ccalling_tasks = task->next;
1783     }
1784     if (task->next) {
1785         task->next->prev = task->prev;
1786     }
1787     task->next = task->prev = NULL;
1788 }
1789
1790 /* ---------------------------------------------------------------------------
1791  * Suspending & resuming Haskell threads.
1792  * 
1793  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1794  * its capability before calling the C function.  This allows another
1795  * task to pick up the capability and carry on running Haskell
1796  * threads.  It also means that if the C call blocks, it won't lock
1797  * the whole system.
1798  *
1799  * The Haskell thread making the C call is put to sleep for the
1800  * duration of the call, on the susepended_ccalling_threads queue.  We
1801  * give out a token to the task, which it can use to resume the thread
1802  * on return from the C function.
1803  * ------------------------------------------------------------------------- */
1804    
1805 void *
1806 suspendThread (StgRegTable *reg)
1807 {
1808   Capability *cap;
1809   int saved_errno;
1810   StgTSO *tso;
1811   Task *task;
1812 #if mingw32_HOST_OS
1813   StgWord32 saved_winerror;
1814 #endif
1815
1816   saved_errno = errno;
1817 #if mingw32_HOST_OS
1818   saved_winerror = GetLastError();
1819 #endif
1820
1821   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1822    */
1823   cap = regTableToCapability(reg);
1824
1825   task = cap->running_task;
1826   tso = cap->r.rCurrentTSO;
1827
1828   debugTrace(DEBUG_sched, 
1829              "thread %lu did a safe foreign call", 
1830              (unsigned long)cap->r.rCurrentTSO->id);
1831
1832   // XXX this might not be necessary --SDM
1833   tso->what_next = ThreadRunGHC;
1834
1835   threadPaused(cap,tso);
1836
1837   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1838       tso->why_blocked = BlockedOnCCall;
1839       tso->flags |= TSO_BLOCKEX;
1840       tso->flags &= ~TSO_INTERRUPTIBLE;
1841   } else {
1842       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1843   }
1844
1845   // Hand back capability
1846   task->suspended_tso = tso;
1847
1848   ACQUIRE_LOCK(&cap->lock);
1849
1850   suspendTask(cap,task);
1851   cap->in_haskell = rtsFalse;
1852   releaseCapability_(cap,rtsFalse);
1853   
1854   RELEASE_LOCK(&cap->lock);
1855
1856 #if defined(THREADED_RTS)
1857   /* Preparing to leave the RTS, so ensure there's a native thread/task
1858      waiting to take over.
1859   */
1860   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1861 #endif
1862
1863   errno = saved_errno;
1864 #if mingw32_HOST_OS
1865   SetLastError(saved_winerror);
1866 #endif
1867   return task;
1868 }
1869
1870 StgRegTable *
1871 resumeThread (void *task_)
1872 {
1873     StgTSO *tso;
1874     Capability *cap;
1875     Task *task = task_;
1876     int saved_errno;
1877 #if mingw32_HOST_OS
1878     StgWord32 saved_winerror;
1879 #endif
1880
1881     saved_errno = errno;
1882 #if mingw32_HOST_OS
1883     saved_winerror = GetLastError();
1884 #endif
1885
1886     cap = task->cap;
1887     // Wait for permission to re-enter the RTS with the result.
1888     waitForReturnCapability(&cap,task);
1889     // we might be on a different capability now... but if so, our
1890     // entry on the suspended_ccalling_tasks list will also have been
1891     // migrated.
1892
1893     // Remove the thread from the suspended list
1894     recoverSuspendedTask(cap,task);
1895
1896     tso = task->suspended_tso;
1897     task->suspended_tso = NULL;
1898     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1899     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1900     
1901     if (tso->why_blocked == BlockedOnCCall) {
1902         awakenBlockedExceptionQueue(cap,tso);
1903         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1904     }
1905     
1906     /* Reset blocking status */
1907     tso->why_blocked  = NotBlocked;
1908     
1909     cap->r.rCurrentTSO = tso;
1910     cap->in_haskell = rtsTrue;
1911     errno = saved_errno;
1912 #if mingw32_HOST_OS
1913     SetLastError(saved_winerror);
1914 #endif
1915
1916     /* We might have GC'd, mark the TSO dirty again */
1917     dirty_TSO(cap,tso);
1918
1919     IF_DEBUG(sanity, checkTSO(tso));
1920
1921     return &cap->r;
1922 }
1923
1924 /* ---------------------------------------------------------------------------
1925  * scheduleThread()
1926  *
1927  * scheduleThread puts a thread on the end  of the runnable queue.
1928  * This will usually be done immediately after a thread is created.
1929  * The caller of scheduleThread must create the thread using e.g.
1930  * createThread and push an appropriate closure
1931  * on this thread's stack before the scheduler is invoked.
1932  * ------------------------------------------------------------------------ */
1933
1934 void
1935 scheduleThread(Capability *cap, StgTSO *tso)
1936 {
1937     // The thread goes at the *end* of the run-queue, to avoid possible
1938     // starvation of any threads already on the queue.
1939     appendToRunQueue(cap,tso);
1940 }
1941
1942 void
1943 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1944 {
1945 #if defined(THREADED_RTS)
1946     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1947                               // move this thread from now on.
1948     cpu %= RtsFlags.ParFlags.nNodes;
1949     if (cpu == cap->no) {
1950         appendToRunQueue(cap,tso);
1951     } else {
1952         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1953     }
1954 #else
1955     appendToRunQueue(cap,tso);
1956 #endif
1957 }
1958
1959 Capability *
1960 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1961 {
1962     Task *task;
1963
1964     // We already created/initialised the Task
1965     task = cap->running_task;
1966
1967     // This TSO is now a bound thread; make the Task and TSO
1968     // point to each other.
1969     tso->bound = task;
1970     tso->cap = cap;
1971
1972     task->tso = tso;
1973     task->ret = ret;
1974     task->stat = NoStatus;
1975
1976     appendToRunQueue(cap,tso);
1977
1978     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1979
1980     cap = schedule(cap,task);
1981
1982     ASSERT(task->stat != NoStatus);
1983     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1984
1985     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1986     return cap;
1987 }
1988
1989 /* ----------------------------------------------------------------------------
1990  * Starting Tasks
1991  * ------------------------------------------------------------------------- */
1992
1993 #if defined(THREADED_RTS)
1994 void OSThreadProcAttr
1995 workerStart(Task *task)
1996 {
1997     Capability *cap;
1998
1999     // See startWorkerTask().
2000     ACQUIRE_LOCK(&task->lock);
2001     cap = task->cap;
2002     RELEASE_LOCK(&task->lock);
2003
2004     // set the thread-local pointer to the Task:
2005     taskEnter(task);
2006
2007     // schedule() runs without a lock.
2008     cap = schedule(cap,task);
2009
2010     // On exit from schedule(), we have a Capability.
2011     releaseCapability(cap);
2012     workerTaskStop(task);
2013 }
2014 #endif
2015
2016 /* ---------------------------------------------------------------------------
2017  * initScheduler()
2018  *
2019  * Initialise the scheduler.  This resets all the queues - if the
2020  * queues contained any threads, they'll be garbage collected at the
2021  * next pass.
2022  *
2023  * ------------------------------------------------------------------------ */
2024
2025 void 
2026 initScheduler(void)
2027 {
2028 #if !defined(THREADED_RTS)
2029   blocked_queue_hd  = END_TSO_QUEUE;
2030   blocked_queue_tl  = END_TSO_QUEUE;
2031   sleeping_queue    = END_TSO_QUEUE;
2032 #endif
2033
2034   blackhole_queue   = END_TSO_QUEUE;
2035
2036   sched_state    = SCHED_RUNNING;
2037   recent_activity = ACTIVITY_YES;
2038
2039 #if defined(THREADED_RTS)
2040   /* Initialise the mutex and condition variables used by
2041    * the scheduler. */
2042   initMutex(&sched_mutex);
2043 #endif
2044   
2045   ACQUIRE_LOCK(&sched_mutex);
2046
2047   /* A capability holds the state a native thread needs in
2048    * order to execute STG code. At least one capability is
2049    * floating around (only THREADED_RTS builds have more than one).
2050    */
2051   initCapabilities();
2052
2053   initTaskManager();
2054
2055 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2056   initSparkPools();
2057 #endif
2058
2059 #if defined(THREADED_RTS)
2060   /*
2061    * Eagerly start one worker to run each Capability, except for
2062    * Capability 0.  The idea is that we're probably going to start a
2063    * bound thread on Capability 0 pretty soon, so we don't want a
2064    * worker task hogging it.
2065    */
2066   { 
2067       nat i;
2068       Capability *cap;
2069       for (i = 1; i < n_capabilities; i++) {
2070           cap = &capabilities[i];
2071           ACQUIRE_LOCK(&cap->lock);
2072           startWorkerTask(cap, workerStart);
2073           RELEASE_LOCK(&cap->lock);
2074       }
2075   }
2076 #endif
2077
2078   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2079
2080   RELEASE_LOCK(&sched_mutex);
2081 }
2082
2083 void
2084 exitScheduler(
2085     rtsBool wait_foreign
2086 #if !defined(THREADED_RTS)
2087                          __attribute__((unused))
2088 #endif
2089 )
2090                /* see Capability.c, shutdownCapability() */
2091 {
2092     Task *task = NULL;
2093
2094 #if defined(THREADED_RTS)
2095     ACQUIRE_LOCK(&sched_mutex);
2096     task = newBoundTask();
2097     RELEASE_LOCK(&sched_mutex);
2098 #endif
2099
2100     // If we haven't killed all the threads yet, do it now.
2101     if (sched_state < SCHED_SHUTTING_DOWN) {
2102         sched_state = SCHED_INTERRUPTING;
2103         scheduleDoGC(NULL,task,rtsFalse);    
2104     }
2105     sched_state = SCHED_SHUTTING_DOWN;
2106
2107 #if defined(THREADED_RTS)
2108     { 
2109         nat i;
2110         
2111         for (i = 0; i < n_capabilities; i++) {
2112             shutdownCapability(&capabilities[i], task, wait_foreign);
2113         }
2114         boundTaskExiting(task);
2115         stopTaskManager();
2116     }
2117 #endif
2118 }
2119
2120 void
2121 freeScheduler( void )
2122 {
2123     freeCapabilities();
2124     freeTaskManager();
2125     if (n_capabilities != 1) {
2126         stgFree(capabilities);
2127     }
2128 #if defined(THREADED_RTS)
2129     closeMutex(&sched_mutex);
2130 #endif
2131 }
2132
2133 /* -----------------------------------------------------------------------------
2134    performGC
2135
2136    This is the interface to the garbage collector from Haskell land.
2137    We provide this so that external C code can allocate and garbage
2138    collect when called from Haskell via _ccall_GC.
2139    -------------------------------------------------------------------------- */
2140
2141 static void
2142 performGC_(rtsBool force_major)
2143 {
2144     Task *task;
2145     // We must grab a new Task here, because the existing Task may be
2146     // associated with a particular Capability, and chained onto the 
2147     // suspended_ccalling_tasks queue.
2148     ACQUIRE_LOCK(&sched_mutex);
2149     task = newBoundTask();
2150     RELEASE_LOCK(&sched_mutex);
2151     scheduleDoGC(NULL,task,force_major);
2152     boundTaskExiting(task);
2153 }
2154
2155 void
2156 performGC(void)
2157 {
2158     performGC_(rtsFalse);
2159 }
2160
2161 void
2162 performMajorGC(void)
2163 {
2164     performGC_(rtsTrue);
2165 }
2166
2167 /* -----------------------------------------------------------------------------
2168    Stack overflow
2169
2170    If the thread has reached its maximum stack size, then raise the
2171    StackOverflow exception in the offending thread.  Otherwise
2172    relocate the TSO into a larger chunk of memory and adjust its stack
2173    size appropriately.
2174    -------------------------------------------------------------------------- */
2175
2176 static StgTSO *
2177 threadStackOverflow(Capability *cap, StgTSO *tso)
2178 {
2179   nat new_stack_size, stack_words;
2180   lnat new_tso_size;
2181   StgPtr new_sp;
2182   StgTSO *dest;
2183
2184   IF_DEBUG(sanity,checkTSO(tso));
2185
2186   // don't allow throwTo() to modify the blocked_exceptions queue
2187   // while we are moving the TSO:
2188   lockClosure((StgClosure *)tso);
2189
2190   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2191       // NB. never raise a StackOverflow exception if the thread is
2192       // inside Control.Exceptino.block.  It is impractical to protect
2193       // against stack overflow exceptions, since virtually anything
2194       // can raise one (even 'catch'), so this is the only sensible
2195       // thing to do here.  See bug #767.
2196
2197       debugTrace(DEBUG_gc,
2198                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2199                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2200       IF_DEBUG(gc,
2201                /* If we're debugging, just print out the top of the stack */
2202                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2203                                                 tso->sp+64)));
2204
2205       // Send this thread the StackOverflow exception
2206       unlockTSO(tso);
2207       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2208       return tso;
2209   }
2210
2211   /* Try to double the current stack size.  If that takes us over the
2212    * maximum stack size for this thread, then use the maximum instead
2213    * (that is, unless we're already at or over the max size and we
2214    * can't raise the StackOverflow exception (see above), in which
2215    * case just double the size). Finally round up so the TSO ends up as
2216    * a whole number of blocks.
2217    */
2218   if (tso->stack_size >= tso->max_stack_size) {
2219       new_stack_size = tso->stack_size * 2;
2220   } else { 
2221       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2222   }
2223   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2224                                        TSO_STRUCT_SIZE)/sizeof(W_);
2225   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2226   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2227
2228   debugTrace(DEBUG_sched, 
2229              "increasing stack size from %ld words to %d.",
2230              (long)tso->stack_size, new_stack_size);
2231
2232   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2233   TICK_ALLOC_TSO(new_stack_size,0);
2234
2235   /* copy the TSO block and the old stack into the new area */
2236   memcpy(dest,tso,TSO_STRUCT_SIZE);
2237   stack_words = tso->stack + tso->stack_size - tso->sp;
2238   new_sp = (P_)dest + new_tso_size - stack_words;
2239   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2240
2241   /* relocate the stack pointers... */
2242   dest->sp         = new_sp;
2243   dest->stack_size = new_stack_size;
2244         
2245   /* Mark the old TSO as relocated.  We have to check for relocated
2246    * TSOs in the garbage collector and any primops that deal with TSOs.
2247    *
2248    * It's important to set the sp value to just beyond the end
2249    * of the stack, so we don't attempt to scavenge any part of the
2250    * dead TSO's stack.
2251    */
2252   tso->what_next = ThreadRelocated;
2253   setTSOLink(cap,tso,dest);
2254   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2255   tso->why_blocked = NotBlocked;
2256
2257   IF_PAR_DEBUG(verbose,
2258                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2259                      tso->id, tso, tso->stack_size);
2260                /* If we're debugging, just print out the top of the stack */
2261                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2262                                                 tso->sp+64)));
2263   
2264   unlockTSO(dest);
2265   unlockTSO(tso);
2266
2267   IF_DEBUG(sanity,checkTSO(dest));
2268 #if 0
2269   IF_DEBUG(scheduler,printTSO(dest));
2270 #endif
2271
2272   return dest;
2273 }
2274
2275 static StgTSO *
2276 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2277 {
2278     bdescr *bd, *new_bd;
2279     lnat free_w, tso_size_w;
2280     StgTSO *new_tso;
2281
2282     tso_size_w = tso_sizeW(tso);
2283
2284     if (tso_size_w < MBLOCK_SIZE_W || 
2285         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2286     {
2287         return tso;
2288     }
2289
2290     // don't allow throwTo() to modify the blocked_exceptions queue
2291     // while we are moving the TSO:
2292     lockClosure((StgClosure *)tso);
2293
2294     // this is the number of words we'll free
2295     free_w = round_to_mblocks(tso_size_w/2);
2296
2297     bd = Bdescr((StgPtr)tso);
2298     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2299     bd->free = bd->start + TSO_STRUCT_SIZEW;
2300
2301     new_tso = (StgTSO *)new_bd->start;
2302     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2303     new_tso->stack_size = new_bd->free - new_tso->stack;
2304
2305     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2306                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2307
2308     tso->what_next = ThreadRelocated;
2309     tso->_link = new_tso; // no write barrier reqd: same generation
2310
2311     // The TSO attached to this Task may have moved, so update the
2312     // pointer to it.
2313     if (task->tso == tso) {
2314         task->tso = new_tso;
2315     }
2316
2317     unlockTSO(new_tso);
2318     unlockTSO(tso);
2319
2320     IF_DEBUG(sanity,checkTSO(new_tso));
2321
2322     return new_tso;
2323 }
2324
2325 /* ---------------------------------------------------------------------------
2326    Interrupt execution
2327    - usually called inside a signal handler so it mustn't do anything fancy.   
2328    ------------------------------------------------------------------------ */
2329
2330 void
2331 interruptStgRts(void)
2332 {
2333     sched_state = SCHED_INTERRUPTING;
2334     setContextSwitches();
2335     wakeUpRts();
2336 }
2337
2338 /* -----------------------------------------------------------------------------
2339    Wake up the RTS
2340    
2341    This function causes at least one OS thread to wake up and run the
2342    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2343    an external event has arrived that may need servicing (eg. a
2344    keyboard interrupt).
2345
2346    In the single-threaded RTS we don't do anything here; we only have
2347    one thread anyway, and the event that caused us to want to wake up
2348    will have interrupted any blocking system call in progress anyway.
2349    -------------------------------------------------------------------------- */
2350
2351 void
2352 wakeUpRts(void)
2353 {
2354 #if defined(THREADED_RTS)
2355     // This forces the IO Manager thread to wakeup, which will
2356     // in turn ensure that some OS thread wakes up and runs the
2357     // scheduler loop, which will cause a GC and deadlock check.
2358     ioManagerWakeup();
2359 #endif
2360 }
2361
2362 /* -----------------------------------------------------------------------------
2363  * checkBlackHoles()
2364  *
2365  * Check the blackhole_queue for threads that can be woken up.  We do
2366  * this periodically: before every GC, and whenever the run queue is
2367  * empty.
2368  *
2369  * An elegant solution might be to just wake up all the blocked
2370  * threads with awakenBlockedQueue occasionally: they'll go back to
2371  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2372  * doesn't give us a way to tell whether we've actually managed to
2373  * wake up any threads, so we would be busy-waiting.
2374  *
2375  * -------------------------------------------------------------------------- */
2376
2377 static rtsBool
2378 checkBlackHoles (Capability *cap)
2379 {
2380     StgTSO **prev, *t;
2381     rtsBool any_woke_up = rtsFalse;
2382     StgHalfWord type;
2383
2384     // blackhole_queue is global:
2385     ASSERT_LOCK_HELD(&sched_mutex);
2386
2387     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2388
2389     // ASSUMES: sched_mutex
2390     prev = &blackhole_queue;
2391     t = blackhole_queue;
2392     while (t != END_TSO_QUEUE) {
2393         ASSERT(t->why_blocked == BlockedOnBlackHole);
2394         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2395         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2396             IF_DEBUG(sanity,checkTSO(t));
2397             t = unblockOne(cap, t);
2398             *prev = t;
2399             any_woke_up = rtsTrue;
2400         } else {
2401             prev = &t->_link;
2402             t = t->_link;
2403         }
2404     }
2405
2406     return any_woke_up;
2407 }
2408
2409 /* -----------------------------------------------------------------------------
2410    Deleting threads
2411
2412    This is used for interruption (^C) and forking, and corresponds to
2413    raising an exception but without letting the thread catch the
2414    exception.
2415    -------------------------------------------------------------------------- */
2416
2417 static void 
2418 deleteThread (Capability *cap, StgTSO *tso)
2419 {
2420     // NOTE: must only be called on a TSO that we have exclusive
2421     // access to, because we will call throwToSingleThreaded() below.
2422     // The TSO must be on the run queue of the Capability we own, or 
2423     // we must own all Capabilities.
2424
2425     if (tso->why_blocked != BlockedOnCCall &&
2426         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2427         throwToSingleThreaded(cap,tso,NULL);
2428     }
2429 }
2430
2431 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2432 static void 
2433 deleteThread_(Capability *cap, StgTSO *tso)
2434 { // for forkProcess only:
2435   // like deleteThread(), but we delete threads in foreign calls, too.
2436
2437     if (tso->why_blocked == BlockedOnCCall ||
2438         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2439         unblockOne(cap,tso);
2440         tso->what_next = ThreadKilled;
2441     } else {
2442         deleteThread(cap,tso);
2443     }
2444 }
2445 #endif
2446
2447 /* -----------------------------------------------------------------------------
2448    raiseExceptionHelper
2449    
2450    This function is called by the raise# primitve, just so that we can
2451    move some of the tricky bits of raising an exception from C-- into
2452    C.  Who knows, it might be a useful re-useable thing here too.
2453    -------------------------------------------------------------------------- */
2454
2455 StgWord
2456 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2457 {
2458     Capability *cap = regTableToCapability(reg);
2459     StgThunk *raise_closure = NULL;
2460     StgPtr p, next;
2461     StgRetInfoTable *info;
2462     //
2463     // This closure represents the expression 'raise# E' where E
2464     // is the exception raise.  It is used to overwrite all the
2465     // thunks which are currently under evaluataion.
2466     //
2467
2468     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2469     // LDV profiling: stg_raise_info has THUNK as its closure
2470     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2471     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2472     // 1 does not cause any problem unless profiling is performed.
2473     // However, when LDV profiling goes on, we need to linearly scan
2474     // small object pool, where raise_closure is stored, so we should
2475     // use MIN_UPD_SIZE.
2476     //
2477     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2478     //                                 sizeofW(StgClosure)+1);
2479     //
2480
2481     //
2482     // Walk up the stack, looking for the catch frame.  On the way,
2483     // we update any closures pointed to from update frames with the
2484     // raise closure that we just built.
2485     //
2486     p = tso->sp;
2487     while(1) {
2488         info = get_ret_itbl((StgClosure *)p);
2489         next = p + stack_frame_sizeW((StgClosure *)p);
2490         switch (info->i.type) {
2491             
2492         case UPDATE_FRAME:
2493             // Only create raise_closure if we need to.
2494             if (raise_closure == NULL) {
2495                 raise_closure = 
2496                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2497                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2498                 raise_closure->payload[0] = exception;
2499             }
2500             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2501             p = next;
2502             continue;
2503
2504         case ATOMICALLY_FRAME:
2505             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2506             tso->sp = p;
2507             return ATOMICALLY_FRAME;
2508             
2509         case CATCH_FRAME:
2510             tso->sp = p;
2511             return CATCH_FRAME;
2512
2513         case CATCH_STM_FRAME:
2514             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2515             tso->sp = p;
2516             return CATCH_STM_FRAME;
2517             
2518         case STOP_FRAME:
2519             tso->sp = p;
2520             return STOP_FRAME;
2521
2522         case CATCH_RETRY_FRAME:
2523         default:
2524             p = next; 
2525             continue;
2526         }
2527     }
2528 }
2529
2530
2531 /* -----------------------------------------------------------------------------
2532    findRetryFrameHelper
2533
2534    This function is called by the retry# primitive.  It traverses the stack
2535    leaving tso->sp referring to the frame which should handle the retry.  
2536
2537    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2538    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2539
2540    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2541    create) because retries are not considered to be exceptions, despite the
2542    similar implementation.
2543
2544    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2545    not be created within memory transactions.
2546    -------------------------------------------------------------------------- */
2547
2548 StgWord
2549 findRetryFrameHelper (StgTSO *tso)
2550 {
2551   StgPtr           p, next;
2552   StgRetInfoTable *info;
2553
2554   p = tso -> sp;
2555   while (1) {
2556     info = get_ret_itbl((StgClosure *)p);
2557     next = p + stack_frame_sizeW((StgClosure *)p);
2558     switch (info->i.type) {
2559       
2560     case ATOMICALLY_FRAME:
2561         debugTrace(DEBUG_stm,
2562                    "found ATOMICALLY_FRAME at %p during retry", p);
2563         tso->sp = p;
2564         return ATOMICALLY_FRAME;
2565       
2566     case CATCH_RETRY_FRAME:
2567         debugTrace(DEBUG_stm,
2568                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2569         tso->sp = p;
2570         return CATCH_RETRY_FRAME;
2571       
2572     case CATCH_STM_FRAME: {
2573         StgTRecHeader *trec = tso -> trec;
2574         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2575         debugTrace(DEBUG_stm,
2576                    "found CATCH_STM_FRAME at %p during retry", p);
2577         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2578         stmAbortTransaction(tso -> cap, trec);
2579         stmFreeAbortedTRec(tso -> cap, trec);
2580         tso -> trec = outer;
2581         p = next; 
2582         continue;
2583     }
2584       
2585
2586     default:
2587       ASSERT(info->i.type != CATCH_FRAME);
2588       ASSERT(info->i.type != STOP_FRAME);
2589       p = next; 
2590       continue;
2591     }
2592   }
2593 }
2594
2595 /* -----------------------------------------------------------------------------
2596    resurrectThreads is called after garbage collection on the list of
2597    threads found to be garbage.  Each of these threads will be woken
2598    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2599    on an MVar, or NonTermination if the thread was blocked on a Black
2600    Hole.
2601
2602    Locks: assumes we hold *all* the capabilities.
2603    -------------------------------------------------------------------------- */
2604
2605 void
2606 resurrectThreads (StgTSO *threads)
2607 {
2608     StgTSO *tso, *next;
2609     Capability *cap;
2610     step *step;
2611
2612     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2613         next = tso->global_link;
2614
2615         step = Bdescr((P_)tso)->step;
2616         tso->global_link = step->threads;
2617         step->threads = tso;
2618
2619         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2620         
2621         // Wake up the thread on the Capability it was last on
2622         cap = tso->cap;
2623         
2624         switch (tso->why_blocked) {
2625         case BlockedOnMVar:
2626         case BlockedOnException:
2627             /* Called by GC - sched_mutex lock is currently held. */
2628             throwToSingleThreaded(cap, tso,
2629                                   (StgClosure *)blockedOnDeadMVar_closure);
2630             break;
2631         case BlockedOnBlackHole:
2632             throwToSingleThreaded(cap, tso,
2633                                   (StgClosure *)nonTermination_closure);
2634             break;
2635         case BlockedOnSTM:
2636             throwToSingleThreaded(cap, tso,
2637                                   (StgClosure *)blockedIndefinitely_closure);
2638             break;
2639         case NotBlocked:
2640             /* This might happen if the thread was blocked on a black hole
2641              * belonging to a thread that we've just woken up (raiseAsync
2642              * can wake up threads, remember...).
2643              */
2644             continue;
2645         default:
2646             barf("resurrectThreads: thread blocked in a strange way");
2647         }
2648     }
2649 }
2650
2651 /* -----------------------------------------------------------------------------
2652    performPendingThrowTos is called after garbage collection, and
2653    passed a list of threads that were found to have pending throwTos
2654    (tso->blocked_exceptions was not empty), and were blocked.
2655    Normally this doesn't happen, because we would deliver the
2656    exception directly if the target thread is blocked, but there are
2657    small windows where it might occur on a multiprocessor (see
2658    throwTo()).
2659
2660    NB. we must be holding all the capabilities at this point, just
2661    like resurrectThreads().
2662    -------------------------------------------------------------------------- */
2663
2664 void
2665 performPendingThrowTos (StgTSO *threads)
2666 {
2667     StgTSO *tso, *next;
2668     Capability *cap;
2669     step *step;
2670
2671     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2672         next = tso->global_link;
2673
2674         step = Bdescr((P_)tso)->step;
2675         tso->global_link = step->threads;
2676         step->threads = tso;
2677
2678         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2679         
2680         cap = tso->cap;
2681         maybePerformBlockedException(cap, tso);
2682     }
2683 }