Fix race condition in wakeupThreadOnCapability() (#2574)
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag set by signal handler to precipitate a context switch
93  * LOCK: none (just an advisory flag)
94  */
95 int context_switch = 0;
96
97 /* flag that tracks whether we have done any execution in this time slice.
98  * LOCK: currently none, perhaps we should lock (but needs to be
99  * updated in the fast path of the scheduler).
100  */
101 nat recent_activity = ACTIVITY_YES;
102
103 /* if this flag is set as well, give up execution
104  * LOCK: none (changes once, from false->true)
105  */
106 rtsBool sched_state = SCHED_RUNNING;
107
108 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
109  *  exists - earlier gccs apparently didn't.
110  *  -= chak
111  */
112 StgTSO dummy_tso;
113
114 /*
115  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
116  * in an MT setting, needed to signal that a worker thread shouldn't hang around
117  * in the scheduler when it is out of work.
118  */
119 rtsBool shutting_down_scheduler = rtsFalse;
120
121 /*
122  * This mutex protects most of the global scheduler data in
123  * the THREADED_RTS runtime.
124  */
125 #if defined(THREADED_RTS)
126 Mutex sched_mutex;
127 #endif
128
129 #if !defined(mingw32_HOST_OS)
130 #define FORKPROCESS_PRIMOP_SUPPORTED
131 #endif
132
133 /* -----------------------------------------------------------------------------
134  * static function prototypes
135  * -------------------------------------------------------------------------- */
136
137 static Capability *schedule (Capability *initialCapability, Task *task);
138
139 //
140 // These function all encapsulate parts of the scheduler loop, and are
141 // abstracted only to make the structure and control flow of the
142 // scheduler clearer.
143 //
144 static void schedulePreLoop (void);
145 #if defined(THREADED_RTS)
146 static void schedulePushWork(Capability *cap, Task *task);
147 #endif
148 static void scheduleStartSignalHandlers (Capability *cap);
149 static void scheduleCheckBlockedThreads (Capability *cap);
150 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
151 static void scheduleCheckBlackHoles (Capability *cap);
152 static void scheduleDetectDeadlock (Capability *cap, Task *task);
153 #if defined(PARALLEL_HASKELL)
154 static rtsBool scheduleGetRemoteWork(Capability *cap);
155 static void scheduleSendPendingMessages(void);
156 static void scheduleActivateSpark(Capability *cap);
157 #endif
158 static void schedulePostRunThread(StgTSO *t);
159 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
160 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
161                                          StgTSO *t);
162 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
163                                     nat prev_what_next );
164 static void scheduleHandleThreadBlocked( StgTSO *t );
165 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
166                                              StgTSO *t );
167 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
168 static Capability *scheduleDoGC(Capability *cap, Task *task,
169                                 rtsBool force_major);
170
171 static rtsBool checkBlackHoles(Capability *cap);
172
173 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
174 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
175
176 static void deleteThread (Capability *cap, StgTSO *tso);
177 static void deleteAllThreads (Capability *cap);
178
179 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
180 static void deleteThread_(Capability *cap, StgTSO *tso);
181 #endif
182
183 #ifdef DEBUG
184 static char *whatNext_strs[] = {
185   "(unknown)",
186   "ThreadRunGHC",
187   "ThreadInterpret",
188   "ThreadKilled",
189   "ThreadRelocated",
190   "ThreadComplete"
191 };
192 #endif
193
194 /* -----------------------------------------------------------------------------
195  * Putting a thread on the run queue: different scheduling policies
196  * -------------------------------------------------------------------------- */
197
198 STATIC_INLINE void
199 addToRunQueue( Capability *cap, StgTSO *t )
200 {
201 #if defined(PARALLEL_HASKELL)
202     if (RtsFlags.ParFlags.doFairScheduling) { 
203         // this does round-robin scheduling; good for concurrency
204         appendToRunQueue(cap,t);
205     } else {
206         // this does unfair scheduling; good for parallelism
207         pushOnRunQueue(cap,t);
208     }
209 #else
210     // this does round-robin scheduling; good for concurrency
211     appendToRunQueue(cap,t);
212 #endif
213 }
214
215 /* ---------------------------------------------------------------------------
216    Main scheduling loop.
217
218    We use round-robin scheduling, each thread returning to the
219    scheduler loop when one of these conditions is detected:
220
221       * out of heap space
222       * timer expires (thread yields)
223       * thread blocks
224       * thread ends
225       * stack overflow
226
227    GRAN version:
228      In a GranSim setup this loop iterates over the global event queue.
229      This revolves around the global event queue, which determines what 
230      to do next. Therefore, it's more complicated than either the 
231      concurrent or the parallel (GUM) setup.
232   This version has been entirely removed (JB 2008/08).
233
234    GUM version:
235      GUM iterates over incoming messages.
236      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
237      and sends out a fish whenever it has nothing to do; in-between
238      doing the actual reductions (shared code below) it processes the
239      incoming messages and deals with delayed operations 
240      (see PendingFetches).
241      This is not the ugliest code you could imagine, but it's bloody close.
242
243   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
244   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
245   as well as future GUM versions. This file has been refurbished to
246   only contain valid code, which is however incomplete, refers to
247   invalid includes etc.
248
249    ------------------------------------------------------------------------ */
250
251 static Capability *
252 schedule (Capability *initialCapability, Task *task)
253 {
254   StgTSO *t;
255   Capability *cap;
256   StgThreadReturnCode ret;
257 #if defined(PARALLEL_HASKELL)
258   rtsBool receivedFinish = rtsFalse;
259 #endif
260   nat prev_what_next;
261   rtsBool ready_to_gc;
262 #if defined(THREADED_RTS)
263   rtsBool first = rtsTrue;
264 #endif
265   
266   cap = initialCapability;
267
268   // Pre-condition: this task owns initialCapability.
269   // The sched_mutex is *NOT* held
270   // NB. on return, we still hold a capability.
271
272   debugTrace (DEBUG_sched, 
273               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
274               task, initialCapability);
275
276   schedulePreLoop();
277
278   // -----------------------------------------------------------
279   // Scheduler loop starts here:
280
281 #if defined(PARALLEL_HASKELL)
282 #define TERMINATION_CONDITION        (!receivedFinish)
283 #else
284 #define TERMINATION_CONDITION        rtsTrue
285 #endif
286
287   while (TERMINATION_CONDITION) {
288
289 #if defined(THREADED_RTS)
290       if (first) {
291           // don't yield the first time, we want a chance to run this
292           // thread for a bit, even if there are others banging at the
293           // door.
294           first = rtsFalse;
295           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
296       } else {
297           // Yield the capability to higher-priority tasks if necessary.
298           yieldCapability(&cap, task);
299       }
300 #endif
301       
302 #if defined(THREADED_RTS)
303       schedulePushWork(cap,task);
304 #endif
305
306     // Check whether we have re-entered the RTS from Haskell without
307     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
308     // call).
309     if (cap->in_haskell) {
310           errorBelch("schedule: re-entered unsafely.\n"
311                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
312           stg_exit(EXIT_FAILURE);
313     }
314
315     // The interruption / shutdown sequence.
316     // 
317     // In order to cleanly shut down the runtime, we want to:
318     //   * make sure that all main threads return to their callers
319     //     with the state 'Interrupted'.
320     //   * clean up all OS threads assocated with the runtime
321     //   * free all memory etc.
322     //
323     // So the sequence for ^C goes like this:
324     //
325     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
326     //     arranges for some Capability to wake up
327     //
328     //   * all threads in the system are halted, and the zombies are
329     //     placed on the run queue for cleaning up.  We acquire all
330     //     the capabilities in order to delete the threads, this is
331     //     done by scheduleDoGC() for convenience (because GC already
332     //     needs to acquire all the capabilities).  We can't kill
333     //     threads involved in foreign calls.
334     // 
335     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
336     //
337     //   * sched_state := SCHED_SHUTTING_DOWN
338     //
339     //   * all workers exit when the run queue on their capability
340     //     drains.  All main threads will also exit when their TSO
341     //     reaches the head of the run queue and they can return.
342     //
343     //   * eventually all Capabilities will shut down, and the RTS can
344     //     exit.
345     //
346     //   * We might be left with threads blocked in foreign calls, 
347     //     we should really attempt to kill these somehow (TODO);
348     
349     switch (sched_state) {
350     case SCHED_RUNNING:
351         break;
352     case SCHED_INTERRUPTING:
353         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
354 #if defined(THREADED_RTS)
355         discardSparksCap(cap);
356 #endif
357         /* scheduleDoGC() deletes all the threads */
358         cap = scheduleDoGC(cap,task,rtsFalse);
359         break;
360     case SCHED_SHUTTING_DOWN:
361         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
362         // If we are a worker, just exit.  If we're a bound thread
363         // then we will exit below when we've removed our TSO from
364         // the run queue.
365         if (task->tso == NULL && emptyRunQueue(cap)) {
366             return cap;
367         }
368         break;
369     default:
370         barf("sched_state: %d", sched_state);
371     }
372
373 #if defined(THREADED_RTS)
374     // If the run queue is empty, take a spark and turn it into a thread.
375     {
376         if (emptyRunQueue(cap)) {
377             StgClosure *spark;
378             spark = findSpark(cap);
379             if (spark != NULL) {
380                 debugTrace(DEBUG_sched,
381                            "turning spark of closure %p into a thread",
382                            (StgClosure *)spark);
383                 createSparkThread(cap,spark);     
384             }
385         }
386     }
387 #endif // THREADED_RTS
388
389     scheduleStartSignalHandlers(cap);
390
391     // Only check the black holes here if we've nothing else to do.
392     // During normal execution, the black hole list only gets checked
393     // at GC time, to avoid repeatedly traversing this possibly long
394     // list each time around the scheduler.
395     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
396
397     scheduleCheckWakeupThreads(cap);
398
399     scheduleCheckBlockedThreads(cap);
400
401 #if defined(PARALLEL_HASKELL)
402     /* message processing and work distribution goes here */ 
403
404     /* if messages have been buffered... a NOOP in THREADED_RTS */
405     scheduleSendPendingMessages();
406
407     /* If the run queue is empty,...*/
408     if (emptyRunQueue(cap)) {
409       /* ...take one of our own sparks and turn it into a thread */
410       scheduleActivateSpark(cap);
411
412         /* if this did not work, try to steal a spark from someone else */
413       if (emptyRunQueue(cap)) {
414         receivedFinish = scheduleGetRemoteWork(cap);
415         continue; //  a new round, (hopefully) with new work
416         /* 
417            in GUM, this a) sends out a FISH and returns IF no fish is
418                            out already
419                         b) (blocking) awaits and receives messages
420            
421            in Eden, this is only the blocking receive, as b) in GUM.
422         */
423       }
424     } 
425
426     /* since we perform a blocking receive and continue otherwise,
427        either we never reach here or we definitely have work! */
428     // from here: non-empty run queue
429     ASSERT(!emptyRunQueue(cap));
430
431     if (PacketsWaiting()) {  /* now process incoming messages, if any
432                                 pending...  
433
434                                 CAUTION: scheduleGetRemoteWork called
435                                 above, waits for messages as well! */
436       processMessages(cap, &receivedFinish);
437     }
438 #endif // PARALLEL_HASKELL
439
440     scheduleDetectDeadlock(cap,task);
441 #if defined(THREADED_RTS)
442     cap = task->cap;    // reload cap, it might have changed
443 #endif
444
445     // Normally, the only way we can get here with no threads to
446     // run is if a keyboard interrupt received during 
447     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
448     // Additionally, it is not fatal for the
449     // threaded RTS to reach here with no threads to run.
450     //
451     // win32: might be here due to awaitEvent() being abandoned
452     // as a result of a console event having been delivered.
453     if ( emptyRunQueue(cap) ) {
454 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
455         ASSERT(sched_state >= SCHED_INTERRUPTING);
456 #endif
457         continue; // nothing to do
458     }
459
460     // 
461     // Get a thread to run
462     //
463     t = popRunQueue(cap);
464
465     // Sanity check the thread we're about to run.  This can be
466     // expensive if there is lots of thread switching going on...
467     IF_DEBUG(sanity,checkTSO(t));
468
469 #if defined(THREADED_RTS)
470     // Check whether we can run this thread in the current task.
471     // If not, we have to pass our capability to the right task.
472     {
473         Task *bound = t->bound;
474       
475         if (bound) {
476             if (bound == task) {
477                 debugTrace(DEBUG_sched,
478                            "### Running thread %lu in bound thread", (unsigned long)t->id);
479                 // yes, the Haskell thread is bound to the current native thread
480             } else {
481                 debugTrace(DEBUG_sched,
482                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
483                 // no, bound to a different Haskell thread: pass to that thread
484                 pushOnRunQueue(cap,t);
485                 continue;
486             }
487         } else {
488             // The thread we want to run is unbound.
489             if (task->tso) { 
490                 debugTrace(DEBUG_sched,
491                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
492                 // no, the current native thread is bound to a different
493                 // Haskell thread, so pass it to any worker thread
494                 pushOnRunQueue(cap,t);
495                 continue; 
496             }
497         }
498     }
499 #endif
500
501     /* context switches are initiated by the timer signal, unless
502      * the user specified "context switch as often as possible", with
503      * +RTS -C0
504      */
505     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
506         && !emptyThreadQueues(cap)) {
507         context_switch = 1;
508     }
509          
510 run_thread:
511
512     // CurrentTSO is the thread to run.  t might be different if we
513     // loop back to run_thread, so make sure to set CurrentTSO after
514     // that.
515     cap->r.rCurrentTSO = t;
516
517     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
518                               (long)t->id, whatNext_strs[t->what_next]);
519
520     startHeapProfTimer();
521
522     // Check for exceptions blocked on this thread
523     maybePerformBlockedException (cap, t);
524
525     // ----------------------------------------------------------------------
526     // Run the current thread 
527
528     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
529     ASSERT(t->cap == cap);
530
531     prev_what_next = t->what_next;
532
533     errno = t->saved_errno;
534 #if mingw32_HOST_OS
535     SetLastError(t->saved_winerror);
536 #endif
537
538     cap->in_haskell = rtsTrue;
539
540     dirty_TSO(cap,t);
541
542 #if defined(THREADED_RTS)
543     if (recent_activity == ACTIVITY_DONE_GC) {
544         // ACTIVITY_DONE_GC means we turned off the timer signal to
545         // conserve power (see #1623).  Re-enable it here.
546         nat prev;
547         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
548         if (prev == ACTIVITY_DONE_GC) {
549             startTimer();
550         }
551     } else {
552         recent_activity = ACTIVITY_YES;
553     }
554 #endif
555
556     switch (prev_what_next) {
557         
558     case ThreadKilled:
559     case ThreadComplete:
560         /* Thread already finished, return to scheduler. */
561         ret = ThreadFinished;
562         break;
563         
564     case ThreadRunGHC:
565     {
566         StgRegTable *r;
567         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
568         cap = regTableToCapability(r);
569         ret = r->rRet;
570         break;
571     }
572     
573     case ThreadInterpret:
574         cap = interpretBCO(cap);
575         ret = cap->r.rRet;
576         break;
577         
578     default:
579         barf("schedule: invalid what_next field");
580     }
581
582     cap->in_haskell = rtsFalse;
583
584     // The TSO might have moved, eg. if it re-entered the RTS and a GC
585     // happened.  So find the new location:
586     t = cap->r.rCurrentTSO;
587
588     // We have run some Haskell code: there might be blackhole-blocked
589     // threads to wake up now.
590     // Lock-free test here should be ok, we're just setting a flag.
591     if ( blackhole_queue != END_TSO_QUEUE ) {
592         blackholes_need_checking = rtsTrue;
593     }
594     
595     // And save the current errno in this thread.
596     // XXX: possibly bogus for SMP because this thread might already
597     // be running again, see code below.
598     t->saved_errno = errno;
599 #if mingw32_HOST_OS
600     // Similarly for Windows error code
601     t->saved_winerror = GetLastError();
602 #endif
603
604 #if defined(THREADED_RTS)
605     // If ret is ThreadBlocked, and this Task is bound to the TSO that
606     // blocked, we are in limbo - the TSO is now owned by whatever it
607     // is blocked on, and may in fact already have been woken up,
608     // perhaps even on a different Capability.  It may be the case
609     // that task->cap != cap.  We better yield this Capability
610     // immediately and return to normaility.
611     if (ret == ThreadBlocked) {
612         debugTrace(DEBUG_sched,
613                    "--<< thread %lu (%s) stopped: blocked",
614                    (unsigned long)t->id, whatNext_strs[t->what_next]);
615         continue;
616     }
617 #endif
618
619     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
620     ASSERT(t->cap == cap);
621
622     // ----------------------------------------------------------------------
623     
624     // Costs for the scheduler are assigned to CCS_SYSTEM
625     stopHeapProfTimer();
626 #if defined(PROFILING)
627     CCCS = CCS_SYSTEM;
628 #endif
629     
630     schedulePostRunThread(t);
631
632     t = threadStackUnderflow(task,t);
633
634     ready_to_gc = rtsFalse;
635
636     switch (ret) {
637     case HeapOverflow:
638         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
639         break;
640
641     case StackOverflow:
642         scheduleHandleStackOverflow(cap,task,t);
643         break;
644
645     case ThreadYielding:
646         if (scheduleHandleYield(cap, t, prev_what_next)) {
647             // shortcut for switching between compiler/interpreter:
648             goto run_thread; 
649         }
650         break;
651
652     case ThreadBlocked:
653         scheduleHandleThreadBlocked(t);
654         break;
655
656     case ThreadFinished:
657         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
658         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
659         break;
660
661     default:
662       barf("schedule: invalid thread return code %d", (int)ret);
663     }
664
665     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
666       cap = scheduleDoGC(cap,task,rtsFalse);
667     }
668   } /* end of while() */
669 }
670
671 /* ----------------------------------------------------------------------------
672  * Setting up the scheduler loop
673  * ------------------------------------------------------------------------- */
674
675 static void
676 schedulePreLoop(void)
677 {
678   // initialisation for scheduler - what cannot go into initScheduler()  
679 }
680
681 /* -----------------------------------------------------------------------------
682  * schedulePushWork()
683  *
684  * Push work to other Capabilities if we have some.
685  * -------------------------------------------------------------------------- */
686
687 #if defined(THREADED_RTS)
688 static void
689 schedulePushWork(Capability *cap USED_IF_THREADS, 
690                  Task *task      USED_IF_THREADS)
691 {
692     Capability *free_caps[n_capabilities], *cap0;
693     nat i, n_free_caps;
694
695     // migration can be turned off with +RTS -qg
696     if (!RtsFlags.ParFlags.migrate) return;
697
698     // Check whether we have more threads on our run queue, or sparks
699     // in our pool, that we could hand to another Capability.
700     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
701         && sparkPoolSizeCap(cap) < 2) {
702         return;
703     }
704
705     // First grab as many free Capabilities as we can.
706     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
707         cap0 = &capabilities[i];
708         if (cap != cap0 && tryGrabCapability(cap0,task)) {
709             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
710                 // it already has some work, we just grabbed it at 
711                 // the wrong moment.  Or maybe it's deadlocked!
712                 releaseCapability(cap0);
713             } else {
714                 free_caps[n_free_caps++] = cap0;
715             }
716         }
717     }
718
719     // we now have n_free_caps free capabilities stashed in
720     // free_caps[].  Share our run queue equally with them.  This is
721     // probably the simplest thing we could do; improvements we might
722     // want to do include:
723     //
724     //   - giving high priority to moving relatively new threads, on 
725     //     the gournds that they haven't had time to build up a
726     //     working set in the cache on this CPU/Capability.
727     //
728     //   - giving low priority to moving long-lived threads
729
730     if (n_free_caps > 0) {
731         StgTSO *prev, *t, *next;
732         rtsBool pushed_to_all;
733
734         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
735
736         i = 0;
737         pushed_to_all = rtsFalse;
738
739         if (cap->run_queue_hd != END_TSO_QUEUE) {
740             prev = cap->run_queue_hd;
741             t = prev->_link;
742             prev->_link = END_TSO_QUEUE;
743             for (; t != END_TSO_QUEUE; t = next) {
744                 next = t->_link;
745                 t->_link = END_TSO_QUEUE;
746                 if (t->what_next == ThreadRelocated
747                     || t->bound == task // don't move my bound thread
748                     || tsoLocked(t)) {  // don't move a locked thread
749                     setTSOLink(cap, prev, t);
750                     prev = t;
751                 } else if (i == n_free_caps) {
752                     pushed_to_all = rtsTrue;
753                     i = 0;
754                     // keep one for us
755                     setTSOLink(cap, prev, t);
756                     prev = t;
757                 } else {
758                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
759                     appendToRunQueue(free_caps[i],t);
760                     if (t->bound) { t->bound->cap = free_caps[i]; }
761                     t->cap = free_caps[i];
762                     i++;
763                 }
764             }
765             cap->run_queue_tl = prev;
766         }
767
768         // If there are some free capabilities that we didn't push any
769         // threads to, then try to push a spark to each one.
770         if (!pushed_to_all) {
771             StgClosure *spark;
772             // i is the next free capability to push to
773             for (; i < n_free_caps; i++) {
774                 if (emptySparkPoolCap(free_caps[i])) {
775                     spark = findSpark(cap);
776                     if (spark != NULL) {
777                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
778                         newSpark(&(free_caps[i]->r), spark);
779                     }
780                 }
781             }
782         }
783
784         // release the capabilities
785         for (i = 0; i < n_free_caps; i++) {
786             task->cap = free_caps[i];
787             releaseCapability(free_caps[i]);
788         }
789     }
790     task->cap = cap; // reset to point to our Capability.
791 }
792 #endif
793
794 /* ----------------------------------------------------------------------------
795  * Start any pending signal handlers
796  * ------------------------------------------------------------------------- */
797
798 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
799 static void
800 scheduleStartSignalHandlers(Capability *cap)
801 {
802     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
803         // safe outside the lock
804         startSignalHandlers(cap);
805     }
806 }
807 #else
808 static void
809 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
810 {
811 }
812 #endif
813
814 /* ----------------------------------------------------------------------------
815  * Check for blocked threads that can be woken up.
816  * ------------------------------------------------------------------------- */
817
818 static void
819 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
820 {
821 #if !defined(THREADED_RTS)
822     //
823     // Check whether any waiting threads need to be woken up.  If the
824     // run queue is empty, and there are no other tasks running, we
825     // can wait indefinitely for something to happen.
826     //
827     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
828     {
829         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
830     }
831 #endif
832 }
833
834
835 /* ----------------------------------------------------------------------------
836  * Check for threads woken up by other Capabilities
837  * ------------------------------------------------------------------------- */
838
839 static void
840 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
841 {
842 #if defined(THREADED_RTS)
843     // Any threads that were woken up by other Capabilities get
844     // appended to our run queue.
845     if (!emptyWakeupQueue(cap)) {
846         ACQUIRE_LOCK(&cap->lock);
847         if (emptyRunQueue(cap)) {
848             cap->run_queue_hd = cap->wakeup_queue_hd;
849             cap->run_queue_tl = cap->wakeup_queue_tl;
850         } else {
851             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
852             cap->run_queue_tl = cap->wakeup_queue_tl;
853         }
854         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
855         RELEASE_LOCK(&cap->lock);
856     }
857 #endif
858 }
859
860 /* ----------------------------------------------------------------------------
861  * Check for threads blocked on BLACKHOLEs that can be woken up
862  * ------------------------------------------------------------------------- */
863 static void
864 scheduleCheckBlackHoles (Capability *cap)
865 {
866     if ( blackholes_need_checking ) // check without the lock first
867     {
868         ACQUIRE_LOCK(&sched_mutex);
869         if ( blackholes_need_checking ) {
870             checkBlackHoles(cap);
871             blackholes_need_checking = rtsFalse;
872         }
873         RELEASE_LOCK(&sched_mutex);
874     }
875 }
876
877 /* ----------------------------------------------------------------------------
878  * Detect deadlock conditions and attempt to resolve them.
879  * ------------------------------------------------------------------------- */
880
881 static void
882 scheduleDetectDeadlock (Capability *cap, Task *task)
883 {
884
885 #if defined(PARALLEL_HASKELL)
886     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
887     return;
888 #endif
889
890     /* 
891      * Detect deadlock: when we have no threads to run, there are no
892      * threads blocked, waiting for I/O, or sleeping, and all the
893      * other tasks are waiting for work, we must have a deadlock of
894      * some description.
895      */
896     if ( emptyThreadQueues(cap) )
897     {
898 #if defined(THREADED_RTS)
899         /* 
900          * In the threaded RTS, we only check for deadlock if there
901          * has been no activity in a complete timeslice.  This means
902          * we won't eagerly start a full GC just because we don't have
903          * any threads to run currently.
904          */
905         if (recent_activity != ACTIVITY_INACTIVE) return;
906 #endif
907
908         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
909
910         // Garbage collection can release some new threads due to
911         // either (a) finalizers or (b) threads resurrected because
912         // they are unreachable and will therefore be sent an
913         // exception.  Any threads thus released will be immediately
914         // runnable.
915         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
916
917         recent_activity = ACTIVITY_DONE_GC;
918         // disable timer signals (see #1623)
919         stopTimer();
920         
921         if ( !emptyRunQueue(cap) ) return;
922
923 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
924         /* If we have user-installed signal handlers, then wait
925          * for signals to arrive rather then bombing out with a
926          * deadlock.
927          */
928         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
929             debugTrace(DEBUG_sched,
930                        "still deadlocked, waiting for signals...");
931
932             awaitUserSignals();
933
934             if (signals_pending()) {
935                 startSignalHandlers(cap);
936             }
937
938             // either we have threads to run, or we were interrupted:
939             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
940
941             return;
942         }
943 #endif
944
945 #if !defined(THREADED_RTS)
946         /* Probably a real deadlock.  Send the current main thread the
947          * Deadlock exception.
948          */
949         if (task->tso) {
950             switch (task->tso->why_blocked) {
951             case BlockedOnSTM:
952             case BlockedOnBlackHole:
953             case BlockedOnException:
954             case BlockedOnMVar:
955                 throwToSingleThreaded(cap, task->tso, 
956                                       (StgClosure *)nonTermination_closure);
957                 return;
958             default:
959                 barf("deadlock: main thread blocked in a strange way");
960             }
961         }
962         return;
963 #endif
964     }
965 }
966
967
968 /* ----------------------------------------------------------------------------
969  * Send pending messages (PARALLEL_HASKELL only)
970  * ------------------------------------------------------------------------- */
971
972 static StgTSO *
973 scheduleSendPendingMessages(void)
974 {
975 #if defined(PARALLEL_HASKELL)
976
977 # if defined(PAR) // global Mem.Mgmt., omit for now
978     if (PendingFetches != END_BF_QUEUE) {
979         processFetches();
980     }
981 # endif
982     
983     if (RtsFlags.ParFlags.BufferTime) {
984         // if we use message buffering, we must send away all message
985         // packets which have become too old...
986         sendOldBuffers(); 
987     }
988 #endif
989 }
990
991 /* ----------------------------------------------------------------------------
992  * Activate spark threads (PARALLEL_HASKELL only)
993  * ------------------------------------------------------------------------- */
994
995 #if defined(PARALLEL_HASKELL)
996 static void
997 scheduleActivateSpark(Capability *cap)
998 {
999     StgClosure *spark;
1000
1001 /* We only want to stay here if the run queue is empty and we want some
1002    work. We try to turn a spark into a thread, and add it to the run
1003    queue, from where it will be picked up in the next iteration of the
1004    scheduler loop.  
1005 */
1006     if (!emptyRunQueue(cap)) 
1007       /* In the threaded RTS, another task might have pushed a thread
1008          on our run queue in the meantime ? But would need a lock.. */
1009       return;
1010
1011     spark = findSpark(cap); // defined in Sparks.c
1012
1013     if (spark != NULL) {
1014       debugTrace(DEBUG_sched,
1015                  "turning spark of closure %p into a thread",
1016                  (StgClosure *)spark);
1017       createSparkThread(cap,spark); // defined in Sparks.c
1018     }
1019 }
1020 #endif // PARALLEL_HASKELL
1021
1022 /* ----------------------------------------------------------------------------
1023  * Get work from a remote node (PARALLEL_HASKELL only)
1024  * ------------------------------------------------------------------------- */
1025     
1026 #if defined(PARALLEL_HASKELL)
1027 static rtsBool
1028 scheduleGetRemoteWork(Capability *cap)
1029 {
1030 #if defined(PARALLEL_HASKELL)
1031   rtsBool receivedFinish = rtsFalse;
1032
1033   // idle() , i.e. send all buffers, wait for work
1034   if (RtsFlags.ParFlags.BufferTime) {
1035         IF_PAR_DEBUG(verbose, 
1036                 debugBelch("...send all pending data,"));
1037         {
1038           nat i;
1039           for (i=1; i<=nPEs; i++)
1040             sendImmediately(i); // send all messages away immediately
1041         }
1042   }
1043
1044   /* this would be the place for fishing in GUM... 
1045
1046      if (no-earlier-fish-around) 
1047           sendFish(choosePe());
1048    */
1049
1050   // Eden:just look for incoming messages (blocking receive)
1051   IF_PAR_DEBUG(verbose, 
1052                debugBelch("...wait for incoming messages...\n"));
1053   processMessages(cap, &receivedFinish); // blocking receive...
1054
1055
1056   return receivedFinish;
1057   // reenter scheduling look after having received something
1058
1059 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1060
1061   return rtsFalse; /* return value unused in THREADED_RTS */
1062
1063 #endif /* PARALLEL_HASKELL */
1064 }
1065 #endif // PARALLEL_HASKELL
1066
1067 /* ----------------------------------------------------------------------------
1068  * After running a thread...
1069  * ------------------------------------------------------------------------- */
1070
1071 static void
1072 schedulePostRunThread (StgTSO *t)
1073 {
1074     // We have to be able to catch transactions that are in an
1075     // infinite loop as a result of seeing an inconsistent view of
1076     // memory, e.g. 
1077     //
1078     //   atomically $ do
1079     //       [a,b] <- mapM readTVar [ta,tb]
1080     //       when (a == b) loop
1081     //
1082     // and a is never equal to b given a consistent view of memory.
1083     //
1084     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1085         if (!stmValidateNestOfTransactions (t -> trec)) {
1086             debugTrace(DEBUG_sched | DEBUG_stm,
1087                        "trec %p found wasting its time", t);
1088             
1089             // strip the stack back to the
1090             // ATOMICALLY_FRAME, aborting the (nested)
1091             // transaction, and saving the stack of any
1092             // partially-evaluated thunks on the heap.
1093             throwToSingleThreaded_(&capabilities[0], t, 
1094                                    NULL, rtsTrue, NULL);
1095             
1096             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1097         }
1098     }
1099
1100   /* some statistics gathering in the parallel case */
1101 }
1102
1103 /* -----------------------------------------------------------------------------
1104  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1105  * -------------------------------------------------------------------------- */
1106
1107 static rtsBool
1108 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1109 {
1110     // did the task ask for a large block?
1111     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1112         // if so, get one and push it on the front of the nursery.
1113         bdescr *bd;
1114         lnat blocks;
1115         
1116         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1117         
1118         debugTrace(DEBUG_sched,
1119                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1120                    (long)t->id, whatNext_strs[t->what_next], blocks);
1121     
1122         // don't do this if the nursery is (nearly) full, we'll GC first.
1123         if (cap->r.rCurrentNursery->link != NULL ||
1124             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1125                                                // if the nursery has only one block.
1126             
1127             ACQUIRE_SM_LOCK
1128             bd = allocGroup( blocks );
1129             RELEASE_SM_LOCK
1130             cap->r.rNursery->n_blocks += blocks;
1131             
1132             // link the new group into the list
1133             bd->link = cap->r.rCurrentNursery;
1134             bd->u.back = cap->r.rCurrentNursery->u.back;
1135             if (cap->r.rCurrentNursery->u.back != NULL) {
1136                 cap->r.rCurrentNursery->u.back->link = bd;
1137             } else {
1138 #if !defined(THREADED_RTS)
1139                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1140                        g0s0 == cap->r.rNursery);
1141 #endif
1142                 cap->r.rNursery->blocks = bd;
1143             }             
1144             cap->r.rCurrentNursery->u.back = bd;
1145             
1146             // initialise it as a nursery block.  We initialise the
1147             // step, gen_no, and flags field of *every* sub-block in
1148             // this large block, because this is easier than making
1149             // sure that we always find the block head of a large
1150             // block whenever we call Bdescr() (eg. evacuate() and
1151             // isAlive() in the GC would both have to do this, at
1152             // least).
1153             { 
1154                 bdescr *x;
1155                 for (x = bd; x < bd + blocks; x++) {
1156                     x->step = cap->r.rNursery;
1157                     x->gen_no = 0;
1158                     x->flags = 0;
1159                 }
1160             }
1161             
1162             // This assert can be a killer if the app is doing lots
1163             // of large block allocations.
1164             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1165             
1166             // now update the nursery to point to the new block
1167             cap->r.rCurrentNursery = bd;
1168             
1169             // we might be unlucky and have another thread get on the
1170             // run queue before us and steal the large block, but in that
1171             // case the thread will just end up requesting another large
1172             // block.
1173             pushOnRunQueue(cap,t);
1174             return rtsFalse;  /* not actually GC'ing */
1175         }
1176     }
1177     
1178     debugTrace(DEBUG_sched,
1179                "--<< thread %ld (%s) stopped: HeapOverflow",
1180                (long)t->id, whatNext_strs[t->what_next]);
1181
1182     if (context_switch) {
1183         // Sometimes we miss a context switch, e.g. when calling
1184         // primitives in a tight loop, MAYBE_GC() doesn't check the
1185         // context switch flag, and we end up waiting for a GC.
1186         // See #1984, and concurrent/should_run/1984
1187         context_switch = 0;
1188         addToRunQueue(cap,t);
1189     } else {
1190         pushOnRunQueue(cap,t);
1191     }
1192     return rtsTrue;
1193     /* actual GC is done at the end of the while loop in schedule() */
1194 }
1195
1196 /* -----------------------------------------------------------------------------
1197  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1198  * -------------------------------------------------------------------------- */
1199
1200 static void
1201 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1202 {
1203     debugTrace (DEBUG_sched,
1204                 "--<< thread %ld (%s) stopped, StackOverflow", 
1205                 (long)t->id, whatNext_strs[t->what_next]);
1206
1207     /* just adjust the stack for this thread, then pop it back
1208      * on the run queue.
1209      */
1210     { 
1211         /* enlarge the stack */
1212         StgTSO *new_t = threadStackOverflow(cap, t);
1213         
1214         /* The TSO attached to this Task may have moved, so update the
1215          * pointer to it.
1216          */
1217         if (task->tso == t) {
1218             task->tso = new_t;
1219         }
1220         pushOnRunQueue(cap,new_t);
1221     }
1222 }
1223
1224 /* -----------------------------------------------------------------------------
1225  * Handle a thread that returned to the scheduler with ThreadYielding
1226  * -------------------------------------------------------------------------- */
1227
1228 static rtsBool
1229 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1230 {
1231     // Reset the context switch flag.  We don't do this just before
1232     // running the thread, because that would mean we would lose ticks
1233     // during GC, which can lead to unfair scheduling (a thread hogs
1234     // the CPU because the tick always arrives during GC).  This way
1235     // penalises threads that do a lot of allocation, but that seems
1236     // better than the alternative.
1237     context_switch = 0;
1238     
1239     /* put the thread back on the run queue.  Then, if we're ready to
1240      * GC, check whether this is the last task to stop.  If so, wake
1241      * up the GC thread.  getThread will block during a GC until the
1242      * GC is finished.
1243      */
1244 #ifdef DEBUG
1245     if (t->what_next != prev_what_next) {
1246         debugTrace(DEBUG_sched,
1247                    "--<< thread %ld (%s) stopped to switch evaluators", 
1248                    (long)t->id, whatNext_strs[t->what_next]);
1249     } else {
1250         debugTrace(DEBUG_sched,
1251                    "--<< thread %ld (%s) stopped, yielding",
1252                    (long)t->id, whatNext_strs[t->what_next]);
1253     }
1254 #endif
1255     
1256     IF_DEBUG(sanity,
1257              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1258              checkTSO(t));
1259     ASSERT(t->_link == END_TSO_QUEUE);
1260     
1261     // Shortcut if we're just switching evaluators: don't bother
1262     // doing stack squeezing (which can be expensive), just run the
1263     // thread.
1264     if (t->what_next != prev_what_next) {
1265         return rtsTrue;
1266     }
1267
1268     addToRunQueue(cap,t);
1269
1270     return rtsFalse;
1271 }
1272
1273 /* -----------------------------------------------------------------------------
1274  * Handle a thread that returned to the scheduler with ThreadBlocked
1275  * -------------------------------------------------------------------------- */
1276
1277 static void
1278 scheduleHandleThreadBlocked( StgTSO *t
1279 #if !defined(GRAN) && !defined(DEBUG)
1280     STG_UNUSED
1281 #endif
1282     )
1283 {
1284
1285       // We don't need to do anything.  The thread is blocked, and it
1286       // has tidied up its stack and placed itself on whatever queue
1287       // it needs to be on.
1288
1289     // ASSERT(t->why_blocked != NotBlocked);
1290     // Not true: for example,
1291     //    - in THREADED_RTS, the thread may already have been woken
1292     //      up by another Capability.  This actually happens: try
1293     //      conc023 +RTS -N2.
1294     //    - the thread may have woken itself up already, because
1295     //      threadPaused() might have raised a blocked throwTo
1296     //      exception, see maybePerformBlockedException().
1297
1298 #ifdef DEBUG
1299     if (traceClass(DEBUG_sched)) {
1300         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1301                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1302         printThreadBlockage(t);
1303         debugTraceEnd();
1304     }
1305 #endif
1306 }
1307
1308 /* -----------------------------------------------------------------------------
1309  * Handle a thread that returned to the scheduler with ThreadFinished
1310  * -------------------------------------------------------------------------- */
1311
1312 static rtsBool
1313 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1314 {
1315     /* Need to check whether this was a main thread, and if so,
1316      * return with the return value.
1317      *
1318      * We also end up here if the thread kills itself with an
1319      * uncaught exception, see Exception.cmm.
1320      */
1321     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1322                (unsigned long)t->id, whatNext_strs[t->what_next]);
1323
1324       //
1325       // Check whether the thread that just completed was a bound
1326       // thread, and if so return with the result.  
1327       //
1328       // There is an assumption here that all thread completion goes
1329       // through this point; we need to make sure that if a thread
1330       // ends up in the ThreadKilled state, that it stays on the run
1331       // queue so it can be dealt with here.
1332       //
1333
1334       if (t->bound) {
1335
1336           if (t->bound != task) {
1337 #if !defined(THREADED_RTS)
1338               // Must be a bound thread that is not the topmost one.  Leave
1339               // it on the run queue until the stack has unwound to the
1340               // point where we can deal with this.  Leaving it on the run
1341               // queue also ensures that the garbage collector knows about
1342               // this thread and its return value (it gets dropped from the
1343               // step->threads list so there's no other way to find it).
1344               appendToRunQueue(cap,t);
1345               return rtsFalse;
1346 #else
1347               // this cannot happen in the threaded RTS, because a
1348               // bound thread can only be run by the appropriate Task.
1349               barf("finished bound thread that isn't mine");
1350 #endif
1351           }
1352
1353           ASSERT(task->tso == t);
1354
1355           if (t->what_next == ThreadComplete) {
1356               if (task->ret) {
1357                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1358                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1359               }
1360               task->stat = Success;
1361           } else {
1362               if (task->ret) {
1363                   *(task->ret) = NULL;
1364               }
1365               if (sched_state >= SCHED_INTERRUPTING) {
1366                   task->stat = Interrupted;
1367               } else {
1368                   task->stat = Killed;
1369               }
1370           }
1371 #ifdef DEBUG
1372           removeThreadLabel((StgWord)task->tso->id);
1373 #endif
1374           return rtsTrue; // tells schedule() to return
1375       }
1376
1377       return rtsFalse;
1378 }
1379
1380 /* -----------------------------------------------------------------------------
1381  * Perform a heap census
1382  * -------------------------------------------------------------------------- */
1383
1384 static rtsBool
1385 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1386 {
1387     // When we have +RTS -i0 and we're heap profiling, do a census at
1388     // every GC.  This lets us get repeatable runs for debugging.
1389     if (performHeapProfile ||
1390         (RtsFlags.ProfFlags.profileInterval==0 &&
1391          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1392         return rtsTrue;
1393     } else {
1394         return rtsFalse;
1395     }
1396 }
1397
1398 /* -----------------------------------------------------------------------------
1399  * Perform a garbage collection if necessary
1400  * -------------------------------------------------------------------------- */
1401
1402 static Capability *
1403 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1404 {
1405     StgTSO *t;
1406     rtsBool heap_census;
1407 #ifdef THREADED_RTS
1408     /* extern static volatile StgWord waiting_for_gc; 
1409        lives inside capability.c */
1410     rtsBool was_waiting;
1411     nat i;
1412 #endif
1413
1414 #ifdef THREADED_RTS
1415     // In order to GC, there must be no threads running Haskell code.
1416     // Therefore, the GC thread needs to hold *all* the capabilities,
1417     // and release them after the GC has completed.  
1418     //
1419     // This seems to be the simplest way: previous attempts involved
1420     // making all the threads with capabilities give up their
1421     // capabilities and sleep except for the *last* one, which
1422     // actually did the GC.  But it's quite hard to arrange for all
1423     // the other tasks to sleep and stay asleep.
1424     //
1425         
1426     /*  Other capabilities are prevented from running yet more Haskell
1427         threads if waiting_for_gc is set. Tested inside
1428         yieldCapability() and releaseCapability() in Capability.c */
1429
1430     was_waiting = cas(&waiting_for_gc, 0, 1);
1431     if (was_waiting) {
1432         do {
1433             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1434             if (cap) yieldCapability(&cap,task);
1435         } while (waiting_for_gc);
1436         return cap;  // NOTE: task->cap might have changed here
1437     }
1438
1439     for (i=0; i < n_capabilities; i++) {
1440         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1441         if (cap != &capabilities[i]) {
1442             Capability *pcap = &capabilities[i];
1443             // we better hope this task doesn't get migrated to
1444             // another Capability while we're waiting for this one.
1445             // It won't, because load balancing happens while we have
1446             // all the Capabilities, but even so it's a slightly
1447             // unsavoury invariant.
1448             task->cap = pcap;
1449             context_switch = 1;
1450             waitForReturnCapability(&pcap, task);
1451             if (pcap != &capabilities[i]) {
1452                 barf("scheduleDoGC: got the wrong capability");
1453             }
1454         }
1455     }
1456
1457     waiting_for_gc = rtsFalse;
1458 #endif
1459
1460     // so this happens periodically:
1461     if (cap) scheduleCheckBlackHoles(cap);
1462     
1463     IF_DEBUG(scheduler, printAllThreads());
1464
1465     /*
1466      * We now have all the capabilities; if we're in an interrupting
1467      * state, then we should take the opportunity to delete all the
1468      * threads in the system.
1469      */
1470     if (sched_state >= SCHED_INTERRUPTING) {
1471         deleteAllThreads(&capabilities[0]);
1472         sched_state = SCHED_SHUTTING_DOWN;
1473     }
1474     
1475     heap_census = scheduleNeedHeapProfile(rtsTrue);
1476
1477     /* everybody back, start the GC.
1478      * Could do it in this thread, or signal a condition var
1479      * to do it in another thread.  Either way, we need to
1480      * broadcast on gc_pending_cond afterward.
1481      */
1482 #if defined(THREADED_RTS)
1483     debugTrace(DEBUG_sched, "doing GC");
1484 #endif
1485     GarbageCollect(force_major || heap_census);
1486     
1487     if (heap_census) {
1488         debugTrace(DEBUG_sched, "performing heap census");
1489         heapCensus();
1490         performHeapProfile = rtsFalse;
1491     }
1492
1493 #if defined(THREADED_RTS)
1494     // release our stash of capabilities.
1495     for (i = 0; i < n_capabilities; i++) {
1496         if (cap != &capabilities[i]) {
1497             task->cap = &capabilities[i];
1498             releaseCapability(&capabilities[i]);
1499         }
1500     }
1501     if (cap) {
1502         task->cap = cap;
1503     } else {
1504         task->cap = NULL;
1505     }
1506 #endif
1507
1508     return cap;
1509 }
1510
1511 /* ---------------------------------------------------------------------------
1512  * Singleton fork(). Do not copy any running threads.
1513  * ------------------------------------------------------------------------- */
1514
1515 pid_t
1516 forkProcess(HsStablePtr *entry
1517 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1518             STG_UNUSED
1519 #endif
1520            )
1521 {
1522 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1523     Task *task;
1524     pid_t pid;
1525     StgTSO* t,*next;
1526     Capability *cap;
1527     nat s;
1528     
1529 #if defined(THREADED_RTS)
1530     if (RtsFlags.ParFlags.nNodes > 1) {
1531         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1532         stg_exit(EXIT_FAILURE);
1533     }
1534 #endif
1535
1536     debugTrace(DEBUG_sched, "forking!");
1537     
1538     // ToDo: for SMP, we should probably acquire *all* the capabilities
1539     cap = rts_lock();
1540     
1541     // no funny business: hold locks while we fork, otherwise if some
1542     // other thread is holding a lock when the fork happens, the data
1543     // structure protected by the lock will forever be in an
1544     // inconsistent state in the child.  See also #1391.
1545     ACQUIRE_LOCK(&sched_mutex);
1546     ACQUIRE_LOCK(&cap->lock);
1547     ACQUIRE_LOCK(&cap->running_task->lock);
1548
1549     pid = fork();
1550     
1551     if (pid) { // parent
1552         
1553         RELEASE_LOCK(&sched_mutex);
1554         RELEASE_LOCK(&cap->lock);
1555         RELEASE_LOCK(&cap->running_task->lock);
1556
1557         // just return the pid
1558         rts_unlock(cap);
1559         return pid;
1560         
1561     } else { // child
1562         
1563 #if defined(THREADED_RTS)
1564         initMutex(&sched_mutex);
1565         initMutex(&cap->lock);
1566         initMutex(&cap->running_task->lock);
1567 #endif
1568
1569         // Now, all OS threads except the thread that forked are
1570         // stopped.  We need to stop all Haskell threads, including
1571         // those involved in foreign calls.  Also we need to delete
1572         // all Tasks, because they correspond to OS threads that are
1573         // now gone.
1574
1575         for (s = 0; s < total_steps; s++) {
1576           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1577             if (t->what_next == ThreadRelocated) {
1578                 next = t->_link;
1579             } else {
1580                 next = t->global_link;
1581                 // don't allow threads to catch the ThreadKilled
1582                 // exception, but we do want to raiseAsync() because these
1583                 // threads may be evaluating thunks that we need later.
1584                 deleteThread_(cap,t);
1585             }
1586           }
1587         }
1588         
1589         // Empty the run queue.  It seems tempting to let all the
1590         // killed threads stay on the run queue as zombies to be
1591         // cleaned up later, but some of them correspond to bound
1592         // threads for which the corresponding Task does not exist.
1593         cap->run_queue_hd = END_TSO_QUEUE;
1594         cap->run_queue_tl = END_TSO_QUEUE;
1595
1596         // Any suspended C-calling Tasks are no more, their OS threads
1597         // don't exist now:
1598         cap->suspended_ccalling_tasks = NULL;
1599
1600         // Empty the threads lists.  Otherwise, the garbage
1601         // collector may attempt to resurrect some of these threads.
1602         for (s = 0; s < total_steps; s++) {
1603             all_steps[s].threads = END_TSO_QUEUE;
1604         }
1605
1606         // Wipe the task list, except the current Task.
1607         ACQUIRE_LOCK(&sched_mutex);
1608         for (task = all_tasks; task != NULL; task=task->all_link) {
1609             if (task != cap->running_task) {
1610 #if defined(THREADED_RTS)
1611                 initMutex(&task->lock); // see #1391
1612 #endif
1613                 discardTask(task);
1614             }
1615         }
1616         RELEASE_LOCK(&sched_mutex);
1617
1618 #if defined(THREADED_RTS)
1619         // Wipe our spare workers list, they no longer exist.  New
1620         // workers will be created if necessary.
1621         cap->spare_workers = NULL;
1622         cap->returning_tasks_hd = NULL;
1623         cap->returning_tasks_tl = NULL;
1624 #endif
1625
1626         // On Unix, all timers are reset in the child, so we need to start
1627         // the timer again.
1628         initTimer();
1629         startTimer();
1630
1631         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1632         rts_checkSchedStatus("forkProcess",cap);
1633         
1634         rts_unlock(cap);
1635         hs_exit();                      // clean up and exit
1636         stg_exit(EXIT_SUCCESS);
1637     }
1638 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1639     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1640     return -1;
1641 #endif
1642 }
1643
1644 /* ---------------------------------------------------------------------------
1645  * Delete all the threads in the system
1646  * ------------------------------------------------------------------------- */
1647    
1648 static void
1649 deleteAllThreads ( Capability *cap )
1650 {
1651     // NOTE: only safe to call if we own all capabilities.
1652
1653     StgTSO* t, *next;
1654     nat s;
1655
1656     debugTrace(DEBUG_sched,"deleting all threads");
1657     for (s = 0; s < total_steps; s++) {
1658       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1659         if (t->what_next == ThreadRelocated) {
1660             next = t->_link;
1661         } else {
1662             next = t->global_link;
1663             deleteThread(cap,t);
1664         }
1665       }
1666     }      
1667
1668     // The run queue now contains a bunch of ThreadKilled threads.  We
1669     // must not throw these away: the main thread(s) will be in there
1670     // somewhere, and the main scheduler loop has to deal with it.
1671     // Also, the run queue is the only thing keeping these threads from
1672     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1673
1674 #if !defined(THREADED_RTS)
1675     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1676     ASSERT(sleeping_queue == END_TSO_QUEUE);
1677 #endif
1678 }
1679
1680 /* -----------------------------------------------------------------------------
1681    Managing the suspended_ccalling_tasks list.
1682    Locks required: sched_mutex
1683    -------------------------------------------------------------------------- */
1684
1685 STATIC_INLINE void
1686 suspendTask (Capability *cap, Task *task)
1687 {
1688     ASSERT(task->next == NULL && task->prev == NULL);
1689     task->next = cap->suspended_ccalling_tasks;
1690     task->prev = NULL;
1691     if (cap->suspended_ccalling_tasks) {
1692         cap->suspended_ccalling_tasks->prev = task;
1693     }
1694     cap->suspended_ccalling_tasks = task;
1695 }
1696
1697 STATIC_INLINE void
1698 recoverSuspendedTask (Capability *cap, Task *task)
1699 {
1700     if (task->prev) {
1701         task->prev->next = task->next;
1702     } else {
1703         ASSERT(cap->suspended_ccalling_tasks == task);
1704         cap->suspended_ccalling_tasks = task->next;
1705     }
1706     if (task->next) {
1707         task->next->prev = task->prev;
1708     }
1709     task->next = task->prev = NULL;
1710 }
1711
1712 /* ---------------------------------------------------------------------------
1713  * Suspending & resuming Haskell threads.
1714  * 
1715  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1716  * its capability before calling the C function.  This allows another
1717  * task to pick up the capability and carry on running Haskell
1718  * threads.  It also means that if the C call blocks, it won't lock
1719  * the whole system.
1720  *
1721  * The Haskell thread making the C call is put to sleep for the
1722  * duration of the call, on the susepended_ccalling_threads queue.  We
1723  * give out a token to the task, which it can use to resume the thread
1724  * on return from the C function.
1725  * ------------------------------------------------------------------------- */
1726    
1727 void *
1728 suspendThread (StgRegTable *reg)
1729 {
1730   Capability *cap;
1731   int saved_errno;
1732   StgTSO *tso;
1733   Task *task;
1734 #if mingw32_HOST_OS
1735   StgWord32 saved_winerror;
1736 #endif
1737
1738   saved_errno = errno;
1739 #if mingw32_HOST_OS
1740   saved_winerror = GetLastError();
1741 #endif
1742
1743   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1744    */
1745   cap = regTableToCapability(reg);
1746
1747   task = cap->running_task;
1748   tso = cap->r.rCurrentTSO;
1749
1750   debugTrace(DEBUG_sched, 
1751              "thread %lu did a safe foreign call", 
1752              (unsigned long)cap->r.rCurrentTSO->id);
1753
1754   // XXX this might not be necessary --SDM
1755   tso->what_next = ThreadRunGHC;
1756
1757   threadPaused(cap,tso);
1758
1759   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1760       tso->why_blocked = BlockedOnCCall;
1761       tso->flags |= TSO_BLOCKEX;
1762       tso->flags &= ~TSO_INTERRUPTIBLE;
1763   } else {
1764       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1765   }
1766
1767   // Hand back capability
1768   task->suspended_tso = tso;
1769
1770   ACQUIRE_LOCK(&cap->lock);
1771
1772   suspendTask(cap,task);
1773   cap->in_haskell = rtsFalse;
1774   releaseCapability_(cap);
1775   
1776   RELEASE_LOCK(&cap->lock);
1777
1778 #if defined(THREADED_RTS)
1779   /* Preparing to leave the RTS, so ensure there's a native thread/task
1780      waiting to take over.
1781   */
1782   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1783 #endif
1784
1785   errno = saved_errno;
1786 #if mingw32_HOST_OS
1787   SetLastError(saved_winerror);
1788 #endif
1789   return task;
1790 }
1791
1792 StgRegTable *
1793 resumeThread (void *task_)
1794 {
1795     StgTSO *tso;
1796     Capability *cap;
1797     Task *task = task_;
1798     int saved_errno;
1799 #if mingw32_HOST_OS
1800     StgWord32 saved_winerror;
1801 #endif
1802
1803     saved_errno = errno;
1804 #if mingw32_HOST_OS
1805     saved_winerror = GetLastError();
1806 #endif
1807
1808     cap = task->cap;
1809     // Wait for permission to re-enter the RTS with the result.
1810     waitForReturnCapability(&cap,task);
1811     // we might be on a different capability now... but if so, our
1812     // entry on the suspended_ccalling_tasks list will also have been
1813     // migrated.
1814
1815     // Remove the thread from the suspended list
1816     recoverSuspendedTask(cap,task);
1817
1818     tso = task->suspended_tso;
1819     task->suspended_tso = NULL;
1820     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1821     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1822     
1823     if (tso->why_blocked == BlockedOnCCall) {
1824         awakenBlockedExceptionQueue(cap,tso);
1825         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1826     }
1827     
1828     /* Reset blocking status */
1829     tso->why_blocked  = NotBlocked;
1830     
1831     cap->r.rCurrentTSO = tso;
1832     cap->in_haskell = rtsTrue;
1833     errno = saved_errno;
1834 #if mingw32_HOST_OS
1835     SetLastError(saved_winerror);
1836 #endif
1837
1838     /* We might have GC'd, mark the TSO dirty again */
1839     dirty_TSO(cap,tso);
1840
1841     IF_DEBUG(sanity, checkTSO(tso));
1842
1843     return &cap->r;
1844 }
1845
1846 /* ---------------------------------------------------------------------------
1847  * scheduleThread()
1848  *
1849  * scheduleThread puts a thread on the end  of the runnable queue.
1850  * This will usually be done immediately after a thread is created.
1851  * The caller of scheduleThread must create the thread using e.g.
1852  * createThread and push an appropriate closure
1853  * on this thread's stack before the scheduler is invoked.
1854  * ------------------------------------------------------------------------ */
1855
1856 void
1857 scheduleThread(Capability *cap, StgTSO *tso)
1858 {
1859     // The thread goes at the *end* of the run-queue, to avoid possible
1860     // starvation of any threads already on the queue.
1861     appendToRunQueue(cap,tso);
1862 }
1863
1864 void
1865 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1866 {
1867 #if defined(THREADED_RTS)
1868     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1869                               // move this thread from now on.
1870     cpu %= RtsFlags.ParFlags.nNodes;
1871     if (cpu == cap->no) {
1872         appendToRunQueue(cap,tso);
1873     } else {
1874         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1875     }
1876 #else
1877     appendToRunQueue(cap,tso);
1878 #endif
1879 }
1880
1881 Capability *
1882 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1883 {
1884     Task *task;
1885
1886     // We already created/initialised the Task
1887     task = cap->running_task;
1888
1889     // This TSO is now a bound thread; make the Task and TSO
1890     // point to each other.
1891     tso->bound = task;
1892     tso->cap = cap;
1893
1894     task->tso = tso;
1895     task->ret = ret;
1896     task->stat = NoStatus;
1897
1898     appendToRunQueue(cap,tso);
1899
1900     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1901
1902     cap = schedule(cap,task);
1903
1904     ASSERT(task->stat != NoStatus);
1905     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1906
1907     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1908     return cap;
1909 }
1910
1911 /* ----------------------------------------------------------------------------
1912  * Starting Tasks
1913  * ------------------------------------------------------------------------- */
1914
1915 #if defined(THREADED_RTS)
1916 void
1917 workerStart(Task *task)
1918 {
1919     Capability *cap;
1920
1921     // See startWorkerTask().
1922     ACQUIRE_LOCK(&task->lock);
1923     cap = task->cap;
1924     RELEASE_LOCK(&task->lock);
1925
1926     // set the thread-local pointer to the Task:
1927     taskEnter(task);
1928
1929     // schedule() runs without a lock.
1930     cap = schedule(cap,task);
1931
1932     // On exit from schedule(), we have a Capability.
1933     releaseCapability(cap);
1934     workerTaskStop(task);
1935 }
1936 #endif
1937
1938 /* ---------------------------------------------------------------------------
1939  * initScheduler()
1940  *
1941  * Initialise the scheduler.  This resets all the queues - if the
1942  * queues contained any threads, they'll be garbage collected at the
1943  * next pass.
1944  *
1945  * ------------------------------------------------------------------------ */
1946
1947 void 
1948 initScheduler(void)
1949 {
1950 #if !defined(THREADED_RTS)
1951   blocked_queue_hd  = END_TSO_QUEUE;
1952   blocked_queue_tl  = END_TSO_QUEUE;
1953   sleeping_queue    = END_TSO_QUEUE;
1954 #endif
1955
1956   blackhole_queue   = END_TSO_QUEUE;
1957
1958   context_switch = 0;
1959   sched_state    = SCHED_RUNNING;
1960   recent_activity = ACTIVITY_YES;
1961
1962 #if defined(THREADED_RTS)
1963   /* Initialise the mutex and condition variables used by
1964    * the scheduler. */
1965   initMutex(&sched_mutex);
1966 #endif
1967   
1968   ACQUIRE_LOCK(&sched_mutex);
1969
1970   /* A capability holds the state a native thread needs in
1971    * order to execute STG code. At least one capability is
1972    * floating around (only THREADED_RTS builds have more than one).
1973    */
1974   initCapabilities();
1975
1976   initTaskManager();
1977
1978 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
1979   initSparkPools();
1980 #endif
1981
1982 #if defined(THREADED_RTS)
1983   /*
1984    * Eagerly start one worker to run each Capability, except for
1985    * Capability 0.  The idea is that we're probably going to start a
1986    * bound thread on Capability 0 pretty soon, so we don't want a
1987    * worker task hogging it.
1988    */
1989   { 
1990       nat i;
1991       Capability *cap;
1992       for (i = 1; i < n_capabilities; i++) {
1993           cap = &capabilities[i];
1994           ACQUIRE_LOCK(&cap->lock);
1995           startWorkerTask(cap, workerStart);
1996           RELEASE_LOCK(&cap->lock);
1997       }
1998   }
1999 #endif
2000
2001   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2002
2003   RELEASE_LOCK(&sched_mutex);
2004 }
2005
2006 void
2007 exitScheduler(
2008     rtsBool wait_foreign
2009 #if !defined(THREADED_RTS)
2010                          __attribute__((unused))
2011 #endif
2012 )
2013                /* see Capability.c, shutdownCapability() */
2014 {
2015     Task *task = NULL;
2016
2017 #if defined(THREADED_RTS)
2018     ACQUIRE_LOCK(&sched_mutex);
2019     task = newBoundTask();
2020     RELEASE_LOCK(&sched_mutex);
2021 #endif
2022
2023     // If we haven't killed all the threads yet, do it now.
2024     if (sched_state < SCHED_SHUTTING_DOWN) {
2025         sched_state = SCHED_INTERRUPTING;
2026         scheduleDoGC(NULL,task,rtsFalse);    
2027     }
2028     sched_state = SCHED_SHUTTING_DOWN;
2029
2030 #if defined(THREADED_RTS)
2031     { 
2032         nat i;
2033         
2034         for (i = 0; i < n_capabilities; i++) {
2035             shutdownCapability(&capabilities[i], task, wait_foreign);
2036         }
2037         boundTaskExiting(task);
2038         stopTaskManager();
2039     }
2040 #else
2041     freeCapability(&MainCapability);
2042 #endif
2043 }
2044
2045 void
2046 freeScheduler( void )
2047 {
2048     freeTaskManager();
2049     if (n_capabilities != 1) {
2050         stgFree(capabilities);
2051     }
2052 #if defined(THREADED_RTS)
2053     closeMutex(&sched_mutex);
2054 #endif
2055 }
2056
2057 /* -----------------------------------------------------------------------------
2058    performGC
2059
2060    This is the interface to the garbage collector from Haskell land.
2061    We provide this so that external C code can allocate and garbage
2062    collect when called from Haskell via _ccall_GC.
2063    -------------------------------------------------------------------------- */
2064
2065 static void
2066 performGC_(rtsBool force_major)
2067 {
2068     Task *task;
2069     // We must grab a new Task here, because the existing Task may be
2070     // associated with a particular Capability, and chained onto the 
2071     // suspended_ccalling_tasks queue.
2072     ACQUIRE_LOCK(&sched_mutex);
2073     task = newBoundTask();
2074     RELEASE_LOCK(&sched_mutex);
2075     scheduleDoGC(NULL,task,force_major);
2076     boundTaskExiting(task);
2077 }
2078
2079 void
2080 performGC(void)
2081 {
2082     performGC_(rtsFalse);
2083 }
2084
2085 void
2086 performMajorGC(void)
2087 {
2088     performGC_(rtsTrue);
2089 }
2090
2091 /* -----------------------------------------------------------------------------
2092    Stack overflow
2093
2094    If the thread has reached its maximum stack size, then raise the
2095    StackOverflow exception in the offending thread.  Otherwise
2096    relocate the TSO into a larger chunk of memory and adjust its stack
2097    size appropriately.
2098    -------------------------------------------------------------------------- */
2099
2100 static StgTSO *
2101 threadStackOverflow(Capability *cap, StgTSO *tso)
2102 {
2103   nat new_stack_size, stack_words;
2104   lnat new_tso_size;
2105   StgPtr new_sp;
2106   StgTSO *dest;
2107
2108   IF_DEBUG(sanity,checkTSO(tso));
2109
2110   // don't allow throwTo() to modify the blocked_exceptions queue
2111   // while we are moving the TSO:
2112   lockClosure((StgClosure *)tso);
2113
2114   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2115       // NB. never raise a StackOverflow exception if the thread is
2116       // inside Control.Exceptino.block.  It is impractical to protect
2117       // against stack overflow exceptions, since virtually anything
2118       // can raise one (even 'catch'), so this is the only sensible
2119       // thing to do here.  See bug #767.
2120
2121       debugTrace(DEBUG_gc,
2122                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2123                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2124       IF_DEBUG(gc,
2125                /* If we're debugging, just print out the top of the stack */
2126                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2127                                                 tso->sp+64)));
2128
2129       // Send this thread the StackOverflow exception
2130       unlockTSO(tso);
2131       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2132       return tso;
2133   }
2134
2135   /* Try to double the current stack size.  If that takes us over the
2136    * maximum stack size for this thread, then use the maximum instead.
2137    * Finally round up so the TSO ends up as a whole number of blocks.
2138    */
2139   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2140   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2141                                        TSO_STRUCT_SIZE)/sizeof(W_);
2142   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2143   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2144
2145   debugTrace(DEBUG_sched, 
2146              "increasing stack size from %ld words to %d.",
2147              (long)tso->stack_size, new_stack_size);
2148
2149   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2150   TICK_ALLOC_TSO(new_stack_size,0);
2151
2152   /* copy the TSO block and the old stack into the new area */
2153   memcpy(dest,tso,TSO_STRUCT_SIZE);
2154   stack_words = tso->stack + tso->stack_size - tso->sp;
2155   new_sp = (P_)dest + new_tso_size - stack_words;
2156   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2157
2158   /* relocate the stack pointers... */
2159   dest->sp         = new_sp;
2160   dest->stack_size = new_stack_size;
2161         
2162   /* Mark the old TSO as relocated.  We have to check for relocated
2163    * TSOs in the garbage collector and any primops that deal with TSOs.
2164    *
2165    * It's important to set the sp value to just beyond the end
2166    * of the stack, so we don't attempt to scavenge any part of the
2167    * dead TSO's stack.
2168    */
2169   tso->what_next = ThreadRelocated;
2170   setTSOLink(cap,tso,dest);
2171   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2172   tso->why_blocked = NotBlocked;
2173
2174   IF_PAR_DEBUG(verbose,
2175                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2176                      tso->id, tso, tso->stack_size);
2177                /* If we're debugging, just print out the top of the stack */
2178                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2179                                                 tso->sp+64)));
2180   
2181   unlockTSO(dest);
2182   unlockTSO(tso);
2183
2184   IF_DEBUG(sanity,checkTSO(dest));
2185 #if 0
2186   IF_DEBUG(scheduler,printTSO(dest));
2187 #endif
2188
2189   return dest;
2190 }
2191
2192 static StgTSO *
2193 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2194 {
2195     bdescr *bd, *new_bd;
2196     lnat new_tso_size_w, tso_size_w;
2197     StgTSO *new_tso;
2198
2199     tso_size_w = tso_sizeW(tso);
2200
2201     if (tso_size_w < MBLOCK_SIZE_W || 
2202         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2203     {
2204         return tso;
2205     }
2206
2207     // don't allow throwTo() to modify the blocked_exceptions queue
2208     // while we are moving the TSO:
2209     lockClosure((StgClosure *)tso);
2210
2211     new_tso_size_w = round_to_mblocks(tso_size_w/2);
2212
2213     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2214                tso->id, tso_size_w, new_tso_size_w);
2215
2216     bd = Bdescr((StgPtr)tso);
2217     new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
2218     new_bd->free = bd->free;
2219     bd->free = bd->start + TSO_STRUCT_SIZEW;
2220
2221     new_tso = (StgTSO *)new_bd->start;
2222     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2223     new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
2224
2225     tso->what_next = ThreadRelocated;
2226     tso->_link = new_tso; // no write barrier reqd: same generation
2227
2228     // The TSO attached to this Task may have moved, so update the
2229     // pointer to it.
2230     if (task->tso == tso) {
2231         task->tso = new_tso;
2232     }
2233
2234     unlockTSO(new_tso);
2235     unlockTSO(tso);
2236
2237     IF_DEBUG(sanity,checkTSO(new_tso));
2238
2239     return new_tso;
2240 }
2241
2242 /* ---------------------------------------------------------------------------
2243    Interrupt execution
2244    - usually called inside a signal handler so it mustn't do anything fancy.   
2245    ------------------------------------------------------------------------ */
2246
2247 void
2248 interruptStgRts(void)
2249 {
2250     sched_state = SCHED_INTERRUPTING;
2251     context_switch = 1;
2252     wakeUpRts();
2253 }
2254
2255 /* -----------------------------------------------------------------------------
2256    Wake up the RTS
2257    
2258    This function causes at least one OS thread to wake up and run the
2259    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2260    an external event has arrived that may need servicing (eg. a
2261    keyboard interrupt).
2262
2263    In the single-threaded RTS we don't do anything here; we only have
2264    one thread anyway, and the event that caused us to want to wake up
2265    will have interrupted any blocking system call in progress anyway.
2266    -------------------------------------------------------------------------- */
2267
2268 void
2269 wakeUpRts(void)
2270 {
2271 #if defined(THREADED_RTS)
2272     // This forces the IO Manager thread to wakeup, which will
2273     // in turn ensure that some OS thread wakes up and runs the
2274     // scheduler loop, which will cause a GC and deadlock check.
2275     ioManagerWakeup();
2276 #endif
2277 }
2278
2279 /* -----------------------------------------------------------------------------
2280  * checkBlackHoles()
2281  *
2282  * Check the blackhole_queue for threads that can be woken up.  We do
2283  * this periodically: before every GC, and whenever the run queue is
2284  * empty.
2285  *
2286  * An elegant solution might be to just wake up all the blocked
2287  * threads with awakenBlockedQueue occasionally: they'll go back to
2288  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2289  * doesn't give us a way to tell whether we've actually managed to
2290  * wake up any threads, so we would be busy-waiting.
2291  *
2292  * -------------------------------------------------------------------------- */
2293
2294 static rtsBool
2295 checkBlackHoles (Capability *cap)
2296 {
2297     StgTSO **prev, *t;
2298     rtsBool any_woke_up = rtsFalse;
2299     StgHalfWord type;
2300
2301     // blackhole_queue is global:
2302     ASSERT_LOCK_HELD(&sched_mutex);
2303
2304     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2305
2306     // ASSUMES: sched_mutex
2307     prev = &blackhole_queue;
2308     t = blackhole_queue;
2309     while (t != END_TSO_QUEUE) {
2310         ASSERT(t->why_blocked == BlockedOnBlackHole);
2311         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2312         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2313             IF_DEBUG(sanity,checkTSO(t));
2314             t = unblockOne(cap, t);
2315             *prev = t;
2316             any_woke_up = rtsTrue;
2317         } else {
2318             prev = &t->_link;
2319             t = t->_link;
2320         }
2321     }
2322
2323     return any_woke_up;
2324 }
2325
2326 /* -----------------------------------------------------------------------------
2327    Deleting threads
2328
2329    This is used for interruption (^C) and forking, and corresponds to
2330    raising an exception but without letting the thread catch the
2331    exception.
2332    -------------------------------------------------------------------------- */
2333
2334 static void 
2335 deleteThread (Capability *cap, StgTSO *tso)
2336 {
2337     // NOTE: must only be called on a TSO that we have exclusive
2338     // access to, because we will call throwToSingleThreaded() below.
2339     // The TSO must be on the run queue of the Capability we own, or 
2340     // we must own all Capabilities.
2341
2342     if (tso->why_blocked != BlockedOnCCall &&
2343         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2344         throwToSingleThreaded(cap,tso,NULL);
2345     }
2346 }
2347
2348 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2349 static void 
2350 deleteThread_(Capability *cap, StgTSO *tso)
2351 { // for forkProcess only:
2352   // like deleteThread(), but we delete threads in foreign calls, too.
2353
2354     if (tso->why_blocked == BlockedOnCCall ||
2355         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2356         unblockOne(cap,tso);
2357         tso->what_next = ThreadKilled;
2358     } else {
2359         deleteThread(cap,tso);
2360     }
2361 }
2362 #endif
2363
2364 /* -----------------------------------------------------------------------------
2365    raiseExceptionHelper
2366    
2367    This function is called by the raise# primitve, just so that we can
2368    move some of the tricky bits of raising an exception from C-- into
2369    C.  Who knows, it might be a useful re-useable thing here too.
2370    -------------------------------------------------------------------------- */
2371
2372 StgWord
2373 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2374 {
2375     Capability *cap = regTableToCapability(reg);
2376     StgThunk *raise_closure = NULL;
2377     StgPtr p, next;
2378     StgRetInfoTable *info;
2379     //
2380     // This closure represents the expression 'raise# E' where E
2381     // is the exception raise.  It is used to overwrite all the
2382     // thunks which are currently under evaluataion.
2383     //
2384
2385     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2386     // LDV profiling: stg_raise_info has THUNK as its closure
2387     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2388     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2389     // 1 does not cause any problem unless profiling is performed.
2390     // However, when LDV profiling goes on, we need to linearly scan
2391     // small object pool, where raise_closure is stored, so we should
2392     // use MIN_UPD_SIZE.
2393     //
2394     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2395     //                                 sizeofW(StgClosure)+1);
2396     //
2397
2398     //
2399     // Walk up the stack, looking for the catch frame.  On the way,
2400     // we update any closures pointed to from update frames with the
2401     // raise closure that we just built.
2402     //
2403     p = tso->sp;
2404     while(1) {
2405         info = get_ret_itbl((StgClosure *)p);
2406         next = p + stack_frame_sizeW((StgClosure *)p);
2407         switch (info->i.type) {
2408             
2409         case UPDATE_FRAME:
2410             // Only create raise_closure if we need to.
2411             if (raise_closure == NULL) {
2412                 raise_closure = 
2413                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2414                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2415                 raise_closure->payload[0] = exception;
2416             }
2417             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2418             p = next;
2419             continue;
2420
2421         case ATOMICALLY_FRAME:
2422             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2423             tso->sp = p;
2424             return ATOMICALLY_FRAME;
2425             
2426         case CATCH_FRAME:
2427             tso->sp = p;
2428             return CATCH_FRAME;
2429
2430         case CATCH_STM_FRAME:
2431             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2432             tso->sp = p;
2433             return CATCH_STM_FRAME;
2434             
2435         case STOP_FRAME:
2436             tso->sp = p;
2437             return STOP_FRAME;
2438
2439         case CATCH_RETRY_FRAME:
2440         default:
2441             p = next; 
2442             continue;
2443         }
2444     }
2445 }
2446
2447
2448 /* -----------------------------------------------------------------------------
2449    findRetryFrameHelper
2450
2451    This function is called by the retry# primitive.  It traverses the stack
2452    leaving tso->sp referring to the frame which should handle the retry.  
2453
2454    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2455    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2456
2457    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2458    create) because retries are not considered to be exceptions, despite the
2459    similar implementation.
2460
2461    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2462    not be created within memory transactions.
2463    -------------------------------------------------------------------------- */
2464
2465 StgWord
2466 findRetryFrameHelper (StgTSO *tso)
2467 {
2468   StgPtr           p, next;
2469   StgRetInfoTable *info;
2470
2471   p = tso -> sp;
2472   while (1) {
2473     info = get_ret_itbl((StgClosure *)p);
2474     next = p + stack_frame_sizeW((StgClosure *)p);
2475     switch (info->i.type) {
2476       
2477     case ATOMICALLY_FRAME:
2478         debugTrace(DEBUG_stm,
2479                    "found ATOMICALLY_FRAME at %p during retry", p);
2480         tso->sp = p;
2481         return ATOMICALLY_FRAME;
2482       
2483     case CATCH_RETRY_FRAME:
2484         debugTrace(DEBUG_stm,
2485                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2486         tso->sp = p;
2487         return CATCH_RETRY_FRAME;
2488       
2489     case CATCH_STM_FRAME: {
2490         StgTRecHeader *trec = tso -> trec;
2491         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2492         debugTrace(DEBUG_stm,
2493                    "found CATCH_STM_FRAME at %p during retry", p);
2494         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2495         stmAbortTransaction(tso -> cap, trec);
2496         stmFreeAbortedTRec(tso -> cap, trec);
2497         tso -> trec = outer;
2498         p = next; 
2499         continue;
2500     }
2501       
2502
2503     default:
2504       ASSERT(info->i.type != CATCH_FRAME);
2505       ASSERT(info->i.type != STOP_FRAME);
2506       p = next; 
2507       continue;
2508     }
2509   }
2510 }
2511
2512 /* -----------------------------------------------------------------------------
2513    resurrectThreads is called after garbage collection on the list of
2514    threads found to be garbage.  Each of these threads will be woken
2515    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2516    on an MVar, or NonTermination if the thread was blocked on a Black
2517    Hole.
2518
2519    Locks: assumes we hold *all* the capabilities.
2520    -------------------------------------------------------------------------- */
2521
2522 void
2523 resurrectThreads (StgTSO *threads)
2524 {
2525     StgTSO *tso, *next;
2526     Capability *cap;
2527     step *step;
2528
2529     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2530         next = tso->global_link;
2531
2532         step = Bdescr((P_)tso)->step;
2533         tso->global_link = step->threads;
2534         step->threads = tso;
2535
2536         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2537         
2538         // Wake up the thread on the Capability it was last on
2539         cap = tso->cap;
2540         
2541         switch (tso->why_blocked) {
2542         case BlockedOnMVar:
2543         case BlockedOnException:
2544             /* Called by GC - sched_mutex lock is currently held. */
2545             throwToSingleThreaded(cap, tso,
2546                                   (StgClosure *)blockedOnDeadMVar_closure);
2547             break;
2548         case BlockedOnBlackHole:
2549             throwToSingleThreaded(cap, tso,
2550                                   (StgClosure *)nonTermination_closure);
2551             break;
2552         case BlockedOnSTM:
2553             throwToSingleThreaded(cap, tso,
2554                                   (StgClosure *)blockedIndefinitely_closure);
2555             break;
2556         case NotBlocked:
2557             /* This might happen if the thread was blocked on a black hole
2558              * belonging to a thread that we've just woken up (raiseAsync
2559              * can wake up threads, remember...).
2560              */
2561             continue;
2562         default:
2563             barf("resurrectThreads: thread blocked in a strange way");
2564         }
2565     }
2566 }
2567
2568 /* -----------------------------------------------------------------------------
2569    performPendingThrowTos is called after garbage collection, and
2570    passed a list of threads that were found to have pending throwTos
2571    (tso->blocked_exceptions was not empty), and were blocked.
2572    Normally this doesn't happen, because we would deliver the
2573    exception directly if the target thread is blocked, but there are
2574    small windows where it might occur on a multiprocessor (see
2575    throwTo()).
2576
2577    NB. we must be holding all the capabilities at this point, just
2578    like resurrectThreads().
2579    -------------------------------------------------------------------------- */
2580
2581 void
2582 performPendingThrowTos (StgTSO *threads)
2583 {
2584     StgTSO *tso, *next;
2585     Capability *cap;
2586     step *step;
2587
2588     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2589         next = tso->global_link;
2590
2591         step = Bdescr((P_)tso)->step;
2592         tso->global_link = step->threads;
2593         step->threads = tso;
2594
2595         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2596         
2597         cap = tso->cap;
2598         maybePerformBlockedException(cap, tso);
2599     }
2600 }