Another shutdown fix
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  *
96  * NB. must be StgWord, we do xchg() on it.
97  */
98 volatile StgWord recent_activity = ACTIVITY_YES;
99
100 /* if this flag is set as well, give up execution
101  * LOCK: none (changes monotonically)
102  */
103 volatile StgWord sched_state = SCHED_RUNNING;
104
105 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
106  *  exists - earlier gccs apparently didn't.
107  *  -= chak
108  */
109 StgTSO dummy_tso;
110
111 /*
112  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
113  * in an MT setting, needed to signal that a worker thread shouldn't hang around
114  * in the scheduler when it is out of work.
115  */
116 rtsBool shutting_down_scheduler = rtsFalse;
117
118 /*
119  * This mutex protects most of the global scheduler data in
120  * the THREADED_RTS runtime.
121  */
122 #if defined(THREADED_RTS)
123 Mutex sched_mutex;
124 #endif
125
126 #if !defined(mingw32_HOST_OS)
127 #define FORKPROCESS_PRIMOP_SUPPORTED
128 #endif
129
130 /* -----------------------------------------------------------------------------
131  * static function prototypes
132  * -------------------------------------------------------------------------- */
133
134 static Capability *schedule (Capability *initialCapability, Task *task);
135
136 //
137 // These function all encapsulate parts of the scheduler loop, and are
138 // abstracted only to make the structure and control flow of the
139 // scheduler clearer.
140 //
141 static void schedulePreLoop (void);
142 static void scheduleFindWork (Capability *cap);
143 #if defined(THREADED_RTS)
144 static void scheduleYield (Capability **pcap, Task *task);
145 #endif
146 static void scheduleStartSignalHandlers (Capability *cap);
147 static void scheduleCheckBlockedThreads (Capability *cap);
148 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
149 static void scheduleCheckBlackHoles (Capability *cap);
150 static void scheduleDetectDeadlock (Capability *cap, Task *task);
151 static void schedulePushWork(Capability *cap, Task *task);
152 #if defined(PARALLEL_HASKELL)
153 static rtsBool scheduleGetRemoteWork(Capability *cap);
154 static void scheduleSendPendingMessages(void);
155 #endif
156 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
157 static void scheduleActivateSpark(Capability *cap);
158 #endif
159 static void schedulePostRunThread(Capability *cap, StgTSO *t);
160 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
161 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
162                                          StgTSO *t);
163 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
164                                     nat prev_what_next );
165 static void scheduleHandleThreadBlocked( StgTSO *t );
166 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
167                                              StgTSO *t );
168 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
169 static Capability *scheduleDoGC(Capability *cap, Task *task,
170                                 rtsBool force_major);
171
172 static rtsBool checkBlackHoles(Capability *cap);
173
174 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
175 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
176
177 static void deleteThread (Capability *cap, StgTSO *tso);
178 static void deleteAllThreads (Capability *cap);
179
180 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
181 static void deleteThread_(Capability *cap, StgTSO *tso);
182 #endif
183
184 #ifdef DEBUG
185 static char *whatNext_strs[] = {
186   "(unknown)",
187   "ThreadRunGHC",
188   "ThreadInterpret",
189   "ThreadKilled",
190   "ThreadRelocated",
191   "ThreadComplete"
192 };
193 #endif
194
195 /* -----------------------------------------------------------------------------
196  * Putting a thread on the run queue: different scheduling policies
197  * -------------------------------------------------------------------------- */
198
199 STATIC_INLINE void
200 addToRunQueue( Capability *cap, StgTSO *t )
201 {
202 #if defined(PARALLEL_HASKELL)
203     if (RtsFlags.ParFlags.doFairScheduling) { 
204         // this does round-robin scheduling; good for concurrency
205         appendToRunQueue(cap,t);
206     } else {
207         // this does unfair scheduling; good for parallelism
208         pushOnRunQueue(cap,t);
209     }
210 #else
211     // this does round-robin scheduling; good for concurrency
212     appendToRunQueue(cap,t);
213 #endif
214 }
215
216 /* ---------------------------------------------------------------------------
217    Main scheduling loop.
218
219    We use round-robin scheduling, each thread returning to the
220    scheduler loop when one of these conditions is detected:
221
222       * out of heap space
223       * timer expires (thread yields)
224       * thread blocks
225       * thread ends
226       * stack overflow
227
228    GRAN version:
229      In a GranSim setup this loop iterates over the global event queue.
230      This revolves around the global event queue, which determines what 
231      to do next. Therefore, it's more complicated than either the 
232      concurrent or the parallel (GUM) setup.
233   This version has been entirely removed (JB 2008/08).
234
235    GUM version:
236      GUM iterates over incoming messages.
237      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
238      and sends out a fish whenever it has nothing to do; in-between
239      doing the actual reductions (shared code below) it processes the
240      incoming messages and deals with delayed operations 
241      (see PendingFetches).
242      This is not the ugliest code you could imagine, but it's bloody close.
243
244   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
245   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
246   as well as future GUM versions. This file has been refurbished to
247   only contain valid code, which is however incomplete, refers to
248   invalid includes etc.
249
250    ------------------------------------------------------------------------ */
251
252 static Capability *
253 schedule (Capability *initialCapability, Task *task)
254 {
255   StgTSO *t;
256   Capability *cap;
257   StgThreadReturnCode ret;
258 #if defined(PARALLEL_HASKELL)
259   rtsBool receivedFinish = rtsFalse;
260 #endif
261   nat prev_what_next;
262   rtsBool ready_to_gc;
263 #if defined(THREADED_RTS)
264   rtsBool first = rtsTrue;
265 #endif
266   
267   cap = initialCapability;
268
269   // Pre-condition: this task owns initialCapability.
270   // The sched_mutex is *NOT* held
271   // NB. on return, we still hold a capability.
272
273   debugTrace (DEBUG_sched, 
274               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
275               task, initialCapability);
276
277   schedulePreLoop();
278
279   // -----------------------------------------------------------
280   // Scheduler loop starts here:
281
282 #if defined(PARALLEL_HASKELL)
283 #define TERMINATION_CONDITION        (!receivedFinish)
284 #else
285 #define TERMINATION_CONDITION        rtsTrue
286 #endif
287
288   while (TERMINATION_CONDITION) {
289
290     // Check whether we have re-entered the RTS from Haskell without
291     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
292     // call).
293     if (cap->in_haskell) {
294           errorBelch("schedule: re-entered unsafely.\n"
295                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
296           stg_exit(EXIT_FAILURE);
297     }
298
299     // The interruption / shutdown sequence.
300     // 
301     // In order to cleanly shut down the runtime, we want to:
302     //   * make sure that all main threads return to their callers
303     //     with the state 'Interrupted'.
304     //   * clean up all OS threads assocated with the runtime
305     //   * free all memory etc.
306     //
307     // So the sequence for ^C goes like this:
308     //
309     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
310     //     arranges for some Capability to wake up
311     //
312     //   * all threads in the system are halted, and the zombies are
313     //     placed on the run queue for cleaning up.  We acquire all
314     //     the capabilities in order to delete the threads, this is
315     //     done by scheduleDoGC() for convenience (because GC already
316     //     needs to acquire all the capabilities).  We can't kill
317     //     threads involved in foreign calls.
318     // 
319     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
320     //
321     //   * sched_state := SCHED_SHUTTING_DOWN
322     //
323     //   * all workers exit when the run queue on their capability
324     //     drains.  All main threads will also exit when their TSO
325     //     reaches the head of the run queue and they can return.
326     //
327     //   * eventually all Capabilities will shut down, and the RTS can
328     //     exit.
329     //
330     //   * We might be left with threads blocked in foreign calls, 
331     //     we should really attempt to kill these somehow (TODO);
332     
333     switch (sched_state) {
334     case SCHED_RUNNING:
335         break;
336     case SCHED_INTERRUPTING:
337         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
338 #if defined(THREADED_RTS)
339         discardSparksCap(cap);
340 #endif
341         /* scheduleDoGC() deletes all the threads */
342         cap = scheduleDoGC(cap,task,rtsFalse);
343
344         // after scheduleDoGC(), we must be shutting down.  Either some
345         // other Capability did the final GC, or we did it above,
346         // either way we can fall through to the SCHED_SHUTTING_DOWN
347         // case now.
348         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
349         // fall through
350
351     case SCHED_SHUTTING_DOWN:
352         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
353         // If we are a worker, just exit.  If we're a bound thread
354         // then we will exit below when we've removed our TSO from
355         // the run queue.
356         if (task->tso == NULL && emptyRunQueue(cap)) {
357             return cap;
358         }
359         break;
360     default:
361         barf("sched_state: %d", sched_state);
362     }
363
364     scheduleFindWork(cap);
365
366     /* work pushing, currently relevant only for THREADED_RTS:
367        (pushes threads, wakes up idle capabilities for stealing) */
368     schedulePushWork(cap,task);
369
370 #if defined(PARALLEL_HASKELL)
371     /* since we perform a blocking receive and continue otherwise,
372        either we never reach here or we definitely have work! */
373     // from here: non-empty run queue
374     ASSERT(!emptyRunQueue(cap));
375
376     if (PacketsWaiting()) {  /* now process incoming messages, if any
377                                 pending...  
378
379                                 CAUTION: scheduleGetRemoteWork called
380                                 above, waits for messages as well! */
381       processMessages(cap, &receivedFinish);
382     }
383 #endif // PARALLEL_HASKELL: non-empty run queue!
384
385     scheduleDetectDeadlock(cap,task);
386
387 #if defined(THREADED_RTS)
388     cap = task->cap;    // reload cap, it might have changed
389 #endif
390
391     // Normally, the only way we can get here with no threads to
392     // run is if a keyboard interrupt received during 
393     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
394     // Additionally, it is not fatal for the
395     // threaded RTS to reach here with no threads to run.
396     //
397     // win32: might be here due to awaitEvent() being abandoned
398     // as a result of a console event having been delivered.
399     
400 #if defined(THREADED_RTS)
401     if (first) 
402     {
403     // XXX: ToDo
404     //     // don't yield the first time, we want a chance to run this
405     //     // thread for a bit, even if there are others banging at the
406     //     // door.
407     //     first = rtsFalse;
408     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
409     }
410
411   yield:
412     scheduleYield(&cap,task);
413     if (emptyRunQueue(cap)) continue; // look for work again
414 #endif
415
416 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
417     if ( emptyRunQueue(cap) ) {
418         ASSERT(sched_state >= SCHED_INTERRUPTING);
419     }
420 #endif
421
422     // 
423     // Get a thread to run
424     //
425     t = popRunQueue(cap);
426
427     // Sanity check the thread we're about to run.  This can be
428     // expensive if there is lots of thread switching going on...
429     IF_DEBUG(sanity,checkTSO(t));
430
431 #if defined(THREADED_RTS)
432     // Check whether we can run this thread in the current task.
433     // If not, we have to pass our capability to the right task.
434     {
435         Task *bound = t->bound;
436       
437         if (bound) {
438             if (bound == task) {
439                 debugTrace(DEBUG_sched,
440                            "### Running thread %lu in bound thread", (unsigned long)t->id);
441                 // yes, the Haskell thread is bound to the current native thread
442             } else {
443                 debugTrace(DEBUG_sched,
444                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
445                 // no, bound to a different Haskell thread: pass to that thread
446                 pushOnRunQueue(cap,t);
447                 continue;
448             }
449         } else {
450             // The thread we want to run is unbound.
451             if (task->tso) { 
452                 debugTrace(DEBUG_sched,
453                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
454                 // no, the current native thread is bound to a different
455                 // Haskell thread, so pass it to any worker thread
456                 pushOnRunQueue(cap,t);
457                 continue; 
458             }
459         }
460     }
461 #endif
462
463     // If we're shutting down, and this thread has not yet been
464     // killed, kill it now.  This sometimes happens when a finalizer
465     // thread is created by the final GC, or a thread previously
466     // in a foreign call returns.
467     if (sched_state >= SCHED_INTERRUPTING &&
468         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
469         deleteThread_(cap,t);
470     }
471
472     /* context switches are initiated by the timer signal, unless
473      * the user specified "context switch as often as possible", with
474      * +RTS -C0
475      */
476     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
477         && !emptyThreadQueues(cap)) {
478         cap->context_switch = 1;
479     }
480          
481 run_thread:
482
483     // CurrentTSO is the thread to run.  t might be different if we
484     // loop back to run_thread, so make sure to set CurrentTSO after
485     // that.
486     cap->r.rCurrentTSO = t;
487
488     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
489                               (long)t->id, whatNext_strs[t->what_next]);
490
491     startHeapProfTimer();
492
493     // Check for exceptions blocked on this thread
494     maybePerformBlockedException (cap, t);
495
496     // ----------------------------------------------------------------------
497     // Run the current thread 
498
499     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
500     ASSERT(t->cap == cap);
501     ASSERT(t->bound ? t->bound->cap == cap : 1);
502
503     prev_what_next = t->what_next;
504
505     errno = t->saved_errno;
506 #if mingw32_HOST_OS
507     SetLastError(t->saved_winerror);
508 #endif
509
510     cap->in_haskell = rtsTrue;
511
512     dirty_TSO(cap,t);
513
514 #if defined(THREADED_RTS)
515     if (recent_activity == ACTIVITY_DONE_GC) {
516         // ACTIVITY_DONE_GC means we turned off the timer signal to
517         // conserve power (see #1623).  Re-enable it here.
518         nat prev;
519         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
520         if (prev == ACTIVITY_DONE_GC) {
521             startTimer();
522         }
523     } else {
524         recent_activity = ACTIVITY_YES;
525     }
526 #endif
527
528     switch (prev_what_next) {
529         
530     case ThreadKilled:
531     case ThreadComplete:
532         /* Thread already finished, return to scheduler. */
533         ret = ThreadFinished;
534         break;
535         
536     case ThreadRunGHC:
537     {
538         StgRegTable *r;
539         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
540         cap = regTableToCapability(r);
541         ret = r->rRet;
542         break;
543     }
544     
545     case ThreadInterpret:
546         cap = interpretBCO(cap);
547         ret = cap->r.rRet;
548         break;
549         
550     default:
551         barf("schedule: invalid what_next field");
552     }
553
554     cap->in_haskell = rtsFalse;
555
556     // The TSO might have moved, eg. if it re-entered the RTS and a GC
557     // happened.  So find the new location:
558     t = cap->r.rCurrentTSO;
559
560     // We have run some Haskell code: there might be blackhole-blocked
561     // threads to wake up now.
562     // Lock-free test here should be ok, we're just setting a flag.
563     if ( blackhole_queue != END_TSO_QUEUE ) {
564         blackholes_need_checking = rtsTrue;
565     }
566     
567     // And save the current errno in this thread.
568     // XXX: possibly bogus for SMP because this thread might already
569     // be running again, see code below.
570     t->saved_errno = errno;
571 #if mingw32_HOST_OS
572     // Similarly for Windows error code
573     t->saved_winerror = GetLastError();
574 #endif
575
576 #if defined(THREADED_RTS)
577     // If ret is ThreadBlocked, and this Task is bound to the TSO that
578     // blocked, we are in limbo - the TSO is now owned by whatever it
579     // is blocked on, and may in fact already have been woken up,
580     // perhaps even on a different Capability.  It may be the case
581     // that task->cap != cap.  We better yield this Capability
582     // immediately and return to normaility.
583     if (ret == ThreadBlocked) {
584         debugTrace(DEBUG_sched,
585                    "--<< thread %lu (%s) stopped: blocked",
586                    (unsigned long)t->id, whatNext_strs[t->what_next]);
587         goto yield;
588     }
589 #endif
590
591     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
592     ASSERT(t->cap == cap);
593
594     // ----------------------------------------------------------------------
595     
596     // Costs for the scheduler are assigned to CCS_SYSTEM
597     stopHeapProfTimer();
598 #if defined(PROFILING)
599     CCCS = CCS_SYSTEM;
600 #endif
601     
602     schedulePostRunThread(cap,t);
603
604     t = threadStackUnderflow(task,t);
605
606     ready_to_gc = rtsFalse;
607
608     switch (ret) {
609     case HeapOverflow:
610         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
611         break;
612
613     case StackOverflow:
614         scheduleHandleStackOverflow(cap,task,t);
615         break;
616
617     case ThreadYielding:
618         if (scheduleHandleYield(cap, t, prev_what_next)) {
619             // shortcut for switching between compiler/interpreter:
620             goto run_thread; 
621         }
622         break;
623
624     case ThreadBlocked:
625         scheduleHandleThreadBlocked(t);
626         break;
627
628     case ThreadFinished:
629         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
630         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
631         break;
632
633     default:
634       barf("schedule: invalid thread return code %d", (int)ret);
635     }
636
637     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
638       cap = scheduleDoGC(cap,task,rtsFalse);
639     }
640   } /* end of while() */
641 }
642
643 /* ----------------------------------------------------------------------------
644  * Setting up the scheduler loop
645  * ------------------------------------------------------------------------- */
646
647 static void
648 schedulePreLoop(void)
649 {
650   // initialisation for scheduler - what cannot go into initScheduler()  
651 }
652
653 /* -----------------------------------------------------------------------------
654  * scheduleFindWork()
655  *
656  * Search for work to do, and handle messages from elsewhere.
657  * -------------------------------------------------------------------------- */
658
659 static void
660 scheduleFindWork (Capability *cap)
661 {
662     scheduleStartSignalHandlers(cap);
663
664     // Only check the black holes here if we've nothing else to do.
665     // During normal execution, the black hole list only gets checked
666     // at GC time, to avoid repeatedly traversing this possibly long
667     // list each time around the scheduler.
668     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
669
670     scheduleCheckWakeupThreads(cap);
671
672     scheduleCheckBlockedThreads(cap);
673
674 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
675     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
676 #endif
677
678 #if defined(PARALLEL_HASKELL)
679     // if messages have been buffered...
680     scheduleSendPendingMessages();
681 #endif
682
683 #if defined(PARALLEL_HASKELL)
684     if (emptyRunQueue(cap)) {
685         receivedFinish = scheduleGetRemoteWork(cap);
686         continue; //  a new round, (hopefully) with new work
687         /* 
688            in GUM, this a) sends out a FISH and returns IF no fish is
689                            out already
690                         b) (blocking) awaits and receives messages
691            
692            in Eden, this is only the blocking receive, as b) in GUM.
693         */
694     }
695 #endif
696 }
697
698 #if defined(THREADED_RTS)
699 STATIC_INLINE rtsBool
700 shouldYieldCapability (Capability *cap, Task *task)
701 {
702     // we need to yield this capability to someone else if..
703     //   - another thread is initiating a GC
704     //   - another Task is returning from a foreign call
705     //   - the thread at the head of the run queue cannot be run
706     //     by this Task (it is bound to another Task, or it is unbound
707     //     and this task it bound).
708     return (waiting_for_gc || 
709             cap->returning_tasks_hd != NULL ||
710             (!emptyRunQueue(cap) && (task->tso == NULL
711                                      ? cap->run_queue_hd->bound != NULL
712                                      : cap->run_queue_hd->bound != task)));
713 }
714
715 // This is the single place where a Task goes to sleep.  There are
716 // two reasons it might need to sleep:
717 //    - there are no threads to run
718 //    - we need to yield this Capability to someone else 
719 //      (see shouldYieldCapability())
720 //
721 // Careful: the scheduler loop is quite delicate.  Make sure you run
722 // the tests in testsuite/concurrent (all ways) after modifying this,
723 // and also check the benchmarks in nofib/parallel for regressions.
724
725 static void
726 scheduleYield (Capability **pcap, Task *task)
727 {
728     Capability *cap = *pcap;
729
730     // if we have work, and we don't need to give up the Capability, continue.
731     if (!shouldYieldCapability(cap,task) && 
732         (!emptyRunQueue(cap) ||
733          blackholes_need_checking ||
734          sched_state >= SCHED_INTERRUPTING))
735         return;
736
737     // otherwise yield (sleep), and keep yielding if necessary.
738     do {
739         yieldCapability(&cap,task);
740     } 
741     while (shouldYieldCapability(cap,task));
742
743     // note there may still be no threads on the run queue at this
744     // point, the caller has to check.
745
746     *pcap = cap;
747     return;
748 }
749 #endif
750     
751 /* -----------------------------------------------------------------------------
752  * schedulePushWork()
753  *
754  * Push work to other Capabilities if we have some.
755  * -------------------------------------------------------------------------- */
756
757 static void
758 schedulePushWork(Capability *cap USED_IF_THREADS, 
759                  Task *task      USED_IF_THREADS)
760 {
761   /* following code not for PARALLEL_HASKELL. I kept the call general,
762      future GUM versions might use pushing in a distributed setup */
763 #if defined(THREADED_RTS)
764
765     Capability *free_caps[n_capabilities], *cap0;
766     nat i, n_free_caps;
767
768     // migration can be turned off with +RTS -qg
769     if (!RtsFlags.ParFlags.migrate) return;
770
771     // Check whether we have more threads on our run queue, or sparks
772     // in our pool, that we could hand to another Capability.
773     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
774         && sparkPoolSizeCap(cap) < 2) {
775         return;
776     }
777
778     // First grab as many free Capabilities as we can.
779     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
780         cap0 = &capabilities[i];
781         if (cap != cap0 && tryGrabCapability(cap0,task)) {
782             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
783                 // it already has some work, we just grabbed it at 
784                 // the wrong moment.  Or maybe it's deadlocked!
785                 releaseCapability(cap0);
786             } else {
787                 free_caps[n_free_caps++] = cap0;
788             }
789         }
790     }
791
792     // we now have n_free_caps free capabilities stashed in
793     // free_caps[].  Share our run queue equally with them.  This is
794     // probably the simplest thing we could do; improvements we might
795     // want to do include:
796     //
797     //   - giving high priority to moving relatively new threads, on 
798     //     the gournds that they haven't had time to build up a
799     //     working set in the cache on this CPU/Capability.
800     //
801     //   - giving low priority to moving long-lived threads
802
803     if (n_free_caps > 0) {
804         StgTSO *prev, *t, *next;
805         rtsBool pushed_to_all;
806
807         debugTrace(DEBUG_sched, 
808                    "cap %d: %s and %d free capabilities, sharing...", 
809                    cap->no, 
810                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
811                    "excess threads on run queue":"sparks to share (>=2)",
812                    n_free_caps);
813
814         i = 0;
815         pushed_to_all = rtsFalse;
816
817         if (cap->run_queue_hd != END_TSO_QUEUE) {
818             prev = cap->run_queue_hd;
819             t = prev->_link;
820             prev->_link = END_TSO_QUEUE;
821             for (; t != END_TSO_QUEUE; t = next) {
822                 next = t->_link;
823                 t->_link = END_TSO_QUEUE;
824                 if (t->what_next == ThreadRelocated
825                     || t->bound == task // don't move my bound thread
826                     || tsoLocked(t)) {  // don't move a locked thread
827                     setTSOLink(cap, prev, t);
828                     prev = t;
829                 } else if (i == n_free_caps) {
830                     pushed_to_all = rtsTrue;
831                     i = 0;
832                     // keep one for us
833                     setTSOLink(cap, prev, t);
834                     prev = t;
835                 } else {
836                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
837                     appendToRunQueue(free_caps[i],t);
838                     if (t->bound) { t->bound->cap = free_caps[i]; }
839                     t->cap = free_caps[i];
840                     i++;
841                 }
842             }
843             cap->run_queue_tl = prev;
844         }
845
846 #ifdef SPARK_PUSHING
847         /* JB I left this code in place, it would work but is not necessary */
848
849         // If there are some free capabilities that we didn't push any
850         // threads to, then try to push a spark to each one.
851         if (!pushed_to_all) {
852             StgClosure *spark;
853             // i is the next free capability to push to
854             for (; i < n_free_caps; i++) {
855                 if (emptySparkPoolCap(free_caps[i])) {
856                     spark = tryStealSpark(cap->sparks);
857                     if (spark != NULL) {
858                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
859                         newSpark(&(free_caps[i]->r), spark);
860                     }
861                 }
862             }
863         }
864 #endif /* SPARK_PUSHING */
865
866         // release the capabilities
867         for (i = 0; i < n_free_caps; i++) {
868             task->cap = free_caps[i];
869             releaseAndWakeupCapability(free_caps[i]);
870         }
871     }
872     task->cap = cap; // reset to point to our Capability.
873
874 #endif /* THREADED_RTS */
875
876 }
877
878 /* ----------------------------------------------------------------------------
879  * Start any pending signal handlers
880  * ------------------------------------------------------------------------- */
881
882 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
883 static void
884 scheduleStartSignalHandlers(Capability *cap)
885 {
886     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
887         // safe outside the lock
888         startSignalHandlers(cap);
889     }
890 }
891 #else
892 static void
893 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
894 {
895 }
896 #endif
897
898 /* ----------------------------------------------------------------------------
899  * Check for blocked threads that can be woken up.
900  * ------------------------------------------------------------------------- */
901
902 static void
903 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
904 {
905 #if !defined(THREADED_RTS)
906     //
907     // Check whether any waiting threads need to be woken up.  If the
908     // run queue is empty, and there are no other tasks running, we
909     // can wait indefinitely for something to happen.
910     //
911     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
912     {
913         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
914     }
915 #endif
916 }
917
918
919 /* ----------------------------------------------------------------------------
920  * Check for threads woken up by other Capabilities
921  * ------------------------------------------------------------------------- */
922
923 static void
924 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
925 {
926 #if defined(THREADED_RTS)
927     // Any threads that were woken up by other Capabilities get
928     // appended to our run queue.
929     if (!emptyWakeupQueue(cap)) {
930         ACQUIRE_LOCK(&cap->lock);
931         if (emptyRunQueue(cap)) {
932             cap->run_queue_hd = cap->wakeup_queue_hd;
933             cap->run_queue_tl = cap->wakeup_queue_tl;
934         } else {
935             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
936             cap->run_queue_tl = cap->wakeup_queue_tl;
937         }
938         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
939         RELEASE_LOCK(&cap->lock);
940     }
941 #endif
942 }
943
944 /* ----------------------------------------------------------------------------
945  * Check for threads blocked on BLACKHOLEs that can be woken up
946  * ------------------------------------------------------------------------- */
947 static void
948 scheduleCheckBlackHoles (Capability *cap)
949 {
950     if ( blackholes_need_checking ) // check without the lock first
951     {
952         ACQUIRE_LOCK(&sched_mutex);
953         if ( blackholes_need_checking ) {
954             blackholes_need_checking = rtsFalse;
955             // important that we reset the flag *before* checking the
956             // blackhole queue, otherwise we could get deadlock.  This
957             // happens as follows: we wake up a thread that
958             // immediately runs on another Capability, blocks on a
959             // blackhole, and then we reset the blackholes_need_checking flag.
960             checkBlackHoles(cap);
961         }
962         RELEASE_LOCK(&sched_mutex);
963     }
964 }
965
966 /* ----------------------------------------------------------------------------
967  * Detect deadlock conditions and attempt to resolve them.
968  * ------------------------------------------------------------------------- */
969
970 static void
971 scheduleDetectDeadlock (Capability *cap, Task *task)
972 {
973
974 #if defined(PARALLEL_HASKELL)
975     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
976     return;
977 #endif
978
979     /* 
980      * Detect deadlock: when we have no threads to run, there are no
981      * threads blocked, waiting for I/O, or sleeping, and all the
982      * other tasks are waiting for work, we must have a deadlock of
983      * some description.
984      */
985     if ( emptyThreadQueues(cap) )
986     {
987 #if defined(THREADED_RTS)
988         /* 
989          * In the threaded RTS, we only check for deadlock if there
990          * has been no activity in a complete timeslice.  This means
991          * we won't eagerly start a full GC just because we don't have
992          * any threads to run currently.
993          */
994         if (recent_activity != ACTIVITY_INACTIVE) return;
995 #endif
996
997         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
998
999         // Garbage collection can release some new threads due to
1000         // either (a) finalizers or (b) threads resurrected because
1001         // they are unreachable and will therefore be sent an
1002         // exception.  Any threads thus released will be immediately
1003         // runnable.
1004         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
1005
1006         recent_activity = ACTIVITY_DONE_GC;
1007         // disable timer signals (see #1623)
1008         stopTimer();
1009         
1010         if ( !emptyRunQueue(cap) ) return;
1011
1012 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1013         /* If we have user-installed signal handlers, then wait
1014          * for signals to arrive rather then bombing out with a
1015          * deadlock.
1016          */
1017         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1018             debugTrace(DEBUG_sched,
1019                        "still deadlocked, waiting for signals...");
1020
1021             awaitUserSignals();
1022
1023             if (signals_pending()) {
1024                 startSignalHandlers(cap);
1025             }
1026
1027             // either we have threads to run, or we were interrupted:
1028             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1029
1030             return;
1031         }
1032 #endif
1033
1034 #if !defined(THREADED_RTS)
1035         /* Probably a real deadlock.  Send the current main thread the
1036          * Deadlock exception.
1037          */
1038         if (task->tso) {
1039             switch (task->tso->why_blocked) {
1040             case BlockedOnSTM:
1041             case BlockedOnBlackHole:
1042             case BlockedOnException:
1043             case BlockedOnMVar:
1044                 throwToSingleThreaded(cap, task->tso, 
1045                                       (StgClosure *)nonTermination_closure);
1046                 return;
1047             default:
1048                 barf("deadlock: main thread blocked in a strange way");
1049             }
1050         }
1051         return;
1052 #endif
1053     }
1054 }
1055
1056
1057 /* ----------------------------------------------------------------------------
1058  * Send pending messages (PARALLEL_HASKELL only)
1059  * ------------------------------------------------------------------------- */
1060
1061 #if defined(PARALLEL_HASKELL)
1062 static void
1063 scheduleSendPendingMessages(void)
1064 {
1065
1066 # if defined(PAR) // global Mem.Mgmt., omit for now
1067     if (PendingFetches != END_BF_QUEUE) {
1068         processFetches();
1069     }
1070 # endif
1071     
1072     if (RtsFlags.ParFlags.BufferTime) {
1073         // if we use message buffering, we must send away all message
1074         // packets which have become too old...
1075         sendOldBuffers(); 
1076     }
1077 }
1078 #endif
1079
1080 /* ----------------------------------------------------------------------------
1081  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1082  * ------------------------------------------------------------------------- */
1083
1084 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1085 static void
1086 scheduleActivateSpark(Capability *cap)
1087 {
1088     if (anySparks())
1089     {
1090         createSparkThread(cap);
1091         debugTrace(DEBUG_sched, "creating a spark thread");
1092     }
1093 }
1094 #endif // PARALLEL_HASKELL || THREADED_RTS
1095
1096 /* ----------------------------------------------------------------------------
1097  * Get work from a remote node (PARALLEL_HASKELL only)
1098  * ------------------------------------------------------------------------- */
1099     
1100 #if defined(PARALLEL_HASKELL)
1101 static rtsBool /* return value used in PARALLEL_HASKELL only */
1102 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1103 {
1104 #if defined(PARALLEL_HASKELL)
1105   rtsBool receivedFinish = rtsFalse;
1106
1107   // idle() , i.e. send all buffers, wait for work
1108   if (RtsFlags.ParFlags.BufferTime) {
1109         IF_PAR_DEBUG(verbose, 
1110                 debugBelch("...send all pending data,"));
1111         {
1112           nat i;
1113           for (i=1; i<=nPEs; i++)
1114             sendImmediately(i); // send all messages away immediately
1115         }
1116   }
1117
1118   /* this would be the place for fishing in GUM... 
1119
1120      if (no-earlier-fish-around) 
1121           sendFish(choosePe());
1122    */
1123
1124   // Eden:just look for incoming messages (blocking receive)
1125   IF_PAR_DEBUG(verbose, 
1126                debugBelch("...wait for incoming messages...\n"));
1127   processMessages(cap, &receivedFinish); // blocking receive...
1128
1129
1130   return receivedFinish;
1131   // reenter scheduling look after having received something
1132
1133 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1134
1135   return rtsFalse; /* return value unused in THREADED_RTS */
1136
1137 #endif /* PARALLEL_HASKELL */
1138 }
1139 #endif // PARALLEL_HASKELL || THREADED_RTS
1140
1141 /* ----------------------------------------------------------------------------
1142  * After running a thread...
1143  * ------------------------------------------------------------------------- */
1144
1145 static void
1146 schedulePostRunThread (Capability *cap, StgTSO *t)
1147 {
1148     // We have to be able to catch transactions that are in an
1149     // infinite loop as a result of seeing an inconsistent view of
1150     // memory, e.g. 
1151     //
1152     //   atomically $ do
1153     //       [a,b] <- mapM readTVar [ta,tb]
1154     //       when (a == b) loop
1155     //
1156     // and a is never equal to b given a consistent view of memory.
1157     //
1158     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1159         if (!stmValidateNestOfTransactions (t -> trec)) {
1160             debugTrace(DEBUG_sched | DEBUG_stm,
1161                        "trec %p found wasting its time", t);
1162             
1163             // strip the stack back to the
1164             // ATOMICALLY_FRAME, aborting the (nested)
1165             // transaction, and saving the stack of any
1166             // partially-evaluated thunks on the heap.
1167             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1168             
1169             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1170         }
1171     }
1172
1173   /* some statistics gathering in the parallel case */
1174 }
1175
1176 /* -----------------------------------------------------------------------------
1177  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1178  * -------------------------------------------------------------------------- */
1179
1180 static rtsBool
1181 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1182 {
1183     // did the task ask for a large block?
1184     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1185         // if so, get one and push it on the front of the nursery.
1186         bdescr *bd;
1187         lnat blocks;
1188         
1189         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1190         
1191         debugTrace(DEBUG_sched,
1192                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1193                    (long)t->id, whatNext_strs[t->what_next], blocks);
1194     
1195         // don't do this if the nursery is (nearly) full, we'll GC first.
1196         if (cap->r.rCurrentNursery->link != NULL ||
1197             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1198                                                // if the nursery has only one block.
1199             
1200             ACQUIRE_SM_LOCK
1201             bd = allocGroup( blocks );
1202             RELEASE_SM_LOCK
1203             cap->r.rNursery->n_blocks += blocks;
1204             
1205             // link the new group into the list
1206             bd->link = cap->r.rCurrentNursery;
1207             bd->u.back = cap->r.rCurrentNursery->u.back;
1208             if (cap->r.rCurrentNursery->u.back != NULL) {
1209                 cap->r.rCurrentNursery->u.back->link = bd;
1210             } else {
1211 #if !defined(THREADED_RTS)
1212                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1213                        g0s0 == cap->r.rNursery);
1214 #endif
1215                 cap->r.rNursery->blocks = bd;
1216             }             
1217             cap->r.rCurrentNursery->u.back = bd;
1218             
1219             // initialise it as a nursery block.  We initialise the
1220             // step, gen_no, and flags field of *every* sub-block in
1221             // this large block, because this is easier than making
1222             // sure that we always find the block head of a large
1223             // block whenever we call Bdescr() (eg. evacuate() and
1224             // isAlive() in the GC would both have to do this, at
1225             // least).
1226             { 
1227                 bdescr *x;
1228                 for (x = bd; x < bd + blocks; x++) {
1229                     x->step = cap->r.rNursery;
1230                     x->gen_no = 0;
1231                     x->flags = 0;
1232                 }
1233             }
1234             
1235             // This assert can be a killer if the app is doing lots
1236             // of large block allocations.
1237             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1238             
1239             // now update the nursery to point to the new block
1240             cap->r.rCurrentNursery = bd;
1241             
1242             // we might be unlucky and have another thread get on the
1243             // run queue before us and steal the large block, but in that
1244             // case the thread will just end up requesting another large
1245             // block.
1246             pushOnRunQueue(cap,t);
1247             return rtsFalse;  /* not actually GC'ing */
1248         }
1249     }
1250     
1251     debugTrace(DEBUG_sched,
1252                "--<< thread %ld (%s) stopped: HeapOverflow",
1253                (long)t->id, whatNext_strs[t->what_next]);
1254
1255     if (cap->context_switch) {
1256         // Sometimes we miss a context switch, e.g. when calling
1257         // primitives in a tight loop, MAYBE_GC() doesn't check the
1258         // context switch flag, and we end up waiting for a GC.
1259         // See #1984, and concurrent/should_run/1984
1260         cap->context_switch = 0;
1261         addToRunQueue(cap,t);
1262     } else {
1263         pushOnRunQueue(cap,t);
1264     }
1265     return rtsTrue;
1266     /* actual GC is done at the end of the while loop in schedule() */
1267 }
1268
1269 /* -----------------------------------------------------------------------------
1270  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1271  * -------------------------------------------------------------------------- */
1272
1273 static void
1274 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1275 {
1276     debugTrace (DEBUG_sched,
1277                 "--<< thread %ld (%s) stopped, StackOverflow", 
1278                 (long)t->id, whatNext_strs[t->what_next]);
1279
1280     /* just adjust the stack for this thread, then pop it back
1281      * on the run queue.
1282      */
1283     { 
1284         /* enlarge the stack */
1285         StgTSO *new_t = threadStackOverflow(cap, t);
1286         
1287         /* The TSO attached to this Task may have moved, so update the
1288          * pointer to it.
1289          */
1290         if (task->tso == t) {
1291             task->tso = new_t;
1292         }
1293         pushOnRunQueue(cap,new_t);
1294     }
1295 }
1296
1297 /* -----------------------------------------------------------------------------
1298  * Handle a thread that returned to the scheduler with ThreadYielding
1299  * -------------------------------------------------------------------------- */
1300
1301 static rtsBool
1302 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1303 {
1304     // Reset the context switch flag.  We don't do this just before
1305     // running the thread, because that would mean we would lose ticks
1306     // during GC, which can lead to unfair scheduling (a thread hogs
1307     // the CPU because the tick always arrives during GC).  This way
1308     // penalises threads that do a lot of allocation, but that seems
1309     // better than the alternative.
1310     cap->context_switch = 0;
1311     
1312     /* put the thread back on the run queue.  Then, if we're ready to
1313      * GC, check whether this is the last task to stop.  If so, wake
1314      * up the GC thread.  getThread will block during a GC until the
1315      * GC is finished.
1316      */
1317 #ifdef DEBUG
1318     if (t->what_next != prev_what_next) {
1319         debugTrace(DEBUG_sched,
1320                    "--<< thread %ld (%s) stopped to switch evaluators", 
1321                    (long)t->id, whatNext_strs[t->what_next]);
1322     } else {
1323         debugTrace(DEBUG_sched,
1324                    "--<< thread %ld (%s) stopped, yielding",
1325                    (long)t->id, whatNext_strs[t->what_next]);
1326     }
1327 #endif
1328     
1329     IF_DEBUG(sanity,
1330              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1331              checkTSO(t));
1332     ASSERT(t->_link == END_TSO_QUEUE);
1333     
1334     // Shortcut if we're just switching evaluators: don't bother
1335     // doing stack squeezing (which can be expensive), just run the
1336     // thread.
1337     if (t->what_next != prev_what_next) {
1338         return rtsTrue;
1339     }
1340
1341     addToRunQueue(cap,t);
1342
1343     return rtsFalse;
1344 }
1345
1346 /* -----------------------------------------------------------------------------
1347  * Handle a thread that returned to the scheduler with ThreadBlocked
1348  * -------------------------------------------------------------------------- */
1349
1350 static void
1351 scheduleHandleThreadBlocked( StgTSO *t
1352 #if !defined(GRAN) && !defined(DEBUG)
1353     STG_UNUSED
1354 #endif
1355     )
1356 {
1357
1358       // We don't need to do anything.  The thread is blocked, and it
1359       // has tidied up its stack and placed itself on whatever queue
1360       // it needs to be on.
1361
1362     // ASSERT(t->why_blocked != NotBlocked);
1363     // Not true: for example,
1364     //    - in THREADED_RTS, the thread may already have been woken
1365     //      up by another Capability.  This actually happens: try
1366     //      conc023 +RTS -N2.
1367     //    - the thread may have woken itself up already, because
1368     //      threadPaused() might have raised a blocked throwTo
1369     //      exception, see maybePerformBlockedException().
1370
1371 #ifdef DEBUG
1372     if (traceClass(DEBUG_sched)) {
1373         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1374                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1375         printThreadBlockage(t);
1376         debugTraceEnd();
1377     }
1378 #endif
1379 }
1380
1381 /* -----------------------------------------------------------------------------
1382  * Handle a thread that returned to the scheduler with ThreadFinished
1383  * -------------------------------------------------------------------------- */
1384
1385 static rtsBool
1386 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1387 {
1388     /* Need to check whether this was a main thread, and if so,
1389      * return with the return value.
1390      *
1391      * We also end up here if the thread kills itself with an
1392      * uncaught exception, see Exception.cmm.
1393      */
1394     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1395                (unsigned long)t->id, whatNext_strs[t->what_next]);
1396
1397       //
1398       // Check whether the thread that just completed was a bound
1399       // thread, and if so return with the result.  
1400       //
1401       // There is an assumption here that all thread completion goes
1402       // through this point; we need to make sure that if a thread
1403       // ends up in the ThreadKilled state, that it stays on the run
1404       // queue so it can be dealt with here.
1405       //
1406
1407       if (t->bound) {
1408
1409           if (t->bound != task) {
1410 #if !defined(THREADED_RTS)
1411               // Must be a bound thread that is not the topmost one.  Leave
1412               // it on the run queue until the stack has unwound to the
1413               // point where we can deal with this.  Leaving it on the run
1414               // queue also ensures that the garbage collector knows about
1415               // this thread and its return value (it gets dropped from the
1416               // step->threads list so there's no other way to find it).
1417               appendToRunQueue(cap,t);
1418               return rtsFalse;
1419 #else
1420               // this cannot happen in the threaded RTS, because a
1421               // bound thread can only be run by the appropriate Task.
1422               barf("finished bound thread that isn't mine");
1423 #endif
1424           }
1425
1426           ASSERT(task->tso == t);
1427
1428           if (t->what_next == ThreadComplete) {
1429               if (task->ret) {
1430                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1431                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1432               }
1433               task->stat = Success;
1434           } else {
1435               if (task->ret) {
1436                   *(task->ret) = NULL;
1437               }
1438               if (sched_state >= SCHED_INTERRUPTING) {
1439                   task->stat = Interrupted;
1440               } else {
1441                   task->stat = Killed;
1442               }
1443           }
1444 #ifdef DEBUG
1445           removeThreadLabel((StgWord)task->tso->id);
1446 #endif
1447           return rtsTrue; // tells schedule() to return
1448       }
1449
1450       return rtsFalse;
1451 }
1452
1453 /* -----------------------------------------------------------------------------
1454  * Perform a heap census
1455  * -------------------------------------------------------------------------- */
1456
1457 static rtsBool
1458 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1459 {
1460     // When we have +RTS -i0 and we're heap profiling, do a census at
1461     // every GC.  This lets us get repeatable runs for debugging.
1462     if (performHeapProfile ||
1463         (RtsFlags.ProfFlags.profileInterval==0 &&
1464          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1465         return rtsTrue;
1466     } else {
1467         return rtsFalse;
1468     }
1469 }
1470
1471 /* -----------------------------------------------------------------------------
1472  * Perform a garbage collection if necessary
1473  * -------------------------------------------------------------------------- */
1474
1475 static Capability *
1476 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1477 {
1478     rtsBool heap_census;
1479 #ifdef THREADED_RTS
1480     /* extern static volatile StgWord waiting_for_gc; 
1481        lives inside capability.c */
1482     rtsBool was_waiting;
1483     nat i;
1484 #endif
1485
1486     if (sched_state == SCHED_SHUTTING_DOWN) {
1487         // The final GC has already been done, and the system is
1488         // shutting down.  We'll probably deadlock if we try to GC
1489         // now.
1490         return cap;
1491     }
1492
1493 #ifdef THREADED_RTS
1494     // In order to GC, there must be no threads running Haskell code.
1495     // Therefore, the GC thread needs to hold *all* the capabilities,
1496     // and release them after the GC has completed.  
1497     //
1498     // This seems to be the simplest way: previous attempts involved
1499     // making all the threads with capabilities give up their
1500     // capabilities and sleep except for the *last* one, which
1501     // actually did the GC.  But it's quite hard to arrange for all
1502     // the other tasks to sleep and stay asleep.
1503     //
1504         
1505     /*  Other capabilities are prevented from running yet more Haskell
1506         threads if waiting_for_gc is set. Tested inside
1507         yieldCapability() and releaseCapability() in Capability.c */
1508
1509     was_waiting = cas(&waiting_for_gc, 0, 1);
1510     if (was_waiting) {
1511         do {
1512             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1513             if (cap) yieldCapability(&cap,task);
1514         } while (waiting_for_gc);
1515         return cap;  // NOTE: task->cap might have changed here
1516     }
1517
1518     setContextSwitches();
1519     for (i=0; i < n_capabilities; i++) {
1520         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1521         if (cap != &capabilities[i]) {
1522             Capability *pcap = &capabilities[i];
1523             // we better hope this task doesn't get migrated to
1524             // another Capability while we're waiting for this one.
1525             // It won't, because load balancing happens while we have
1526             // all the Capabilities, but even so it's a slightly
1527             // unsavoury invariant.
1528             task->cap = pcap;
1529             waitForReturnCapability(&pcap, task);
1530             if (pcap != &capabilities[i]) {
1531                 barf("scheduleDoGC: got the wrong capability");
1532             }
1533         }
1534     }
1535
1536     waiting_for_gc = rtsFalse;
1537 #endif
1538
1539     // so this happens periodically:
1540     if (cap) scheduleCheckBlackHoles(cap);
1541     
1542     IF_DEBUG(scheduler, printAllThreads());
1543
1544     /*
1545      * We now have all the capabilities; if we're in an interrupting
1546      * state, then we should take the opportunity to delete all the
1547      * threads in the system.
1548      */
1549     if (sched_state >= SCHED_INTERRUPTING) {
1550         deleteAllThreads(&capabilities[0]);
1551         sched_state = SCHED_SHUTTING_DOWN;
1552     }
1553     
1554     heap_census = scheduleNeedHeapProfile(rtsTrue);
1555
1556     /* everybody back, start the GC.
1557      * Could do it in this thread, or signal a condition var
1558      * to do it in another thread.  Either way, we need to
1559      * broadcast on gc_pending_cond afterward.
1560      */
1561 #if defined(THREADED_RTS)
1562     debugTrace(DEBUG_sched, "doing GC");
1563 #endif
1564     GarbageCollect(force_major || heap_census);
1565     
1566     if (heap_census) {
1567         debugTrace(DEBUG_sched, "performing heap census");
1568         heapCensus();
1569         performHeapProfile = rtsFalse;
1570     }
1571
1572 #ifdef SPARKBALANCE
1573     /* JB 
1574        Once we are all together... this would be the place to balance all
1575        spark pools. No concurrent stealing or adding of new sparks can
1576        occur. Should be defined in Sparks.c. */
1577     balanceSparkPoolsCaps(n_capabilities, capabilities);
1578 #endif
1579
1580 #if defined(THREADED_RTS)
1581     // release our stash of capabilities.
1582     for (i = 0; i < n_capabilities; i++) {
1583         if (cap != &capabilities[i]) {
1584             task->cap = &capabilities[i];
1585             releaseCapability(&capabilities[i]);
1586         }
1587     }
1588     if (cap) {
1589         task->cap = cap;
1590     } else {
1591         task->cap = NULL;
1592     }
1593 #endif
1594
1595     return cap;
1596 }
1597
1598 /* ---------------------------------------------------------------------------
1599  * Singleton fork(). Do not copy any running threads.
1600  * ------------------------------------------------------------------------- */
1601
1602 pid_t
1603 forkProcess(HsStablePtr *entry
1604 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1605             STG_UNUSED
1606 #endif
1607            )
1608 {
1609 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1610     Task *task;
1611     pid_t pid;
1612     StgTSO* t,*next;
1613     Capability *cap;
1614     nat s;
1615     
1616 #if defined(THREADED_RTS)
1617     if (RtsFlags.ParFlags.nNodes > 1) {
1618         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1619         stg_exit(EXIT_FAILURE);
1620     }
1621 #endif
1622
1623     debugTrace(DEBUG_sched, "forking!");
1624     
1625     // ToDo: for SMP, we should probably acquire *all* the capabilities
1626     cap = rts_lock();
1627     
1628     // no funny business: hold locks while we fork, otherwise if some
1629     // other thread is holding a lock when the fork happens, the data
1630     // structure protected by the lock will forever be in an
1631     // inconsistent state in the child.  See also #1391.
1632     ACQUIRE_LOCK(&sched_mutex);
1633     ACQUIRE_LOCK(&cap->lock);
1634     ACQUIRE_LOCK(&cap->running_task->lock);
1635
1636     pid = fork();
1637     
1638     if (pid) { // parent
1639         
1640         RELEASE_LOCK(&sched_mutex);
1641         RELEASE_LOCK(&cap->lock);
1642         RELEASE_LOCK(&cap->running_task->lock);
1643
1644         // just return the pid
1645         rts_unlock(cap);
1646         return pid;
1647         
1648     } else { // child
1649         
1650 #if defined(THREADED_RTS)
1651         initMutex(&sched_mutex);
1652         initMutex(&cap->lock);
1653         initMutex(&cap->running_task->lock);
1654 #endif
1655
1656         // Now, all OS threads except the thread that forked are
1657         // stopped.  We need to stop all Haskell threads, including
1658         // those involved in foreign calls.  Also we need to delete
1659         // all Tasks, because they correspond to OS threads that are
1660         // now gone.
1661
1662         for (s = 0; s < total_steps; s++) {
1663           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1664             if (t->what_next == ThreadRelocated) {
1665                 next = t->_link;
1666             } else {
1667                 next = t->global_link;
1668                 // don't allow threads to catch the ThreadKilled
1669                 // exception, but we do want to raiseAsync() because these
1670                 // threads may be evaluating thunks that we need later.
1671                 deleteThread_(cap,t);
1672             }
1673           }
1674         }
1675         
1676         // Empty the run queue.  It seems tempting to let all the
1677         // killed threads stay on the run queue as zombies to be
1678         // cleaned up later, but some of them correspond to bound
1679         // threads for which the corresponding Task does not exist.
1680         cap->run_queue_hd = END_TSO_QUEUE;
1681         cap->run_queue_tl = END_TSO_QUEUE;
1682
1683         // Any suspended C-calling Tasks are no more, their OS threads
1684         // don't exist now:
1685         cap->suspended_ccalling_tasks = NULL;
1686
1687         // Empty the threads lists.  Otherwise, the garbage
1688         // collector may attempt to resurrect some of these threads.
1689         for (s = 0; s < total_steps; s++) {
1690             all_steps[s].threads = END_TSO_QUEUE;
1691         }
1692
1693         // Wipe the task list, except the current Task.
1694         ACQUIRE_LOCK(&sched_mutex);
1695         for (task = all_tasks; task != NULL; task=task->all_link) {
1696             if (task != cap->running_task) {
1697 #if defined(THREADED_RTS)
1698                 initMutex(&task->lock); // see #1391
1699 #endif
1700                 discardTask(task);
1701             }
1702         }
1703         RELEASE_LOCK(&sched_mutex);
1704
1705 #if defined(THREADED_RTS)
1706         // Wipe our spare workers list, they no longer exist.  New
1707         // workers will be created if necessary.
1708         cap->spare_workers = NULL;
1709         cap->returning_tasks_hd = NULL;
1710         cap->returning_tasks_tl = NULL;
1711 #endif
1712
1713         // On Unix, all timers are reset in the child, so we need to start
1714         // the timer again.
1715         initTimer();
1716         startTimer();
1717
1718         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1719         rts_checkSchedStatus("forkProcess",cap);
1720         
1721         rts_unlock(cap);
1722         hs_exit();                      // clean up and exit
1723         stg_exit(EXIT_SUCCESS);
1724     }
1725 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1726     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1727     return -1;
1728 #endif
1729 }
1730
1731 /* ---------------------------------------------------------------------------
1732  * Delete all the threads in the system
1733  * ------------------------------------------------------------------------- */
1734    
1735 static void
1736 deleteAllThreads ( Capability *cap )
1737 {
1738     // NOTE: only safe to call if we own all capabilities.
1739
1740     StgTSO* t, *next;
1741     nat s;
1742
1743     debugTrace(DEBUG_sched,"deleting all threads");
1744     for (s = 0; s < total_steps; s++) {
1745       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1746         if (t->what_next == ThreadRelocated) {
1747             next = t->_link;
1748         } else {
1749             next = t->global_link;
1750             deleteThread(cap,t);
1751         }
1752       }
1753     }      
1754
1755     // The run queue now contains a bunch of ThreadKilled threads.  We
1756     // must not throw these away: the main thread(s) will be in there
1757     // somewhere, and the main scheduler loop has to deal with it.
1758     // Also, the run queue is the only thing keeping these threads from
1759     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1760
1761 #if !defined(THREADED_RTS)
1762     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1763     ASSERT(sleeping_queue == END_TSO_QUEUE);
1764 #endif
1765 }
1766
1767 /* -----------------------------------------------------------------------------
1768    Managing the suspended_ccalling_tasks list.
1769    Locks required: sched_mutex
1770    -------------------------------------------------------------------------- */
1771
1772 STATIC_INLINE void
1773 suspendTask (Capability *cap, Task *task)
1774 {
1775     ASSERT(task->next == NULL && task->prev == NULL);
1776     task->next = cap->suspended_ccalling_tasks;
1777     task->prev = NULL;
1778     if (cap->suspended_ccalling_tasks) {
1779         cap->suspended_ccalling_tasks->prev = task;
1780     }
1781     cap->suspended_ccalling_tasks = task;
1782 }
1783
1784 STATIC_INLINE void
1785 recoverSuspendedTask (Capability *cap, Task *task)
1786 {
1787     if (task->prev) {
1788         task->prev->next = task->next;
1789     } else {
1790         ASSERT(cap->suspended_ccalling_tasks == task);
1791         cap->suspended_ccalling_tasks = task->next;
1792     }
1793     if (task->next) {
1794         task->next->prev = task->prev;
1795     }
1796     task->next = task->prev = NULL;
1797 }
1798
1799 /* ---------------------------------------------------------------------------
1800  * Suspending & resuming Haskell threads.
1801  * 
1802  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1803  * its capability before calling the C function.  This allows another
1804  * task to pick up the capability and carry on running Haskell
1805  * threads.  It also means that if the C call blocks, it won't lock
1806  * the whole system.
1807  *
1808  * The Haskell thread making the C call is put to sleep for the
1809  * duration of the call, on the susepended_ccalling_threads queue.  We
1810  * give out a token to the task, which it can use to resume the thread
1811  * on return from the C function.
1812  * ------------------------------------------------------------------------- */
1813    
1814 void *
1815 suspendThread (StgRegTable *reg)
1816 {
1817   Capability *cap;
1818   int saved_errno;
1819   StgTSO *tso;
1820   Task *task;
1821 #if mingw32_HOST_OS
1822   StgWord32 saved_winerror;
1823 #endif
1824
1825   saved_errno = errno;
1826 #if mingw32_HOST_OS
1827   saved_winerror = GetLastError();
1828 #endif
1829
1830   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1831    */
1832   cap = regTableToCapability(reg);
1833
1834   task = cap->running_task;
1835   tso = cap->r.rCurrentTSO;
1836
1837   debugTrace(DEBUG_sched, 
1838              "thread %lu did a safe foreign call", 
1839              (unsigned long)cap->r.rCurrentTSO->id);
1840
1841   // XXX this might not be necessary --SDM
1842   tso->what_next = ThreadRunGHC;
1843
1844   threadPaused(cap,tso);
1845
1846   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1847       tso->why_blocked = BlockedOnCCall;
1848       tso->flags |= TSO_BLOCKEX;
1849       tso->flags &= ~TSO_INTERRUPTIBLE;
1850   } else {
1851       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1852   }
1853
1854   // Hand back capability
1855   task->suspended_tso = tso;
1856
1857   ACQUIRE_LOCK(&cap->lock);
1858
1859   suspendTask(cap,task);
1860   cap->in_haskell = rtsFalse;
1861   releaseCapability_(cap,rtsFalse);
1862   
1863   RELEASE_LOCK(&cap->lock);
1864
1865 #if defined(THREADED_RTS)
1866   /* Preparing to leave the RTS, so ensure there's a native thread/task
1867      waiting to take over.
1868   */
1869   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1870 #endif
1871
1872   errno = saved_errno;
1873 #if mingw32_HOST_OS
1874   SetLastError(saved_winerror);
1875 #endif
1876   return task;
1877 }
1878
1879 StgRegTable *
1880 resumeThread (void *task_)
1881 {
1882     StgTSO *tso;
1883     Capability *cap;
1884     Task *task = task_;
1885     int saved_errno;
1886 #if mingw32_HOST_OS
1887     StgWord32 saved_winerror;
1888 #endif
1889
1890     saved_errno = errno;
1891 #if mingw32_HOST_OS
1892     saved_winerror = GetLastError();
1893 #endif
1894
1895     cap = task->cap;
1896     // Wait for permission to re-enter the RTS with the result.
1897     waitForReturnCapability(&cap,task);
1898     // we might be on a different capability now... but if so, our
1899     // entry on the suspended_ccalling_tasks list will also have been
1900     // migrated.
1901
1902     // Remove the thread from the suspended list
1903     recoverSuspendedTask(cap,task);
1904
1905     tso = task->suspended_tso;
1906     task->suspended_tso = NULL;
1907     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1908     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1909     
1910     if (tso->why_blocked == BlockedOnCCall) {
1911         awakenBlockedExceptionQueue(cap,tso);
1912         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1913     }
1914     
1915     /* Reset blocking status */
1916     tso->why_blocked  = NotBlocked;
1917     
1918     cap->r.rCurrentTSO = tso;
1919     cap->in_haskell = rtsTrue;
1920     errno = saved_errno;
1921 #if mingw32_HOST_OS
1922     SetLastError(saved_winerror);
1923 #endif
1924
1925     /* We might have GC'd, mark the TSO dirty again */
1926     dirty_TSO(cap,tso);
1927
1928     IF_DEBUG(sanity, checkTSO(tso));
1929
1930     return &cap->r;
1931 }
1932
1933 /* ---------------------------------------------------------------------------
1934  * scheduleThread()
1935  *
1936  * scheduleThread puts a thread on the end  of the runnable queue.
1937  * This will usually be done immediately after a thread is created.
1938  * The caller of scheduleThread must create the thread using e.g.
1939  * createThread and push an appropriate closure
1940  * on this thread's stack before the scheduler is invoked.
1941  * ------------------------------------------------------------------------ */
1942
1943 void
1944 scheduleThread(Capability *cap, StgTSO *tso)
1945 {
1946     // The thread goes at the *end* of the run-queue, to avoid possible
1947     // starvation of any threads already on the queue.
1948     appendToRunQueue(cap,tso);
1949 }
1950
1951 void
1952 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1953 {
1954 #if defined(THREADED_RTS)
1955     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1956                               // move this thread from now on.
1957     cpu %= RtsFlags.ParFlags.nNodes;
1958     if (cpu == cap->no) {
1959         appendToRunQueue(cap,tso);
1960     } else {
1961         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1962     }
1963 #else
1964     appendToRunQueue(cap,tso);
1965 #endif
1966 }
1967
1968 Capability *
1969 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1970 {
1971     Task *task;
1972
1973     // We already created/initialised the Task
1974     task = cap->running_task;
1975
1976     // This TSO is now a bound thread; make the Task and TSO
1977     // point to each other.
1978     tso->bound = task;
1979     tso->cap = cap;
1980
1981     task->tso = tso;
1982     task->ret = ret;
1983     task->stat = NoStatus;
1984
1985     appendToRunQueue(cap,tso);
1986
1987     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1988
1989     cap = schedule(cap,task);
1990
1991     ASSERT(task->stat != NoStatus);
1992     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1993
1994     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1995     return cap;
1996 }
1997
1998 /* ----------------------------------------------------------------------------
1999  * Starting Tasks
2000  * ------------------------------------------------------------------------- */
2001
2002 #if defined(THREADED_RTS)
2003 void OSThreadProcAttr
2004 workerStart(Task *task)
2005 {
2006     Capability *cap;
2007
2008     // See startWorkerTask().
2009     ACQUIRE_LOCK(&task->lock);
2010     cap = task->cap;
2011     RELEASE_LOCK(&task->lock);
2012
2013     // set the thread-local pointer to the Task:
2014     taskEnter(task);
2015
2016     // schedule() runs without a lock.
2017     cap = schedule(cap,task);
2018
2019     // On exit from schedule(), we have a Capability.
2020     releaseCapability(cap);
2021     workerTaskStop(task);
2022 }
2023 #endif
2024
2025 /* ---------------------------------------------------------------------------
2026  * initScheduler()
2027  *
2028  * Initialise the scheduler.  This resets all the queues - if the
2029  * queues contained any threads, they'll be garbage collected at the
2030  * next pass.
2031  *
2032  * ------------------------------------------------------------------------ */
2033
2034 void 
2035 initScheduler(void)
2036 {
2037 #if !defined(THREADED_RTS)
2038   blocked_queue_hd  = END_TSO_QUEUE;
2039   blocked_queue_tl  = END_TSO_QUEUE;
2040   sleeping_queue    = END_TSO_QUEUE;
2041 #endif
2042
2043   blackhole_queue   = END_TSO_QUEUE;
2044
2045   sched_state    = SCHED_RUNNING;
2046   recent_activity = ACTIVITY_YES;
2047
2048 #if defined(THREADED_RTS)
2049   /* Initialise the mutex and condition variables used by
2050    * the scheduler. */
2051   initMutex(&sched_mutex);
2052 #endif
2053   
2054   ACQUIRE_LOCK(&sched_mutex);
2055
2056   /* A capability holds the state a native thread needs in
2057    * order to execute STG code. At least one capability is
2058    * floating around (only THREADED_RTS builds have more than one).
2059    */
2060   initCapabilities();
2061
2062   initTaskManager();
2063
2064 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2065   initSparkPools();
2066 #endif
2067
2068 #if defined(THREADED_RTS)
2069   /*
2070    * Eagerly start one worker to run each Capability, except for
2071    * Capability 0.  The idea is that we're probably going to start a
2072    * bound thread on Capability 0 pretty soon, so we don't want a
2073    * worker task hogging it.
2074    */
2075   { 
2076       nat i;
2077       Capability *cap;
2078       for (i = 1; i < n_capabilities; i++) {
2079           cap = &capabilities[i];
2080           ACQUIRE_LOCK(&cap->lock);
2081           startWorkerTask(cap, workerStart);
2082           RELEASE_LOCK(&cap->lock);
2083       }
2084   }
2085 #endif
2086
2087   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2088
2089   RELEASE_LOCK(&sched_mutex);
2090 }
2091
2092 void
2093 exitScheduler(
2094     rtsBool wait_foreign
2095 #if !defined(THREADED_RTS)
2096                          __attribute__((unused))
2097 #endif
2098 )
2099                /* see Capability.c, shutdownCapability() */
2100 {
2101     Task *task = NULL;
2102
2103 #if defined(THREADED_RTS)
2104     ACQUIRE_LOCK(&sched_mutex);
2105     task = newBoundTask();
2106     RELEASE_LOCK(&sched_mutex);
2107 #endif
2108
2109     // If we haven't killed all the threads yet, do it now.
2110     if (sched_state < SCHED_SHUTTING_DOWN) {
2111         sched_state = SCHED_INTERRUPTING;
2112         scheduleDoGC(NULL,task,rtsFalse);    
2113     }
2114     sched_state = SCHED_SHUTTING_DOWN;
2115
2116 #if defined(THREADED_RTS)
2117     { 
2118         nat i;
2119         
2120         for (i = 0; i < n_capabilities; i++) {
2121             shutdownCapability(&capabilities[i], task, wait_foreign);
2122         }
2123         boundTaskExiting(task);
2124         stopTaskManager();
2125     }
2126 #endif
2127 }
2128
2129 void
2130 freeScheduler( void )
2131 {
2132     freeCapabilities();
2133     freeTaskManager();
2134     if (n_capabilities != 1) {
2135         stgFree(capabilities);
2136     }
2137 #if defined(THREADED_RTS)
2138     closeMutex(&sched_mutex);
2139 #endif
2140 }
2141
2142 /* -----------------------------------------------------------------------------
2143    performGC
2144
2145    This is the interface to the garbage collector from Haskell land.
2146    We provide this so that external C code can allocate and garbage
2147    collect when called from Haskell via _ccall_GC.
2148    -------------------------------------------------------------------------- */
2149
2150 static void
2151 performGC_(rtsBool force_major)
2152 {
2153     Task *task;
2154     // We must grab a new Task here, because the existing Task may be
2155     // associated with a particular Capability, and chained onto the 
2156     // suspended_ccalling_tasks queue.
2157     ACQUIRE_LOCK(&sched_mutex);
2158     task = newBoundTask();
2159     RELEASE_LOCK(&sched_mutex);
2160     scheduleDoGC(NULL,task,force_major);
2161     boundTaskExiting(task);
2162 }
2163
2164 void
2165 performGC(void)
2166 {
2167     performGC_(rtsFalse);
2168 }
2169
2170 void
2171 performMajorGC(void)
2172 {
2173     performGC_(rtsTrue);
2174 }
2175
2176 /* -----------------------------------------------------------------------------
2177    Stack overflow
2178
2179    If the thread has reached its maximum stack size, then raise the
2180    StackOverflow exception in the offending thread.  Otherwise
2181    relocate the TSO into a larger chunk of memory and adjust its stack
2182    size appropriately.
2183    -------------------------------------------------------------------------- */
2184
2185 static StgTSO *
2186 threadStackOverflow(Capability *cap, StgTSO *tso)
2187 {
2188   nat new_stack_size, stack_words;
2189   lnat new_tso_size;
2190   StgPtr new_sp;
2191   StgTSO *dest;
2192
2193   IF_DEBUG(sanity,checkTSO(tso));
2194
2195   // don't allow throwTo() to modify the blocked_exceptions queue
2196   // while we are moving the TSO:
2197   lockClosure((StgClosure *)tso);
2198
2199   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2200       // NB. never raise a StackOverflow exception if the thread is
2201       // inside Control.Exceptino.block.  It is impractical to protect
2202       // against stack overflow exceptions, since virtually anything
2203       // can raise one (even 'catch'), so this is the only sensible
2204       // thing to do here.  See bug #767.
2205
2206       debugTrace(DEBUG_gc,
2207                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2208                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2209       IF_DEBUG(gc,
2210                /* If we're debugging, just print out the top of the stack */
2211                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2212                                                 tso->sp+64)));
2213
2214       // Send this thread the StackOverflow exception
2215       unlockTSO(tso);
2216       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2217       return tso;
2218   }
2219
2220   /* Try to double the current stack size.  If that takes us over the
2221    * maximum stack size for this thread, then use the maximum instead
2222    * (that is, unless we're already at or over the max size and we
2223    * can't raise the StackOverflow exception (see above), in which
2224    * case just double the size). Finally round up so the TSO ends up as
2225    * a whole number of blocks.
2226    */
2227   if (tso->stack_size >= tso->max_stack_size) {
2228       new_stack_size = tso->stack_size * 2;
2229   } else { 
2230       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2231   }
2232   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2233                                        TSO_STRUCT_SIZE)/sizeof(W_);
2234   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2235   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2236
2237   debugTrace(DEBUG_sched, 
2238              "increasing stack size from %ld words to %d.",
2239              (long)tso->stack_size, new_stack_size);
2240
2241   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2242   TICK_ALLOC_TSO(new_stack_size,0);
2243
2244   /* copy the TSO block and the old stack into the new area */
2245   memcpy(dest,tso,TSO_STRUCT_SIZE);
2246   stack_words = tso->stack + tso->stack_size - tso->sp;
2247   new_sp = (P_)dest + new_tso_size - stack_words;
2248   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2249
2250   /* relocate the stack pointers... */
2251   dest->sp         = new_sp;
2252   dest->stack_size = new_stack_size;
2253         
2254   /* Mark the old TSO as relocated.  We have to check for relocated
2255    * TSOs in the garbage collector and any primops that deal with TSOs.
2256    *
2257    * It's important to set the sp value to just beyond the end
2258    * of the stack, so we don't attempt to scavenge any part of the
2259    * dead TSO's stack.
2260    */
2261   tso->what_next = ThreadRelocated;
2262   setTSOLink(cap,tso,dest);
2263   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2264   tso->why_blocked = NotBlocked;
2265
2266   IF_PAR_DEBUG(verbose,
2267                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2268                      tso->id, tso, tso->stack_size);
2269                /* If we're debugging, just print out the top of the stack */
2270                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2271                                                 tso->sp+64)));
2272   
2273   unlockTSO(dest);
2274   unlockTSO(tso);
2275
2276   IF_DEBUG(sanity,checkTSO(dest));
2277 #if 0
2278   IF_DEBUG(scheduler,printTSO(dest));
2279 #endif
2280
2281   return dest;
2282 }
2283
2284 static StgTSO *
2285 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2286 {
2287     bdescr *bd, *new_bd;
2288     lnat free_w, tso_size_w;
2289     StgTSO *new_tso;
2290
2291     tso_size_w = tso_sizeW(tso);
2292
2293     if (tso_size_w < MBLOCK_SIZE_W || 
2294         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2295     {
2296         return tso;
2297     }
2298
2299     // don't allow throwTo() to modify the blocked_exceptions queue
2300     // while we are moving the TSO:
2301     lockClosure((StgClosure *)tso);
2302
2303     // this is the number of words we'll free
2304     free_w = round_to_mblocks(tso_size_w/2);
2305
2306     bd = Bdescr((StgPtr)tso);
2307     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2308     bd->free = bd->start + TSO_STRUCT_SIZEW;
2309
2310     new_tso = (StgTSO *)new_bd->start;
2311     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2312     new_tso->stack_size = new_bd->free - new_tso->stack;
2313
2314     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2315                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2316
2317     tso->what_next = ThreadRelocated;
2318     tso->_link = new_tso; // no write barrier reqd: same generation
2319
2320     // The TSO attached to this Task may have moved, so update the
2321     // pointer to it.
2322     if (task->tso == tso) {
2323         task->tso = new_tso;
2324     }
2325
2326     unlockTSO(new_tso);
2327     unlockTSO(tso);
2328
2329     IF_DEBUG(sanity,checkTSO(new_tso));
2330
2331     return new_tso;
2332 }
2333
2334 /* ---------------------------------------------------------------------------
2335    Interrupt execution
2336    - usually called inside a signal handler so it mustn't do anything fancy.   
2337    ------------------------------------------------------------------------ */
2338
2339 void
2340 interruptStgRts(void)
2341 {
2342     sched_state = SCHED_INTERRUPTING;
2343     setContextSwitches();
2344     wakeUpRts();
2345 }
2346
2347 /* -----------------------------------------------------------------------------
2348    Wake up the RTS
2349    
2350    This function causes at least one OS thread to wake up and run the
2351    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2352    an external event has arrived that may need servicing (eg. a
2353    keyboard interrupt).
2354
2355    In the single-threaded RTS we don't do anything here; we only have
2356    one thread anyway, and the event that caused us to want to wake up
2357    will have interrupted any blocking system call in progress anyway.
2358    -------------------------------------------------------------------------- */
2359
2360 void
2361 wakeUpRts(void)
2362 {
2363 #if defined(THREADED_RTS)
2364     // This forces the IO Manager thread to wakeup, which will
2365     // in turn ensure that some OS thread wakes up and runs the
2366     // scheduler loop, which will cause a GC and deadlock check.
2367     ioManagerWakeup();
2368 #endif
2369 }
2370
2371 /* -----------------------------------------------------------------------------
2372  * checkBlackHoles()
2373  *
2374  * Check the blackhole_queue for threads that can be woken up.  We do
2375  * this periodically: before every GC, and whenever the run queue is
2376  * empty.
2377  *
2378  * An elegant solution might be to just wake up all the blocked
2379  * threads with awakenBlockedQueue occasionally: they'll go back to
2380  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2381  * doesn't give us a way to tell whether we've actually managed to
2382  * wake up any threads, so we would be busy-waiting.
2383  *
2384  * -------------------------------------------------------------------------- */
2385
2386 static rtsBool
2387 checkBlackHoles (Capability *cap)
2388 {
2389     StgTSO **prev, *t;
2390     rtsBool any_woke_up = rtsFalse;
2391     StgHalfWord type;
2392
2393     // blackhole_queue is global:
2394     ASSERT_LOCK_HELD(&sched_mutex);
2395
2396     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2397
2398     // ASSUMES: sched_mutex
2399     prev = &blackhole_queue;
2400     t = blackhole_queue;
2401     while (t != END_TSO_QUEUE) {
2402         ASSERT(t->why_blocked == BlockedOnBlackHole);
2403         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2404         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2405             IF_DEBUG(sanity,checkTSO(t));
2406             t = unblockOne(cap, t);
2407             *prev = t;
2408             any_woke_up = rtsTrue;
2409         } else {
2410             prev = &t->_link;
2411             t = t->_link;
2412         }
2413     }
2414
2415     return any_woke_up;
2416 }
2417
2418 /* -----------------------------------------------------------------------------
2419    Deleting threads
2420
2421    This is used for interruption (^C) and forking, and corresponds to
2422    raising an exception but without letting the thread catch the
2423    exception.
2424    -------------------------------------------------------------------------- */
2425
2426 static void 
2427 deleteThread (Capability *cap, StgTSO *tso)
2428 {
2429     // NOTE: must only be called on a TSO that we have exclusive
2430     // access to, because we will call throwToSingleThreaded() below.
2431     // The TSO must be on the run queue of the Capability we own, or 
2432     // we must own all Capabilities.
2433
2434     if (tso->why_blocked != BlockedOnCCall &&
2435         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2436         throwToSingleThreaded(cap,tso,NULL);
2437     }
2438 }
2439
2440 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2441 static void 
2442 deleteThread_(Capability *cap, StgTSO *tso)
2443 { // for forkProcess only:
2444   // like deleteThread(), but we delete threads in foreign calls, too.
2445
2446     if (tso->why_blocked == BlockedOnCCall ||
2447         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2448         unblockOne(cap,tso);
2449         tso->what_next = ThreadKilled;
2450     } else {
2451         deleteThread(cap,tso);
2452     }
2453 }
2454 #endif
2455
2456 /* -----------------------------------------------------------------------------
2457    raiseExceptionHelper
2458    
2459    This function is called by the raise# primitve, just so that we can
2460    move some of the tricky bits of raising an exception from C-- into
2461    C.  Who knows, it might be a useful re-useable thing here too.
2462    -------------------------------------------------------------------------- */
2463
2464 StgWord
2465 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2466 {
2467     Capability *cap = regTableToCapability(reg);
2468     StgThunk *raise_closure = NULL;
2469     StgPtr p, next;
2470     StgRetInfoTable *info;
2471     //
2472     // This closure represents the expression 'raise# E' where E
2473     // is the exception raise.  It is used to overwrite all the
2474     // thunks which are currently under evaluataion.
2475     //
2476
2477     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2478     // LDV profiling: stg_raise_info has THUNK as its closure
2479     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2480     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2481     // 1 does not cause any problem unless profiling is performed.
2482     // However, when LDV profiling goes on, we need to linearly scan
2483     // small object pool, where raise_closure is stored, so we should
2484     // use MIN_UPD_SIZE.
2485     //
2486     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2487     //                                 sizeofW(StgClosure)+1);
2488     //
2489
2490     //
2491     // Walk up the stack, looking for the catch frame.  On the way,
2492     // we update any closures pointed to from update frames with the
2493     // raise closure that we just built.
2494     //
2495     p = tso->sp;
2496     while(1) {
2497         info = get_ret_itbl((StgClosure *)p);
2498         next = p + stack_frame_sizeW((StgClosure *)p);
2499         switch (info->i.type) {
2500             
2501         case UPDATE_FRAME:
2502             // Only create raise_closure if we need to.
2503             if (raise_closure == NULL) {
2504                 raise_closure = 
2505                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2506                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2507                 raise_closure->payload[0] = exception;
2508             }
2509             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2510             p = next;
2511             continue;
2512
2513         case ATOMICALLY_FRAME:
2514             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2515             tso->sp = p;
2516             return ATOMICALLY_FRAME;
2517             
2518         case CATCH_FRAME:
2519             tso->sp = p;
2520             return CATCH_FRAME;
2521
2522         case CATCH_STM_FRAME:
2523             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2524             tso->sp = p;
2525             return CATCH_STM_FRAME;
2526             
2527         case STOP_FRAME:
2528             tso->sp = p;
2529             return STOP_FRAME;
2530
2531         case CATCH_RETRY_FRAME:
2532         default:
2533             p = next; 
2534             continue;
2535         }
2536     }
2537 }
2538
2539
2540 /* -----------------------------------------------------------------------------
2541    findRetryFrameHelper
2542
2543    This function is called by the retry# primitive.  It traverses the stack
2544    leaving tso->sp referring to the frame which should handle the retry.  
2545
2546    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2547    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2548
2549    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2550    create) because retries are not considered to be exceptions, despite the
2551    similar implementation.
2552
2553    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2554    not be created within memory transactions.
2555    -------------------------------------------------------------------------- */
2556
2557 StgWord
2558 findRetryFrameHelper (StgTSO *tso)
2559 {
2560   StgPtr           p, next;
2561   StgRetInfoTable *info;
2562
2563   p = tso -> sp;
2564   while (1) {
2565     info = get_ret_itbl((StgClosure *)p);
2566     next = p + stack_frame_sizeW((StgClosure *)p);
2567     switch (info->i.type) {
2568       
2569     case ATOMICALLY_FRAME:
2570         debugTrace(DEBUG_stm,
2571                    "found ATOMICALLY_FRAME at %p during retry", p);
2572         tso->sp = p;
2573         return ATOMICALLY_FRAME;
2574       
2575     case CATCH_RETRY_FRAME:
2576         debugTrace(DEBUG_stm,
2577                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2578         tso->sp = p;
2579         return CATCH_RETRY_FRAME;
2580       
2581     case CATCH_STM_FRAME: {
2582         StgTRecHeader *trec = tso -> trec;
2583         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2584         debugTrace(DEBUG_stm,
2585                    "found CATCH_STM_FRAME at %p during retry", p);
2586         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2587         stmAbortTransaction(tso -> cap, trec);
2588         stmFreeAbortedTRec(tso -> cap, trec);
2589         tso -> trec = outer;
2590         p = next; 
2591         continue;
2592     }
2593       
2594
2595     default:
2596       ASSERT(info->i.type != CATCH_FRAME);
2597       ASSERT(info->i.type != STOP_FRAME);
2598       p = next; 
2599       continue;
2600     }
2601   }
2602 }
2603
2604 /* -----------------------------------------------------------------------------
2605    resurrectThreads is called after garbage collection on the list of
2606    threads found to be garbage.  Each of these threads will be woken
2607    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2608    on an MVar, or NonTermination if the thread was blocked on a Black
2609    Hole.
2610
2611    Locks: assumes we hold *all* the capabilities.
2612    -------------------------------------------------------------------------- */
2613
2614 void
2615 resurrectThreads (StgTSO *threads)
2616 {
2617     StgTSO *tso, *next;
2618     Capability *cap;
2619     step *step;
2620
2621     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2622         next = tso->global_link;
2623
2624         step = Bdescr((P_)tso)->step;
2625         tso->global_link = step->threads;
2626         step->threads = tso;
2627
2628         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2629         
2630         // Wake up the thread on the Capability it was last on
2631         cap = tso->cap;
2632         
2633         switch (tso->why_blocked) {
2634         case BlockedOnMVar:
2635         case BlockedOnException:
2636             /* Called by GC - sched_mutex lock is currently held. */
2637             throwToSingleThreaded(cap, tso,
2638                                   (StgClosure *)blockedOnDeadMVar_closure);
2639             break;
2640         case BlockedOnBlackHole:
2641             throwToSingleThreaded(cap, tso,
2642                                   (StgClosure *)nonTermination_closure);
2643             break;
2644         case BlockedOnSTM:
2645             throwToSingleThreaded(cap, tso,
2646                                   (StgClosure *)blockedIndefinitely_closure);
2647             break;
2648         case NotBlocked:
2649             /* This might happen if the thread was blocked on a black hole
2650              * belonging to a thread that we've just woken up (raiseAsync
2651              * can wake up threads, remember...).
2652              */
2653             continue;
2654         default:
2655             barf("resurrectThreads: thread blocked in a strange way");
2656         }
2657     }
2658 }
2659
2660 /* -----------------------------------------------------------------------------
2661    performPendingThrowTos is called after garbage collection, and
2662    passed a list of threads that were found to have pending throwTos
2663    (tso->blocked_exceptions was not empty), and were blocked.
2664    Normally this doesn't happen, because we would deliver the
2665    exception directly if the target thread is blocked, but there are
2666    small windows where it might occur on a multiprocessor (see
2667    throwTo()).
2668
2669    NB. we must be holding all the capabilities at this point, just
2670    like resurrectThreads().
2671    -------------------------------------------------------------------------- */
2672
2673 void
2674 performPendingThrowTos (StgTSO *threads)
2675 {
2676     StgTSO *tso, *next;
2677     Capability *cap;
2678     step *step;
2679
2680     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2681         next = tso->global_link;
2682
2683         step = Bdescr((P_)tso)->step;
2684         tso->global_link = step->threads;
2685         step->threads = tso;
2686
2687         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2688         
2689         cap = tso->cap;
2690         maybePerformBlockedException(cap, tso);
2691     }
2692 }