Fix #2663: we had a hard-wired capabilities[0]
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  */
96 nat recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99  * LOCK: none (changes once, from false->true)
100  */
101 rtsBool sched_state = SCHED_RUNNING;
102
103 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
104  *  exists - earlier gccs apparently didn't.
105  *  -= chak
106  */
107 StgTSO dummy_tso;
108
109 /*
110  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111  * in an MT setting, needed to signal that a worker thread shouldn't hang around
112  * in the scheduler when it is out of work.
113  */
114 rtsBool shutting_down_scheduler = rtsFalse;
115
116 /*
117  * This mutex protects most of the global scheduler data in
118  * the THREADED_RTS runtime.
119  */
120 #if defined(THREADED_RTS)
121 Mutex sched_mutex;
122 #endif
123
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
126 #endif
127
128 /* -----------------------------------------------------------------------------
129  * static function prototypes
130  * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 #if defined(THREADED_RTS)
141 static void schedulePushWork(Capability *cap, Task *task);
142 #endif
143 static void scheduleStartSignalHandlers (Capability *cap);
144 static void scheduleCheckBlockedThreads (Capability *cap);
145 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
146 static void scheduleCheckBlackHoles (Capability *cap);
147 static void scheduleDetectDeadlock (Capability *cap, Task *task);
148 #if defined(PARALLEL_HASKELL)
149 static rtsBool scheduleGetRemoteWork(Capability *cap);
150 static void scheduleSendPendingMessages(void);
151 static void scheduleActivateSpark(Capability *cap);
152 #endif
153 static void schedulePostRunThread(Capability *cap, StgTSO *t);
154 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
155 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
156                                          StgTSO *t);
157 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
158                                     nat prev_what_next );
159 static void scheduleHandleThreadBlocked( StgTSO *t );
160 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
161                                              StgTSO *t );
162 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
163 static Capability *scheduleDoGC(Capability *cap, Task *task,
164                                 rtsBool force_major);
165
166 static rtsBool checkBlackHoles(Capability *cap);
167
168 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
169 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
170
171 static void deleteThread (Capability *cap, StgTSO *tso);
172 static void deleteAllThreads (Capability *cap);
173
174 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
175 static void deleteThread_(Capability *cap, StgTSO *tso);
176 #endif
177
178 #ifdef DEBUG
179 static char *whatNext_strs[] = {
180   "(unknown)",
181   "ThreadRunGHC",
182   "ThreadInterpret",
183   "ThreadKilled",
184   "ThreadRelocated",
185   "ThreadComplete"
186 };
187 #endif
188
189 /* -----------------------------------------------------------------------------
190  * Putting a thread on the run queue: different scheduling policies
191  * -------------------------------------------------------------------------- */
192
193 STATIC_INLINE void
194 addToRunQueue( Capability *cap, StgTSO *t )
195 {
196 #if defined(PARALLEL_HASKELL)
197     if (RtsFlags.ParFlags.doFairScheduling) { 
198         // this does round-robin scheduling; good for concurrency
199         appendToRunQueue(cap,t);
200     } else {
201         // this does unfair scheduling; good for parallelism
202         pushOnRunQueue(cap,t);
203     }
204 #else
205     // this does round-robin scheduling; good for concurrency
206     appendToRunQueue(cap,t);
207 #endif
208 }
209
210 /* ---------------------------------------------------------------------------
211    Main scheduling loop.
212
213    We use round-robin scheduling, each thread returning to the
214    scheduler loop when one of these conditions is detected:
215
216       * out of heap space
217       * timer expires (thread yields)
218       * thread blocks
219       * thread ends
220       * stack overflow
221
222    GRAN version:
223      In a GranSim setup this loop iterates over the global event queue.
224      This revolves around the global event queue, which determines what 
225      to do next. Therefore, it's more complicated than either the 
226      concurrent or the parallel (GUM) setup.
227   This version has been entirely removed (JB 2008/08).
228
229    GUM version:
230      GUM iterates over incoming messages.
231      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
232      and sends out a fish whenever it has nothing to do; in-between
233      doing the actual reductions (shared code below) it processes the
234      incoming messages and deals with delayed operations 
235      (see PendingFetches).
236      This is not the ugliest code you could imagine, but it's bloody close.
237
238   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
239   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
240   as well as future GUM versions. This file has been refurbished to
241   only contain valid code, which is however incomplete, refers to
242   invalid includes etc.
243
244    ------------------------------------------------------------------------ */
245
246 static Capability *
247 schedule (Capability *initialCapability, Task *task)
248 {
249   StgTSO *t;
250   Capability *cap;
251   StgThreadReturnCode ret;
252 #if defined(PARALLEL_HASKELL)
253   rtsBool receivedFinish = rtsFalse;
254 #endif
255   nat prev_what_next;
256   rtsBool ready_to_gc;
257 #if defined(THREADED_RTS)
258   rtsBool first = rtsTrue;
259 #endif
260   
261   cap = initialCapability;
262
263   // Pre-condition: this task owns initialCapability.
264   // The sched_mutex is *NOT* held
265   // NB. on return, we still hold a capability.
266
267   debugTrace (DEBUG_sched, 
268               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
269               task, initialCapability);
270
271   schedulePreLoop();
272
273   // -----------------------------------------------------------
274   // Scheduler loop starts here:
275
276 #if defined(PARALLEL_HASKELL)
277 #define TERMINATION_CONDITION        (!receivedFinish)
278 #else
279 #define TERMINATION_CONDITION        rtsTrue
280 #endif
281
282   while (TERMINATION_CONDITION) {
283
284 #if defined(THREADED_RTS)
285       if (first) {
286           // don't yield the first time, we want a chance to run this
287           // thread for a bit, even if there are others banging at the
288           // door.
289           first = rtsFalse;
290           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
291       } else {
292           // Yield the capability to higher-priority tasks if necessary.
293           yieldCapability(&cap, task);
294       }
295 #endif
296       
297 #if defined(THREADED_RTS)
298       schedulePushWork(cap,task);
299 #endif
300
301     // Check whether we have re-entered the RTS from Haskell without
302     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
303     // call).
304     if (cap->in_haskell) {
305           errorBelch("schedule: re-entered unsafely.\n"
306                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
307           stg_exit(EXIT_FAILURE);
308     }
309
310     // The interruption / shutdown sequence.
311     // 
312     // In order to cleanly shut down the runtime, we want to:
313     //   * make sure that all main threads return to their callers
314     //     with the state 'Interrupted'.
315     //   * clean up all OS threads assocated with the runtime
316     //   * free all memory etc.
317     //
318     // So the sequence for ^C goes like this:
319     //
320     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
321     //     arranges for some Capability to wake up
322     //
323     //   * all threads in the system are halted, and the zombies are
324     //     placed on the run queue for cleaning up.  We acquire all
325     //     the capabilities in order to delete the threads, this is
326     //     done by scheduleDoGC() for convenience (because GC already
327     //     needs to acquire all the capabilities).  We can't kill
328     //     threads involved in foreign calls.
329     // 
330     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
331     //
332     //   * sched_state := SCHED_SHUTTING_DOWN
333     //
334     //   * all workers exit when the run queue on their capability
335     //     drains.  All main threads will also exit when their TSO
336     //     reaches the head of the run queue and they can return.
337     //
338     //   * eventually all Capabilities will shut down, and the RTS can
339     //     exit.
340     //
341     //   * We might be left with threads blocked in foreign calls, 
342     //     we should really attempt to kill these somehow (TODO);
343     
344     switch (sched_state) {
345     case SCHED_RUNNING:
346         break;
347     case SCHED_INTERRUPTING:
348         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
349 #if defined(THREADED_RTS)
350         discardSparksCap(cap);
351 #endif
352         /* scheduleDoGC() deletes all the threads */
353         cap = scheduleDoGC(cap,task,rtsFalse);
354         break;
355     case SCHED_SHUTTING_DOWN:
356         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
357         // If we are a worker, just exit.  If we're a bound thread
358         // then we will exit below when we've removed our TSO from
359         // the run queue.
360         if (task->tso == NULL && emptyRunQueue(cap)) {
361             return cap;
362         }
363         break;
364     default:
365         barf("sched_state: %d", sched_state);
366     }
367
368 #if defined(THREADED_RTS)
369     // If the run queue is empty, take a spark and turn it into a thread.
370     {
371         if (emptyRunQueue(cap)) {
372             StgClosure *spark;
373             spark = findSpark(cap);
374             if (spark != NULL) {
375                 debugTrace(DEBUG_sched,
376                            "turning spark of closure %p into a thread",
377                            (StgClosure *)spark);
378                 createSparkThread(cap,spark);     
379             }
380         }
381     }
382 #endif // THREADED_RTS
383
384     scheduleStartSignalHandlers(cap);
385
386     // Only check the black holes here if we've nothing else to do.
387     // During normal execution, the black hole list only gets checked
388     // at GC time, to avoid repeatedly traversing this possibly long
389     // list each time around the scheduler.
390     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
391
392     scheduleCheckWakeupThreads(cap);
393
394     scheduleCheckBlockedThreads(cap);
395
396 #if defined(PARALLEL_HASKELL)
397     /* message processing and work distribution goes here */ 
398
399     /* if messages have been buffered... a NOOP in THREADED_RTS */
400     scheduleSendPendingMessages();
401
402     /* If the run queue is empty,...*/
403     if (emptyRunQueue(cap)) {
404       /* ...take one of our own sparks and turn it into a thread */
405       scheduleActivateSpark(cap);
406
407         /* if this did not work, try to steal a spark from someone else */
408       if (emptyRunQueue(cap)) {
409         receivedFinish = scheduleGetRemoteWork(cap);
410         continue; //  a new round, (hopefully) with new work
411         /* 
412            in GUM, this a) sends out a FISH and returns IF no fish is
413                            out already
414                         b) (blocking) awaits and receives messages
415            
416            in Eden, this is only the blocking receive, as b) in GUM.
417         */
418       }
419     } 
420
421     /* since we perform a blocking receive and continue otherwise,
422        either we never reach here or we definitely have work! */
423     // from here: non-empty run queue
424     ASSERT(!emptyRunQueue(cap));
425
426     if (PacketsWaiting()) {  /* now process incoming messages, if any
427                                 pending...  
428
429                                 CAUTION: scheduleGetRemoteWork called
430                                 above, waits for messages as well! */
431       processMessages(cap, &receivedFinish);
432     }
433 #endif // PARALLEL_HASKELL
434
435     scheduleDetectDeadlock(cap,task);
436 #if defined(THREADED_RTS)
437     cap = task->cap;    // reload cap, it might have changed
438 #endif
439
440     // Normally, the only way we can get here with no threads to
441     // run is if a keyboard interrupt received during 
442     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
443     // Additionally, it is not fatal for the
444     // threaded RTS to reach here with no threads to run.
445     //
446     // win32: might be here due to awaitEvent() being abandoned
447     // as a result of a console event having been delivered.
448     if ( emptyRunQueue(cap) ) {
449 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
450         ASSERT(sched_state >= SCHED_INTERRUPTING);
451 #endif
452         continue; // nothing to do
453     }
454
455     // 
456     // Get a thread to run
457     //
458     t = popRunQueue(cap);
459
460     // Sanity check the thread we're about to run.  This can be
461     // expensive if there is lots of thread switching going on...
462     IF_DEBUG(sanity,checkTSO(t));
463
464 #if defined(THREADED_RTS)
465     // Check whether we can run this thread in the current task.
466     // If not, we have to pass our capability to the right task.
467     {
468         Task *bound = t->bound;
469       
470         if (bound) {
471             if (bound == task) {
472                 debugTrace(DEBUG_sched,
473                            "### Running thread %lu in bound thread", (unsigned long)t->id);
474                 // yes, the Haskell thread is bound to the current native thread
475             } else {
476                 debugTrace(DEBUG_sched,
477                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
478                 // no, bound to a different Haskell thread: pass to that thread
479                 pushOnRunQueue(cap,t);
480                 continue;
481             }
482         } else {
483             // The thread we want to run is unbound.
484             if (task->tso) { 
485                 debugTrace(DEBUG_sched,
486                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
487                 // no, the current native thread is bound to a different
488                 // Haskell thread, so pass it to any worker thread
489                 pushOnRunQueue(cap,t);
490                 continue; 
491             }
492         }
493     }
494 #endif
495
496     /* context switches are initiated by the timer signal, unless
497      * the user specified "context switch as often as possible", with
498      * +RTS -C0
499      */
500     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
501         && !emptyThreadQueues(cap)) {
502         cap->context_switch = 1;
503     }
504          
505 run_thread:
506
507     // CurrentTSO is the thread to run.  t might be different if we
508     // loop back to run_thread, so make sure to set CurrentTSO after
509     // that.
510     cap->r.rCurrentTSO = t;
511
512     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
513                               (long)t->id, whatNext_strs[t->what_next]);
514
515     startHeapProfTimer();
516
517     // Check for exceptions blocked on this thread
518     maybePerformBlockedException (cap, t);
519
520     // ----------------------------------------------------------------------
521     // Run the current thread 
522
523     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
524     ASSERT(t->cap == cap);
525
526     prev_what_next = t->what_next;
527
528     errno = t->saved_errno;
529 #if mingw32_HOST_OS
530     SetLastError(t->saved_winerror);
531 #endif
532
533     cap->in_haskell = rtsTrue;
534
535     dirty_TSO(cap,t);
536
537 #if defined(THREADED_RTS)
538     if (recent_activity == ACTIVITY_DONE_GC) {
539         // ACTIVITY_DONE_GC means we turned off the timer signal to
540         // conserve power (see #1623).  Re-enable it here.
541         nat prev;
542         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
543         if (prev == ACTIVITY_DONE_GC) {
544             startTimer();
545         }
546     } else {
547         recent_activity = ACTIVITY_YES;
548     }
549 #endif
550
551     switch (prev_what_next) {
552         
553     case ThreadKilled:
554     case ThreadComplete:
555         /* Thread already finished, return to scheduler. */
556         ret = ThreadFinished;
557         break;
558         
559     case ThreadRunGHC:
560     {
561         StgRegTable *r;
562         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
563         cap = regTableToCapability(r);
564         ret = r->rRet;
565         break;
566     }
567     
568     case ThreadInterpret:
569         cap = interpretBCO(cap);
570         ret = cap->r.rRet;
571         break;
572         
573     default:
574         barf("schedule: invalid what_next field");
575     }
576
577     cap->in_haskell = rtsFalse;
578
579     // The TSO might have moved, eg. if it re-entered the RTS and a GC
580     // happened.  So find the new location:
581     t = cap->r.rCurrentTSO;
582
583     // We have run some Haskell code: there might be blackhole-blocked
584     // threads to wake up now.
585     // Lock-free test here should be ok, we're just setting a flag.
586     if ( blackhole_queue != END_TSO_QUEUE ) {
587         blackholes_need_checking = rtsTrue;
588     }
589     
590     // And save the current errno in this thread.
591     // XXX: possibly bogus for SMP because this thread might already
592     // be running again, see code below.
593     t->saved_errno = errno;
594 #if mingw32_HOST_OS
595     // Similarly for Windows error code
596     t->saved_winerror = GetLastError();
597 #endif
598
599 #if defined(THREADED_RTS)
600     // If ret is ThreadBlocked, and this Task is bound to the TSO that
601     // blocked, we are in limbo - the TSO is now owned by whatever it
602     // is blocked on, and may in fact already have been woken up,
603     // perhaps even on a different Capability.  It may be the case
604     // that task->cap != cap.  We better yield this Capability
605     // immediately and return to normaility.
606     if (ret == ThreadBlocked) {
607         debugTrace(DEBUG_sched,
608                    "--<< thread %lu (%s) stopped: blocked",
609                    (unsigned long)t->id, whatNext_strs[t->what_next]);
610         continue;
611     }
612 #endif
613
614     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
615     ASSERT(t->cap == cap);
616
617     // ----------------------------------------------------------------------
618     
619     // Costs for the scheduler are assigned to CCS_SYSTEM
620     stopHeapProfTimer();
621 #if defined(PROFILING)
622     CCCS = CCS_SYSTEM;
623 #endif
624     
625     schedulePostRunThread(cap,t);
626
627     t = threadStackUnderflow(task,t);
628
629     ready_to_gc = rtsFalse;
630
631     switch (ret) {
632     case HeapOverflow:
633         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
634         break;
635
636     case StackOverflow:
637         scheduleHandleStackOverflow(cap,task,t);
638         break;
639
640     case ThreadYielding:
641         if (scheduleHandleYield(cap, t, prev_what_next)) {
642             // shortcut for switching between compiler/interpreter:
643             goto run_thread; 
644         }
645         break;
646
647     case ThreadBlocked:
648         scheduleHandleThreadBlocked(t);
649         break;
650
651     case ThreadFinished:
652         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
653         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
654         break;
655
656     default:
657       barf("schedule: invalid thread return code %d", (int)ret);
658     }
659
660     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
661       cap = scheduleDoGC(cap,task,rtsFalse);
662     }
663   } /* end of while() */
664 }
665
666 /* ----------------------------------------------------------------------------
667  * Setting up the scheduler loop
668  * ------------------------------------------------------------------------- */
669
670 static void
671 schedulePreLoop(void)
672 {
673   // initialisation for scheduler - what cannot go into initScheduler()  
674 }
675
676 /* -----------------------------------------------------------------------------
677  * schedulePushWork()
678  *
679  * Push work to other Capabilities if we have some.
680  * -------------------------------------------------------------------------- */
681
682 #if defined(THREADED_RTS)
683 static void
684 schedulePushWork(Capability *cap USED_IF_THREADS, 
685                  Task *task      USED_IF_THREADS)
686 {
687     Capability *free_caps[n_capabilities], *cap0;
688     nat i, n_free_caps;
689
690     // migration can be turned off with +RTS -qg
691     if (!RtsFlags.ParFlags.migrate) return;
692
693     // Check whether we have more threads on our run queue, or sparks
694     // in our pool, that we could hand to another Capability.
695     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
696         && sparkPoolSizeCap(cap) < 2) {
697         return;
698     }
699
700     // First grab as many free Capabilities as we can.
701     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
702         cap0 = &capabilities[i];
703         if (cap != cap0 && tryGrabCapability(cap0,task)) {
704             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
705                 // it already has some work, we just grabbed it at 
706                 // the wrong moment.  Or maybe it's deadlocked!
707                 releaseCapability(cap0);
708             } else {
709                 free_caps[n_free_caps++] = cap0;
710             }
711         }
712     }
713
714     // we now have n_free_caps free capabilities stashed in
715     // free_caps[].  Share our run queue equally with them.  This is
716     // probably the simplest thing we could do; improvements we might
717     // want to do include:
718     //
719     //   - giving high priority to moving relatively new threads, on 
720     //     the gournds that they haven't had time to build up a
721     //     working set in the cache on this CPU/Capability.
722     //
723     //   - giving low priority to moving long-lived threads
724
725     if (n_free_caps > 0) {
726         StgTSO *prev, *t, *next;
727         rtsBool pushed_to_all;
728
729         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
730
731         i = 0;
732         pushed_to_all = rtsFalse;
733
734         if (cap->run_queue_hd != END_TSO_QUEUE) {
735             prev = cap->run_queue_hd;
736             t = prev->_link;
737             prev->_link = END_TSO_QUEUE;
738             for (; t != END_TSO_QUEUE; t = next) {
739                 next = t->_link;
740                 t->_link = END_TSO_QUEUE;
741                 if (t->what_next == ThreadRelocated
742                     || t->bound == task // don't move my bound thread
743                     || tsoLocked(t)) {  // don't move a locked thread
744                     setTSOLink(cap, prev, t);
745                     prev = t;
746                 } else if (i == n_free_caps) {
747                     pushed_to_all = rtsTrue;
748                     i = 0;
749                     // keep one for us
750                     setTSOLink(cap, prev, t);
751                     prev = t;
752                 } else {
753                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
754                     appendToRunQueue(free_caps[i],t);
755                     if (t->bound) { t->bound->cap = free_caps[i]; }
756                     t->cap = free_caps[i];
757                     i++;
758                 }
759             }
760             cap->run_queue_tl = prev;
761         }
762
763         // If there are some free capabilities that we didn't push any
764         // threads to, then try to push a spark to each one.
765         if (!pushed_to_all) {
766             StgClosure *spark;
767             // i is the next free capability to push to
768             for (; i < n_free_caps; i++) {
769                 if (emptySparkPoolCap(free_caps[i])) {
770                     spark = findSpark(cap);
771                     if (spark != NULL) {
772                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
773                         newSpark(&(free_caps[i]->r), spark);
774                     }
775                 }
776             }
777         }
778
779         // release the capabilities
780         for (i = 0; i < n_free_caps; i++) {
781             task->cap = free_caps[i];
782             releaseCapability(free_caps[i]);
783         }
784     }
785     task->cap = cap; // reset to point to our Capability.
786 }
787 #endif
788
789 /* ----------------------------------------------------------------------------
790  * Start any pending signal handlers
791  * ------------------------------------------------------------------------- */
792
793 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
794 static void
795 scheduleStartSignalHandlers(Capability *cap)
796 {
797     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
798         // safe outside the lock
799         startSignalHandlers(cap);
800     }
801 }
802 #else
803 static void
804 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
805 {
806 }
807 #endif
808
809 /* ----------------------------------------------------------------------------
810  * Check for blocked threads that can be woken up.
811  * ------------------------------------------------------------------------- */
812
813 static void
814 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
815 {
816 #if !defined(THREADED_RTS)
817     //
818     // Check whether any waiting threads need to be woken up.  If the
819     // run queue is empty, and there are no other tasks running, we
820     // can wait indefinitely for something to happen.
821     //
822     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
823     {
824         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
825     }
826 #endif
827 }
828
829
830 /* ----------------------------------------------------------------------------
831  * Check for threads woken up by other Capabilities
832  * ------------------------------------------------------------------------- */
833
834 static void
835 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
836 {
837 #if defined(THREADED_RTS)
838     // Any threads that were woken up by other Capabilities get
839     // appended to our run queue.
840     if (!emptyWakeupQueue(cap)) {
841         ACQUIRE_LOCK(&cap->lock);
842         if (emptyRunQueue(cap)) {
843             cap->run_queue_hd = cap->wakeup_queue_hd;
844             cap->run_queue_tl = cap->wakeup_queue_tl;
845         } else {
846             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
847             cap->run_queue_tl = cap->wakeup_queue_tl;
848         }
849         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
850         RELEASE_LOCK(&cap->lock);
851     }
852 #endif
853 }
854
855 /* ----------------------------------------------------------------------------
856  * Check for threads blocked on BLACKHOLEs that can be woken up
857  * ------------------------------------------------------------------------- */
858 static void
859 scheduleCheckBlackHoles (Capability *cap)
860 {
861     if ( blackholes_need_checking ) // check without the lock first
862     {
863         ACQUIRE_LOCK(&sched_mutex);
864         if ( blackholes_need_checking ) {
865             checkBlackHoles(cap);
866             blackholes_need_checking = rtsFalse;
867         }
868         RELEASE_LOCK(&sched_mutex);
869     }
870 }
871
872 /* ----------------------------------------------------------------------------
873  * Detect deadlock conditions and attempt to resolve them.
874  * ------------------------------------------------------------------------- */
875
876 static void
877 scheduleDetectDeadlock (Capability *cap, Task *task)
878 {
879
880 #if defined(PARALLEL_HASKELL)
881     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
882     return;
883 #endif
884
885     /* 
886      * Detect deadlock: when we have no threads to run, there are no
887      * threads blocked, waiting for I/O, or sleeping, and all the
888      * other tasks are waiting for work, we must have a deadlock of
889      * some description.
890      */
891     if ( emptyThreadQueues(cap) )
892     {
893 #if defined(THREADED_RTS)
894         /* 
895          * In the threaded RTS, we only check for deadlock if there
896          * has been no activity in a complete timeslice.  This means
897          * we won't eagerly start a full GC just because we don't have
898          * any threads to run currently.
899          */
900         if (recent_activity != ACTIVITY_INACTIVE) return;
901 #endif
902
903         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
904
905         // Garbage collection can release some new threads due to
906         // either (a) finalizers or (b) threads resurrected because
907         // they are unreachable and will therefore be sent an
908         // exception.  Any threads thus released will be immediately
909         // runnable.
910         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
911
912         recent_activity = ACTIVITY_DONE_GC;
913         // disable timer signals (see #1623)
914         stopTimer();
915         
916         if ( !emptyRunQueue(cap) ) return;
917
918 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
919         /* If we have user-installed signal handlers, then wait
920          * for signals to arrive rather then bombing out with a
921          * deadlock.
922          */
923         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
924             debugTrace(DEBUG_sched,
925                        "still deadlocked, waiting for signals...");
926
927             awaitUserSignals();
928
929             if (signals_pending()) {
930                 startSignalHandlers(cap);
931             }
932
933             // either we have threads to run, or we were interrupted:
934             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
935
936             return;
937         }
938 #endif
939
940 #if !defined(THREADED_RTS)
941         /* Probably a real deadlock.  Send the current main thread the
942          * Deadlock exception.
943          */
944         if (task->tso) {
945             switch (task->tso->why_blocked) {
946             case BlockedOnSTM:
947             case BlockedOnBlackHole:
948             case BlockedOnException:
949             case BlockedOnMVar:
950                 throwToSingleThreaded(cap, task->tso, 
951                                       (StgClosure *)nonTermination_closure);
952                 return;
953             default:
954                 barf("deadlock: main thread blocked in a strange way");
955             }
956         }
957         return;
958 #endif
959     }
960 }
961
962
963 /* ----------------------------------------------------------------------------
964  * Send pending messages (PARALLEL_HASKELL only)
965  * ------------------------------------------------------------------------- */
966
967 #if defined(PARALLEL_HASKELL)
968 static StgTSO *
969 scheduleSendPendingMessages(void)
970 {
971
972 # if defined(PAR) // global Mem.Mgmt., omit for now
973     if (PendingFetches != END_BF_QUEUE) {
974         processFetches();
975     }
976 # endif
977     
978     if (RtsFlags.ParFlags.BufferTime) {
979         // if we use message buffering, we must send away all message
980         // packets which have become too old...
981         sendOldBuffers(); 
982     }
983 }
984 #endif
985
986 /* ----------------------------------------------------------------------------
987  * Activate spark threads (PARALLEL_HASKELL only)
988  * ------------------------------------------------------------------------- */
989
990 #if defined(PARALLEL_HASKELL)
991 static void
992 scheduleActivateSpark(Capability *cap)
993 {
994     StgClosure *spark;
995
996 /* We only want to stay here if the run queue is empty and we want some
997    work. We try to turn a spark into a thread, and add it to the run
998    queue, from where it will be picked up in the next iteration of the
999    scheduler loop.  
1000 */
1001     if (!emptyRunQueue(cap)) 
1002       /* In the threaded RTS, another task might have pushed a thread
1003          on our run queue in the meantime ? But would need a lock.. */
1004       return;
1005
1006     spark = findSpark(cap); // defined in Sparks.c
1007
1008     if (spark != NULL) {
1009       debugTrace(DEBUG_sched,
1010                  "turning spark of closure %p into a thread",
1011                  (StgClosure *)spark);
1012       createSparkThread(cap,spark); // defined in Sparks.c
1013     }
1014 }
1015 #endif // PARALLEL_HASKELL
1016
1017 /* ----------------------------------------------------------------------------
1018  * Get work from a remote node (PARALLEL_HASKELL only)
1019  * ------------------------------------------------------------------------- */
1020     
1021 #if defined(PARALLEL_HASKELL)
1022 static rtsBool
1023 scheduleGetRemoteWork(Capability *cap)
1024 {
1025 #if defined(PARALLEL_HASKELL)
1026   rtsBool receivedFinish = rtsFalse;
1027
1028   // idle() , i.e. send all buffers, wait for work
1029   if (RtsFlags.ParFlags.BufferTime) {
1030         IF_PAR_DEBUG(verbose, 
1031                 debugBelch("...send all pending data,"));
1032         {
1033           nat i;
1034           for (i=1; i<=nPEs; i++)
1035             sendImmediately(i); // send all messages away immediately
1036         }
1037   }
1038
1039   /* this would be the place for fishing in GUM... 
1040
1041      if (no-earlier-fish-around) 
1042           sendFish(choosePe());
1043    */
1044
1045   // Eden:just look for incoming messages (blocking receive)
1046   IF_PAR_DEBUG(verbose, 
1047                debugBelch("...wait for incoming messages...\n"));
1048   processMessages(cap, &receivedFinish); // blocking receive...
1049
1050
1051   return receivedFinish;
1052   // reenter scheduling look after having received something
1053
1054 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1055
1056   return rtsFalse; /* return value unused in THREADED_RTS */
1057
1058 #endif /* PARALLEL_HASKELL */
1059 }
1060 #endif // PARALLEL_HASKELL
1061
1062 /* ----------------------------------------------------------------------------
1063  * After running a thread...
1064  * ------------------------------------------------------------------------- */
1065
1066 static void
1067 schedulePostRunThread (Capability *cap, StgTSO *t)
1068 {
1069     // We have to be able to catch transactions that are in an
1070     // infinite loop as a result of seeing an inconsistent view of
1071     // memory, e.g. 
1072     //
1073     //   atomically $ do
1074     //       [a,b] <- mapM readTVar [ta,tb]
1075     //       when (a == b) loop
1076     //
1077     // and a is never equal to b given a consistent view of memory.
1078     //
1079     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1080         if (!stmValidateNestOfTransactions (t -> trec)) {
1081             debugTrace(DEBUG_sched | DEBUG_stm,
1082                        "trec %p found wasting its time", t);
1083             
1084             // strip the stack back to the
1085             // ATOMICALLY_FRAME, aborting the (nested)
1086             // transaction, and saving the stack of any
1087             // partially-evaluated thunks on the heap.
1088             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1089             
1090             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1091         }
1092     }
1093
1094   /* some statistics gathering in the parallel case */
1095 }
1096
1097 /* -----------------------------------------------------------------------------
1098  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1099  * -------------------------------------------------------------------------- */
1100
1101 static rtsBool
1102 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1103 {
1104     // did the task ask for a large block?
1105     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1106         // if so, get one and push it on the front of the nursery.
1107         bdescr *bd;
1108         lnat blocks;
1109         
1110         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1111         
1112         debugTrace(DEBUG_sched,
1113                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1114                    (long)t->id, whatNext_strs[t->what_next], blocks);
1115     
1116         // don't do this if the nursery is (nearly) full, we'll GC first.
1117         if (cap->r.rCurrentNursery->link != NULL ||
1118             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1119                                                // if the nursery has only one block.
1120             
1121             ACQUIRE_SM_LOCK
1122             bd = allocGroup( blocks );
1123             RELEASE_SM_LOCK
1124             cap->r.rNursery->n_blocks += blocks;
1125             
1126             // link the new group into the list
1127             bd->link = cap->r.rCurrentNursery;
1128             bd->u.back = cap->r.rCurrentNursery->u.back;
1129             if (cap->r.rCurrentNursery->u.back != NULL) {
1130                 cap->r.rCurrentNursery->u.back->link = bd;
1131             } else {
1132 #if !defined(THREADED_RTS)
1133                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1134                        g0s0 == cap->r.rNursery);
1135 #endif
1136                 cap->r.rNursery->blocks = bd;
1137             }             
1138             cap->r.rCurrentNursery->u.back = bd;
1139             
1140             // initialise it as a nursery block.  We initialise the
1141             // step, gen_no, and flags field of *every* sub-block in
1142             // this large block, because this is easier than making
1143             // sure that we always find the block head of a large
1144             // block whenever we call Bdescr() (eg. evacuate() and
1145             // isAlive() in the GC would both have to do this, at
1146             // least).
1147             { 
1148                 bdescr *x;
1149                 for (x = bd; x < bd + blocks; x++) {
1150                     x->step = cap->r.rNursery;
1151                     x->gen_no = 0;
1152                     x->flags = 0;
1153                 }
1154             }
1155             
1156             // This assert can be a killer if the app is doing lots
1157             // of large block allocations.
1158             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1159             
1160             // now update the nursery to point to the new block
1161             cap->r.rCurrentNursery = bd;
1162             
1163             // we might be unlucky and have another thread get on the
1164             // run queue before us and steal the large block, but in that
1165             // case the thread will just end up requesting another large
1166             // block.
1167             pushOnRunQueue(cap,t);
1168             return rtsFalse;  /* not actually GC'ing */
1169         }
1170     }
1171     
1172     debugTrace(DEBUG_sched,
1173                "--<< thread %ld (%s) stopped: HeapOverflow",
1174                (long)t->id, whatNext_strs[t->what_next]);
1175
1176     if (cap->context_switch) {
1177         // Sometimes we miss a context switch, e.g. when calling
1178         // primitives in a tight loop, MAYBE_GC() doesn't check the
1179         // context switch flag, and we end up waiting for a GC.
1180         // See #1984, and concurrent/should_run/1984
1181         cap->context_switch = 0;
1182         addToRunQueue(cap,t);
1183     } else {
1184         pushOnRunQueue(cap,t);
1185     }
1186     return rtsTrue;
1187     /* actual GC is done at the end of the while loop in schedule() */
1188 }
1189
1190 /* -----------------------------------------------------------------------------
1191  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1192  * -------------------------------------------------------------------------- */
1193
1194 static void
1195 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1196 {
1197     debugTrace (DEBUG_sched,
1198                 "--<< thread %ld (%s) stopped, StackOverflow", 
1199                 (long)t->id, whatNext_strs[t->what_next]);
1200
1201     /* just adjust the stack for this thread, then pop it back
1202      * on the run queue.
1203      */
1204     { 
1205         /* enlarge the stack */
1206         StgTSO *new_t = threadStackOverflow(cap, t);
1207         
1208         /* The TSO attached to this Task may have moved, so update the
1209          * pointer to it.
1210          */
1211         if (task->tso == t) {
1212             task->tso = new_t;
1213         }
1214         pushOnRunQueue(cap,new_t);
1215     }
1216 }
1217
1218 /* -----------------------------------------------------------------------------
1219  * Handle a thread that returned to the scheduler with ThreadYielding
1220  * -------------------------------------------------------------------------- */
1221
1222 static rtsBool
1223 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1224 {
1225     // Reset the context switch flag.  We don't do this just before
1226     // running the thread, because that would mean we would lose ticks
1227     // during GC, which can lead to unfair scheduling (a thread hogs
1228     // the CPU because the tick always arrives during GC).  This way
1229     // penalises threads that do a lot of allocation, but that seems
1230     // better than the alternative.
1231     cap->context_switch = 0;
1232     
1233     /* put the thread back on the run queue.  Then, if we're ready to
1234      * GC, check whether this is the last task to stop.  If so, wake
1235      * up the GC thread.  getThread will block during a GC until the
1236      * GC is finished.
1237      */
1238 #ifdef DEBUG
1239     if (t->what_next != prev_what_next) {
1240         debugTrace(DEBUG_sched,
1241                    "--<< thread %ld (%s) stopped to switch evaluators", 
1242                    (long)t->id, whatNext_strs[t->what_next]);
1243     } else {
1244         debugTrace(DEBUG_sched,
1245                    "--<< thread %ld (%s) stopped, yielding",
1246                    (long)t->id, whatNext_strs[t->what_next]);
1247     }
1248 #endif
1249     
1250     IF_DEBUG(sanity,
1251              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1252              checkTSO(t));
1253     ASSERT(t->_link == END_TSO_QUEUE);
1254     
1255     // Shortcut if we're just switching evaluators: don't bother
1256     // doing stack squeezing (which can be expensive), just run the
1257     // thread.
1258     if (t->what_next != prev_what_next) {
1259         return rtsTrue;
1260     }
1261
1262     addToRunQueue(cap,t);
1263
1264     return rtsFalse;
1265 }
1266
1267 /* -----------------------------------------------------------------------------
1268  * Handle a thread that returned to the scheduler with ThreadBlocked
1269  * -------------------------------------------------------------------------- */
1270
1271 static void
1272 scheduleHandleThreadBlocked( StgTSO *t
1273 #if !defined(GRAN) && !defined(DEBUG)
1274     STG_UNUSED
1275 #endif
1276     )
1277 {
1278
1279       // We don't need to do anything.  The thread is blocked, and it
1280       // has tidied up its stack and placed itself on whatever queue
1281       // it needs to be on.
1282
1283     // ASSERT(t->why_blocked != NotBlocked);
1284     // Not true: for example,
1285     //    - in THREADED_RTS, the thread may already have been woken
1286     //      up by another Capability.  This actually happens: try
1287     //      conc023 +RTS -N2.
1288     //    - the thread may have woken itself up already, because
1289     //      threadPaused() might have raised a blocked throwTo
1290     //      exception, see maybePerformBlockedException().
1291
1292 #ifdef DEBUG
1293     if (traceClass(DEBUG_sched)) {
1294         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1295                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1296         printThreadBlockage(t);
1297         debugTraceEnd();
1298     }
1299 #endif
1300 }
1301
1302 /* -----------------------------------------------------------------------------
1303  * Handle a thread that returned to the scheduler with ThreadFinished
1304  * -------------------------------------------------------------------------- */
1305
1306 static rtsBool
1307 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1308 {
1309     /* Need to check whether this was a main thread, and if so,
1310      * return with the return value.
1311      *
1312      * We also end up here if the thread kills itself with an
1313      * uncaught exception, see Exception.cmm.
1314      */
1315     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1316                (unsigned long)t->id, whatNext_strs[t->what_next]);
1317
1318       //
1319       // Check whether the thread that just completed was a bound
1320       // thread, and if so return with the result.  
1321       //
1322       // There is an assumption here that all thread completion goes
1323       // through this point; we need to make sure that if a thread
1324       // ends up in the ThreadKilled state, that it stays on the run
1325       // queue so it can be dealt with here.
1326       //
1327
1328       if (t->bound) {
1329
1330           if (t->bound != task) {
1331 #if !defined(THREADED_RTS)
1332               // Must be a bound thread that is not the topmost one.  Leave
1333               // it on the run queue until the stack has unwound to the
1334               // point where we can deal with this.  Leaving it on the run
1335               // queue also ensures that the garbage collector knows about
1336               // this thread and its return value (it gets dropped from the
1337               // step->threads list so there's no other way to find it).
1338               appendToRunQueue(cap,t);
1339               return rtsFalse;
1340 #else
1341               // this cannot happen in the threaded RTS, because a
1342               // bound thread can only be run by the appropriate Task.
1343               barf("finished bound thread that isn't mine");
1344 #endif
1345           }
1346
1347           ASSERT(task->tso == t);
1348
1349           if (t->what_next == ThreadComplete) {
1350               if (task->ret) {
1351                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1352                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1353               }
1354               task->stat = Success;
1355           } else {
1356               if (task->ret) {
1357                   *(task->ret) = NULL;
1358               }
1359               if (sched_state >= SCHED_INTERRUPTING) {
1360                   task->stat = Interrupted;
1361               } else {
1362                   task->stat = Killed;
1363               }
1364           }
1365 #ifdef DEBUG
1366           removeThreadLabel((StgWord)task->tso->id);
1367 #endif
1368           return rtsTrue; // tells schedule() to return
1369       }
1370
1371       return rtsFalse;
1372 }
1373
1374 /* -----------------------------------------------------------------------------
1375  * Perform a heap census
1376  * -------------------------------------------------------------------------- */
1377
1378 static rtsBool
1379 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1380 {
1381     // When we have +RTS -i0 and we're heap profiling, do a census at
1382     // every GC.  This lets us get repeatable runs for debugging.
1383     if (performHeapProfile ||
1384         (RtsFlags.ProfFlags.profileInterval==0 &&
1385          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1386         return rtsTrue;
1387     } else {
1388         return rtsFalse;
1389     }
1390 }
1391
1392 /* -----------------------------------------------------------------------------
1393  * Perform a garbage collection if necessary
1394  * -------------------------------------------------------------------------- */
1395
1396 static Capability *
1397 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1398 {
1399     rtsBool heap_census;
1400 #ifdef THREADED_RTS
1401     /* extern static volatile StgWord waiting_for_gc; 
1402        lives inside capability.c */
1403     rtsBool was_waiting;
1404     nat i;
1405 #endif
1406
1407 #ifdef THREADED_RTS
1408     // In order to GC, there must be no threads running Haskell code.
1409     // Therefore, the GC thread needs to hold *all* the capabilities,
1410     // and release them after the GC has completed.  
1411     //
1412     // This seems to be the simplest way: previous attempts involved
1413     // making all the threads with capabilities give up their
1414     // capabilities and sleep except for the *last* one, which
1415     // actually did the GC.  But it's quite hard to arrange for all
1416     // the other tasks to sleep and stay asleep.
1417     //
1418         
1419     /*  Other capabilities are prevented from running yet more Haskell
1420         threads if waiting_for_gc is set. Tested inside
1421         yieldCapability() and releaseCapability() in Capability.c */
1422
1423     was_waiting = cas(&waiting_for_gc, 0, 1);
1424     if (was_waiting) {
1425         do {
1426             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1427             if (cap) yieldCapability(&cap,task);
1428         } while (waiting_for_gc);
1429         return cap;  // NOTE: task->cap might have changed here
1430     }
1431
1432     setContextSwitches();
1433     for (i=0; i < n_capabilities; i++) {
1434         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1435         if (cap != &capabilities[i]) {
1436             Capability *pcap = &capabilities[i];
1437             // we better hope this task doesn't get migrated to
1438             // another Capability while we're waiting for this one.
1439             // It won't, because load balancing happens while we have
1440             // all the Capabilities, but even so it's a slightly
1441             // unsavoury invariant.
1442             task->cap = pcap;
1443             waitForReturnCapability(&pcap, task);
1444             if (pcap != &capabilities[i]) {
1445                 barf("scheduleDoGC: got the wrong capability");
1446             }
1447         }
1448     }
1449
1450     waiting_for_gc = rtsFalse;
1451 #endif
1452
1453     // so this happens periodically:
1454     if (cap) scheduleCheckBlackHoles(cap);
1455     
1456     IF_DEBUG(scheduler, printAllThreads());
1457
1458     /*
1459      * We now have all the capabilities; if we're in an interrupting
1460      * state, then we should take the opportunity to delete all the
1461      * threads in the system.
1462      */
1463     if (sched_state >= SCHED_INTERRUPTING) {
1464         deleteAllThreads(&capabilities[0]);
1465         sched_state = SCHED_SHUTTING_DOWN;
1466     }
1467     
1468     heap_census = scheduleNeedHeapProfile(rtsTrue);
1469
1470     /* everybody back, start the GC.
1471      * Could do it in this thread, or signal a condition var
1472      * to do it in another thread.  Either way, we need to
1473      * broadcast on gc_pending_cond afterward.
1474      */
1475 #if defined(THREADED_RTS)
1476     debugTrace(DEBUG_sched, "doing GC");
1477 #endif
1478     GarbageCollect(force_major || heap_census);
1479     
1480     if (heap_census) {
1481         debugTrace(DEBUG_sched, "performing heap census");
1482         heapCensus();
1483         performHeapProfile = rtsFalse;
1484     }
1485
1486 #if defined(THREADED_RTS)
1487     // release our stash of capabilities.
1488     for (i = 0; i < n_capabilities; i++) {
1489         if (cap != &capabilities[i]) {
1490             task->cap = &capabilities[i];
1491             releaseCapability(&capabilities[i]);
1492         }
1493     }
1494     if (cap) {
1495         task->cap = cap;
1496     } else {
1497         task->cap = NULL;
1498     }
1499 #endif
1500
1501     return cap;
1502 }
1503
1504 /* ---------------------------------------------------------------------------
1505  * Singleton fork(). Do not copy any running threads.
1506  * ------------------------------------------------------------------------- */
1507
1508 pid_t
1509 forkProcess(HsStablePtr *entry
1510 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1511             STG_UNUSED
1512 #endif
1513            )
1514 {
1515 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1516     Task *task;
1517     pid_t pid;
1518     StgTSO* t,*next;
1519     Capability *cap;
1520     nat s;
1521     
1522 #if defined(THREADED_RTS)
1523     if (RtsFlags.ParFlags.nNodes > 1) {
1524         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1525         stg_exit(EXIT_FAILURE);
1526     }
1527 #endif
1528
1529     debugTrace(DEBUG_sched, "forking!");
1530     
1531     // ToDo: for SMP, we should probably acquire *all* the capabilities
1532     cap = rts_lock();
1533     
1534     // no funny business: hold locks while we fork, otherwise if some
1535     // other thread is holding a lock when the fork happens, the data
1536     // structure protected by the lock will forever be in an
1537     // inconsistent state in the child.  See also #1391.
1538     ACQUIRE_LOCK(&sched_mutex);
1539     ACQUIRE_LOCK(&cap->lock);
1540     ACQUIRE_LOCK(&cap->running_task->lock);
1541
1542     pid = fork();
1543     
1544     if (pid) { // parent
1545         
1546         RELEASE_LOCK(&sched_mutex);
1547         RELEASE_LOCK(&cap->lock);
1548         RELEASE_LOCK(&cap->running_task->lock);
1549
1550         // just return the pid
1551         rts_unlock(cap);
1552         return pid;
1553         
1554     } else { // child
1555         
1556 #if defined(THREADED_RTS)
1557         initMutex(&sched_mutex);
1558         initMutex(&cap->lock);
1559         initMutex(&cap->running_task->lock);
1560 #endif
1561
1562         // Now, all OS threads except the thread that forked are
1563         // stopped.  We need to stop all Haskell threads, including
1564         // those involved in foreign calls.  Also we need to delete
1565         // all Tasks, because they correspond to OS threads that are
1566         // now gone.
1567
1568         for (s = 0; s < total_steps; s++) {
1569           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1570             if (t->what_next == ThreadRelocated) {
1571                 next = t->_link;
1572             } else {
1573                 next = t->global_link;
1574                 // don't allow threads to catch the ThreadKilled
1575                 // exception, but we do want to raiseAsync() because these
1576                 // threads may be evaluating thunks that we need later.
1577                 deleteThread_(cap,t);
1578             }
1579           }
1580         }
1581         
1582         // Empty the run queue.  It seems tempting to let all the
1583         // killed threads stay on the run queue as zombies to be
1584         // cleaned up later, but some of them correspond to bound
1585         // threads for which the corresponding Task does not exist.
1586         cap->run_queue_hd = END_TSO_QUEUE;
1587         cap->run_queue_tl = END_TSO_QUEUE;
1588
1589         // Any suspended C-calling Tasks are no more, their OS threads
1590         // don't exist now:
1591         cap->suspended_ccalling_tasks = NULL;
1592
1593         // Empty the threads lists.  Otherwise, the garbage
1594         // collector may attempt to resurrect some of these threads.
1595         for (s = 0; s < total_steps; s++) {
1596             all_steps[s].threads = END_TSO_QUEUE;
1597         }
1598
1599         // Wipe the task list, except the current Task.
1600         ACQUIRE_LOCK(&sched_mutex);
1601         for (task = all_tasks; task != NULL; task=task->all_link) {
1602             if (task != cap->running_task) {
1603 #if defined(THREADED_RTS)
1604                 initMutex(&task->lock); // see #1391
1605 #endif
1606                 discardTask(task);
1607             }
1608         }
1609         RELEASE_LOCK(&sched_mutex);
1610
1611 #if defined(THREADED_RTS)
1612         // Wipe our spare workers list, they no longer exist.  New
1613         // workers will be created if necessary.
1614         cap->spare_workers = NULL;
1615         cap->returning_tasks_hd = NULL;
1616         cap->returning_tasks_tl = NULL;
1617 #endif
1618
1619         // On Unix, all timers are reset in the child, so we need to start
1620         // the timer again.
1621         initTimer();
1622         startTimer();
1623
1624         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1625         rts_checkSchedStatus("forkProcess",cap);
1626         
1627         rts_unlock(cap);
1628         hs_exit();                      // clean up and exit
1629         stg_exit(EXIT_SUCCESS);
1630     }
1631 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1632     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1633     return -1;
1634 #endif
1635 }
1636
1637 /* ---------------------------------------------------------------------------
1638  * Delete all the threads in the system
1639  * ------------------------------------------------------------------------- */
1640    
1641 static void
1642 deleteAllThreads ( Capability *cap )
1643 {
1644     // NOTE: only safe to call if we own all capabilities.
1645
1646     StgTSO* t, *next;
1647     nat s;
1648
1649     debugTrace(DEBUG_sched,"deleting all threads");
1650     for (s = 0; s < total_steps; s++) {
1651       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1652         if (t->what_next == ThreadRelocated) {
1653             next = t->_link;
1654         } else {
1655             next = t->global_link;
1656             deleteThread(cap,t);
1657         }
1658       }
1659     }      
1660
1661     // The run queue now contains a bunch of ThreadKilled threads.  We
1662     // must not throw these away: the main thread(s) will be in there
1663     // somewhere, and the main scheduler loop has to deal with it.
1664     // Also, the run queue is the only thing keeping these threads from
1665     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1666
1667 #if !defined(THREADED_RTS)
1668     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1669     ASSERT(sleeping_queue == END_TSO_QUEUE);
1670 #endif
1671 }
1672
1673 /* -----------------------------------------------------------------------------
1674    Managing the suspended_ccalling_tasks list.
1675    Locks required: sched_mutex
1676    -------------------------------------------------------------------------- */
1677
1678 STATIC_INLINE void
1679 suspendTask (Capability *cap, Task *task)
1680 {
1681     ASSERT(task->next == NULL && task->prev == NULL);
1682     task->next = cap->suspended_ccalling_tasks;
1683     task->prev = NULL;
1684     if (cap->suspended_ccalling_tasks) {
1685         cap->suspended_ccalling_tasks->prev = task;
1686     }
1687     cap->suspended_ccalling_tasks = task;
1688 }
1689
1690 STATIC_INLINE void
1691 recoverSuspendedTask (Capability *cap, Task *task)
1692 {
1693     if (task->prev) {
1694         task->prev->next = task->next;
1695     } else {
1696         ASSERT(cap->suspended_ccalling_tasks == task);
1697         cap->suspended_ccalling_tasks = task->next;
1698     }
1699     if (task->next) {
1700         task->next->prev = task->prev;
1701     }
1702     task->next = task->prev = NULL;
1703 }
1704
1705 /* ---------------------------------------------------------------------------
1706  * Suspending & resuming Haskell threads.
1707  * 
1708  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1709  * its capability before calling the C function.  This allows another
1710  * task to pick up the capability and carry on running Haskell
1711  * threads.  It also means that if the C call blocks, it won't lock
1712  * the whole system.
1713  *
1714  * The Haskell thread making the C call is put to sleep for the
1715  * duration of the call, on the susepended_ccalling_threads queue.  We
1716  * give out a token to the task, which it can use to resume the thread
1717  * on return from the C function.
1718  * ------------------------------------------------------------------------- */
1719    
1720 void *
1721 suspendThread (StgRegTable *reg)
1722 {
1723   Capability *cap;
1724   int saved_errno;
1725   StgTSO *tso;
1726   Task *task;
1727 #if mingw32_HOST_OS
1728   StgWord32 saved_winerror;
1729 #endif
1730
1731   saved_errno = errno;
1732 #if mingw32_HOST_OS
1733   saved_winerror = GetLastError();
1734 #endif
1735
1736   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1737    */
1738   cap = regTableToCapability(reg);
1739
1740   task = cap->running_task;
1741   tso = cap->r.rCurrentTSO;
1742
1743   debugTrace(DEBUG_sched, 
1744              "thread %lu did a safe foreign call", 
1745              (unsigned long)cap->r.rCurrentTSO->id);
1746
1747   // XXX this might not be necessary --SDM
1748   tso->what_next = ThreadRunGHC;
1749
1750   threadPaused(cap,tso);
1751
1752   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1753       tso->why_blocked = BlockedOnCCall;
1754       tso->flags |= TSO_BLOCKEX;
1755       tso->flags &= ~TSO_INTERRUPTIBLE;
1756   } else {
1757       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1758   }
1759
1760   // Hand back capability
1761   task->suspended_tso = tso;
1762
1763   ACQUIRE_LOCK(&cap->lock);
1764
1765   suspendTask(cap,task);
1766   cap->in_haskell = rtsFalse;
1767   releaseCapability_(cap);
1768   
1769   RELEASE_LOCK(&cap->lock);
1770
1771 #if defined(THREADED_RTS)
1772   /* Preparing to leave the RTS, so ensure there's a native thread/task
1773      waiting to take over.
1774   */
1775   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1776 #endif
1777
1778   errno = saved_errno;
1779 #if mingw32_HOST_OS
1780   SetLastError(saved_winerror);
1781 #endif
1782   return task;
1783 }
1784
1785 StgRegTable *
1786 resumeThread (void *task_)
1787 {
1788     StgTSO *tso;
1789     Capability *cap;
1790     Task *task = task_;
1791     int saved_errno;
1792 #if mingw32_HOST_OS
1793     StgWord32 saved_winerror;
1794 #endif
1795
1796     saved_errno = errno;
1797 #if mingw32_HOST_OS
1798     saved_winerror = GetLastError();
1799 #endif
1800
1801     cap = task->cap;
1802     // Wait for permission to re-enter the RTS with the result.
1803     waitForReturnCapability(&cap,task);
1804     // we might be on a different capability now... but if so, our
1805     // entry on the suspended_ccalling_tasks list will also have been
1806     // migrated.
1807
1808     // Remove the thread from the suspended list
1809     recoverSuspendedTask(cap,task);
1810
1811     tso = task->suspended_tso;
1812     task->suspended_tso = NULL;
1813     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1814     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1815     
1816     if (tso->why_blocked == BlockedOnCCall) {
1817         awakenBlockedExceptionQueue(cap,tso);
1818         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1819     }
1820     
1821     /* Reset blocking status */
1822     tso->why_blocked  = NotBlocked;
1823     
1824     cap->r.rCurrentTSO = tso;
1825     cap->in_haskell = rtsTrue;
1826     errno = saved_errno;
1827 #if mingw32_HOST_OS
1828     SetLastError(saved_winerror);
1829 #endif
1830
1831     /* We might have GC'd, mark the TSO dirty again */
1832     dirty_TSO(cap,tso);
1833
1834     IF_DEBUG(sanity, checkTSO(tso));
1835
1836     return &cap->r;
1837 }
1838
1839 /* ---------------------------------------------------------------------------
1840  * scheduleThread()
1841  *
1842  * scheduleThread puts a thread on the end  of the runnable queue.
1843  * This will usually be done immediately after a thread is created.
1844  * The caller of scheduleThread must create the thread using e.g.
1845  * createThread and push an appropriate closure
1846  * on this thread's stack before the scheduler is invoked.
1847  * ------------------------------------------------------------------------ */
1848
1849 void
1850 scheduleThread(Capability *cap, StgTSO *tso)
1851 {
1852     // The thread goes at the *end* of the run-queue, to avoid possible
1853     // starvation of any threads already on the queue.
1854     appendToRunQueue(cap,tso);
1855 }
1856
1857 void
1858 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1859 {
1860 #if defined(THREADED_RTS)
1861     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1862                               // move this thread from now on.
1863     cpu %= RtsFlags.ParFlags.nNodes;
1864     if (cpu == cap->no) {
1865         appendToRunQueue(cap,tso);
1866     } else {
1867         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1868     }
1869 #else
1870     appendToRunQueue(cap,tso);
1871 #endif
1872 }
1873
1874 Capability *
1875 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1876 {
1877     Task *task;
1878
1879     // We already created/initialised the Task
1880     task = cap->running_task;
1881
1882     // This TSO is now a bound thread; make the Task and TSO
1883     // point to each other.
1884     tso->bound = task;
1885     tso->cap = cap;
1886
1887     task->tso = tso;
1888     task->ret = ret;
1889     task->stat = NoStatus;
1890
1891     appendToRunQueue(cap,tso);
1892
1893     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1894
1895     cap = schedule(cap,task);
1896
1897     ASSERT(task->stat != NoStatus);
1898     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1899
1900     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1901     return cap;
1902 }
1903
1904 /* ----------------------------------------------------------------------------
1905  * Starting Tasks
1906  * ------------------------------------------------------------------------- */
1907
1908 #if defined(THREADED_RTS)
1909 void OSThreadProcAttr
1910 workerStart(Task *task)
1911 {
1912     Capability *cap;
1913
1914     // See startWorkerTask().
1915     ACQUIRE_LOCK(&task->lock);
1916     cap = task->cap;
1917     RELEASE_LOCK(&task->lock);
1918
1919     // set the thread-local pointer to the Task:
1920     taskEnter(task);
1921
1922     // schedule() runs without a lock.
1923     cap = schedule(cap,task);
1924
1925     // On exit from schedule(), we have a Capability.
1926     releaseCapability(cap);
1927     workerTaskStop(task);
1928 }
1929 #endif
1930
1931 /* ---------------------------------------------------------------------------
1932  * initScheduler()
1933  *
1934  * Initialise the scheduler.  This resets all the queues - if the
1935  * queues contained any threads, they'll be garbage collected at the
1936  * next pass.
1937  *
1938  * ------------------------------------------------------------------------ */
1939
1940 void 
1941 initScheduler(void)
1942 {
1943 #if !defined(THREADED_RTS)
1944   blocked_queue_hd  = END_TSO_QUEUE;
1945   blocked_queue_tl  = END_TSO_QUEUE;
1946   sleeping_queue    = END_TSO_QUEUE;
1947 #endif
1948
1949   blackhole_queue   = END_TSO_QUEUE;
1950
1951   sched_state    = SCHED_RUNNING;
1952   recent_activity = ACTIVITY_YES;
1953
1954 #if defined(THREADED_RTS)
1955   /* Initialise the mutex and condition variables used by
1956    * the scheduler. */
1957   initMutex(&sched_mutex);
1958 #endif
1959   
1960   ACQUIRE_LOCK(&sched_mutex);
1961
1962   /* A capability holds the state a native thread needs in
1963    * order to execute STG code. At least one capability is
1964    * floating around (only THREADED_RTS builds have more than one).
1965    */
1966   initCapabilities();
1967
1968   initTaskManager();
1969
1970 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
1971   initSparkPools();
1972 #endif
1973
1974 #if defined(THREADED_RTS)
1975   /*
1976    * Eagerly start one worker to run each Capability, except for
1977    * Capability 0.  The idea is that we're probably going to start a
1978    * bound thread on Capability 0 pretty soon, so we don't want a
1979    * worker task hogging it.
1980    */
1981   { 
1982       nat i;
1983       Capability *cap;
1984       for (i = 1; i < n_capabilities; i++) {
1985           cap = &capabilities[i];
1986           ACQUIRE_LOCK(&cap->lock);
1987           startWorkerTask(cap, workerStart);
1988           RELEASE_LOCK(&cap->lock);
1989       }
1990   }
1991 #endif
1992
1993   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
1994
1995   RELEASE_LOCK(&sched_mutex);
1996 }
1997
1998 void
1999 exitScheduler(
2000     rtsBool wait_foreign
2001 #if !defined(THREADED_RTS)
2002                          __attribute__((unused))
2003 #endif
2004 )
2005                /* see Capability.c, shutdownCapability() */
2006 {
2007     Task *task = NULL;
2008
2009 #if defined(THREADED_RTS)
2010     ACQUIRE_LOCK(&sched_mutex);
2011     task = newBoundTask();
2012     RELEASE_LOCK(&sched_mutex);
2013 #endif
2014
2015     // If we haven't killed all the threads yet, do it now.
2016     if (sched_state < SCHED_SHUTTING_DOWN) {
2017         sched_state = SCHED_INTERRUPTING;
2018         scheduleDoGC(NULL,task,rtsFalse);    
2019     }
2020     sched_state = SCHED_SHUTTING_DOWN;
2021
2022 #if defined(THREADED_RTS)
2023     { 
2024         nat i;
2025         
2026         for (i = 0; i < n_capabilities; i++) {
2027             shutdownCapability(&capabilities[i], task, wait_foreign);
2028         }
2029         boundTaskExiting(task);
2030         stopTaskManager();
2031     }
2032 #else
2033     freeCapability(&MainCapability);
2034 #endif
2035 }
2036
2037 void
2038 freeScheduler( void )
2039 {
2040     freeTaskManager();
2041     if (n_capabilities != 1) {
2042         stgFree(capabilities);
2043     }
2044 #if defined(THREADED_RTS)
2045     closeMutex(&sched_mutex);
2046 #endif
2047 }
2048
2049 /* -----------------------------------------------------------------------------
2050    performGC
2051
2052    This is the interface to the garbage collector from Haskell land.
2053    We provide this so that external C code can allocate and garbage
2054    collect when called from Haskell via _ccall_GC.
2055    -------------------------------------------------------------------------- */
2056
2057 static void
2058 performGC_(rtsBool force_major)
2059 {
2060     Task *task;
2061     // We must grab a new Task here, because the existing Task may be
2062     // associated with a particular Capability, and chained onto the 
2063     // suspended_ccalling_tasks queue.
2064     ACQUIRE_LOCK(&sched_mutex);
2065     task = newBoundTask();
2066     RELEASE_LOCK(&sched_mutex);
2067     scheduleDoGC(NULL,task,force_major);
2068     boundTaskExiting(task);
2069 }
2070
2071 void
2072 performGC(void)
2073 {
2074     performGC_(rtsFalse);
2075 }
2076
2077 void
2078 performMajorGC(void)
2079 {
2080     performGC_(rtsTrue);
2081 }
2082
2083 /* -----------------------------------------------------------------------------
2084    Stack overflow
2085
2086    If the thread has reached its maximum stack size, then raise the
2087    StackOverflow exception in the offending thread.  Otherwise
2088    relocate the TSO into a larger chunk of memory and adjust its stack
2089    size appropriately.
2090    -------------------------------------------------------------------------- */
2091
2092 static StgTSO *
2093 threadStackOverflow(Capability *cap, StgTSO *tso)
2094 {
2095   nat new_stack_size, stack_words;
2096   lnat new_tso_size;
2097   StgPtr new_sp;
2098   StgTSO *dest;
2099
2100   IF_DEBUG(sanity,checkTSO(tso));
2101
2102   // don't allow throwTo() to modify the blocked_exceptions queue
2103   // while we are moving the TSO:
2104   lockClosure((StgClosure *)tso);
2105
2106   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2107       // NB. never raise a StackOverflow exception if the thread is
2108       // inside Control.Exceptino.block.  It is impractical to protect
2109       // against stack overflow exceptions, since virtually anything
2110       // can raise one (even 'catch'), so this is the only sensible
2111       // thing to do here.  See bug #767.
2112
2113       debugTrace(DEBUG_gc,
2114                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2115                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2116       IF_DEBUG(gc,
2117                /* If we're debugging, just print out the top of the stack */
2118                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2119                                                 tso->sp+64)));
2120
2121       // Send this thread the StackOverflow exception
2122       unlockTSO(tso);
2123       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2124       return tso;
2125   }
2126
2127   /* Try to double the current stack size.  If that takes us over the
2128    * maximum stack size for this thread, then use the maximum instead.
2129    * Finally round up so the TSO ends up as a whole number of blocks.
2130    */
2131   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2132   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2133                                        TSO_STRUCT_SIZE)/sizeof(W_);
2134   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2135   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2136
2137   debugTrace(DEBUG_sched, 
2138              "increasing stack size from %ld words to %d.",
2139              (long)tso->stack_size, new_stack_size);
2140
2141   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2142   TICK_ALLOC_TSO(new_stack_size,0);
2143
2144   /* copy the TSO block and the old stack into the new area */
2145   memcpy(dest,tso,TSO_STRUCT_SIZE);
2146   stack_words = tso->stack + tso->stack_size - tso->sp;
2147   new_sp = (P_)dest + new_tso_size - stack_words;
2148   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2149
2150   /* relocate the stack pointers... */
2151   dest->sp         = new_sp;
2152   dest->stack_size = new_stack_size;
2153         
2154   /* Mark the old TSO as relocated.  We have to check for relocated
2155    * TSOs in the garbage collector and any primops that deal with TSOs.
2156    *
2157    * It's important to set the sp value to just beyond the end
2158    * of the stack, so we don't attempt to scavenge any part of the
2159    * dead TSO's stack.
2160    */
2161   tso->what_next = ThreadRelocated;
2162   setTSOLink(cap,tso,dest);
2163   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2164   tso->why_blocked = NotBlocked;
2165
2166   IF_PAR_DEBUG(verbose,
2167                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2168                      tso->id, tso, tso->stack_size);
2169                /* If we're debugging, just print out the top of the stack */
2170                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2171                                                 tso->sp+64)));
2172   
2173   unlockTSO(dest);
2174   unlockTSO(tso);
2175
2176   IF_DEBUG(sanity,checkTSO(dest));
2177 #if 0
2178   IF_DEBUG(scheduler,printTSO(dest));
2179 #endif
2180
2181   return dest;
2182 }
2183
2184 static StgTSO *
2185 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2186 {
2187     bdescr *bd, *new_bd;
2188     lnat free_w, tso_size_w;
2189     StgTSO *new_tso;
2190
2191     tso_size_w = tso_sizeW(tso);
2192
2193     if (tso_size_w < MBLOCK_SIZE_W || 
2194         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2195     {
2196         return tso;
2197     }
2198
2199     // don't allow throwTo() to modify the blocked_exceptions queue
2200     // while we are moving the TSO:
2201     lockClosure((StgClosure *)tso);
2202
2203     // this is the number of words we'll free
2204     free_w = round_to_mblocks(tso_size_w/2);
2205
2206     bd = Bdescr((StgPtr)tso);
2207     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2208     bd->free = bd->start + TSO_STRUCT_SIZEW;
2209
2210     new_tso = (StgTSO *)new_bd->start;
2211     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2212     new_tso->stack_size = new_bd->free - new_tso->stack;
2213
2214     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2215                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2216
2217     tso->what_next = ThreadRelocated;
2218     tso->_link = new_tso; // no write barrier reqd: same generation
2219
2220     // The TSO attached to this Task may have moved, so update the
2221     // pointer to it.
2222     if (task->tso == tso) {
2223         task->tso = new_tso;
2224     }
2225
2226     unlockTSO(new_tso);
2227     unlockTSO(tso);
2228
2229     IF_DEBUG(sanity,checkTSO(new_tso));
2230
2231     return new_tso;
2232 }
2233
2234 /* ---------------------------------------------------------------------------
2235    Interrupt execution
2236    - usually called inside a signal handler so it mustn't do anything fancy.   
2237    ------------------------------------------------------------------------ */
2238
2239 void
2240 interruptStgRts(void)
2241 {
2242     sched_state = SCHED_INTERRUPTING;
2243     setContextSwitches();
2244     wakeUpRts();
2245 }
2246
2247 /* -----------------------------------------------------------------------------
2248    Wake up the RTS
2249    
2250    This function causes at least one OS thread to wake up and run the
2251    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2252    an external event has arrived that may need servicing (eg. a
2253    keyboard interrupt).
2254
2255    In the single-threaded RTS we don't do anything here; we only have
2256    one thread anyway, and the event that caused us to want to wake up
2257    will have interrupted any blocking system call in progress anyway.
2258    -------------------------------------------------------------------------- */
2259
2260 void
2261 wakeUpRts(void)
2262 {
2263 #if defined(THREADED_RTS)
2264     // This forces the IO Manager thread to wakeup, which will
2265     // in turn ensure that some OS thread wakes up and runs the
2266     // scheduler loop, which will cause a GC and deadlock check.
2267     ioManagerWakeup();
2268 #endif
2269 }
2270
2271 /* -----------------------------------------------------------------------------
2272  * checkBlackHoles()
2273  *
2274  * Check the blackhole_queue for threads that can be woken up.  We do
2275  * this periodically: before every GC, and whenever the run queue is
2276  * empty.
2277  *
2278  * An elegant solution might be to just wake up all the blocked
2279  * threads with awakenBlockedQueue occasionally: they'll go back to
2280  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2281  * doesn't give us a way to tell whether we've actually managed to
2282  * wake up any threads, so we would be busy-waiting.
2283  *
2284  * -------------------------------------------------------------------------- */
2285
2286 static rtsBool
2287 checkBlackHoles (Capability *cap)
2288 {
2289     StgTSO **prev, *t;
2290     rtsBool any_woke_up = rtsFalse;
2291     StgHalfWord type;
2292
2293     // blackhole_queue is global:
2294     ASSERT_LOCK_HELD(&sched_mutex);
2295
2296     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2297
2298     // ASSUMES: sched_mutex
2299     prev = &blackhole_queue;
2300     t = blackhole_queue;
2301     while (t != END_TSO_QUEUE) {
2302         ASSERT(t->why_blocked == BlockedOnBlackHole);
2303         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2304         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2305             IF_DEBUG(sanity,checkTSO(t));
2306             t = unblockOne(cap, t);
2307             *prev = t;
2308             any_woke_up = rtsTrue;
2309         } else {
2310             prev = &t->_link;
2311             t = t->_link;
2312         }
2313     }
2314
2315     return any_woke_up;
2316 }
2317
2318 /* -----------------------------------------------------------------------------
2319    Deleting threads
2320
2321    This is used for interruption (^C) and forking, and corresponds to
2322    raising an exception but without letting the thread catch the
2323    exception.
2324    -------------------------------------------------------------------------- */
2325
2326 static void 
2327 deleteThread (Capability *cap, StgTSO *tso)
2328 {
2329     // NOTE: must only be called on a TSO that we have exclusive
2330     // access to, because we will call throwToSingleThreaded() below.
2331     // The TSO must be on the run queue of the Capability we own, or 
2332     // we must own all Capabilities.
2333
2334     if (tso->why_blocked != BlockedOnCCall &&
2335         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2336         throwToSingleThreaded(cap,tso,NULL);
2337     }
2338 }
2339
2340 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2341 static void 
2342 deleteThread_(Capability *cap, StgTSO *tso)
2343 { // for forkProcess only:
2344   // like deleteThread(), but we delete threads in foreign calls, too.
2345
2346     if (tso->why_blocked == BlockedOnCCall ||
2347         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2348         unblockOne(cap,tso);
2349         tso->what_next = ThreadKilled;
2350     } else {
2351         deleteThread(cap,tso);
2352     }
2353 }
2354 #endif
2355
2356 /* -----------------------------------------------------------------------------
2357    raiseExceptionHelper
2358    
2359    This function is called by the raise# primitve, just so that we can
2360    move some of the tricky bits of raising an exception from C-- into
2361    C.  Who knows, it might be a useful re-useable thing here too.
2362    -------------------------------------------------------------------------- */
2363
2364 StgWord
2365 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2366 {
2367     Capability *cap = regTableToCapability(reg);
2368     StgThunk *raise_closure = NULL;
2369     StgPtr p, next;
2370     StgRetInfoTable *info;
2371     //
2372     // This closure represents the expression 'raise# E' where E
2373     // is the exception raise.  It is used to overwrite all the
2374     // thunks which are currently under evaluataion.
2375     //
2376
2377     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2378     // LDV profiling: stg_raise_info has THUNK as its closure
2379     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2380     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2381     // 1 does not cause any problem unless profiling is performed.
2382     // However, when LDV profiling goes on, we need to linearly scan
2383     // small object pool, where raise_closure is stored, so we should
2384     // use MIN_UPD_SIZE.
2385     //
2386     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2387     //                                 sizeofW(StgClosure)+1);
2388     //
2389
2390     //
2391     // Walk up the stack, looking for the catch frame.  On the way,
2392     // we update any closures pointed to from update frames with the
2393     // raise closure that we just built.
2394     //
2395     p = tso->sp;
2396     while(1) {
2397         info = get_ret_itbl((StgClosure *)p);
2398         next = p + stack_frame_sizeW((StgClosure *)p);
2399         switch (info->i.type) {
2400             
2401         case UPDATE_FRAME:
2402             // Only create raise_closure if we need to.
2403             if (raise_closure == NULL) {
2404                 raise_closure = 
2405                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2406                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2407                 raise_closure->payload[0] = exception;
2408             }
2409             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2410             p = next;
2411             continue;
2412
2413         case ATOMICALLY_FRAME:
2414             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2415             tso->sp = p;
2416             return ATOMICALLY_FRAME;
2417             
2418         case CATCH_FRAME:
2419             tso->sp = p;
2420             return CATCH_FRAME;
2421
2422         case CATCH_STM_FRAME:
2423             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2424             tso->sp = p;
2425             return CATCH_STM_FRAME;
2426             
2427         case STOP_FRAME:
2428             tso->sp = p;
2429             return STOP_FRAME;
2430
2431         case CATCH_RETRY_FRAME:
2432         default:
2433             p = next; 
2434             continue;
2435         }
2436     }
2437 }
2438
2439
2440 /* -----------------------------------------------------------------------------
2441    findRetryFrameHelper
2442
2443    This function is called by the retry# primitive.  It traverses the stack
2444    leaving tso->sp referring to the frame which should handle the retry.  
2445
2446    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2447    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2448
2449    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2450    create) because retries are not considered to be exceptions, despite the
2451    similar implementation.
2452
2453    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2454    not be created within memory transactions.
2455    -------------------------------------------------------------------------- */
2456
2457 StgWord
2458 findRetryFrameHelper (StgTSO *tso)
2459 {
2460   StgPtr           p, next;
2461   StgRetInfoTable *info;
2462
2463   p = tso -> sp;
2464   while (1) {
2465     info = get_ret_itbl((StgClosure *)p);
2466     next = p + stack_frame_sizeW((StgClosure *)p);
2467     switch (info->i.type) {
2468       
2469     case ATOMICALLY_FRAME:
2470         debugTrace(DEBUG_stm,
2471                    "found ATOMICALLY_FRAME at %p during retry", p);
2472         tso->sp = p;
2473         return ATOMICALLY_FRAME;
2474       
2475     case CATCH_RETRY_FRAME:
2476         debugTrace(DEBUG_stm,
2477                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2478         tso->sp = p;
2479         return CATCH_RETRY_FRAME;
2480       
2481     case CATCH_STM_FRAME: {
2482         StgTRecHeader *trec = tso -> trec;
2483         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2484         debugTrace(DEBUG_stm,
2485                    "found CATCH_STM_FRAME at %p during retry", p);
2486         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2487         stmAbortTransaction(tso -> cap, trec);
2488         stmFreeAbortedTRec(tso -> cap, trec);
2489         tso -> trec = outer;
2490         p = next; 
2491         continue;
2492     }
2493       
2494
2495     default:
2496       ASSERT(info->i.type != CATCH_FRAME);
2497       ASSERT(info->i.type != STOP_FRAME);
2498       p = next; 
2499       continue;
2500     }
2501   }
2502 }
2503
2504 /* -----------------------------------------------------------------------------
2505    resurrectThreads is called after garbage collection on the list of
2506    threads found to be garbage.  Each of these threads will be woken
2507    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2508    on an MVar, or NonTermination if the thread was blocked on a Black
2509    Hole.
2510
2511    Locks: assumes we hold *all* the capabilities.
2512    -------------------------------------------------------------------------- */
2513
2514 void
2515 resurrectThreads (StgTSO *threads)
2516 {
2517     StgTSO *tso, *next;
2518     Capability *cap;
2519     step *step;
2520
2521     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2522         next = tso->global_link;
2523
2524         step = Bdescr((P_)tso)->step;
2525         tso->global_link = step->threads;
2526         step->threads = tso;
2527
2528         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2529         
2530         // Wake up the thread on the Capability it was last on
2531         cap = tso->cap;
2532         
2533         switch (tso->why_blocked) {
2534         case BlockedOnMVar:
2535         case BlockedOnException:
2536             /* Called by GC - sched_mutex lock is currently held. */
2537             throwToSingleThreaded(cap, tso,
2538                                   (StgClosure *)blockedOnDeadMVar_closure);
2539             break;
2540         case BlockedOnBlackHole:
2541             throwToSingleThreaded(cap, tso,
2542                                   (StgClosure *)nonTermination_closure);
2543             break;
2544         case BlockedOnSTM:
2545             throwToSingleThreaded(cap, tso,
2546                                   (StgClosure *)blockedIndefinitely_closure);
2547             break;
2548         case NotBlocked:
2549             /* This might happen if the thread was blocked on a black hole
2550              * belonging to a thread that we've just woken up (raiseAsync
2551              * can wake up threads, remember...).
2552              */
2553             continue;
2554         default:
2555             barf("resurrectThreads: thread blocked in a strange way");
2556         }
2557     }
2558 }
2559
2560 /* -----------------------------------------------------------------------------
2561    performPendingThrowTos is called after garbage collection, and
2562    passed a list of threads that were found to have pending throwTos
2563    (tso->blocked_exceptions was not empty), and were blocked.
2564    Normally this doesn't happen, because we would deliver the
2565    exception directly if the target thread is blocked, but there are
2566    small windows where it might occur on a multiprocessor (see
2567    throwTo()).
2568
2569    NB. we must be holding all the capabilities at this point, just
2570    like resurrectThreads().
2571    -------------------------------------------------------------------------- */
2572
2573 void
2574 performPendingThrowTos (StgTSO *threads)
2575 {
2576     StgTSO *tso, *next;
2577     Capability *cap;
2578     step *step;
2579
2580     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2581         next = tso->global_link;
2582
2583         step = Bdescr((P_)tso)->step;
2584         tso->global_link = step->threads;
2585         step->threads = tso;
2586
2587         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2588         
2589         cap = tso->cap;
2590         maybePerformBlockedException(cap, tso);
2591     }
2592 }