Run sparks in batches, instead of creating a new thread for each one
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  */
96 nat recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99  * LOCK: none (changes once, from false->true)
100  */
101 rtsBool sched_state = SCHED_RUNNING;
102
103 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
104  *  exists - earlier gccs apparently didn't.
105  *  -= chak
106  */
107 StgTSO dummy_tso;
108
109 /*
110  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111  * in an MT setting, needed to signal that a worker thread shouldn't hang around
112  * in the scheduler when it is out of work.
113  */
114 rtsBool shutting_down_scheduler = rtsFalse;
115
116 /*
117  * This mutex protects most of the global scheduler data in
118  * the THREADED_RTS runtime.
119  */
120 #if defined(THREADED_RTS)
121 Mutex sched_mutex;
122 #endif
123
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
126 #endif
127
128 /* -----------------------------------------------------------------------------
129  * static function prototypes
130  * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability *cap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
143 #endif
144 static void scheduleStartSignalHandlers (Capability *cap);
145 static void scheduleCheckBlockedThreads (Capability *cap);
146 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
147 static void scheduleCheckBlackHoles (Capability *cap);
148 static void scheduleDetectDeadlock (Capability *cap, Task *task);
149 static void schedulePushWork(Capability *cap, Task *task);
150 #if defined(PARALLEL_HASKELL)
151 static rtsBool scheduleGetRemoteWork(Capability *cap);
152 static void scheduleSendPendingMessages(void);
153 #endif
154 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
155 static void scheduleActivateSpark(Capability *cap);
156 #endif
157 static void schedulePostRunThread(Capability *cap, StgTSO *t);
158 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
159 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
160                                          StgTSO *t);
161 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
162                                     nat prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
165                                              StgTSO *t );
166 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
167 static Capability *scheduleDoGC(Capability *cap, Task *task,
168                                 rtsBool force_major);
169
170 static rtsBool checkBlackHoles(Capability *cap);
171
172 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
173 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
174
175 static void deleteThread (Capability *cap, StgTSO *tso);
176 static void deleteAllThreads (Capability *cap);
177
178 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
179 static void deleteThread_(Capability *cap, StgTSO *tso);
180 #endif
181
182 #ifdef DEBUG
183 static char *whatNext_strs[] = {
184   "(unknown)",
185   "ThreadRunGHC",
186   "ThreadInterpret",
187   "ThreadKilled",
188   "ThreadRelocated",
189   "ThreadComplete"
190 };
191 #endif
192
193 /* -----------------------------------------------------------------------------
194  * Putting a thread on the run queue: different scheduling policies
195  * -------------------------------------------------------------------------- */
196
197 STATIC_INLINE void
198 addToRunQueue( Capability *cap, StgTSO *t )
199 {
200 #if defined(PARALLEL_HASKELL)
201     if (RtsFlags.ParFlags.doFairScheduling) { 
202         // this does round-robin scheduling; good for concurrency
203         appendToRunQueue(cap,t);
204     } else {
205         // this does unfair scheduling; good for parallelism
206         pushOnRunQueue(cap,t);
207     }
208 #else
209     // this does round-robin scheduling; good for concurrency
210     appendToRunQueue(cap,t);
211 #endif
212 }
213
214 /* ---------------------------------------------------------------------------
215    Main scheduling loop.
216
217    We use round-robin scheduling, each thread returning to the
218    scheduler loop when one of these conditions is detected:
219
220       * out of heap space
221       * timer expires (thread yields)
222       * thread blocks
223       * thread ends
224       * stack overflow
225
226    GRAN version:
227      In a GranSim setup this loop iterates over the global event queue.
228      This revolves around the global event queue, which determines what 
229      to do next. Therefore, it's more complicated than either the 
230      concurrent or the parallel (GUM) setup.
231   This version has been entirely removed (JB 2008/08).
232
233    GUM version:
234      GUM iterates over incoming messages.
235      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
236      and sends out a fish whenever it has nothing to do; in-between
237      doing the actual reductions (shared code below) it processes the
238      incoming messages and deals with delayed operations 
239      (see PendingFetches).
240      This is not the ugliest code you could imagine, but it's bloody close.
241
242   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
243   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
244   as well as future GUM versions. This file has been refurbished to
245   only contain valid code, which is however incomplete, refers to
246   invalid includes etc.
247
248    ------------------------------------------------------------------------ */
249
250 static Capability *
251 schedule (Capability *initialCapability, Task *task)
252 {
253   StgTSO *t;
254   Capability *cap;
255   StgThreadReturnCode ret;
256 #if defined(PARALLEL_HASKELL)
257   rtsBool receivedFinish = rtsFalse;
258 #endif
259   nat prev_what_next;
260   rtsBool ready_to_gc;
261 #if defined(THREADED_RTS)
262   rtsBool first = rtsTrue;
263 #endif
264   
265   cap = initialCapability;
266
267   // Pre-condition: this task owns initialCapability.
268   // The sched_mutex is *NOT* held
269   // NB. on return, we still hold a capability.
270
271   debugTrace (DEBUG_sched, 
272               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
273               task, initialCapability);
274
275   schedulePreLoop();
276
277   // -----------------------------------------------------------
278   // Scheduler loop starts here:
279
280 #if defined(PARALLEL_HASKELL)
281 #define TERMINATION_CONDITION        (!receivedFinish)
282 #else
283 #define TERMINATION_CONDITION        rtsTrue
284 #endif
285
286   while (TERMINATION_CONDITION) {
287
288     // Check whether we have re-entered the RTS from Haskell without
289     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
290     // call).
291     if (cap->in_haskell) {
292           errorBelch("schedule: re-entered unsafely.\n"
293                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
294           stg_exit(EXIT_FAILURE);
295     }
296
297     // The interruption / shutdown sequence.
298     // 
299     // In order to cleanly shut down the runtime, we want to:
300     //   * make sure that all main threads return to their callers
301     //     with the state 'Interrupted'.
302     //   * clean up all OS threads assocated with the runtime
303     //   * free all memory etc.
304     //
305     // So the sequence for ^C goes like this:
306     //
307     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
308     //     arranges for some Capability to wake up
309     //
310     //   * all threads in the system are halted, and the zombies are
311     //     placed on the run queue for cleaning up.  We acquire all
312     //     the capabilities in order to delete the threads, this is
313     //     done by scheduleDoGC() for convenience (because GC already
314     //     needs to acquire all the capabilities).  We can't kill
315     //     threads involved in foreign calls.
316     // 
317     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
318     //
319     //   * sched_state := SCHED_SHUTTING_DOWN
320     //
321     //   * all workers exit when the run queue on their capability
322     //     drains.  All main threads will also exit when their TSO
323     //     reaches the head of the run queue and they can return.
324     //
325     //   * eventually all Capabilities will shut down, and the RTS can
326     //     exit.
327     //
328     //   * We might be left with threads blocked in foreign calls, 
329     //     we should really attempt to kill these somehow (TODO);
330     
331     switch (sched_state) {
332     case SCHED_RUNNING:
333         break;
334     case SCHED_INTERRUPTING:
335         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
336 #if defined(THREADED_RTS)
337         discardSparksCap(cap);
338 #endif
339         /* scheduleDoGC() deletes all the threads */
340         cap = scheduleDoGC(cap,task,rtsFalse);
341         break;
342     case SCHED_SHUTTING_DOWN:
343         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
344         // If we are a worker, just exit.  If we're a bound thread
345         // then we will exit below when we've removed our TSO from
346         // the run queue.
347         if (task->tso == NULL && emptyRunQueue(cap)) {
348             return cap;
349         }
350         break;
351     default:
352         barf("sched_state: %d", sched_state);
353     }
354
355     scheduleFindWork(cap);
356
357     /* work pushing, currently relevant only for THREADED_RTS:
358        (pushes threads, wakes up idle capabilities for stealing) */
359     schedulePushWork(cap,task);
360
361 #if defined(PARALLEL_HASKELL)
362     /* since we perform a blocking receive and continue otherwise,
363        either we never reach here or we definitely have work! */
364     // from here: non-empty run queue
365     ASSERT(!emptyRunQueue(cap));
366
367     if (PacketsWaiting()) {  /* now process incoming messages, if any
368                                 pending...  
369
370                                 CAUTION: scheduleGetRemoteWork called
371                                 above, waits for messages as well! */
372       processMessages(cap, &receivedFinish);
373     }
374 #endif // PARALLEL_HASKELL: non-empty run queue!
375
376     scheduleDetectDeadlock(cap,task);
377
378 #if defined(THREADED_RTS)
379     cap = task->cap;    // reload cap, it might have changed
380 #endif
381
382     // Normally, the only way we can get here with no threads to
383     // run is if a keyboard interrupt received during 
384     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
385     // Additionally, it is not fatal for the
386     // threaded RTS to reach here with no threads to run.
387     //
388     // win32: might be here due to awaitEvent() being abandoned
389     // as a result of a console event having been delivered.
390     
391 #if defined(THREADED_RTS)
392     if (first) 
393     {
394     // XXX: ToDo
395     //     // don't yield the first time, we want a chance to run this
396     //     // thread for a bit, even if there are others banging at the
397     //     // door.
398     //     first = rtsFalse;
399     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
400     }
401
402   yield:
403     scheduleYield(&cap,task);
404     if (emptyRunQueue(cap)) continue; // look for work again
405 #endif
406
407 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
408     if ( emptyRunQueue(cap) ) {
409         ASSERT(sched_state >= SCHED_INTERRUPTING);
410     }
411 #endif
412
413     // 
414     // Get a thread to run
415     //
416     t = popRunQueue(cap);
417
418     // Sanity check the thread we're about to run.  This can be
419     // expensive if there is lots of thread switching going on...
420     IF_DEBUG(sanity,checkTSO(t));
421
422 #if defined(THREADED_RTS)
423     // Check whether we can run this thread in the current task.
424     // If not, we have to pass our capability to the right task.
425     {
426         Task *bound = t->bound;
427       
428         if (bound) {
429             if (bound == task) {
430                 debugTrace(DEBUG_sched,
431                            "### Running thread %lu in bound thread", (unsigned long)t->id);
432                 // yes, the Haskell thread is bound to the current native thread
433             } else {
434                 debugTrace(DEBUG_sched,
435                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
436                 // no, bound to a different Haskell thread: pass to that thread
437                 pushOnRunQueue(cap,t);
438                 continue;
439             }
440         } else {
441             // The thread we want to run is unbound.
442             if (task->tso) { 
443                 debugTrace(DEBUG_sched,
444                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
445                 // no, the current native thread is bound to a different
446                 // Haskell thread, so pass it to any worker thread
447                 pushOnRunQueue(cap,t);
448                 continue; 
449             }
450         }
451     }
452 #endif
453
454     /* context switches are initiated by the timer signal, unless
455      * the user specified "context switch as often as possible", with
456      * +RTS -C0
457      */
458     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
459         && !emptyThreadQueues(cap)) {
460         cap->context_switch = 1;
461     }
462          
463 run_thread:
464
465     // CurrentTSO is the thread to run.  t might be different if we
466     // loop back to run_thread, so make sure to set CurrentTSO after
467     // that.
468     cap->r.rCurrentTSO = t;
469
470     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
471                               (long)t->id, whatNext_strs[t->what_next]);
472
473     startHeapProfTimer();
474
475     // Check for exceptions blocked on this thread
476     maybePerformBlockedException (cap, t);
477
478     // ----------------------------------------------------------------------
479     // Run the current thread 
480
481     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
482     ASSERT(t->cap == cap);
483     ASSERT(t->bound ? t->bound->cap == cap : 1);
484
485     prev_what_next = t->what_next;
486
487     errno = t->saved_errno;
488 #if mingw32_HOST_OS
489     SetLastError(t->saved_winerror);
490 #endif
491
492     cap->in_haskell = rtsTrue;
493
494     dirty_TSO(cap,t);
495
496 #if defined(THREADED_RTS)
497     if (recent_activity == ACTIVITY_DONE_GC) {
498         // ACTIVITY_DONE_GC means we turned off the timer signal to
499         // conserve power (see #1623).  Re-enable it here.
500         nat prev;
501         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
502         if (prev == ACTIVITY_DONE_GC) {
503             startTimer();
504         }
505     } else {
506         recent_activity = ACTIVITY_YES;
507     }
508 #endif
509
510     switch (prev_what_next) {
511         
512     case ThreadKilled:
513     case ThreadComplete:
514         /* Thread already finished, return to scheduler. */
515         ret = ThreadFinished;
516         break;
517         
518     case ThreadRunGHC:
519     {
520         StgRegTable *r;
521         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
522         cap = regTableToCapability(r);
523         ret = r->rRet;
524         break;
525     }
526     
527     case ThreadInterpret:
528         cap = interpretBCO(cap);
529         ret = cap->r.rRet;
530         break;
531         
532     default:
533         barf("schedule: invalid what_next field");
534     }
535
536     cap->in_haskell = rtsFalse;
537
538     // The TSO might have moved, eg. if it re-entered the RTS and a GC
539     // happened.  So find the new location:
540     t = cap->r.rCurrentTSO;
541
542     // We have run some Haskell code: there might be blackhole-blocked
543     // threads to wake up now.
544     // Lock-free test here should be ok, we're just setting a flag.
545     if ( blackhole_queue != END_TSO_QUEUE ) {
546         blackholes_need_checking = rtsTrue;
547     }
548     
549     // And save the current errno in this thread.
550     // XXX: possibly bogus for SMP because this thread might already
551     // be running again, see code below.
552     t->saved_errno = errno;
553 #if mingw32_HOST_OS
554     // Similarly for Windows error code
555     t->saved_winerror = GetLastError();
556 #endif
557
558 #if defined(THREADED_RTS)
559     // If ret is ThreadBlocked, and this Task is bound to the TSO that
560     // blocked, we are in limbo - the TSO is now owned by whatever it
561     // is blocked on, and may in fact already have been woken up,
562     // perhaps even on a different Capability.  It may be the case
563     // that task->cap != cap.  We better yield this Capability
564     // immediately and return to normaility.
565     if (ret == ThreadBlocked) {
566         debugTrace(DEBUG_sched,
567                    "--<< thread %lu (%s) stopped: blocked",
568                    (unsigned long)t->id, whatNext_strs[t->what_next]);
569         goto yield;
570     }
571 #endif
572
573     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
574     ASSERT(t->cap == cap);
575
576     // ----------------------------------------------------------------------
577     
578     // Costs for the scheduler are assigned to CCS_SYSTEM
579     stopHeapProfTimer();
580 #if defined(PROFILING)
581     CCCS = CCS_SYSTEM;
582 #endif
583     
584     schedulePostRunThread(cap,t);
585
586     t = threadStackUnderflow(task,t);
587
588     ready_to_gc = rtsFalse;
589
590     switch (ret) {
591     case HeapOverflow:
592         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
593         break;
594
595     case StackOverflow:
596         scheduleHandleStackOverflow(cap,task,t);
597         break;
598
599     case ThreadYielding:
600         if (scheduleHandleYield(cap, t, prev_what_next)) {
601             // shortcut for switching between compiler/interpreter:
602             goto run_thread; 
603         }
604         break;
605
606     case ThreadBlocked:
607         scheduleHandleThreadBlocked(t);
608         break;
609
610     case ThreadFinished:
611         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
612         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
613         break;
614
615     default:
616       barf("schedule: invalid thread return code %d", (int)ret);
617     }
618
619     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
620       cap = scheduleDoGC(cap,task,rtsFalse);
621     }
622   } /* end of while() */
623 }
624
625 /* ----------------------------------------------------------------------------
626  * Setting up the scheduler loop
627  * ------------------------------------------------------------------------- */
628
629 static void
630 schedulePreLoop(void)
631 {
632   // initialisation for scheduler - what cannot go into initScheduler()  
633 }
634
635 /* -----------------------------------------------------------------------------
636  * scheduleFindWork()
637  *
638  * Search for work to do, and handle messages from elsewhere.
639  * -------------------------------------------------------------------------- */
640
641 static void
642 scheduleFindWork (Capability *cap)
643 {
644     scheduleStartSignalHandlers(cap);
645
646     // Only check the black holes here if we've nothing else to do.
647     // During normal execution, the black hole list only gets checked
648     // at GC time, to avoid repeatedly traversing this possibly long
649     // list each time around the scheduler.
650     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
651
652     scheduleCheckWakeupThreads(cap);
653
654     scheduleCheckBlockedThreads(cap);
655
656 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
657     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
658 #endif
659
660 #if defined(PARALLEL_HASKELL)
661     // if messages have been buffered...
662     scheduleSendPendingMessages();
663 #endif
664
665 #if defined(PARALLEL_HASKELL)
666     if (emptyRunQueue(cap)) {
667         receivedFinish = scheduleGetRemoteWork(cap);
668         continue; //  a new round, (hopefully) with new work
669         /* 
670            in GUM, this a) sends out a FISH and returns IF no fish is
671                            out already
672                         b) (blocking) awaits and receives messages
673            
674            in Eden, this is only the blocking receive, as b) in GUM.
675         */
676     }
677 #endif
678 }
679
680 #if defined(THREADED_RTS)
681 STATIC_INLINE rtsBool
682 shouldYieldCapability (Capability *cap, Task *task)
683 {
684     // we need to yield this capability to someone else if..
685     //   - another thread is initiating a GC
686     //   - another Task is returning from a foreign call
687     //   - the thread at the head of the run queue cannot be run
688     //     by this Task (it is bound to another Task, or it is unbound
689     //     and this task it bound).
690     return (waiting_for_gc || 
691             cap->returning_tasks_hd != NULL ||
692             (!emptyRunQueue(cap) && (task->tso == NULL
693                                      ? cap->run_queue_hd->bound != NULL
694                                      : cap->run_queue_hd->bound != task)));
695 }
696
697 // This is the single place where a Task goes to sleep.  There are
698 // two reasons it might need to sleep:
699 //    - there are no threads to run
700 //    - we need to yield this Capability to someone else 
701 //      (see shouldYieldCapability())
702 //
703 // The return value indicates whether 
704
705 static void
706 scheduleYield (Capability **pcap, Task *task)
707 {
708     Capability *cap = *pcap;
709
710     // if we have work, and we don't need to give up the Capability, continue.
711     if (!shouldYieldCapability(cap,task) && 
712         (!emptyRunQueue(cap) || blackholes_need_checking))
713         return;
714
715     // otherwise yield (sleep), and keep yielding if necessary.
716     do {
717         yieldCapability(&cap,task);
718     } 
719     while (shouldYieldCapability(cap,task));
720
721     // note there may still be no threads on the run queue at this
722     // point, the caller has to check.
723
724     *pcap = cap;
725     return;
726 }
727 #endif
728     
729 /* -----------------------------------------------------------------------------
730  * schedulePushWork()
731  *
732  * Push work to other Capabilities if we have some.
733  * -------------------------------------------------------------------------- */
734
735 static void
736 schedulePushWork(Capability *cap USED_IF_THREADS, 
737                  Task *task      USED_IF_THREADS)
738 {
739   /* following code not for PARALLEL_HASKELL. I kept the call general,
740      future GUM versions might use pushing in a distributed setup */
741 #if defined(THREADED_RTS)
742
743     Capability *free_caps[n_capabilities], *cap0;
744     nat i, n_free_caps;
745
746     // migration can be turned off with +RTS -qg
747     if (!RtsFlags.ParFlags.migrate) return;
748
749     // Check whether we have more threads on our run queue, or sparks
750     // in our pool, that we could hand to another Capability.
751     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
752         && sparkPoolSizeCap(cap) < 2) {
753         return;
754     }
755
756     // First grab as many free Capabilities as we can.
757     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
758         cap0 = &capabilities[i];
759         if (cap != cap0 && tryGrabCapability(cap0,task)) {
760             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
761                 // it already has some work, we just grabbed it at 
762                 // the wrong moment.  Or maybe it's deadlocked!
763                 releaseCapability(cap0);
764             } else {
765                 free_caps[n_free_caps++] = cap0;
766             }
767         }
768     }
769
770     // we now have n_free_caps free capabilities stashed in
771     // free_caps[].  Share our run queue equally with them.  This is
772     // probably the simplest thing we could do; improvements we might
773     // want to do include:
774     //
775     //   - giving high priority to moving relatively new threads, on 
776     //     the gournds that they haven't had time to build up a
777     //     working set in the cache on this CPU/Capability.
778     //
779     //   - giving low priority to moving long-lived threads
780
781     if (n_free_caps > 0) {
782         StgTSO *prev, *t, *next;
783         rtsBool pushed_to_all;
784
785         debugTrace(DEBUG_sched, 
786                    "cap %d: %s and %d free capabilities, sharing...", 
787                    cap->no, 
788                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
789                    "excess threads on run queue":"sparks to share (>=2)",
790                    n_free_caps);
791
792         i = 0;
793         pushed_to_all = rtsFalse;
794
795         if (cap->run_queue_hd != END_TSO_QUEUE) {
796             prev = cap->run_queue_hd;
797             t = prev->_link;
798             prev->_link = END_TSO_QUEUE;
799             for (; t != END_TSO_QUEUE; t = next) {
800                 next = t->_link;
801                 t->_link = END_TSO_QUEUE;
802                 if (t->what_next == ThreadRelocated
803                     || t->bound == task // don't move my bound thread
804                     || tsoLocked(t)) {  // don't move a locked thread
805                     setTSOLink(cap, prev, t);
806                     prev = t;
807                 } else if (i == n_free_caps) {
808                     pushed_to_all = rtsTrue;
809                     i = 0;
810                     // keep one for us
811                     setTSOLink(cap, prev, t);
812                     prev = t;
813                 } else {
814                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
815                     appendToRunQueue(free_caps[i],t);
816                     if (t->bound) { t->bound->cap = free_caps[i]; }
817                     t->cap = free_caps[i];
818                     i++;
819                 }
820             }
821             cap->run_queue_tl = prev;
822         }
823
824 #ifdef SPARK_PUSHING
825         /* JB I left this code in place, it would work but is not necessary */
826
827         // If there are some free capabilities that we didn't push any
828         // threads to, then try to push a spark to each one.
829         if (!pushed_to_all) {
830             StgClosure *spark;
831             // i is the next free capability to push to
832             for (; i < n_free_caps; i++) {
833                 if (emptySparkPoolCap(free_caps[i])) {
834                     spark = tryStealSpark(cap->sparks);
835                     if (spark != NULL) {
836                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
837                         newSpark(&(free_caps[i]->r), spark);
838                     }
839                 }
840             }
841         }
842 #endif /* SPARK_PUSHING */
843
844         // release the capabilities
845         for (i = 0; i < n_free_caps; i++) {
846             task->cap = free_caps[i];
847             releaseAndWakeupCapability(free_caps[i]);
848         }
849     }
850     task->cap = cap; // reset to point to our Capability.
851
852 #endif /* THREADED_RTS */
853
854 }
855
856 /* ----------------------------------------------------------------------------
857  * Start any pending signal handlers
858  * ------------------------------------------------------------------------- */
859
860 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
861 static void
862 scheduleStartSignalHandlers(Capability *cap)
863 {
864     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
865         // safe outside the lock
866         startSignalHandlers(cap);
867     }
868 }
869 #else
870 static void
871 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
872 {
873 }
874 #endif
875
876 /* ----------------------------------------------------------------------------
877  * Check for blocked threads that can be woken up.
878  * ------------------------------------------------------------------------- */
879
880 static void
881 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
882 {
883 #if !defined(THREADED_RTS)
884     //
885     // Check whether any waiting threads need to be woken up.  If the
886     // run queue is empty, and there are no other tasks running, we
887     // can wait indefinitely for something to happen.
888     //
889     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
890     {
891         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
892     }
893 #endif
894 }
895
896
897 /* ----------------------------------------------------------------------------
898  * Check for threads woken up by other Capabilities
899  * ------------------------------------------------------------------------- */
900
901 static void
902 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
903 {
904 #if defined(THREADED_RTS)
905     // Any threads that were woken up by other Capabilities get
906     // appended to our run queue.
907     if (!emptyWakeupQueue(cap)) {
908         ACQUIRE_LOCK(&cap->lock);
909         if (emptyRunQueue(cap)) {
910             cap->run_queue_hd = cap->wakeup_queue_hd;
911             cap->run_queue_tl = cap->wakeup_queue_tl;
912         } else {
913             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
914             cap->run_queue_tl = cap->wakeup_queue_tl;
915         }
916         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
917         RELEASE_LOCK(&cap->lock);
918     }
919 #endif
920 }
921
922 /* ----------------------------------------------------------------------------
923  * Check for threads blocked on BLACKHOLEs that can be woken up
924  * ------------------------------------------------------------------------- */
925 static void
926 scheduleCheckBlackHoles (Capability *cap)
927 {
928     if ( blackholes_need_checking ) // check without the lock first
929     {
930         ACQUIRE_LOCK(&sched_mutex);
931         if ( blackholes_need_checking ) {
932             blackholes_need_checking = rtsFalse;
933             // important that we reset the flag *before* checking the
934             // blackhole queue, otherwise we could get deadlock.  This
935             // happens as follows: we wake up a thread that
936             // immediately runs on another Capability, blocks on a
937             // blackhole, and then we reset the blackholes_need_checking flag.
938             checkBlackHoles(cap);
939         }
940         RELEASE_LOCK(&sched_mutex);
941     }
942 }
943
944 /* ----------------------------------------------------------------------------
945  * Detect deadlock conditions and attempt to resolve them.
946  * ------------------------------------------------------------------------- */
947
948 static void
949 scheduleDetectDeadlock (Capability *cap, Task *task)
950 {
951
952 #if defined(PARALLEL_HASKELL)
953     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
954     return;
955 #endif
956
957     /* 
958      * Detect deadlock: when we have no threads to run, there are no
959      * threads blocked, waiting for I/O, or sleeping, and all the
960      * other tasks are waiting for work, we must have a deadlock of
961      * some description.
962      */
963     if ( emptyThreadQueues(cap) )
964     {
965 #if defined(THREADED_RTS)
966         /* 
967          * In the threaded RTS, we only check for deadlock if there
968          * has been no activity in a complete timeslice.  This means
969          * we won't eagerly start a full GC just because we don't have
970          * any threads to run currently.
971          */
972         if (recent_activity != ACTIVITY_INACTIVE) return;
973 #endif
974
975         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
976
977         // Garbage collection can release some new threads due to
978         // either (a) finalizers or (b) threads resurrected because
979         // they are unreachable and will therefore be sent an
980         // exception.  Any threads thus released will be immediately
981         // runnable.
982         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
983
984         recent_activity = ACTIVITY_DONE_GC;
985         // disable timer signals (see #1623)
986         stopTimer();
987         
988         if ( !emptyRunQueue(cap) ) return;
989
990 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
991         /* If we have user-installed signal handlers, then wait
992          * for signals to arrive rather then bombing out with a
993          * deadlock.
994          */
995         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
996             debugTrace(DEBUG_sched,
997                        "still deadlocked, waiting for signals...");
998
999             awaitUserSignals();
1000
1001             if (signals_pending()) {
1002                 startSignalHandlers(cap);
1003             }
1004
1005             // either we have threads to run, or we were interrupted:
1006             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1007
1008             return;
1009         }
1010 #endif
1011
1012 #if !defined(THREADED_RTS)
1013         /* Probably a real deadlock.  Send the current main thread the
1014          * Deadlock exception.
1015          */
1016         if (task->tso) {
1017             switch (task->tso->why_blocked) {
1018             case BlockedOnSTM:
1019             case BlockedOnBlackHole:
1020             case BlockedOnException:
1021             case BlockedOnMVar:
1022                 throwToSingleThreaded(cap, task->tso, 
1023                                       (StgClosure *)nonTermination_closure);
1024                 return;
1025             default:
1026                 barf("deadlock: main thread blocked in a strange way");
1027             }
1028         }
1029         return;
1030 #endif
1031     }
1032 }
1033
1034
1035 /* ----------------------------------------------------------------------------
1036  * Send pending messages (PARALLEL_HASKELL only)
1037  * ------------------------------------------------------------------------- */
1038
1039 #if defined(PARALLEL_HASKELL)
1040 static void
1041 scheduleSendPendingMessages(void)
1042 {
1043
1044 # if defined(PAR) // global Mem.Mgmt., omit for now
1045     if (PendingFetches != END_BF_QUEUE) {
1046         processFetches();
1047     }
1048 # endif
1049     
1050     if (RtsFlags.ParFlags.BufferTime) {
1051         // if we use message buffering, we must send away all message
1052         // packets which have become too old...
1053         sendOldBuffers(); 
1054     }
1055 }
1056 #endif
1057
1058 /* ----------------------------------------------------------------------------
1059  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1060  * ------------------------------------------------------------------------- */
1061
1062 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1063 static void
1064 scheduleActivateSpark(Capability *cap)
1065 {
1066     if (anySparks())
1067     {
1068         createSparkThread(cap);
1069         debugTrace(DEBUG_sched, "creating a spark thread");
1070     }
1071 }
1072 #endif // PARALLEL_HASKELL || THREADED_RTS
1073
1074 /* ----------------------------------------------------------------------------
1075  * Get work from a remote node (PARALLEL_HASKELL only)
1076  * ------------------------------------------------------------------------- */
1077     
1078 #if defined(PARALLEL_HASKELL)
1079 static rtsBool /* return value used in PARALLEL_HASKELL only */
1080 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1081 {
1082 #if defined(PARALLEL_HASKELL)
1083   rtsBool receivedFinish = rtsFalse;
1084
1085   // idle() , i.e. send all buffers, wait for work
1086   if (RtsFlags.ParFlags.BufferTime) {
1087         IF_PAR_DEBUG(verbose, 
1088                 debugBelch("...send all pending data,"));
1089         {
1090           nat i;
1091           for (i=1; i<=nPEs; i++)
1092             sendImmediately(i); // send all messages away immediately
1093         }
1094   }
1095
1096   /* this would be the place for fishing in GUM... 
1097
1098      if (no-earlier-fish-around) 
1099           sendFish(choosePe());
1100    */
1101
1102   // Eden:just look for incoming messages (blocking receive)
1103   IF_PAR_DEBUG(verbose, 
1104                debugBelch("...wait for incoming messages...\n"));
1105   processMessages(cap, &receivedFinish); // blocking receive...
1106
1107
1108   return receivedFinish;
1109   // reenter scheduling look after having received something
1110
1111 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1112
1113   return rtsFalse; /* return value unused in THREADED_RTS */
1114
1115 #endif /* PARALLEL_HASKELL */
1116 }
1117 #endif // PARALLEL_HASKELL || THREADED_RTS
1118
1119 /* ----------------------------------------------------------------------------
1120  * After running a thread...
1121  * ------------------------------------------------------------------------- */
1122
1123 static void
1124 schedulePostRunThread (Capability *cap, StgTSO *t)
1125 {
1126     // We have to be able to catch transactions that are in an
1127     // infinite loop as a result of seeing an inconsistent view of
1128     // memory, e.g. 
1129     //
1130     //   atomically $ do
1131     //       [a,b] <- mapM readTVar [ta,tb]
1132     //       when (a == b) loop
1133     //
1134     // and a is never equal to b given a consistent view of memory.
1135     //
1136     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1137         if (!stmValidateNestOfTransactions (t -> trec)) {
1138             debugTrace(DEBUG_sched | DEBUG_stm,
1139                        "trec %p found wasting its time", t);
1140             
1141             // strip the stack back to the
1142             // ATOMICALLY_FRAME, aborting the (nested)
1143             // transaction, and saving the stack of any
1144             // partially-evaluated thunks on the heap.
1145             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1146             
1147             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1148         }
1149     }
1150
1151   /* some statistics gathering in the parallel case */
1152 }
1153
1154 /* -----------------------------------------------------------------------------
1155  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1156  * -------------------------------------------------------------------------- */
1157
1158 static rtsBool
1159 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1160 {
1161     // did the task ask for a large block?
1162     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1163         // if so, get one and push it on the front of the nursery.
1164         bdescr *bd;
1165         lnat blocks;
1166         
1167         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1168         
1169         debugTrace(DEBUG_sched,
1170                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1171                    (long)t->id, whatNext_strs[t->what_next], blocks);
1172     
1173         // don't do this if the nursery is (nearly) full, we'll GC first.
1174         if (cap->r.rCurrentNursery->link != NULL ||
1175             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1176                                                // if the nursery has only one block.
1177             
1178             ACQUIRE_SM_LOCK
1179             bd = allocGroup( blocks );
1180             RELEASE_SM_LOCK
1181             cap->r.rNursery->n_blocks += blocks;
1182             
1183             // link the new group into the list
1184             bd->link = cap->r.rCurrentNursery;
1185             bd->u.back = cap->r.rCurrentNursery->u.back;
1186             if (cap->r.rCurrentNursery->u.back != NULL) {
1187                 cap->r.rCurrentNursery->u.back->link = bd;
1188             } else {
1189 #if !defined(THREADED_RTS)
1190                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1191                        g0s0 == cap->r.rNursery);
1192 #endif
1193                 cap->r.rNursery->blocks = bd;
1194             }             
1195             cap->r.rCurrentNursery->u.back = bd;
1196             
1197             // initialise it as a nursery block.  We initialise the
1198             // step, gen_no, and flags field of *every* sub-block in
1199             // this large block, because this is easier than making
1200             // sure that we always find the block head of a large
1201             // block whenever we call Bdescr() (eg. evacuate() and
1202             // isAlive() in the GC would both have to do this, at
1203             // least).
1204             { 
1205                 bdescr *x;
1206                 for (x = bd; x < bd + blocks; x++) {
1207                     x->step = cap->r.rNursery;
1208                     x->gen_no = 0;
1209                     x->flags = 0;
1210                 }
1211             }
1212             
1213             // This assert can be a killer if the app is doing lots
1214             // of large block allocations.
1215             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1216             
1217             // now update the nursery to point to the new block
1218             cap->r.rCurrentNursery = bd;
1219             
1220             // we might be unlucky and have another thread get on the
1221             // run queue before us and steal the large block, but in that
1222             // case the thread will just end up requesting another large
1223             // block.
1224             pushOnRunQueue(cap,t);
1225             return rtsFalse;  /* not actually GC'ing */
1226         }
1227     }
1228     
1229     debugTrace(DEBUG_sched,
1230                "--<< thread %ld (%s) stopped: HeapOverflow",
1231                (long)t->id, whatNext_strs[t->what_next]);
1232
1233     if (cap->context_switch) {
1234         // Sometimes we miss a context switch, e.g. when calling
1235         // primitives in a tight loop, MAYBE_GC() doesn't check the
1236         // context switch flag, and we end up waiting for a GC.
1237         // See #1984, and concurrent/should_run/1984
1238         cap->context_switch = 0;
1239         addToRunQueue(cap,t);
1240     } else {
1241         pushOnRunQueue(cap,t);
1242     }
1243     return rtsTrue;
1244     /* actual GC is done at the end of the while loop in schedule() */
1245 }
1246
1247 /* -----------------------------------------------------------------------------
1248  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1249  * -------------------------------------------------------------------------- */
1250
1251 static void
1252 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1253 {
1254     debugTrace (DEBUG_sched,
1255                 "--<< thread %ld (%s) stopped, StackOverflow", 
1256                 (long)t->id, whatNext_strs[t->what_next]);
1257
1258     /* just adjust the stack for this thread, then pop it back
1259      * on the run queue.
1260      */
1261     { 
1262         /* enlarge the stack */
1263         StgTSO *new_t = threadStackOverflow(cap, t);
1264         
1265         /* The TSO attached to this Task may have moved, so update the
1266          * pointer to it.
1267          */
1268         if (task->tso == t) {
1269             task->tso = new_t;
1270         }
1271         pushOnRunQueue(cap,new_t);
1272     }
1273 }
1274
1275 /* -----------------------------------------------------------------------------
1276  * Handle a thread that returned to the scheduler with ThreadYielding
1277  * -------------------------------------------------------------------------- */
1278
1279 static rtsBool
1280 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1281 {
1282     // Reset the context switch flag.  We don't do this just before
1283     // running the thread, because that would mean we would lose ticks
1284     // during GC, which can lead to unfair scheduling (a thread hogs
1285     // the CPU because the tick always arrives during GC).  This way
1286     // penalises threads that do a lot of allocation, but that seems
1287     // better than the alternative.
1288     cap->context_switch = 0;
1289     
1290     /* put the thread back on the run queue.  Then, if we're ready to
1291      * GC, check whether this is the last task to stop.  If so, wake
1292      * up the GC thread.  getThread will block during a GC until the
1293      * GC is finished.
1294      */
1295 #ifdef DEBUG
1296     if (t->what_next != prev_what_next) {
1297         debugTrace(DEBUG_sched,
1298                    "--<< thread %ld (%s) stopped to switch evaluators", 
1299                    (long)t->id, whatNext_strs[t->what_next]);
1300     } else {
1301         debugTrace(DEBUG_sched,
1302                    "--<< thread %ld (%s) stopped, yielding",
1303                    (long)t->id, whatNext_strs[t->what_next]);
1304     }
1305 #endif
1306     
1307     IF_DEBUG(sanity,
1308              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1309              checkTSO(t));
1310     ASSERT(t->_link == END_TSO_QUEUE);
1311     
1312     // Shortcut if we're just switching evaluators: don't bother
1313     // doing stack squeezing (which can be expensive), just run the
1314     // thread.
1315     if (t->what_next != prev_what_next) {
1316         return rtsTrue;
1317     }
1318
1319     addToRunQueue(cap,t);
1320
1321     return rtsFalse;
1322 }
1323
1324 /* -----------------------------------------------------------------------------
1325  * Handle a thread that returned to the scheduler with ThreadBlocked
1326  * -------------------------------------------------------------------------- */
1327
1328 static void
1329 scheduleHandleThreadBlocked( StgTSO *t
1330 #if !defined(GRAN) && !defined(DEBUG)
1331     STG_UNUSED
1332 #endif
1333     )
1334 {
1335
1336       // We don't need to do anything.  The thread is blocked, and it
1337       // has tidied up its stack and placed itself on whatever queue
1338       // it needs to be on.
1339
1340     // ASSERT(t->why_blocked != NotBlocked);
1341     // Not true: for example,
1342     //    - in THREADED_RTS, the thread may already have been woken
1343     //      up by another Capability.  This actually happens: try
1344     //      conc023 +RTS -N2.
1345     //    - the thread may have woken itself up already, because
1346     //      threadPaused() might have raised a blocked throwTo
1347     //      exception, see maybePerformBlockedException().
1348
1349 #ifdef DEBUG
1350     if (traceClass(DEBUG_sched)) {
1351         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1352                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1353         printThreadBlockage(t);
1354         debugTraceEnd();
1355     }
1356 #endif
1357 }
1358
1359 /* -----------------------------------------------------------------------------
1360  * Handle a thread that returned to the scheduler with ThreadFinished
1361  * -------------------------------------------------------------------------- */
1362
1363 static rtsBool
1364 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1365 {
1366     /* Need to check whether this was a main thread, and if so,
1367      * return with the return value.
1368      *
1369      * We also end up here if the thread kills itself with an
1370      * uncaught exception, see Exception.cmm.
1371      */
1372     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1373                (unsigned long)t->id, whatNext_strs[t->what_next]);
1374
1375       //
1376       // Check whether the thread that just completed was a bound
1377       // thread, and if so return with the result.  
1378       //
1379       // There is an assumption here that all thread completion goes
1380       // through this point; we need to make sure that if a thread
1381       // ends up in the ThreadKilled state, that it stays on the run
1382       // queue so it can be dealt with here.
1383       //
1384
1385       if (t->bound) {
1386
1387           if (t->bound != task) {
1388 #if !defined(THREADED_RTS)
1389               // Must be a bound thread that is not the topmost one.  Leave
1390               // it on the run queue until the stack has unwound to the
1391               // point where we can deal with this.  Leaving it on the run
1392               // queue also ensures that the garbage collector knows about
1393               // this thread and its return value (it gets dropped from the
1394               // step->threads list so there's no other way to find it).
1395               appendToRunQueue(cap,t);
1396               return rtsFalse;
1397 #else
1398               // this cannot happen in the threaded RTS, because a
1399               // bound thread can only be run by the appropriate Task.
1400               barf("finished bound thread that isn't mine");
1401 #endif
1402           }
1403
1404           ASSERT(task->tso == t);
1405
1406           if (t->what_next == ThreadComplete) {
1407               if (task->ret) {
1408                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1409                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1410               }
1411               task->stat = Success;
1412           } else {
1413               if (task->ret) {
1414                   *(task->ret) = NULL;
1415               }
1416               if (sched_state >= SCHED_INTERRUPTING) {
1417                   task->stat = Interrupted;
1418               } else {
1419                   task->stat = Killed;
1420               }
1421           }
1422 #ifdef DEBUG
1423           removeThreadLabel((StgWord)task->tso->id);
1424 #endif
1425           return rtsTrue; // tells schedule() to return
1426       }
1427
1428       return rtsFalse;
1429 }
1430
1431 /* -----------------------------------------------------------------------------
1432  * Perform a heap census
1433  * -------------------------------------------------------------------------- */
1434
1435 static rtsBool
1436 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1437 {
1438     // When we have +RTS -i0 and we're heap profiling, do a census at
1439     // every GC.  This lets us get repeatable runs for debugging.
1440     if (performHeapProfile ||
1441         (RtsFlags.ProfFlags.profileInterval==0 &&
1442          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1443         return rtsTrue;
1444     } else {
1445         return rtsFalse;
1446     }
1447 }
1448
1449 /* -----------------------------------------------------------------------------
1450  * Perform a garbage collection if necessary
1451  * -------------------------------------------------------------------------- */
1452
1453 static Capability *
1454 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1455 {
1456     rtsBool heap_census;
1457 #ifdef THREADED_RTS
1458     /* extern static volatile StgWord waiting_for_gc; 
1459        lives inside capability.c */
1460     rtsBool was_waiting;
1461     nat i;
1462 #endif
1463
1464 #ifdef THREADED_RTS
1465     // In order to GC, there must be no threads running Haskell code.
1466     // Therefore, the GC thread needs to hold *all* the capabilities,
1467     // and release them after the GC has completed.  
1468     //
1469     // This seems to be the simplest way: previous attempts involved
1470     // making all the threads with capabilities give up their
1471     // capabilities and sleep except for the *last* one, which
1472     // actually did the GC.  But it's quite hard to arrange for all
1473     // the other tasks to sleep and stay asleep.
1474     //
1475         
1476     /*  Other capabilities are prevented from running yet more Haskell
1477         threads if waiting_for_gc is set. Tested inside
1478         yieldCapability() and releaseCapability() in Capability.c */
1479
1480     was_waiting = cas(&waiting_for_gc, 0, 1);
1481     if (was_waiting) {
1482         do {
1483             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1484             if (cap) yieldCapability(&cap,task);
1485         } while (waiting_for_gc);
1486         return cap;  // NOTE: task->cap might have changed here
1487     }
1488
1489     setContextSwitches();
1490     for (i=0; i < n_capabilities; i++) {
1491         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1492         if (cap != &capabilities[i]) {
1493             Capability *pcap = &capabilities[i];
1494             // we better hope this task doesn't get migrated to
1495             // another Capability while we're waiting for this one.
1496             // It won't, because load balancing happens while we have
1497             // all the Capabilities, but even so it's a slightly
1498             // unsavoury invariant.
1499             task->cap = pcap;
1500             waitForReturnCapability(&pcap, task);
1501             if (pcap != &capabilities[i]) {
1502                 barf("scheduleDoGC: got the wrong capability");
1503             }
1504         }
1505     }
1506
1507     waiting_for_gc = rtsFalse;
1508 #endif
1509
1510     // so this happens periodically:
1511     if (cap) scheduleCheckBlackHoles(cap);
1512     
1513     IF_DEBUG(scheduler, printAllThreads());
1514
1515     /*
1516      * We now have all the capabilities; if we're in an interrupting
1517      * state, then we should take the opportunity to delete all the
1518      * threads in the system.
1519      */
1520     if (sched_state >= SCHED_INTERRUPTING) {
1521         deleteAllThreads(&capabilities[0]);
1522         sched_state = SCHED_SHUTTING_DOWN;
1523     }
1524     
1525     heap_census = scheduleNeedHeapProfile(rtsTrue);
1526
1527     /* everybody back, start the GC.
1528      * Could do it in this thread, or signal a condition var
1529      * to do it in another thread.  Either way, we need to
1530      * broadcast on gc_pending_cond afterward.
1531      */
1532 #if defined(THREADED_RTS)
1533     debugTrace(DEBUG_sched, "doing GC");
1534 #endif
1535     GarbageCollect(force_major || heap_census);
1536     
1537     if (heap_census) {
1538         debugTrace(DEBUG_sched, "performing heap census");
1539         heapCensus();
1540         performHeapProfile = rtsFalse;
1541     }
1542
1543 #ifdef SPARKBALANCE
1544     /* JB 
1545        Once we are all together... this would be the place to balance all
1546        spark pools. No concurrent stealing or adding of new sparks can
1547        occur. Should be defined in Sparks.c. */
1548     balanceSparkPoolsCaps(n_capabilities, capabilities);
1549 #endif
1550
1551 #if defined(THREADED_RTS)
1552     // release our stash of capabilities.
1553     for (i = 0; i < n_capabilities; i++) {
1554         if (cap != &capabilities[i]) {
1555             task->cap = &capabilities[i];
1556             releaseCapability(&capabilities[i]);
1557         }
1558     }
1559     if (cap) {
1560         task->cap = cap;
1561     } else {
1562         task->cap = NULL;
1563     }
1564 #endif
1565
1566     return cap;
1567 }
1568
1569 /* ---------------------------------------------------------------------------
1570  * Singleton fork(). Do not copy any running threads.
1571  * ------------------------------------------------------------------------- */
1572
1573 pid_t
1574 forkProcess(HsStablePtr *entry
1575 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1576             STG_UNUSED
1577 #endif
1578            )
1579 {
1580 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1581     Task *task;
1582     pid_t pid;
1583     StgTSO* t,*next;
1584     Capability *cap;
1585     nat s;
1586     
1587 #if defined(THREADED_RTS)
1588     if (RtsFlags.ParFlags.nNodes > 1) {
1589         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1590         stg_exit(EXIT_FAILURE);
1591     }
1592 #endif
1593
1594     debugTrace(DEBUG_sched, "forking!");
1595     
1596     // ToDo: for SMP, we should probably acquire *all* the capabilities
1597     cap = rts_lock();
1598     
1599     // no funny business: hold locks while we fork, otherwise if some
1600     // other thread is holding a lock when the fork happens, the data
1601     // structure protected by the lock will forever be in an
1602     // inconsistent state in the child.  See also #1391.
1603     ACQUIRE_LOCK(&sched_mutex);
1604     ACQUIRE_LOCK(&cap->lock);
1605     ACQUIRE_LOCK(&cap->running_task->lock);
1606
1607     pid = fork();
1608     
1609     if (pid) { // parent
1610         
1611         RELEASE_LOCK(&sched_mutex);
1612         RELEASE_LOCK(&cap->lock);
1613         RELEASE_LOCK(&cap->running_task->lock);
1614
1615         // just return the pid
1616         rts_unlock(cap);
1617         return pid;
1618         
1619     } else { // child
1620         
1621 #if defined(THREADED_RTS)
1622         initMutex(&sched_mutex);
1623         initMutex(&cap->lock);
1624         initMutex(&cap->running_task->lock);
1625 #endif
1626
1627         // Now, all OS threads except the thread that forked are
1628         // stopped.  We need to stop all Haskell threads, including
1629         // those involved in foreign calls.  Also we need to delete
1630         // all Tasks, because they correspond to OS threads that are
1631         // now gone.
1632
1633         for (s = 0; s < total_steps; s++) {
1634           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1635             if (t->what_next == ThreadRelocated) {
1636                 next = t->_link;
1637             } else {
1638                 next = t->global_link;
1639                 // don't allow threads to catch the ThreadKilled
1640                 // exception, but we do want to raiseAsync() because these
1641                 // threads may be evaluating thunks that we need later.
1642                 deleteThread_(cap,t);
1643             }
1644           }
1645         }
1646         
1647         // Empty the run queue.  It seems tempting to let all the
1648         // killed threads stay on the run queue as zombies to be
1649         // cleaned up later, but some of them correspond to bound
1650         // threads for which the corresponding Task does not exist.
1651         cap->run_queue_hd = END_TSO_QUEUE;
1652         cap->run_queue_tl = END_TSO_QUEUE;
1653
1654         // Any suspended C-calling Tasks are no more, their OS threads
1655         // don't exist now:
1656         cap->suspended_ccalling_tasks = NULL;
1657
1658         // Empty the threads lists.  Otherwise, the garbage
1659         // collector may attempt to resurrect some of these threads.
1660         for (s = 0; s < total_steps; s++) {
1661             all_steps[s].threads = END_TSO_QUEUE;
1662         }
1663
1664         // Wipe the task list, except the current Task.
1665         ACQUIRE_LOCK(&sched_mutex);
1666         for (task = all_tasks; task != NULL; task=task->all_link) {
1667             if (task != cap->running_task) {
1668 #if defined(THREADED_RTS)
1669                 initMutex(&task->lock); // see #1391
1670 #endif
1671                 discardTask(task);
1672             }
1673         }
1674         RELEASE_LOCK(&sched_mutex);
1675
1676 #if defined(THREADED_RTS)
1677         // Wipe our spare workers list, they no longer exist.  New
1678         // workers will be created if necessary.
1679         cap->spare_workers = NULL;
1680         cap->returning_tasks_hd = NULL;
1681         cap->returning_tasks_tl = NULL;
1682 #endif
1683
1684         // On Unix, all timers are reset in the child, so we need to start
1685         // the timer again.
1686         initTimer();
1687         startTimer();
1688
1689         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1690         rts_checkSchedStatus("forkProcess",cap);
1691         
1692         rts_unlock(cap);
1693         hs_exit();                      // clean up and exit
1694         stg_exit(EXIT_SUCCESS);
1695     }
1696 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1697     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1698     return -1;
1699 #endif
1700 }
1701
1702 /* ---------------------------------------------------------------------------
1703  * Delete all the threads in the system
1704  * ------------------------------------------------------------------------- */
1705    
1706 static void
1707 deleteAllThreads ( Capability *cap )
1708 {
1709     // NOTE: only safe to call if we own all capabilities.
1710
1711     StgTSO* t, *next;
1712     nat s;
1713
1714     debugTrace(DEBUG_sched,"deleting all threads");
1715     for (s = 0; s < total_steps; s++) {
1716       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1717         if (t->what_next == ThreadRelocated) {
1718             next = t->_link;
1719         } else {
1720             next = t->global_link;
1721             deleteThread(cap,t);
1722         }
1723       }
1724     }      
1725
1726     // The run queue now contains a bunch of ThreadKilled threads.  We
1727     // must not throw these away: the main thread(s) will be in there
1728     // somewhere, and the main scheduler loop has to deal with it.
1729     // Also, the run queue is the only thing keeping these threads from
1730     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1731
1732 #if !defined(THREADED_RTS)
1733     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1734     ASSERT(sleeping_queue == END_TSO_QUEUE);
1735 #endif
1736 }
1737
1738 /* -----------------------------------------------------------------------------
1739    Managing the suspended_ccalling_tasks list.
1740    Locks required: sched_mutex
1741    -------------------------------------------------------------------------- */
1742
1743 STATIC_INLINE void
1744 suspendTask (Capability *cap, Task *task)
1745 {
1746     ASSERT(task->next == NULL && task->prev == NULL);
1747     task->next = cap->suspended_ccalling_tasks;
1748     task->prev = NULL;
1749     if (cap->suspended_ccalling_tasks) {
1750         cap->suspended_ccalling_tasks->prev = task;
1751     }
1752     cap->suspended_ccalling_tasks = task;
1753 }
1754
1755 STATIC_INLINE void
1756 recoverSuspendedTask (Capability *cap, Task *task)
1757 {
1758     if (task->prev) {
1759         task->prev->next = task->next;
1760     } else {
1761         ASSERT(cap->suspended_ccalling_tasks == task);
1762         cap->suspended_ccalling_tasks = task->next;
1763     }
1764     if (task->next) {
1765         task->next->prev = task->prev;
1766     }
1767     task->next = task->prev = NULL;
1768 }
1769
1770 /* ---------------------------------------------------------------------------
1771  * Suspending & resuming Haskell threads.
1772  * 
1773  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1774  * its capability before calling the C function.  This allows another
1775  * task to pick up the capability and carry on running Haskell
1776  * threads.  It also means that if the C call blocks, it won't lock
1777  * the whole system.
1778  *
1779  * The Haskell thread making the C call is put to sleep for the
1780  * duration of the call, on the susepended_ccalling_threads queue.  We
1781  * give out a token to the task, which it can use to resume the thread
1782  * on return from the C function.
1783  * ------------------------------------------------------------------------- */
1784    
1785 void *
1786 suspendThread (StgRegTable *reg)
1787 {
1788   Capability *cap;
1789   int saved_errno;
1790   StgTSO *tso;
1791   Task *task;
1792 #if mingw32_HOST_OS
1793   StgWord32 saved_winerror;
1794 #endif
1795
1796   saved_errno = errno;
1797 #if mingw32_HOST_OS
1798   saved_winerror = GetLastError();
1799 #endif
1800
1801   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1802    */
1803   cap = regTableToCapability(reg);
1804
1805   task = cap->running_task;
1806   tso = cap->r.rCurrentTSO;
1807
1808   debugTrace(DEBUG_sched, 
1809              "thread %lu did a safe foreign call", 
1810              (unsigned long)cap->r.rCurrentTSO->id);
1811
1812   // XXX this might not be necessary --SDM
1813   tso->what_next = ThreadRunGHC;
1814
1815   threadPaused(cap,tso);
1816
1817   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1818       tso->why_blocked = BlockedOnCCall;
1819       tso->flags |= TSO_BLOCKEX;
1820       tso->flags &= ~TSO_INTERRUPTIBLE;
1821   } else {
1822       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1823   }
1824
1825   // Hand back capability
1826   task->suspended_tso = tso;
1827
1828   ACQUIRE_LOCK(&cap->lock);
1829
1830   suspendTask(cap,task);
1831   cap->in_haskell = rtsFalse;
1832   releaseCapability_(cap,rtsFalse);
1833   
1834   RELEASE_LOCK(&cap->lock);
1835
1836 #if defined(THREADED_RTS)
1837   /* Preparing to leave the RTS, so ensure there's a native thread/task
1838      waiting to take over.
1839   */
1840   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1841 #endif
1842
1843   errno = saved_errno;
1844 #if mingw32_HOST_OS
1845   SetLastError(saved_winerror);
1846 #endif
1847   return task;
1848 }
1849
1850 StgRegTable *
1851 resumeThread (void *task_)
1852 {
1853     StgTSO *tso;
1854     Capability *cap;
1855     Task *task = task_;
1856     int saved_errno;
1857 #if mingw32_HOST_OS
1858     StgWord32 saved_winerror;
1859 #endif
1860
1861     saved_errno = errno;
1862 #if mingw32_HOST_OS
1863     saved_winerror = GetLastError();
1864 #endif
1865
1866     cap = task->cap;
1867     // Wait for permission to re-enter the RTS with the result.
1868     waitForReturnCapability(&cap,task);
1869     // we might be on a different capability now... but if so, our
1870     // entry on the suspended_ccalling_tasks list will also have been
1871     // migrated.
1872
1873     // Remove the thread from the suspended list
1874     recoverSuspendedTask(cap,task);
1875
1876     tso = task->suspended_tso;
1877     task->suspended_tso = NULL;
1878     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1879     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1880     
1881     if (tso->why_blocked == BlockedOnCCall) {
1882         awakenBlockedExceptionQueue(cap,tso);
1883         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1884     }
1885     
1886     /* Reset blocking status */
1887     tso->why_blocked  = NotBlocked;
1888     
1889     cap->r.rCurrentTSO = tso;
1890     cap->in_haskell = rtsTrue;
1891     errno = saved_errno;
1892 #if mingw32_HOST_OS
1893     SetLastError(saved_winerror);
1894 #endif
1895
1896     /* We might have GC'd, mark the TSO dirty again */
1897     dirty_TSO(cap,tso);
1898
1899     IF_DEBUG(sanity, checkTSO(tso));
1900
1901     return &cap->r;
1902 }
1903
1904 /* ---------------------------------------------------------------------------
1905  * scheduleThread()
1906  *
1907  * scheduleThread puts a thread on the end  of the runnable queue.
1908  * This will usually be done immediately after a thread is created.
1909  * The caller of scheduleThread must create the thread using e.g.
1910  * createThread and push an appropriate closure
1911  * on this thread's stack before the scheduler is invoked.
1912  * ------------------------------------------------------------------------ */
1913
1914 void
1915 scheduleThread(Capability *cap, StgTSO *tso)
1916 {
1917     // The thread goes at the *end* of the run-queue, to avoid possible
1918     // starvation of any threads already on the queue.
1919     appendToRunQueue(cap,tso);
1920 }
1921
1922 void
1923 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1924 {
1925 #if defined(THREADED_RTS)
1926     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1927                               // move this thread from now on.
1928     cpu %= RtsFlags.ParFlags.nNodes;
1929     if (cpu == cap->no) {
1930         appendToRunQueue(cap,tso);
1931     } else {
1932         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1933     }
1934 #else
1935     appendToRunQueue(cap,tso);
1936 #endif
1937 }
1938
1939 Capability *
1940 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1941 {
1942     Task *task;
1943
1944     // We already created/initialised the Task
1945     task = cap->running_task;
1946
1947     // This TSO is now a bound thread; make the Task and TSO
1948     // point to each other.
1949     tso->bound = task;
1950     tso->cap = cap;
1951
1952     task->tso = tso;
1953     task->ret = ret;
1954     task->stat = NoStatus;
1955
1956     appendToRunQueue(cap,tso);
1957
1958     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1959
1960     cap = schedule(cap,task);
1961
1962     ASSERT(task->stat != NoStatus);
1963     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1964
1965     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1966     return cap;
1967 }
1968
1969 /* ----------------------------------------------------------------------------
1970  * Starting Tasks
1971  * ------------------------------------------------------------------------- */
1972
1973 #if defined(THREADED_RTS)
1974 void OSThreadProcAttr
1975 workerStart(Task *task)
1976 {
1977     Capability *cap;
1978
1979     // See startWorkerTask().
1980     ACQUIRE_LOCK(&task->lock);
1981     cap = task->cap;
1982     RELEASE_LOCK(&task->lock);
1983
1984     // set the thread-local pointer to the Task:
1985     taskEnter(task);
1986
1987     // schedule() runs without a lock.
1988     cap = schedule(cap,task);
1989
1990     // On exit from schedule(), we have a Capability.
1991     releaseCapability(cap);
1992     workerTaskStop(task);
1993 }
1994 #endif
1995
1996 /* ---------------------------------------------------------------------------
1997  * initScheduler()
1998  *
1999  * Initialise the scheduler.  This resets all the queues - if the
2000  * queues contained any threads, they'll be garbage collected at the
2001  * next pass.
2002  *
2003  * ------------------------------------------------------------------------ */
2004
2005 void 
2006 initScheduler(void)
2007 {
2008 #if !defined(THREADED_RTS)
2009   blocked_queue_hd  = END_TSO_QUEUE;
2010   blocked_queue_tl  = END_TSO_QUEUE;
2011   sleeping_queue    = END_TSO_QUEUE;
2012 #endif
2013
2014   blackhole_queue   = END_TSO_QUEUE;
2015
2016   sched_state    = SCHED_RUNNING;
2017   recent_activity = ACTIVITY_YES;
2018
2019 #if defined(THREADED_RTS)
2020   /* Initialise the mutex and condition variables used by
2021    * the scheduler. */
2022   initMutex(&sched_mutex);
2023 #endif
2024   
2025   ACQUIRE_LOCK(&sched_mutex);
2026
2027   /* A capability holds the state a native thread needs in
2028    * order to execute STG code. At least one capability is
2029    * floating around (only THREADED_RTS builds have more than one).
2030    */
2031   initCapabilities();
2032
2033   initTaskManager();
2034
2035 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2036   initSparkPools();
2037 #endif
2038
2039 #if defined(THREADED_RTS)
2040   /*
2041    * Eagerly start one worker to run each Capability, except for
2042    * Capability 0.  The idea is that we're probably going to start a
2043    * bound thread on Capability 0 pretty soon, so we don't want a
2044    * worker task hogging it.
2045    */
2046   { 
2047       nat i;
2048       Capability *cap;
2049       for (i = 1; i < n_capabilities; i++) {
2050           cap = &capabilities[i];
2051           ACQUIRE_LOCK(&cap->lock);
2052           startWorkerTask(cap, workerStart);
2053           RELEASE_LOCK(&cap->lock);
2054       }
2055   }
2056 #endif
2057
2058   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2059
2060   RELEASE_LOCK(&sched_mutex);
2061 }
2062
2063 void
2064 exitScheduler(
2065     rtsBool wait_foreign
2066 #if !defined(THREADED_RTS)
2067                          __attribute__((unused))
2068 #endif
2069 )
2070                /* see Capability.c, shutdownCapability() */
2071 {
2072     Task *task = NULL;
2073
2074 #if defined(THREADED_RTS)
2075     ACQUIRE_LOCK(&sched_mutex);
2076     task = newBoundTask();
2077     RELEASE_LOCK(&sched_mutex);
2078 #endif
2079
2080     // If we haven't killed all the threads yet, do it now.
2081     if (sched_state < SCHED_SHUTTING_DOWN) {
2082         sched_state = SCHED_INTERRUPTING;
2083         scheduleDoGC(NULL,task,rtsFalse);    
2084     }
2085     sched_state = SCHED_SHUTTING_DOWN;
2086
2087 #if defined(THREADED_RTS)
2088     { 
2089         nat i;
2090         
2091         for (i = 0; i < n_capabilities; i++) {
2092             shutdownCapability(&capabilities[i], task, wait_foreign);
2093         }
2094         boundTaskExiting(task);
2095         stopTaskManager();
2096     }
2097 #endif
2098 }
2099
2100 void
2101 freeScheduler( void )
2102 {
2103     freeCapabilities();
2104     freeTaskManager();
2105     if (n_capabilities != 1) {
2106         stgFree(capabilities);
2107     }
2108 #if defined(THREADED_RTS)
2109     closeMutex(&sched_mutex);
2110 #endif
2111 }
2112
2113 /* -----------------------------------------------------------------------------
2114    performGC
2115
2116    This is the interface to the garbage collector from Haskell land.
2117    We provide this so that external C code can allocate and garbage
2118    collect when called from Haskell via _ccall_GC.
2119    -------------------------------------------------------------------------- */
2120
2121 static void
2122 performGC_(rtsBool force_major)
2123 {
2124     Task *task;
2125     // We must grab a new Task here, because the existing Task may be
2126     // associated with a particular Capability, and chained onto the 
2127     // suspended_ccalling_tasks queue.
2128     ACQUIRE_LOCK(&sched_mutex);
2129     task = newBoundTask();
2130     RELEASE_LOCK(&sched_mutex);
2131     scheduleDoGC(NULL,task,force_major);
2132     boundTaskExiting(task);
2133 }
2134
2135 void
2136 performGC(void)
2137 {
2138     performGC_(rtsFalse);
2139 }
2140
2141 void
2142 performMajorGC(void)
2143 {
2144     performGC_(rtsTrue);
2145 }
2146
2147 /* -----------------------------------------------------------------------------
2148    Stack overflow
2149
2150    If the thread has reached its maximum stack size, then raise the
2151    StackOverflow exception in the offending thread.  Otherwise
2152    relocate the TSO into a larger chunk of memory and adjust its stack
2153    size appropriately.
2154    -------------------------------------------------------------------------- */
2155
2156 static StgTSO *
2157 threadStackOverflow(Capability *cap, StgTSO *tso)
2158 {
2159   nat new_stack_size, stack_words;
2160   lnat new_tso_size;
2161   StgPtr new_sp;
2162   StgTSO *dest;
2163
2164   IF_DEBUG(sanity,checkTSO(tso));
2165
2166   // don't allow throwTo() to modify the blocked_exceptions queue
2167   // while we are moving the TSO:
2168   lockClosure((StgClosure *)tso);
2169
2170   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2171       // NB. never raise a StackOverflow exception if the thread is
2172       // inside Control.Exceptino.block.  It is impractical to protect
2173       // against stack overflow exceptions, since virtually anything
2174       // can raise one (even 'catch'), so this is the only sensible
2175       // thing to do here.  See bug #767.
2176
2177       debugTrace(DEBUG_gc,
2178                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2179                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2180       IF_DEBUG(gc,
2181                /* If we're debugging, just print out the top of the stack */
2182                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2183                                                 tso->sp+64)));
2184
2185       // Send this thread the StackOverflow exception
2186       unlockTSO(tso);
2187       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2188       return tso;
2189   }
2190
2191   /* Try to double the current stack size.  If that takes us over the
2192    * maximum stack size for this thread, then use the maximum instead
2193    * (that is, unless we're already at or over the max size and we
2194    * can't raise the StackOverflow exception (see above), in which
2195    * case just double the size). Finally round up so the TSO ends up as
2196    * a whole number of blocks.
2197    */
2198   if (tso->stack_size >= tso->max_stack_size) {
2199       new_stack_size = tso->stack_size * 2;
2200   } else { 
2201       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2202   }
2203   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2204                                        TSO_STRUCT_SIZE)/sizeof(W_);
2205   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2206   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2207
2208   debugTrace(DEBUG_sched, 
2209              "increasing stack size from %ld words to %d.",
2210              (long)tso->stack_size, new_stack_size);
2211
2212   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2213   TICK_ALLOC_TSO(new_stack_size,0);
2214
2215   /* copy the TSO block and the old stack into the new area */
2216   memcpy(dest,tso,TSO_STRUCT_SIZE);
2217   stack_words = tso->stack + tso->stack_size - tso->sp;
2218   new_sp = (P_)dest + new_tso_size - stack_words;
2219   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2220
2221   /* relocate the stack pointers... */
2222   dest->sp         = new_sp;
2223   dest->stack_size = new_stack_size;
2224         
2225   /* Mark the old TSO as relocated.  We have to check for relocated
2226    * TSOs in the garbage collector and any primops that deal with TSOs.
2227    *
2228    * It's important to set the sp value to just beyond the end
2229    * of the stack, so we don't attempt to scavenge any part of the
2230    * dead TSO's stack.
2231    */
2232   tso->what_next = ThreadRelocated;
2233   setTSOLink(cap,tso,dest);
2234   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2235   tso->why_blocked = NotBlocked;
2236
2237   IF_PAR_DEBUG(verbose,
2238                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2239                      tso->id, tso, tso->stack_size);
2240                /* If we're debugging, just print out the top of the stack */
2241                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2242                                                 tso->sp+64)));
2243   
2244   unlockTSO(dest);
2245   unlockTSO(tso);
2246
2247   IF_DEBUG(sanity,checkTSO(dest));
2248 #if 0
2249   IF_DEBUG(scheduler,printTSO(dest));
2250 #endif
2251
2252   return dest;
2253 }
2254
2255 static StgTSO *
2256 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2257 {
2258     bdescr *bd, *new_bd;
2259     lnat free_w, tso_size_w;
2260     StgTSO *new_tso;
2261
2262     tso_size_w = tso_sizeW(tso);
2263
2264     if (tso_size_w < MBLOCK_SIZE_W || 
2265         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2266     {
2267         return tso;
2268     }
2269
2270     // don't allow throwTo() to modify the blocked_exceptions queue
2271     // while we are moving the TSO:
2272     lockClosure((StgClosure *)tso);
2273
2274     // this is the number of words we'll free
2275     free_w = round_to_mblocks(tso_size_w/2);
2276
2277     bd = Bdescr((StgPtr)tso);
2278     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2279     bd->free = bd->start + TSO_STRUCT_SIZEW;
2280
2281     new_tso = (StgTSO *)new_bd->start;
2282     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2283     new_tso->stack_size = new_bd->free - new_tso->stack;
2284
2285     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2286                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2287
2288     tso->what_next = ThreadRelocated;
2289     tso->_link = new_tso; // no write barrier reqd: same generation
2290
2291     // The TSO attached to this Task may have moved, so update the
2292     // pointer to it.
2293     if (task->tso == tso) {
2294         task->tso = new_tso;
2295     }
2296
2297     unlockTSO(new_tso);
2298     unlockTSO(tso);
2299
2300     IF_DEBUG(sanity,checkTSO(new_tso));
2301
2302     return new_tso;
2303 }
2304
2305 /* ---------------------------------------------------------------------------
2306    Interrupt execution
2307    - usually called inside a signal handler so it mustn't do anything fancy.   
2308    ------------------------------------------------------------------------ */
2309
2310 void
2311 interruptStgRts(void)
2312 {
2313     sched_state = SCHED_INTERRUPTING;
2314     setContextSwitches();
2315     wakeUpRts();
2316 }
2317
2318 /* -----------------------------------------------------------------------------
2319    Wake up the RTS
2320    
2321    This function causes at least one OS thread to wake up and run the
2322    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2323    an external event has arrived that may need servicing (eg. a
2324    keyboard interrupt).
2325
2326    In the single-threaded RTS we don't do anything here; we only have
2327    one thread anyway, and the event that caused us to want to wake up
2328    will have interrupted any blocking system call in progress anyway.
2329    -------------------------------------------------------------------------- */
2330
2331 void
2332 wakeUpRts(void)
2333 {
2334 #if defined(THREADED_RTS)
2335     // This forces the IO Manager thread to wakeup, which will
2336     // in turn ensure that some OS thread wakes up and runs the
2337     // scheduler loop, which will cause a GC and deadlock check.
2338     ioManagerWakeup();
2339 #endif
2340 }
2341
2342 /* -----------------------------------------------------------------------------
2343  * checkBlackHoles()
2344  *
2345  * Check the blackhole_queue for threads that can be woken up.  We do
2346  * this periodically: before every GC, and whenever the run queue is
2347  * empty.
2348  *
2349  * An elegant solution might be to just wake up all the blocked
2350  * threads with awakenBlockedQueue occasionally: they'll go back to
2351  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2352  * doesn't give us a way to tell whether we've actually managed to
2353  * wake up any threads, so we would be busy-waiting.
2354  *
2355  * -------------------------------------------------------------------------- */
2356
2357 static rtsBool
2358 checkBlackHoles (Capability *cap)
2359 {
2360     StgTSO **prev, *t;
2361     rtsBool any_woke_up = rtsFalse;
2362     StgHalfWord type;
2363
2364     // blackhole_queue is global:
2365     ASSERT_LOCK_HELD(&sched_mutex);
2366
2367     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2368
2369     // ASSUMES: sched_mutex
2370     prev = &blackhole_queue;
2371     t = blackhole_queue;
2372     while (t != END_TSO_QUEUE) {
2373         ASSERT(t->why_blocked == BlockedOnBlackHole);
2374         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2375         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2376             IF_DEBUG(sanity,checkTSO(t));
2377             t = unblockOne(cap, t);
2378             *prev = t;
2379             any_woke_up = rtsTrue;
2380         } else {
2381             prev = &t->_link;
2382             t = t->_link;
2383         }
2384     }
2385
2386     return any_woke_up;
2387 }
2388
2389 /* -----------------------------------------------------------------------------
2390    Deleting threads
2391
2392    This is used for interruption (^C) and forking, and corresponds to
2393    raising an exception but without letting the thread catch the
2394    exception.
2395    -------------------------------------------------------------------------- */
2396
2397 static void 
2398 deleteThread (Capability *cap, StgTSO *tso)
2399 {
2400     // NOTE: must only be called on a TSO that we have exclusive
2401     // access to, because we will call throwToSingleThreaded() below.
2402     // The TSO must be on the run queue of the Capability we own, or 
2403     // we must own all Capabilities.
2404
2405     if (tso->why_blocked != BlockedOnCCall &&
2406         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2407         throwToSingleThreaded(cap,tso,NULL);
2408     }
2409 }
2410
2411 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2412 static void 
2413 deleteThread_(Capability *cap, StgTSO *tso)
2414 { // for forkProcess only:
2415   // like deleteThread(), but we delete threads in foreign calls, too.
2416
2417     if (tso->why_blocked == BlockedOnCCall ||
2418         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2419         unblockOne(cap,tso);
2420         tso->what_next = ThreadKilled;
2421     } else {
2422         deleteThread(cap,tso);
2423     }
2424 }
2425 #endif
2426
2427 /* -----------------------------------------------------------------------------
2428    raiseExceptionHelper
2429    
2430    This function is called by the raise# primitve, just so that we can
2431    move some of the tricky bits of raising an exception from C-- into
2432    C.  Who knows, it might be a useful re-useable thing here too.
2433    -------------------------------------------------------------------------- */
2434
2435 StgWord
2436 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2437 {
2438     Capability *cap = regTableToCapability(reg);
2439     StgThunk *raise_closure = NULL;
2440     StgPtr p, next;
2441     StgRetInfoTable *info;
2442     //
2443     // This closure represents the expression 'raise# E' where E
2444     // is the exception raise.  It is used to overwrite all the
2445     // thunks which are currently under evaluataion.
2446     //
2447
2448     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2449     // LDV profiling: stg_raise_info has THUNK as its closure
2450     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2451     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2452     // 1 does not cause any problem unless profiling is performed.
2453     // However, when LDV profiling goes on, we need to linearly scan
2454     // small object pool, where raise_closure is stored, so we should
2455     // use MIN_UPD_SIZE.
2456     //
2457     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2458     //                                 sizeofW(StgClosure)+1);
2459     //
2460
2461     //
2462     // Walk up the stack, looking for the catch frame.  On the way,
2463     // we update any closures pointed to from update frames with the
2464     // raise closure that we just built.
2465     //
2466     p = tso->sp;
2467     while(1) {
2468         info = get_ret_itbl((StgClosure *)p);
2469         next = p + stack_frame_sizeW((StgClosure *)p);
2470         switch (info->i.type) {
2471             
2472         case UPDATE_FRAME:
2473             // Only create raise_closure if we need to.
2474             if (raise_closure == NULL) {
2475                 raise_closure = 
2476                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2477                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2478                 raise_closure->payload[0] = exception;
2479             }
2480             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2481             p = next;
2482             continue;
2483
2484         case ATOMICALLY_FRAME:
2485             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2486             tso->sp = p;
2487             return ATOMICALLY_FRAME;
2488             
2489         case CATCH_FRAME:
2490             tso->sp = p;
2491             return CATCH_FRAME;
2492
2493         case CATCH_STM_FRAME:
2494             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2495             tso->sp = p;
2496             return CATCH_STM_FRAME;
2497             
2498         case STOP_FRAME:
2499             tso->sp = p;
2500             return STOP_FRAME;
2501
2502         case CATCH_RETRY_FRAME:
2503         default:
2504             p = next; 
2505             continue;
2506         }
2507     }
2508 }
2509
2510
2511 /* -----------------------------------------------------------------------------
2512    findRetryFrameHelper
2513
2514    This function is called by the retry# primitive.  It traverses the stack
2515    leaving tso->sp referring to the frame which should handle the retry.  
2516
2517    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2518    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2519
2520    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2521    create) because retries are not considered to be exceptions, despite the
2522    similar implementation.
2523
2524    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2525    not be created within memory transactions.
2526    -------------------------------------------------------------------------- */
2527
2528 StgWord
2529 findRetryFrameHelper (StgTSO *tso)
2530 {
2531   StgPtr           p, next;
2532   StgRetInfoTable *info;
2533
2534   p = tso -> sp;
2535   while (1) {
2536     info = get_ret_itbl((StgClosure *)p);
2537     next = p + stack_frame_sizeW((StgClosure *)p);
2538     switch (info->i.type) {
2539       
2540     case ATOMICALLY_FRAME:
2541         debugTrace(DEBUG_stm,
2542                    "found ATOMICALLY_FRAME at %p during retry", p);
2543         tso->sp = p;
2544         return ATOMICALLY_FRAME;
2545       
2546     case CATCH_RETRY_FRAME:
2547         debugTrace(DEBUG_stm,
2548                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2549         tso->sp = p;
2550         return CATCH_RETRY_FRAME;
2551       
2552     case CATCH_STM_FRAME: {
2553         StgTRecHeader *trec = tso -> trec;
2554         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2555         debugTrace(DEBUG_stm,
2556                    "found CATCH_STM_FRAME at %p during retry", p);
2557         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2558         stmAbortTransaction(tso -> cap, trec);
2559         stmFreeAbortedTRec(tso -> cap, trec);
2560         tso -> trec = outer;
2561         p = next; 
2562         continue;
2563     }
2564       
2565
2566     default:
2567       ASSERT(info->i.type != CATCH_FRAME);
2568       ASSERT(info->i.type != STOP_FRAME);
2569       p = next; 
2570       continue;
2571     }
2572   }
2573 }
2574
2575 /* -----------------------------------------------------------------------------
2576    resurrectThreads is called after garbage collection on the list of
2577    threads found to be garbage.  Each of these threads will be woken
2578    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2579    on an MVar, or NonTermination if the thread was blocked on a Black
2580    Hole.
2581
2582    Locks: assumes we hold *all* the capabilities.
2583    -------------------------------------------------------------------------- */
2584
2585 void
2586 resurrectThreads (StgTSO *threads)
2587 {
2588     StgTSO *tso, *next;
2589     Capability *cap;
2590     step *step;
2591
2592     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2593         next = tso->global_link;
2594
2595         step = Bdescr((P_)tso)->step;
2596         tso->global_link = step->threads;
2597         step->threads = tso;
2598
2599         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2600         
2601         // Wake up the thread on the Capability it was last on
2602         cap = tso->cap;
2603         
2604         switch (tso->why_blocked) {
2605         case BlockedOnMVar:
2606         case BlockedOnException:
2607             /* Called by GC - sched_mutex lock is currently held. */
2608             throwToSingleThreaded(cap, tso,
2609                                   (StgClosure *)blockedOnDeadMVar_closure);
2610             break;
2611         case BlockedOnBlackHole:
2612             throwToSingleThreaded(cap, tso,
2613                                   (StgClosure *)nonTermination_closure);
2614             break;
2615         case BlockedOnSTM:
2616             throwToSingleThreaded(cap, tso,
2617                                   (StgClosure *)blockedIndefinitely_closure);
2618             break;
2619         case NotBlocked:
2620             /* This might happen if the thread was blocked on a black hole
2621              * belonging to a thread that we've just woken up (raiseAsync
2622              * can wake up threads, remember...).
2623              */
2624             continue;
2625         default:
2626             barf("resurrectThreads: thread blocked in a strange way");
2627         }
2628     }
2629 }
2630
2631 /* -----------------------------------------------------------------------------
2632    performPendingThrowTos is called after garbage collection, and
2633    passed a list of threads that were found to have pending throwTos
2634    (tso->blocked_exceptions was not empty), and were blocked.
2635    Normally this doesn't happen, because we would deliver the
2636    exception directly if the target thread is blocked, but there are
2637    small windows where it might occur on a multiprocessor (see
2638    throwTo()).
2639
2640    NB. we must be holding all the capabilities at this point, just
2641    like resurrectThreads().
2642    -------------------------------------------------------------------------- */
2643
2644 void
2645 performPendingThrowTos (StgTSO *threads)
2646 {
2647     StgTSO *tso, *next;
2648     Capability *cap;
2649     step *step;
2650
2651     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2652         next = tso->global_link;
2653
2654         step = Bdescr((P_)tso)->step;
2655         tso->global_link = step->threads;
2656         step->threads = tso;
2657
2658         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2659         
2660         cap = tso->cap;
2661         maybePerformBlockedException(cap, tso);
2662     }
2663 }