Use OSThreadProcAttr for workerStart
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag set by signal handler to precipitate a context switch
93  * LOCK: none (just an advisory flag)
94  */
95 int context_switch = 0;
96
97 /* flag that tracks whether we have done any execution in this time slice.
98  * LOCK: currently none, perhaps we should lock (but needs to be
99  * updated in the fast path of the scheduler).
100  */
101 nat recent_activity = ACTIVITY_YES;
102
103 /* if this flag is set as well, give up execution
104  * LOCK: none (changes once, from false->true)
105  */
106 rtsBool sched_state = SCHED_RUNNING;
107
108 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
109  *  exists - earlier gccs apparently didn't.
110  *  -= chak
111  */
112 StgTSO dummy_tso;
113
114 /*
115  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
116  * in an MT setting, needed to signal that a worker thread shouldn't hang around
117  * in the scheduler when it is out of work.
118  */
119 rtsBool shutting_down_scheduler = rtsFalse;
120
121 /*
122  * This mutex protects most of the global scheduler data in
123  * the THREADED_RTS runtime.
124  */
125 #if defined(THREADED_RTS)
126 Mutex sched_mutex;
127 #endif
128
129 #if !defined(mingw32_HOST_OS)
130 #define FORKPROCESS_PRIMOP_SUPPORTED
131 #endif
132
133 /* -----------------------------------------------------------------------------
134  * static function prototypes
135  * -------------------------------------------------------------------------- */
136
137 static Capability *schedule (Capability *initialCapability, Task *task);
138
139 //
140 // These function all encapsulate parts of the scheduler loop, and are
141 // abstracted only to make the structure and control flow of the
142 // scheduler clearer.
143 //
144 static void schedulePreLoop (void);
145 #if defined(THREADED_RTS)
146 static void schedulePushWork(Capability *cap, Task *task);
147 #endif
148 static void scheduleStartSignalHandlers (Capability *cap);
149 static void scheduleCheckBlockedThreads (Capability *cap);
150 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
151 static void scheduleCheckBlackHoles (Capability *cap);
152 static void scheduleDetectDeadlock (Capability *cap, Task *task);
153 #if defined(PARALLEL_HASKELL)
154 static rtsBool scheduleGetRemoteWork(Capability *cap);
155 static void scheduleSendPendingMessages(void);
156 static void scheduleActivateSpark(Capability *cap);
157 #endif
158 static void schedulePostRunThread(StgTSO *t);
159 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
160 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
161                                          StgTSO *t);
162 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
163                                     nat prev_what_next );
164 static void scheduleHandleThreadBlocked( StgTSO *t );
165 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
166                                              StgTSO *t );
167 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
168 static Capability *scheduleDoGC(Capability *cap, Task *task,
169                                 rtsBool force_major);
170
171 static rtsBool checkBlackHoles(Capability *cap);
172
173 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
174 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
175
176 static void deleteThread (Capability *cap, StgTSO *tso);
177 static void deleteAllThreads (Capability *cap);
178
179 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
180 static void deleteThread_(Capability *cap, StgTSO *tso);
181 #endif
182
183 #ifdef DEBUG
184 static char *whatNext_strs[] = {
185   "(unknown)",
186   "ThreadRunGHC",
187   "ThreadInterpret",
188   "ThreadKilled",
189   "ThreadRelocated",
190   "ThreadComplete"
191 };
192 #endif
193
194 /* -----------------------------------------------------------------------------
195  * Putting a thread on the run queue: different scheduling policies
196  * -------------------------------------------------------------------------- */
197
198 STATIC_INLINE void
199 addToRunQueue( Capability *cap, StgTSO *t )
200 {
201 #if defined(PARALLEL_HASKELL)
202     if (RtsFlags.ParFlags.doFairScheduling) { 
203         // this does round-robin scheduling; good for concurrency
204         appendToRunQueue(cap,t);
205     } else {
206         // this does unfair scheduling; good for parallelism
207         pushOnRunQueue(cap,t);
208     }
209 #else
210     // this does round-robin scheduling; good for concurrency
211     appendToRunQueue(cap,t);
212 #endif
213 }
214
215 /* ---------------------------------------------------------------------------
216    Main scheduling loop.
217
218    We use round-robin scheduling, each thread returning to the
219    scheduler loop when one of these conditions is detected:
220
221       * out of heap space
222       * timer expires (thread yields)
223       * thread blocks
224       * thread ends
225       * stack overflow
226
227    GRAN version:
228      In a GranSim setup this loop iterates over the global event queue.
229      This revolves around the global event queue, which determines what 
230      to do next. Therefore, it's more complicated than either the 
231      concurrent or the parallel (GUM) setup.
232   This version has been entirely removed (JB 2008/08).
233
234    GUM version:
235      GUM iterates over incoming messages.
236      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
237      and sends out a fish whenever it has nothing to do; in-between
238      doing the actual reductions (shared code below) it processes the
239      incoming messages and deals with delayed operations 
240      (see PendingFetches).
241      This is not the ugliest code you could imagine, but it's bloody close.
242
243   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
244   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
245   as well as future GUM versions. This file has been refurbished to
246   only contain valid code, which is however incomplete, refers to
247   invalid includes etc.
248
249    ------------------------------------------------------------------------ */
250
251 static Capability *
252 schedule (Capability *initialCapability, Task *task)
253 {
254   StgTSO *t;
255   Capability *cap;
256   StgThreadReturnCode ret;
257 #if defined(PARALLEL_HASKELL)
258   rtsBool receivedFinish = rtsFalse;
259 #endif
260   nat prev_what_next;
261   rtsBool ready_to_gc;
262 #if defined(THREADED_RTS)
263   rtsBool first = rtsTrue;
264 #endif
265   
266   cap = initialCapability;
267
268   // Pre-condition: this task owns initialCapability.
269   // The sched_mutex is *NOT* held
270   // NB. on return, we still hold a capability.
271
272   debugTrace (DEBUG_sched, 
273               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
274               task, initialCapability);
275
276   schedulePreLoop();
277
278   // -----------------------------------------------------------
279   // Scheduler loop starts here:
280
281 #if defined(PARALLEL_HASKELL)
282 #define TERMINATION_CONDITION        (!receivedFinish)
283 #else
284 #define TERMINATION_CONDITION        rtsTrue
285 #endif
286
287   while (TERMINATION_CONDITION) {
288
289 #if defined(THREADED_RTS)
290       if (first) {
291           // don't yield the first time, we want a chance to run this
292           // thread for a bit, even if there are others banging at the
293           // door.
294           first = rtsFalse;
295           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
296       } else {
297           // Yield the capability to higher-priority tasks if necessary.
298           yieldCapability(&cap, task);
299       }
300 #endif
301       
302 #if defined(THREADED_RTS)
303       schedulePushWork(cap,task);
304 #endif
305
306     // Check whether we have re-entered the RTS from Haskell without
307     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
308     // call).
309     if (cap->in_haskell) {
310           errorBelch("schedule: re-entered unsafely.\n"
311                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
312           stg_exit(EXIT_FAILURE);
313     }
314
315     // The interruption / shutdown sequence.
316     // 
317     // In order to cleanly shut down the runtime, we want to:
318     //   * make sure that all main threads return to their callers
319     //     with the state 'Interrupted'.
320     //   * clean up all OS threads assocated with the runtime
321     //   * free all memory etc.
322     //
323     // So the sequence for ^C goes like this:
324     //
325     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
326     //     arranges for some Capability to wake up
327     //
328     //   * all threads in the system are halted, and the zombies are
329     //     placed on the run queue for cleaning up.  We acquire all
330     //     the capabilities in order to delete the threads, this is
331     //     done by scheduleDoGC() for convenience (because GC already
332     //     needs to acquire all the capabilities).  We can't kill
333     //     threads involved in foreign calls.
334     // 
335     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
336     //
337     //   * sched_state := SCHED_SHUTTING_DOWN
338     //
339     //   * all workers exit when the run queue on their capability
340     //     drains.  All main threads will also exit when their TSO
341     //     reaches the head of the run queue and they can return.
342     //
343     //   * eventually all Capabilities will shut down, and the RTS can
344     //     exit.
345     //
346     //   * We might be left with threads blocked in foreign calls, 
347     //     we should really attempt to kill these somehow (TODO);
348     
349     switch (sched_state) {
350     case SCHED_RUNNING:
351         break;
352     case SCHED_INTERRUPTING:
353         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
354 #if defined(THREADED_RTS)
355         discardSparksCap(cap);
356 #endif
357         /* scheduleDoGC() deletes all the threads */
358         cap = scheduleDoGC(cap,task,rtsFalse);
359         break;
360     case SCHED_SHUTTING_DOWN:
361         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
362         // If we are a worker, just exit.  If we're a bound thread
363         // then we will exit below when we've removed our TSO from
364         // the run queue.
365         if (task->tso == NULL && emptyRunQueue(cap)) {
366             return cap;
367         }
368         break;
369     default:
370         barf("sched_state: %d", sched_state);
371     }
372
373 #if defined(THREADED_RTS)
374     // If the run queue is empty, take a spark and turn it into a thread.
375     {
376         if (emptyRunQueue(cap)) {
377             StgClosure *spark;
378             spark = findSpark(cap);
379             if (spark != NULL) {
380                 debugTrace(DEBUG_sched,
381                            "turning spark of closure %p into a thread",
382                            (StgClosure *)spark);
383                 createSparkThread(cap,spark);     
384             }
385         }
386     }
387 #endif // THREADED_RTS
388
389     scheduleStartSignalHandlers(cap);
390
391     // Only check the black holes here if we've nothing else to do.
392     // During normal execution, the black hole list only gets checked
393     // at GC time, to avoid repeatedly traversing this possibly long
394     // list each time around the scheduler.
395     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
396
397     scheduleCheckWakeupThreads(cap);
398
399     scheduleCheckBlockedThreads(cap);
400
401 #if defined(PARALLEL_HASKELL)
402     /* message processing and work distribution goes here */ 
403
404     /* if messages have been buffered... a NOOP in THREADED_RTS */
405     scheduleSendPendingMessages();
406
407     /* If the run queue is empty,...*/
408     if (emptyRunQueue(cap)) {
409       /* ...take one of our own sparks and turn it into a thread */
410       scheduleActivateSpark(cap);
411
412         /* if this did not work, try to steal a spark from someone else */
413       if (emptyRunQueue(cap)) {
414         receivedFinish = scheduleGetRemoteWork(cap);
415         continue; //  a new round, (hopefully) with new work
416         /* 
417            in GUM, this a) sends out a FISH and returns IF no fish is
418                            out already
419                         b) (blocking) awaits and receives messages
420            
421            in Eden, this is only the blocking receive, as b) in GUM.
422         */
423       }
424     } 
425
426     /* since we perform a blocking receive and continue otherwise,
427        either we never reach here or we definitely have work! */
428     // from here: non-empty run queue
429     ASSERT(!emptyRunQueue(cap));
430
431     if (PacketsWaiting()) {  /* now process incoming messages, if any
432                                 pending...  
433
434                                 CAUTION: scheduleGetRemoteWork called
435                                 above, waits for messages as well! */
436       processMessages(cap, &receivedFinish);
437     }
438 #endif // PARALLEL_HASKELL
439
440     scheduleDetectDeadlock(cap,task);
441 #if defined(THREADED_RTS)
442     cap = task->cap;    // reload cap, it might have changed
443 #endif
444
445     // Normally, the only way we can get here with no threads to
446     // run is if a keyboard interrupt received during 
447     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
448     // Additionally, it is not fatal for the
449     // threaded RTS to reach here with no threads to run.
450     //
451     // win32: might be here due to awaitEvent() being abandoned
452     // as a result of a console event having been delivered.
453     if ( emptyRunQueue(cap) ) {
454 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
455         ASSERT(sched_state >= SCHED_INTERRUPTING);
456 #endif
457         continue; // nothing to do
458     }
459
460     // 
461     // Get a thread to run
462     //
463     t = popRunQueue(cap);
464
465     // Sanity check the thread we're about to run.  This can be
466     // expensive if there is lots of thread switching going on...
467     IF_DEBUG(sanity,checkTSO(t));
468
469 #if defined(THREADED_RTS)
470     // Check whether we can run this thread in the current task.
471     // If not, we have to pass our capability to the right task.
472     {
473         Task *bound = t->bound;
474       
475         if (bound) {
476             if (bound == task) {
477                 debugTrace(DEBUG_sched,
478                            "### Running thread %lu in bound thread", (unsigned long)t->id);
479                 // yes, the Haskell thread is bound to the current native thread
480             } else {
481                 debugTrace(DEBUG_sched,
482                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
483                 // no, bound to a different Haskell thread: pass to that thread
484                 pushOnRunQueue(cap,t);
485                 continue;
486             }
487         } else {
488             // The thread we want to run is unbound.
489             if (task->tso) { 
490                 debugTrace(DEBUG_sched,
491                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
492                 // no, the current native thread is bound to a different
493                 // Haskell thread, so pass it to any worker thread
494                 pushOnRunQueue(cap,t);
495                 continue; 
496             }
497         }
498     }
499 #endif
500
501     /* context switches are initiated by the timer signal, unless
502      * the user specified "context switch as often as possible", with
503      * +RTS -C0
504      */
505     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
506         && !emptyThreadQueues(cap)) {
507         context_switch = 1;
508     }
509          
510 run_thread:
511
512     // CurrentTSO is the thread to run.  t might be different if we
513     // loop back to run_thread, so make sure to set CurrentTSO after
514     // that.
515     cap->r.rCurrentTSO = t;
516
517     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
518                               (long)t->id, whatNext_strs[t->what_next]);
519
520     startHeapProfTimer();
521
522     // Check for exceptions blocked on this thread
523     maybePerformBlockedException (cap, t);
524
525     // ----------------------------------------------------------------------
526     // Run the current thread 
527
528     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
529     ASSERT(t->cap == cap);
530
531     prev_what_next = t->what_next;
532
533     errno = t->saved_errno;
534 #if mingw32_HOST_OS
535     SetLastError(t->saved_winerror);
536 #endif
537
538     cap->in_haskell = rtsTrue;
539
540     dirty_TSO(cap,t);
541
542 #if defined(THREADED_RTS)
543     if (recent_activity == ACTIVITY_DONE_GC) {
544         // ACTIVITY_DONE_GC means we turned off the timer signal to
545         // conserve power (see #1623).  Re-enable it here.
546         nat prev;
547         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
548         if (prev == ACTIVITY_DONE_GC) {
549             startTimer();
550         }
551     } else {
552         recent_activity = ACTIVITY_YES;
553     }
554 #endif
555
556     switch (prev_what_next) {
557         
558     case ThreadKilled:
559     case ThreadComplete:
560         /* Thread already finished, return to scheduler. */
561         ret = ThreadFinished;
562         break;
563         
564     case ThreadRunGHC:
565     {
566         StgRegTable *r;
567         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
568         cap = regTableToCapability(r);
569         ret = r->rRet;
570         break;
571     }
572     
573     case ThreadInterpret:
574         cap = interpretBCO(cap);
575         ret = cap->r.rRet;
576         break;
577         
578     default:
579         barf("schedule: invalid what_next field");
580     }
581
582     cap->in_haskell = rtsFalse;
583
584     // The TSO might have moved, eg. if it re-entered the RTS and a GC
585     // happened.  So find the new location:
586     t = cap->r.rCurrentTSO;
587
588     // We have run some Haskell code: there might be blackhole-blocked
589     // threads to wake up now.
590     // Lock-free test here should be ok, we're just setting a flag.
591     if ( blackhole_queue != END_TSO_QUEUE ) {
592         blackholes_need_checking = rtsTrue;
593     }
594     
595     // And save the current errno in this thread.
596     // XXX: possibly bogus for SMP because this thread might already
597     // be running again, see code below.
598     t->saved_errno = errno;
599 #if mingw32_HOST_OS
600     // Similarly for Windows error code
601     t->saved_winerror = GetLastError();
602 #endif
603
604 #if defined(THREADED_RTS)
605     // If ret is ThreadBlocked, and this Task is bound to the TSO that
606     // blocked, we are in limbo - the TSO is now owned by whatever it
607     // is blocked on, and may in fact already have been woken up,
608     // perhaps even on a different Capability.  It may be the case
609     // that task->cap != cap.  We better yield this Capability
610     // immediately and return to normaility.
611     if (ret == ThreadBlocked) {
612         debugTrace(DEBUG_sched,
613                    "--<< thread %lu (%s) stopped: blocked",
614                    (unsigned long)t->id, whatNext_strs[t->what_next]);
615         continue;
616     }
617 #endif
618
619     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
620     ASSERT(t->cap == cap);
621
622     // ----------------------------------------------------------------------
623     
624     // Costs for the scheduler are assigned to CCS_SYSTEM
625     stopHeapProfTimer();
626 #if defined(PROFILING)
627     CCCS = CCS_SYSTEM;
628 #endif
629     
630     schedulePostRunThread(t);
631
632     t = threadStackUnderflow(task,t);
633
634     ready_to_gc = rtsFalse;
635
636     switch (ret) {
637     case HeapOverflow:
638         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
639         break;
640
641     case StackOverflow:
642         scheduleHandleStackOverflow(cap,task,t);
643         break;
644
645     case ThreadYielding:
646         if (scheduleHandleYield(cap, t, prev_what_next)) {
647             // shortcut for switching between compiler/interpreter:
648             goto run_thread; 
649         }
650         break;
651
652     case ThreadBlocked:
653         scheduleHandleThreadBlocked(t);
654         break;
655
656     case ThreadFinished:
657         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
658         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
659         break;
660
661     default:
662       barf("schedule: invalid thread return code %d", (int)ret);
663     }
664
665     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
666       cap = scheduleDoGC(cap,task,rtsFalse);
667     }
668   } /* end of while() */
669 }
670
671 /* ----------------------------------------------------------------------------
672  * Setting up the scheduler loop
673  * ------------------------------------------------------------------------- */
674
675 static void
676 schedulePreLoop(void)
677 {
678   // initialisation for scheduler - what cannot go into initScheduler()  
679 }
680
681 /* -----------------------------------------------------------------------------
682  * schedulePushWork()
683  *
684  * Push work to other Capabilities if we have some.
685  * -------------------------------------------------------------------------- */
686
687 #if defined(THREADED_RTS)
688 static void
689 schedulePushWork(Capability *cap USED_IF_THREADS, 
690                  Task *task      USED_IF_THREADS)
691 {
692     Capability *free_caps[n_capabilities], *cap0;
693     nat i, n_free_caps;
694
695     // migration can be turned off with +RTS -qg
696     if (!RtsFlags.ParFlags.migrate) return;
697
698     // Check whether we have more threads on our run queue, or sparks
699     // in our pool, that we could hand to another Capability.
700     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
701         && sparkPoolSizeCap(cap) < 2) {
702         return;
703     }
704
705     // First grab as many free Capabilities as we can.
706     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
707         cap0 = &capabilities[i];
708         if (cap != cap0 && tryGrabCapability(cap0,task)) {
709             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
710                 // it already has some work, we just grabbed it at 
711                 // the wrong moment.  Or maybe it's deadlocked!
712                 releaseCapability(cap0);
713             } else {
714                 free_caps[n_free_caps++] = cap0;
715             }
716         }
717     }
718
719     // we now have n_free_caps free capabilities stashed in
720     // free_caps[].  Share our run queue equally with them.  This is
721     // probably the simplest thing we could do; improvements we might
722     // want to do include:
723     //
724     //   - giving high priority to moving relatively new threads, on 
725     //     the gournds that they haven't had time to build up a
726     //     working set in the cache on this CPU/Capability.
727     //
728     //   - giving low priority to moving long-lived threads
729
730     if (n_free_caps > 0) {
731         StgTSO *prev, *t, *next;
732         rtsBool pushed_to_all;
733
734         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
735
736         i = 0;
737         pushed_to_all = rtsFalse;
738
739         if (cap->run_queue_hd != END_TSO_QUEUE) {
740             prev = cap->run_queue_hd;
741             t = prev->_link;
742             prev->_link = END_TSO_QUEUE;
743             for (; t != END_TSO_QUEUE; t = next) {
744                 next = t->_link;
745                 t->_link = END_TSO_QUEUE;
746                 if (t->what_next == ThreadRelocated
747                     || t->bound == task // don't move my bound thread
748                     || tsoLocked(t)) {  // don't move a locked thread
749                     setTSOLink(cap, prev, t);
750                     prev = t;
751                 } else if (i == n_free_caps) {
752                     pushed_to_all = rtsTrue;
753                     i = 0;
754                     // keep one for us
755                     setTSOLink(cap, prev, t);
756                     prev = t;
757                 } else {
758                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
759                     appendToRunQueue(free_caps[i],t);
760                     if (t->bound) { t->bound->cap = free_caps[i]; }
761                     t->cap = free_caps[i];
762                     i++;
763                 }
764             }
765             cap->run_queue_tl = prev;
766         }
767
768         // If there are some free capabilities that we didn't push any
769         // threads to, then try to push a spark to each one.
770         if (!pushed_to_all) {
771             StgClosure *spark;
772             // i is the next free capability to push to
773             for (; i < n_free_caps; i++) {
774                 if (emptySparkPoolCap(free_caps[i])) {
775                     spark = findSpark(cap);
776                     if (spark != NULL) {
777                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
778                         newSpark(&(free_caps[i]->r), spark);
779                     }
780                 }
781             }
782         }
783
784         // release the capabilities
785         for (i = 0; i < n_free_caps; i++) {
786             task->cap = free_caps[i];
787             releaseCapability(free_caps[i]);
788         }
789     }
790     task->cap = cap; // reset to point to our Capability.
791 }
792 #endif
793
794 /* ----------------------------------------------------------------------------
795  * Start any pending signal handlers
796  * ------------------------------------------------------------------------- */
797
798 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
799 static void
800 scheduleStartSignalHandlers(Capability *cap)
801 {
802     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
803         // safe outside the lock
804         startSignalHandlers(cap);
805     }
806 }
807 #else
808 static void
809 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
810 {
811 }
812 #endif
813
814 /* ----------------------------------------------------------------------------
815  * Check for blocked threads that can be woken up.
816  * ------------------------------------------------------------------------- */
817
818 static void
819 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
820 {
821 #if !defined(THREADED_RTS)
822     //
823     // Check whether any waiting threads need to be woken up.  If the
824     // run queue is empty, and there are no other tasks running, we
825     // can wait indefinitely for something to happen.
826     //
827     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
828     {
829         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
830     }
831 #endif
832 }
833
834
835 /* ----------------------------------------------------------------------------
836  * Check for threads woken up by other Capabilities
837  * ------------------------------------------------------------------------- */
838
839 static void
840 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
841 {
842 #if defined(THREADED_RTS)
843     // Any threads that were woken up by other Capabilities get
844     // appended to our run queue.
845     if (!emptyWakeupQueue(cap)) {
846         ACQUIRE_LOCK(&cap->lock);
847         if (emptyRunQueue(cap)) {
848             cap->run_queue_hd = cap->wakeup_queue_hd;
849             cap->run_queue_tl = cap->wakeup_queue_tl;
850         } else {
851             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
852             cap->run_queue_tl = cap->wakeup_queue_tl;
853         }
854         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
855         RELEASE_LOCK(&cap->lock);
856     }
857 #endif
858 }
859
860 /* ----------------------------------------------------------------------------
861  * Check for threads blocked on BLACKHOLEs that can be woken up
862  * ------------------------------------------------------------------------- */
863 static void
864 scheduleCheckBlackHoles (Capability *cap)
865 {
866     if ( blackholes_need_checking ) // check without the lock first
867     {
868         ACQUIRE_LOCK(&sched_mutex);
869         if ( blackholes_need_checking ) {
870             checkBlackHoles(cap);
871             blackholes_need_checking = rtsFalse;
872         }
873         RELEASE_LOCK(&sched_mutex);
874     }
875 }
876
877 /* ----------------------------------------------------------------------------
878  * Detect deadlock conditions and attempt to resolve them.
879  * ------------------------------------------------------------------------- */
880
881 static void
882 scheduleDetectDeadlock (Capability *cap, Task *task)
883 {
884
885 #if defined(PARALLEL_HASKELL)
886     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
887     return;
888 #endif
889
890     /* 
891      * Detect deadlock: when we have no threads to run, there are no
892      * threads blocked, waiting for I/O, or sleeping, and all the
893      * other tasks are waiting for work, we must have a deadlock of
894      * some description.
895      */
896     if ( emptyThreadQueues(cap) )
897     {
898 #if defined(THREADED_RTS)
899         /* 
900          * In the threaded RTS, we only check for deadlock if there
901          * has been no activity in a complete timeslice.  This means
902          * we won't eagerly start a full GC just because we don't have
903          * any threads to run currently.
904          */
905         if (recent_activity != ACTIVITY_INACTIVE) return;
906 #endif
907
908         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
909
910         // Garbage collection can release some new threads due to
911         // either (a) finalizers or (b) threads resurrected because
912         // they are unreachable and will therefore be sent an
913         // exception.  Any threads thus released will be immediately
914         // runnable.
915         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
916
917         recent_activity = ACTIVITY_DONE_GC;
918         // disable timer signals (see #1623)
919         stopTimer();
920         
921         if ( !emptyRunQueue(cap) ) return;
922
923 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
924         /* If we have user-installed signal handlers, then wait
925          * for signals to arrive rather then bombing out with a
926          * deadlock.
927          */
928         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
929             debugTrace(DEBUG_sched,
930                        "still deadlocked, waiting for signals...");
931
932             awaitUserSignals();
933
934             if (signals_pending()) {
935                 startSignalHandlers(cap);
936             }
937
938             // either we have threads to run, or we were interrupted:
939             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
940
941             return;
942         }
943 #endif
944
945 #if !defined(THREADED_RTS)
946         /* Probably a real deadlock.  Send the current main thread the
947          * Deadlock exception.
948          */
949         if (task->tso) {
950             switch (task->tso->why_blocked) {
951             case BlockedOnSTM:
952             case BlockedOnBlackHole:
953             case BlockedOnException:
954             case BlockedOnMVar:
955                 throwToSingleThreaded(cap, task->tso, 
956                                       (StgClosure *)nonTermination_closure);
957                 return;
958             default:
959                 barf("deadlock: main thread blocked in a strange way");
960             }
961         }
962         return;
963 #endif
964     }
965 }
966
967
968 /* ----------------------------------------------------------------------------
969  * Send pending messages (PARALLEL_HASKELL only)
970  * ------------------------------------------------------------------------- */
971
972 #if defined(PARALLEL_HASKELL)
973 static StgTSO *
974 scheduleSendPendingMessages(void)
975 {
976
977 # if defined(PAR) // global Mem.Mgmt., omit for now
978     if (PendingFetches != END_BF_QUEUE) {
979         processFetches();
980     }
981 # endif
982     
983     if (RtsFlags.ParFlags.BufferTime) {
984         // if we use message buffering, we must send away all message
985         // packets which have become too old...
986         sendOldBuffers(); 
987     }
988 }
989 #endif
990
991 /* ----------------------------------------------------------------------------
992  * Activate spark threads (PARALLEL_HASKELL only)
993  * ------------------------------------------------------------------------- */
994
995 #if defined(PARALLEL_HASKELL)
996 static void
997 scheduleActivateSpark(Capability *cap)
998 {
999     StgClosure *spark;
1000
1001 /* We only want to stay here if the run queue is empty and we want some
1002    work. We try to turn a spark into a thread, and add it to the run
1003    queue, from where it will be picked up in the next iteration of the
1004    scheduler loop.  
1005 */
1006     if (!emptyRunQueue(cap)) 
1007       /* In the threaded RTS, another task might have pushed a thread
1008          on our run queue in the meantime ? But would need a lock.. */
1009       return;
1010
1011     spark = findSpark(cap); // defined in Sparks.c
1012
1013     if (spark != NULL) {
1014       debugTrace(DEBUG_sched,
1015                  "turning spark of closure %p into a thread",
1016                  (StgClosure *)spark);
1017       createSparkThread(cap,spark); // defined in Sparks.c
1018     }
1019 }
1020 #endif // PARALLEL_HASKELL
1021
1022 /* ----------------------------------------------------------------------------
1023  * Get work from a remote node (PARALLEL_HASKELL only)
1024  * ------------------------------------------------------------------------- */
1025     
1026 #if defined(PARALLEL_HASKELL)
1027 static rtsBool
1028 scheduleGetRemoteWork(Capability *cap)
1029 {
1030 #if defined(PARALLEL_HASKELL)
1031   rtsBool receivedFinish = rtsFalse;
1032
1033   // idle() , i.e. send all buffers, wait for work
1034   if (RtsFlags.ParFlags.BufferTime) {
1035         IF_PAR_DEBUG(verbose, 
1036                 debugBelch("...send all pending data,"));
1037         {
1038           nat i;
1039           for (i=1; i<=nPEs; i++)
1040             sendImmediately(i); // send all messages away immediately
1041         }
1042   }
1043
1044   /* this would be the place for fishing in GUM... 
1045
1046      if (no-earlier-fish-around) 
1047           sendFish(choosePe());
1048    */
1049
1050   // Eden:just look for incoming messages (blocking receive)
1051   IF_PAR_DEBUG(verbose, 
1052                debugBelch("...wait for incoming messages...\n"));
1053   processMessages(cap, &receivedFinish); // blocking receive...
1054
1055
1056   return receivedFinish;
1057   // reenter scheduling look after having received something
1058
1059 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1060
1061   return rtsFalse; /* return value unused in THREADED_RTS */
1062
1063 #endif /* PARALLEL_HASKELL */
1064 }
1065 #endif // PARALLEL_HASKELL
1066
1067 /* ----------------------------------------------------------------------------
1068  * After running a thread...
1069  * ------------------------------------------------------------------------- */
1070
1071 static void
1072 schedulePostRunThread (StgTSO *t)
1073 {
1074     // We have to be able to catch transactions that are in an
1075     // infinite loop as a result of seeing an inconsistent view of
1076     // memory, e.g. 
1077     //
1078     //   atomically $ do
1079     //       [a,b] <- mapM readTVar [ta,tb]
1080     //       when (a == b) loop
1081     //
1082     // and a is never equal to b given a consistent view of memory.
1083     //
1084     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1085         if (!stmValidateNestOfTransactions (t -> trec)) {
1086             debugTrace(DEBUG_sched | DEBUG_stm,
1087                        "trec %p found wasting its time", t);
1088             
1089             // strip the stack back to the
1090             // ATOMICALLY_FRAME, aborting the (nested)
1091             // transaction, and saving the stack of any
1092             // partially-evaluated thunks on the heap.
1093             throwToSingleThreaded_(&capabilities[0], t, 
1094                                    NULL, rtsTrue, NULL);
1095             
1096             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1097         }
1098     }
1099
1100   /* some statistics gathering in the parallel case */
1101 }
1102
1103 /* -----------------------------------------------------------------------------
1104  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1105  * -------------------------------------------------------------------------- */
1106
1107 static rtsBool
1108 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1109 {
1110     // did the task ask for a large block?
1111     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1112         // if so, get one and push it on the front of the nursery.
1113         bdescr *bd;
1114         lnat blocks;
1115         
1116         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1117         
1118         debugTrace(DEBUG_sched,
1119                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1120                    (long)t->id, whatNext_strs[t->what_next], blocks);
1121     
1122         // don't do this if the nursery is (nearly) full, we'll GC first.
1123         if (cap->r.rCurrentNursery->link != NULL ||
1124             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1125                                                // if the nursery has only one block.
1126             
1127             ACQUIRE_SM_LOCK
1128             bd = allocGroup( blocks );
1129             RELEASE_SM_LOCK
1130             cap->r.rNursery->n_blocks += blocks;
1131             
1132             // link the new group into the list
1133             bd->link = cap->r.rCurrentNursery;
1134             bd->u.back = cap->r.rCurrentNursery->u.back;
1135             if (cap->r.rCurrentNursery->u.back != NULL) {
1136                 cap->r.rCurrentNursery->u.back->link = bd;
1137             } else {
1138 #if !defined(THREADED_RTS)
1139                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1140                        g0s0 == cap->r.rNursery);
1141 #endif
1142                 cap->r.rNursery->blocks = bd;
1143             }             
1144             cap->r.rCurrentNursery->u.back = bd;
1145             
1146             // initialise it as a nursery block.  We initialise the
1147             // step, gen_no, and flags field of *every* sub-block in
1148             // this large block, because this is easier than making
1149             // sure that we always find the block head of a large
1150             // block whenever we call Bdescr() (eg. evacuate() and
1151             // isAlive() in the GC would both have to do this, at
1152             // least).
1153             { 
1154                 bdescr *x;
1155                 for (x = bd; x < bd + blocks; x++) {
1156                     x->step = cap->r.rNursery;
1157                     x->gen_no = 0;
1158                     x->flags = 0;
1159                 }
1160             }
1161             
1162             // This assert can be a killer if the app is doing lots
1163             // of large block allocations.
1164             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1165             
1166             // now update the nursery to point to the new block
1167             cap->r.rCurrentNursery = bd;
1168             
1169             // we might be unlucky and have another thread get on the
1170             // run queue before us and steal the large block, but in that
1171             // case the thread will just end up requesting another large
1172             // block.
1173             pushOnRunQueue(cap,t);
1174             return rtsFalse;  /* not actually GC'ing */
1175         }
1176     }
1177     
1178     debugTrace(DEBUG_sched,
1179                "--<< thread %ld (%s) stopped: HeapOverflow",
1180                (long)t->id, whatNext_strs[t->what_next]);
1181
1182     if (context_switch) {
1183         // Sometimes we miss a context switch, e.g. when calling
1184         // primitives in a tight loop, MAYBE_GC() doesn't check the
1185         // context switch flag, and we end up waiting for a GC.
1186         // See #1984, and concurrent/should_run/1984
1187         context_switch = 0;
1188         addToRunQueue(cap,t);
1189     } else {
1190         pushOnRunQueue(cap,t);
1191     }
1192     return rtsTrue;
1193     /* actual GC is done at the end of the while loop in schedule() */
1194 }
1195
1196 /* -----------------------------------------------------------------------------
1197  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1198  * -------------------------------------------------------------------------- */
1199
1200 static void
1201 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1202 {
1203     debugTrace (DEBUG_sched,
1204                 "--<< thread %ld (%s) stopped, StackOverflow", 
1205                 (long)t->id, whatNext_strs[t->what_next]);
1206
1207     /* just adjust the stack for this thread, then pop it back
1208      * on the run queue.
1209      */
1210     { 
1211         /* enlarge the stack */
1212         StgTSO *new_t = threadStackOverflow(cap, t);
1213         
1214         /* The TSO attached to this Task may have moved, so update the
1215          * pointer to it.
1216          */
1217         if (task->tso == t) {
1218             task->tso = new_t;
1219         }
1220         pushOnRunQueue(cap,new_t);
1221     }
1222 }
1223
1224 /* -----------------------------------------------------------------------------
1225  * Handle a thread that returned to the scheduler with ThreadYielding
1226  * -------------------------------------------------------------------------- */
1227
1228 static rtsBool
1229 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1230 {
1231     // Reset the context switch flag.  We don't do this just before
1232     // running the thread, because that would mean we would lose ticks
1233     // during GC, which can lead to unfair scheduling (a thread hogs
1234     // the CPU because the tick always arrives during GC).  This way
1235     // penalises threads that do a lot of allocation, but that seems
1236     // better than the alternative.
1237     context_switch = 0;
1238     
1239     /* put the thread back on the run queue.  Then, if we're ready to
1240      * GC, check whether this is the last task to stop.  If so, wake
1241      * up the GC thread.  getThread will block during a GC until the
1242      * GC is finished.
1243      */
1244 #ifdef DEBUG
1245     if (t->what_next != prev_what_next) {
1246         debugTrace(DEBUG_sched,
1247                    "--<< thread %ld (%s) stopped to switch evaluators", 
1248                    (long)t->id, whatNext_strs[t->what_next]);
1249     } else {
1250         debugTrace(DEBUG_sched,
1251                    "--<< thread %ld (%s) stopped, yielding",
1252                    (long)t->id, whatNext_strs[t->what_next]);
1253     }
1254 #endif
1255     
1256     IF_DEBUG(sanity,
1257              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1258              checkTSO(t));
1259     ASSERT(t->_link == END_TSO_QUEUE);
1260     
1261     // Shortcut if we're just switching evaluators: don't bother
1262     // doing stack squeezing (which can be expensive), just run the
1263     // thread.
1264     if (t->what_next != prev_what_next) {
1265         return rtsTrue;
1266     }
1267
1268     addToRunQueue(cap,t);
1269
1270     return rtsFalse;
1271 }
1272
1273 /* -----------------------------------------------------------------------------
1274  * Handle a thread that returned to the scheduler with ThreadBlocked
1275  * -------------------------------------------------------------------------- */
1276
1277 static void
1278 scheduleHandleThreadBlocked( StgTSO *t
1279 #if !defined(GRAN) && !defined(DEBUG)
1280     STG_UNUSED
1281 #endif
1282     )
1283 {
1284
1285       // We don't need to do anything.  The thread is blocked, and it
1286       // has tidied up its stack and placed itself on whatever queue
1287       // it needs to be on.
1288
1289     // ASSERT(t->why_blocked != NotBlocked);
1290     // Not true: for example,
1291     //    - in THREADED_RTS, the thread may already have been woken
1292     //      up by another Capability.  This actually happens: try
1293     //      conc023 +RTS -N2.
1294     //    - the thread may have woken itself up already, because
1295     //      threadPaused() might have raised a blocked throwTo
1296     //      exception, see maybePerformBlockedException().
1297
1298 #ifdef DEBUG
1299     if (traceClass(DEBUG_sched)) {
1300         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1301                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1302         printThreadBlockage(t);
1303         debugTraceEnd();
1304     }
1305 #endif
1306 }
1307
1308 /* -----------------------------------------------------------------------------
1309  * Handle a thread that returned to the scheduler with ThreadFinished
1310  * -------------------------------------------------------------------------- */
1311
1312 static rtsBool
1313 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1314 {
1315     /* Need to check whether this was a main thread, and if so,
1316      * return with the return value.
1317      *
1318      * We also end up here if the thread kills itself with an
1319      * uncaught exception, see Exception.cmm.
1320      */
1321     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1322                (unsigned long)t->id, whatNext_strs[t->what_next]);
1323
1324       //
1325       // Check whether the thread that just completed was a bound
1326       // thread, and if so return with the result.  
1327       //
1328       // There is an assumption here that all thread completion goes
1329       // through this point; we need to make sure that if a thread
1330       // ends up in the ThreadKilled state, that it stays on the run
1331       // queue so it can be dealt with here.
1332       //
1333
1334       if (t->bound) {
1335
1336           if (t->bound != task) {
1337 #if !defined(THREADED_RTS)
1338               // Must be a bound thread that is not the topmost one.  Leave
1339               // it on the run queue until the stack has unwound to the
1340               // point where we can deal with this.  Leaving it on the run
1341               // queue also ensures that the garbage collector knows about
1342               // this thread and its return value (it gets dropped from the
1343               // step->threads list so there's no other way to find it).
1344               appendToRunQueue(cap,t);
1345               return rtsFalse;
1346 #else
1347               // this cannot happen in the threaded RTS, because a
1348               // bound thread can only be run by the appropriate Task.
1349               barf("finished bound thread that isn't mine");
1350 #endif
1351           }
1352
1353           ASSERT(task->tso == t);
1354
1355           if (t->what_next == ThreadComplete) {
1356               if (task->ret) {
1357                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1358                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1359               }
1360               task->stat = Success;
1361           } else {
1362               if (task->ret) {
1363                   *(task->ret) = NULL;
1364               }
1365               if (sched_state >= SCHED_INTERRUPTING) {
1366                   task->stat = Interrupted;
1367               } else {
1368                   task->stat = Killed;
1369               }
1370           }
1371 #ifdef DEBUG
1372           removeThreadLabel((StgWord)task->tso->id);
1373 #endif
1374           return rtsTrue; // tells schedule() to return
1375       }
1376
1377       return rtsFalse;
1378 }
1379
1380 /* -----------------------------------------------------------------------------
1381  * Perform a heap census
1382  * -------------------------------------------------------------------------- */
1383
1384 static rtsBool
1385 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1386 {
1387     // When we have +RTS -i0 and we're heap profiling, do a census at
1388     // every GC.  This lets us get repeatable runs for debugging.
1389     if (performHeapProfile ||
1390         (RtsFlags.ProfFlags.profileInterval==0 &&
1391          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1392         return rtsTrue;
1393     } else {
1394         return rtsFalse;
1395     }
1396 }
1397
1398 /* -----------------------------------------------------------------------------
1399  * Perform a garbage collection if necessary
1400  * -------------------------------------------------------------------------- */
1401
1402 static Capability *
1403 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1404 {
1405     rtsBool heap_census;
1406 #ifdef THREADED_RTS
1407     /* extern static volatile StgWord waiting_for_gc; 
1408        lives inside capability.c */
1409     rtsBool was_waiting;
1410     nat i;
1411 #endif
1412
1413 #ifdef THREADED_RTS
1414     // In order to GC, there must be no threads running Haskell code.
1415     // Therefore, the GC thread needs to hold *all* the capabilities,
1416     // and release them after the GC has completed.  
1417     //
1418     // This seems to be the simplest way: previous attempts involved
1419     // making all the threads with capabilities give up their
1420     // capabilities and sleep except for the *last* one, which
1421     // actually did the GC.  But it's quite hard to arrange for all
1422     // the other tasks to sleep and stay asleep.
1423     //
1424         
1425     /*  Other capabilities are prevented from running yet more Haskell
1426         threads if waiting_for_gc is set. Tested inside
1427         yieldCapability() and releaseCapability() in Capability.c */
1428
1429     was_waiting = cas(&waiting_for_gc, 0, 1);
1430     if (was_waiting) {
1431         do {
1432             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1433             if (cap) yieldCapability(&cap,task);
1434         } while (waiting_for_gc);
1435         return cap;  // NOTE: task->cap might have changed here
1436     }
1437
1438     for (i=0; i < n_capabilities; i++) {
1439         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1440         if (cap != &capabilities[i]) {
1441             Capability *pcap = &capabilities[i];
1442             // we better hope this task doesn't get migrated to
1443             // another Capability while we're waiting for this one.
1444             // It won't, because load balancing happens while we have
1445             // all the Capabilities, but even so it's a slightly
1446             // unsavoury invariant.
1447             task->cap = pcap;
1448             context_switch = 1;
1449             waitForReturnCapability(&pcap, task);
1450             if (pcap != &capabilities[i]) {
1451                 barf("scheduleDoGC: got the wrong capability");
1452             }
1453         }
1454     }
1455
1456     waiting_for_gc = rtsFalse;
1457 #endif
1458
1459     // so this happens periodically:
1460     if (cap) scheduleCheckBlackHoles(cap);
1461     
1462     IF_DEBUG(scheduler, printAllThreads());
1463
1464     /*
1465      * We now have all the capabilities; if we're in an interrupting
1466      * state, then we should take the opportunity to delete all the
1467      * threads in the system.
1468      */
1469     if (sched_state >= SCHED_INTERRUPTING) {
1470         deleteAllThreads(&capabilities[0]);
1471         sched_state = SCHED_SHUTTING_DOWN;
1472     }
1473     
1474     heap_census = scheduleNeedHeapProfile(rtsTrue);
1475
1476     /* everybody back, start the GC.
1477      * Could do it in this thread, or signal a condition var
1478      * to do it in another thread.  Either way, we need to
1479      * broadcast on gc_pending_cond afterward.
1480      */
1481 #if defined(THREADED_RTS)
1482     debugTrace(DEBUG_sched, "doing GC");
1483 #endif
1484     GarbageCollect(force_major || heap_census);
1485     
1486     if (heap_census) {
1487         debugTrace(DEBUG_sched, "performing heap census");
1488         heapCensus();
1489         performHeapProfile = rtsFalse;
1490     }
1491
1492 #if defined(THREADED_RTS)
1493     // release our stash of capabilities.
1494     for (i = 0; i < n_capabilities; i++) {
1495         if (cap != &capabilities[i]) {
1496             task->cap = &capabilities[i];
1497             releaseCapability(&capabilities[i]);
1498         }
1499     }
1500     if (cap) {
1501         task->cap = cap;
1502     } else {
1503         task->cap = NULL;
1504     }
1505 #endif
1506
1507     return cap;
1508 }
1509
1510 /* ---------------------------------------------------------------------------
1511  * Singleton fork(). Do not copy any running threads.
1512  * ------------------------------------------------------------------------- */
1513
1514 pid_t
1515 forkProcess(HsStablePtr *entry
1516 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1517             STG_UNUSED
1518 #endif
1519            )
1520 {
1521 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1522     Task *task;
1523     pid_t pid;
1524     StgTSO* t,*next;
1525     Capability *cap;
1526     nat s;
1527     
1528 #if defined(THREADED_RTS)
1529     if (RtsFlags.ParFlags.nNodes > 1) {
1530         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1531         stg_exit(EXIT_FAILURE);
1532     }
1533 #endif
1534
1535     debugTrace(DEBUG_sched, "forking!");
1536     
1537     // ToDo: for SMP, we should probably acquire *all* the capabilities
1538     cap = rts_lock();
1539     
1540     // no funny business: hold locks while we fork, otherwise if some
1541     // other thread is holding a lock when the fork happens, the data
1542     // structure protected by the lock will forever be in an
1543     // inconsistent state in the child.  See also #1391.
1544     ACQUIRE_LOCK(&sched_mutex);
1545     ACQUIRE_LOCK(&cap->lock);
1546     ACQUIRE_LOCK(&cap->running_task->lock);
1547
1548     pid = fork();
1549     
1550     if (pid) { // parent
1551         
1552         RELEASE_LOCK(&sched_mutex);
1553         RELEASE_LOCK(&cap->lock);
1554         RELEASE_LOCK(&cap->running_task->lock);
1555
1556         // just return the pid
1557         rts_unlock(cap);
1558         return pid;
1559         
1560     } else { // child
1561         
1562 #if defined(THREADED_RTS)
1563         initMutex(&sched_mutex);
1564         initMutex(&cap->lock);
1565         initMutex(&cap->running_task->lock);
1566 #endif
1567
1568         // Now, all OS threads except the thread that forked are
1569         // stopped.  We need to stop all Haskell threads, including
1570         // those involved in foreign calls.  Also we need to delete
1571         // all Tasks, because they correspond to OS threads that are
1572         // now gone.
1573
1574         for (s = 0; s < total_steps; s++) {
1575           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1576             if (t->what_next == ThreadRelocated) {
1577                 next = t->_link;
1578             } else {
1579                 next = t->global_link;
1580                 // don't allow threads to catch the ThreadKilled
1581                 // exception, but we do want to raiseAsync() because these
1582                 // threads may be evaluating thunks that we need later.
1583                 deleteThread_(cap,t);
1584             }
1585           }
1586         }
1587         
1588         // Empty the run queue.  It seems tempting to let all the
1589         // killed threads stay on the run queue as zombies to be
1590         // cleaned up later, but some of them correspond to bound
1591         // threads for which the corresponding Task does not exist.
1592         cap->run_queue_hd = END_TSO_QUEUE;
1593         cap->run_queue_tl = END_TSO_QUEUE;
1594
1595         // Any suspended C-calling Tasks are no more, their OS threads
1596         // don't exist now:
1597         cap->suspended_ccalling_tasks = NULL;
1598
1599         // Empty the threads lists.  Otherwise, the garbage
1600         // collector may attempt to resurrect some of these threads.
1601         for (s = 0; s < total_steps; s++) {
1602             all_steps[s].threads = END_TSO_QUEUE;
1603         }
1604
1605         // Wipe the task list, except the current Task.
1606         ACQUIRE_LOCK(&sched_mutex);
1607         for (task = all_tasks; task != NULL; task=task->all_link) {
1608             if (task != cap->running_task) {
1609 #if defined(THREADED_RTS)
1610                 initMutex(&task->lock); // see #1391
1611 #endif
1612                 discardTask(task);
1613             }
1614         }
1615         RELEASE_LOCK(&sched_mutex);
1616
1617 #if defined(THREADED_RTS)
1618         // Wipe our spare workers list, they no longer exist.  New
1619         // workers will be created if necessary.
1620         cap->spare_workers = NULL;
1621         cap->returning_tasks_hd = NULL;
1622         cap->returning_tasks_tl = NULL;
1623 #endif
1624
1625         // On Unix, all timers are reset in the child, so we need to start
1626         // the timer again.
1627         initTimer();
1628         startTimer();
1629
1630         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1631         rts_checkSchedStatus("forkProcess",cap);
1632         
1633         rts_unlock(cap);
1634         hs_exit();                      // clean up and exit
1635         stg_exit(EXIT_SUCCESS);
1636     }
1637 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1638     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1639     return -1;
1640 #endif
1641 }
1642
1643 /* ---------------------------------------------------------------------------
1644  * Delete all the threads in the system
1645  * ------------------------------------------------------------------------- */
1646    
1647 static void
1648 deleteAllThreads ( Capability *cap )
1649 {
1650     // NOTE: only safe to call if we own all capabilities.
1651
1652     StgTSO* t, *next;
1653     nat s;
1654
1655     debugTrace(DEBUG_sched,"deleting all threads");
1656     for (s = 0; s < total_steps; s++) {
1657       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1658         if (t->what_next == ThreadRelocated) {
1659             next = t->_link;
1660         } else {
1661             next = t->global_link;
1662             deleteThread(cap,t);
1663         }
1664       }
1665     }      
1666
1667     // The run queue now contains a bunch of ThreadKilled threads.  We
1668     // must not throw these away: the main thread(s) will be in there
1669     // somewhere, and the main scheduler loop has to deal with it.
1670     // Also, the run queue is the only thing keeping these threads from
1671     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1672
1673 #if !defined(THREADED_RTS)
1674     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1675     ASSERT(sleeping_queue == END_TSO_QUEUE);
1676 #endif
1677 }
1678
1679 /* -----------------------------------------------------------------------------
1680    Managing the suspended_ccalling_tasks list.
1681    Locks required: sched_mutex
1682    -------------------------------------------------------------------------- */
1683
1684 STATIC_INLINE void
1685 suspendTask (Capability *cap, Task *task)
1686 {
1687     ASSERT(task->next == NULL && task->prev == NULL);
1688     task->next = cap->suspended_ccalling_tasks;
1689     task->prev = NULL;
1690     if (cap->suspended_ccalling_tasks) {
1691         cap->suspended_ccalling_tasks->prev = task;
1692     }
1693     cap->suspended_ccalling_tasks = task;
1694 }
1695
1696 STATIC_INLINE void
1697 recoverSuspendedTask (Capability *cap, Task *task)
1698 {
1699     if (task->prev) {
1700         task->prev->next = task->next;
1701     } else {
1702         ASSERT(cap->suspended_ccalling_tasks == task);
1703         cap->suspended_ccalling_tasks = task->next;
1704     }
1705     if (task->next) {
1706         task->next->prev = task->prev;
1707     }
1708     task->next = task->prev = NULL;
1709 }
1710
1711 /* ---------------------------------------------------------------------------
1712  * Suspending & resuming Haskell threads.
1713  * 
1714  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1715  * its capability before calling the C function.  This allows another
1716  * task to pick up the capability and carry on running Haskell
1717  * threads.  It also means that if the C call blocks, it won't lock
1718  * the whole system.
1719  *
1720  * The Haskell thread making the C call is put to sleep for the
1721  * duration of the call, on the susepended_ccalling_threads queue.  We
1722  * give out a token to the task, which it can use to resume the thread
1723  * on return from the C function.
1724  * ------------------------------------------------------------------------- */
1725    
1726 void *
1727 suspendThread (StgRegTable *reg)
1728 {
1729   Capability *cap;
1730   int saved_errno;
1731   StgTSO *tso;
1732   Task *task;
1733 #if mingw32_HOST_OS
1734   StgWord32 saved_winerror;
1735 #endif
1736
1737   saved_errno = errno;
1738 #if mingw32_HOST_OS
1739   saved_winerror = GetLastError();
1740 #endif
1741
1742   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1743    */
1744   cap = regTableToCapability(reg);
1745
1746   task = cap->running_task;
1747   tso = cap->r.rCurrentTSO;
1748
1749   debugTrace(DEBUG_sched, 
1750              "thread %lu did a safe foreign call", 
1751              (unsigned long)cap->r.rCurrentTSO->id);
1752
1753   // XXX this might not be necessary --SDM
1754   tso->what_next = ThreadRunGHC;
1755
1756   threadPaused(cap,tso);
1757
1758   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1759       tso->why_blocked = BlockedOnCCall;
1760       tso->flags |= TSO_BLOCKEX;
1761       tso->flags &= ~TSO_INTERRUPTIBLE;
1762   } else {
1763       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1764   }
1765
1766   // Hand back capability
1767   task->suspended_tso = tso;
1768
1769   ACQUIRE_LOCK(&cap->lock);
1770
1771   suspendTask(cap,task);
1772   cap->in_haskell = rtsFalse;
1773   releaseCapability_(cap);
1774   
1775   RELEASE_LOCK(&cap->lock);
1776
1777 #if defined(THREADED_RTS)
1778   /* Preparing to leave the RTS, so ensure there's a native thread/task
1779      waiting to take over.
1780   */
1781   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1782 #endif
1783
1784   errno = saved_errno;
1785 #if mingw32_HOST_OS
1786   SetLastError(saved_winerror);
1787 #endif
1788   return task;
1789 }
1790
1791 StgRegTable *
1792 resumeThread (void *task_)
1793 {
1794     StgTSO *tso;
1795     Capability *cap;
1796     Task *task = task_;
1797     int saved_errno;
1798 #if mingw32_HOST_OS
1799     StgWord32 saved_winerror;
1800 #endif
1801
1802     saved_errno = errno;
1803 #if mingw32_HOST_OS
1804     saved_winerror = GetLastError();
1805 #endif
1806
1807     cap = task->cap;
1808     // Wait for permission to re-enter the RTS with the result.
1809     waitForReturnCapability(&cap,task);
1810     // we might be on a different capability now... but if so, our
1811     // entry on the suspended_ccalling_tasks list will also have been
1812     // migrated.
1813
1814     // Remove the thread from the suspended list
1815     recoverSuspendedTask(cap,task);
1816
1817     tso = task->suspended_tso;
1818     task->suspended_tso = NULL;
1819     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1820     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1821     
1822     if (tso->why_blocked == BlockedOnCCall) {
1823         awakenBlockedExceptionQueue(cap,tso);
1824         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1825     }
1826     
1827     /* Reset blocking status */
1828     tso->why_blocked  = NotBlocked;
1829     
1830     cap->r.rCurrentTSO = tso;
1831     cap->in_haskell = rtsTrue;
1832     errno = saved_errno;
1833 #if mingw32_HOST_OS
1834     SetLastError(saved_winerror);
1835 #endif
1836
1837     /* We might have GC'd, mark the TSO dirty again */
1838     dirty_TSO(cap,tso);
1839
1840     IF_DEBUG(sanity, checkTSO(tso));
1841
1842     return &cap->r;
1843 }
1844
1845 /* ---------------------------------------------------------------------------
1846  * scheduleThread()
1847  *
1848  * scheduleThread puts a thread on the end  of the runnable queue.
1849  * This will usually be done immediately after a thread is created.
1850  * The caller of scheduleThread must create the thread using e.g.
1851  * createThread and push an appropriate closure
1852  * on this thread's stack before the scheduler is invoked.
1853  * ------------------------------------------------------------------------ */
1854
1855 void
1856 scheduleThread(Capability *cap, StgTSO *tso)
1857 {
1858     // The thread goes at the *end* of the run-queue, to avoid possible
1859     // starvation of any threads already on the queue.
1860     appendToRunQueue(cap,tso);
1861 }
1862
1863 void
1864 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1865 {
1866 #if defined(THREADED_RTS)
1867     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1868                               // move this thread from now on.
1869     cpu %= RtsFlags.ParFlags.nNodes;
1870     if (cpu == cap->no) {
1871         appendToRunQueue(cap,tso);
1872     } else {
1873         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1874     }
1875 #else
1876     appendToRunQueue(cap,tso);
1877 #endif
1878 }
1879
1880 Capability *
1881 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1882 {
1883     Task *task;
1884
1885     // We already created/initialised the Task
1886     task = cap->running_task;
1887
1888     // This TSO is now a bound thread; make the Task and TSO
1889     // point to each other.
1890     tso->bound = task;
1891     tso->cap = cap;
1892
1893     task->tso = tso;
1894     task->ret = ret;
1895     task->stat = NoStatus;
1896
1897     appendToRunQueue(cap,tso);
1898
1899     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1900
1901     cap = schedule(cap,task);
1902
1903     ASSERT(task->stat != NoStatus);
1904     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1905
1906     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1907     return cap;
1908 }
1909
1910 /* ----------------------------------------------------------------------------
1911  * Starting Tasks
1912  * ------------------------------------------------------------------------- */
1913
1914 #if defined(THREADED_RTS)
1915 void OSThreadProcAttr
1916 workerStart(Task *task)
1917 {
1918     Capability *cap;
1919
1920     // See startWorkerTask().
1921     ACQUIRE_LOCK(&task->lock);
1922     cap = task->cap;
1923     RELEASE_LOCK(&task->lock);
1924
1925     // set the thread-local pointer to the Task:
1926     taskEnter(task);
1927
1928     // schedule() runs without a lock.
1929     cap = schedule(cap,task);
1930
1931     // On exit from schedule(), we have a Capability.
1932     releaseCapability(cap);
1933     workerTaskStop(task);
1934 }
1935 #endif
1936
1937 /* ---------------------------------------------------------------------------
1938  * initScheduler()
1939  *
1940  * Initialise the scheduler.  This resets all the queues - if the
1941  * queues contained any threads, they'll be garbage collected at the
1942  * next pass.
1943  *
1944  * ------------------------------------------------------------------------ */
1945
1946 void 
1947 initScheduler(void)
1948 {
1949 #if !defined(THREADED_RTS)
1950   blocked_queue_hd  = END_TSO_QUEUE;
1951   blocked_queue_tl  = END_TSO_QUEUE;
1952   sleeping_queue    = END_TSO_QUEUE;
1953 #endif
1954
1955   blackhole_queue   = END_TSO_QUEUE;
1956
1957   context_switch = 0;
1958   sched_state    = SCHED_RUNNING;
1959   recent_activity = ACTIVITY_YES;
1960
1961 #if defined(THREADED_RTS)
1962   /* Initialise the mutex and condition variables used by
1963    * the scheduler. */
1964   initMutex(&sched_mutex);
1965 #endif
1966   
1967   ACQUIRE_LOCK(&sched_mutex);
1968
1969   /* A capability holds the state a native thread needs in
1970    * order to execute STG code. At least one capability is
1971    * floating around (only THREADED_RTS builds have more than one).
1972    */
1973   initCapabilities();
1974
1975   initTaskManager();
1976
1977 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
1978   initSparkPools();
1979 #endif
1980
1981 #if defined(THREADED_RTS)
1982   /*
1983    * Eagerly start one worker to run each Capability, except for
1984    * Capability 0.  The idea is that we're probably going to start a
1985    * bound thread on Capability 0 pretty soon, so we don't want a
1986    * worker task hogging it.
1987    */
1988   { 
1989       nat i;
1990       Capability *cap;
1991       for (i = 1; i < n_capabilities; i++) {
1992           cap = &capabilities[i];
1993           ACQUIRE_LOCK(&cap->lock);
1994           startWorkerTask(cap, workerStart);
1995           RELEASE_LOCK(&cap->lock);
1996       }
1997   }
1998 #endif
1999
2000   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2001
2002   RELEASE_LOCK(&sched_mutex);
2003 }
2004
2005 void
2006 exitScheduler(
2007     rtsBool wait_foreign
2008 #if !defined(THREADED_RTS)
2009                          __attribute__((unused))
2010 #endif
2011 )
2012                /* see Capability.c, shutdownCapability() */
2013 {
2014     Task *task = NULL;
2015
2016 #if defined(THREADED_RTS)
2017     ACQUIRE_LOCK(&sched_mutex);
2018     task = newBoundTask();
2019     RELEASE_LOCK(&sched_mutex);
2020 #endif
2021
2022     // If we haven't killed all the threads yet, do it now.
2023     if (sched_state < SCHED_SHUTTING_DOWN) {
2024         sched_state = SCHED_INTERRUPTING;
2025         scheduleDoGC(NULL,task,rtsFalse);    
2026     }
2027     sched_state = SCHED_SHUTTING_DOWN;
2028
2029 #if defined(THREADED_RTS)
2030     { 
2031         nat i;
2032         
2033         for (i = 0; i < n_capabilities; i++) {
2034             shutdownCapability(&capabilities[i], task, wait_foreign);
2035         }
2036         boundTaskExiting(task);
2037         stopTaskManager();
2038     }
2039 #else
2040     freeCapability(&MainCapability);
2041 #endif
2042 }
2043
2044 void
2045 freeScheduler( void )
2046 {
2047     freeTaskManager();
2048     if (n_capabilities != 1) {
2049         stgFree(capabilities);
2050     }
2051 #if defined(THREADED_RTS)
2052     closeMutex(&sched_mutex);
2053 #endif
2054 }
2055
2056 /* -----------------------------------------------------------------------------
2057    performGC
2058
2059    This is the interface to the garbage collector from Haskell land.
2060    We provide this so that external C code can allocate and garbage
2061    collect when called from Haskell via _ccall_GC.
2062    -------------------------------------------------------------------------- */
2063
2064 static void
2065 performGC_(rtsBool force_major)
2066 {
2067     Task *task;
2068     // We must grab a new Task here, because the existing Task may be
2069     // associated with a particular Capability, and chained onto the 
2070     // suspended_ccalling_tasks queue.
2071     ACQUIRE_LOCK(&sched_mutex);
2072     task = newBoundTask();
2073     RELEASE_LOCK(&sched_mutex);
2074     scheduleDoGC(NULL,task,force_major);
2075     boundTaskExiting(task);
2076 }
2077
2078 void
2079 performGC(void)
2080 {
2081     performGC_(rtsFalse);
2082 }
2083
2084 void
2085 performMajorGC(void)
2086 {
2087     performGC_(rtsTrue);
2088 }
2089
2090 /* -----------------------------------------------------------------------------
2091    Stack overflow
2092
2093    If the thread has reached its maximum stack size, then raise the
2094    StackOverflow exception in the offending thread.  Otherwise
2095    relocate the TSO into a larger chunk of memory and adjust its stack
2096    size appropriately.
2097    -------------------------------------------------------------------------- */
2098
2099 static StgTSO *
2100 threadStackOverflow(Capability *cap, StgTSO *tso)
2101 {
2102   nat new_stack_size, stack_words;
2103   lnat new_tso_size;
2104   StgPtr new_sp;
2105   StgTSO *dest;
2106
2107   IF_DEBUG(sanity,checkTSO(tso));
2108
2109   // don't allow throwTo() to modify the blocked_exceptions queue
2110   // while we are moving the TSO:
2111   lockClosure((StgClosure *)tso);
2112
2113   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2114       // NB. never raise a StackOverflow exception if the thread is
2115       // inside Control.Exceptino.block.  It is impractical to protect
2116       // against stack overflow exceptions, since virtually anything
2117       // can raise one (even 'catch'), so this is the only sensible
2118       // thing to do here.  See bug #767.
2119
2120       debugTrace(DEBUG_gc,
2121                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2122                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2123       IF_DEBUG(gc,
2124                /* If we're debugging, just print out the top of the stack */
2125                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2126                                                 tso->sp+64)));
2127
2128       // Send this thread the StackOverflow exception
2129       unlockTSO(tso);
2130       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2131       return tso;
2132   }
2133
2134   /* Try to double the current stack size.  If that takes us over the
2135    * maximum stack size for this thread, then use the maximum instead.
2136    * Finally round up so the TSO ends up as a whole number of blocks.
2137    */
2138   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2139   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2140                                        TSO_STRUCT_SIZE)/sizeof(W_);
2141   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2142   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2143
2144   debugTrace(DEBUG_sched, 
2145              "increasing stack size from %ld words to %d.",
2146              (long)tso->stack_size, new_stack_size);
2147
2148   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2149   TICK_ALLOC_TSO(new_stack_size,0);
2150
2151   /* copy the TSO block and the old stack into the new area */
2152   memcpy(dest,tso,TSO_STRUCT_SIZE);
2153   stack_words = tso->stack + tso->stack_size - tso->sp;
2154   new_sp = (P_)dest + new_tso_size - stack_words;
2155   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2156
2157   /* relocate the stack pointers... */
2158   dest->sp         = new_sp;
2159   dest->stack_size = new_stack_size;
2160         
2161   /* Mark the old TSO as relocated.  We have to check for relocated
2162    * TSOs in the garbage collector and any primops that deal with TSOs.
2163    *
2164    * It's important to set the sp value to just beyond the end
2165    * of the stack, so we don't attempt to scavenge any part of the
2166    * dead TSO's stack.
2167    */
2168   tso->what_next = ThreadRelocated;
2169   setTSOLink(cap,tso,dest);
2170   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2171   tso->why_blocked = NotBlocked;
2172
2173   IF_PAR_DEBUG(verbose,
2174                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2175                      tso->id, tso, tso->stack_size);
2176                /* If we're debugging, just print out the top of the stack */
2177                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2178                                                 tso->sp+64)));
2179   
2180   unlockTSO(dest);
2181   unlockTSO(tso);
2182
2183   IF_DEBUG(sanity,checkTSO(dest));
2184 #if 0
2185   IF_DEBUG(scheduler,printTSO(dest));
2186 #endif
2187
2188   return dest;
2189 }
2190
2191 static StgTSO *
2192 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2193 {
2194     bdescr *bd, *new_bd;
2195     lnat new_tso_size_w, tso_size_w;
2196     StgTSO *new_tso;
2197
2198     tso_size_w = tso_sizeW(tso);
2199
2200     if (tso_size_w < MBLOCK_SIZE_W || 
2201         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2202     {
2203         return tso;
2204     }
2205
2206     // don't allow throwTo() to modify the blocked_exceptions queue
2207     // while we are moving the TSO:
2208     lockClosure((StgClosure *)tso);
2209
2210     new_tso_size_w = round_to_mblocks(tso_size_w/2);
2211
2212     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2213                (long)tso->id, tso_size_w, new_tso_size_w);
2214
2215     bd = Bdescr((StgPtr)tso);
2216     new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
2217     new_bd->free = bd->free;
2218     bd->free = bd->start + TSO_STRUCT_SIZEW;
2219
2220     new_tso = (StgTSO *)new_bd->start;
2221     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2222     new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
2223
2224     tso->what_next = ThreadRelocated;
2225     tso->_link = new_tso; // no write barrier reqd: same generation
2226
2227     // The TSO attached to this Task may have moved, so update the
2228     // pointer to it.
2229     if (task->tso == tso) {
2230         task->tso = new_tso;
2231     }
2232
2233     unlockTSO(new_tso);
2234     unlockTSO(tso);
2235
2236     IF_DEBUG(sanity,checkTSO(new_tso));
2237
2238     return new_tso;
2239 }
2240
2241 /* ---------------------------------------------------------------------------
2242    Interrupt execution
2243    - usually called inside a signal handler so it mustn't do anything fancy.   
2244    ------------------------------------------------------------------------ */
2245
2246 void
2247 interruptStgRts(void)
2248 {
2249     sched_state = SCHED_INTERRUPTING;
2250     context_switch = 1;
2251     wakeUpRts();
2252 }
2253
2254 /* -----------------------------------------------------------------------------
2255    Wake up the RTS
2256    
2257    This function causes at least one OS thread to wake up and run the
2258    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2259    an external event has arrived that may need servicing (eg. a
2260    keyboard interrupt).
2261
2262    In the single-threaded RTS we don't do anything here; we only have
2263    one thread anyway, and the event that caused us to want to wake up
2264    will have interrupted any blocking system call in progress anyway.
2265    -------------------------------------------------------------------------- */
2266
2267 void
2268 wakeUpRts(void)
2269 {
2270 #if defined(THREADED_RTS)
2271     // This forces the IO Manager thread to wakeup, which will
2272     // in turn ensure that some OS thread wakes up and runs the
2273     // scheduler loop, which will cause a GC and deadlock check.
2274     ioManagerWakeup();
2275 #endif
2276 }
2277
2278 /* -----------------------------------------------------------------------------
2279  * checkBlackHoles()
2280  *
2281  * Check the blackhole_queue for threads that can be woken up.  We do
2282  * this periodically: before every GC, and whenever the run queue is
2283  * empty.
2284  *
2285  * An elegant solution might be to just wake up all the blocked
2286  * threads with awakenBlockedQueue occasionally: they'll go back to
2287  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2288  * doesn't give us a way to tell whether we've actually managed to
2289  * wake up any threads, so we would be busy-waiting.
2290  *
2291  * -------------------------------------------------------------------------- */
2292
2293 static rtsBool
2294 checkBlackHoles (Capability *cap)
2295 {
2296     StgTSO **prev, *t;
2297     rtsBool any_woke_up = rtsFalse;
2298     StgHalfWord type;
2299
2300     // blackhole_queue is global:
2301     ASSERT_LOCK_HELD(&sched_mutex);
2302
2303     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2304
2305     // ASSUMES: sched_mutex
2306     prev = &blackhole_queue;
2307     t = blackhole_queue;
2308     while (t != END_TSO_QUEUE) {
2309         ASSERT(t->why_blocked == BlockedOnBlackHole);
2310         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2311         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2312             IF_DEBUG(sanity,checkTSO(t));
2313             t = unblockOne(cap, t);
2314             *prev = t;
2315             any_woke_up = rtsTrue;
2316         } else {
2317             prev = &t->_link;
2318             t = t->_link;
2319         }
2320     }
2321
2322     return any_woke_up;
2323 }
2324
2325 /* -----------------------------------------------------------------------------
2326    Deleting threads
2327
2328    This is used for interruption (^C) and forking, and corresponds to
2329    raising an exception but without letting the thread catch the
2330    exception.
2331    -------------------------------------------------------------------------- */
2332
2333 static void 
2334 deleteThread (Capability *cap, StgTSO *tso)
2335 {
2336     // NOTE: must only be called on a TSO that we have exclusive
2337     // access to, because we will call throwToSingleThreaded() below.
2338     // The TSO must be on the run queue of the Capability we own, or 
2339     // we must own all Capabilities.
2340
2341     if (tso->why_blocked != BlockedOnCCall &&
2342         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2343         throwToSingleThreaded(cap,tso,NULL);
2344     }
2345 }
2346
2347 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2348 static void 
2349 deleteThread_(Capability *cap, StgTSO *tso)
2350 { // for forkProcess only:
2351   // like deleteThread(), but we delete threads in foreign calls, too.
2352
2353     if (tso->why_blocked == BlockedOnCCall ||
2354         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2355         unblockOne(cap,tso);
2356         tso->what_next = ThreadKilled;
2357     } else {
2358         deleteThread(cap,tso);
2359     }
2360 }
2361 #endif
2362
2363 /* -----------------------------------------------------------------------------
2364    raiseExceptionHelper
2365    
2366    This function is called by the raise# primitve, just so that we can
2367    move some of the tricky bits of raising an exception from C-- into
2368    C.  Who knows, it might be a useful re-useable thing here too.
2369    -------------------------------------------------------------------------- */
2370
2371 StgWord
2372 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2373 {
2374     Capability *cap = regTableToCapability(reg);
2375     StgThunk *raise_closure = NULL;
2376     StgPtr p, next;
2377     StgRetInfoTable *info;
2378     //
2379     // This closure represents the expression 'raise# E' where E
2380     // is the exception raise.  It is used to overwrite all the
2381     // thunks which are currently under evaluataion.
2382     //
2383
2384     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2385     // LDV profiling: stg_raise_info has THUNK as its closure
2386     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2387     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2388     // 1 does not cause any problem unless profiling is performed.
2389     // However, when LDV profiling goes on, we need to linearly scan
2390     // small object pool, where raise_closure is stored, so we should
2391     // use MIN_UPD_SIZE.
2392     //
2393     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2394     //                                 sizeofW(StgClosure)+1);
2395     //
2396
2397     //
2398     // Walk up the stack, looking for the catch frame.  On the way,
2399     // we update any closures pointed to from update frames with the
2400     // raise closure that we just built.
2401     //
2402     p = tso->sp;
2403     while(1) {
2404         info = get_ret_itbl((StgClosure *)p);
2405         next = p + stack_frame_sizeW((StgClosure *)p);
2406         switch (info->i.type) {
2407             
2408         case UPDATE_FRAME:
2409             // Only create raise_closure if we need to.
2410             if (raise_closure == NULL) {
2411                 raise_closure = 
2412                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2413                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2414                 raise_closure->payload[0] = exception;
2415             }
2416             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2417             p = next;
2418             continue;
2419
2420         case ATOMICALLY_FRAME:
2421             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2422             tso->sp = p;
2423             return ATOMICALLY_FRAME;
2424             
2425         case CATCH_FRAME:
2426             tso->sp = p;
2427             return CATCH_FRAME;
2428
2429         case CATCH_STM_FRAME:
2430             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2431             tso->sp = p;
2432             return CATCH_STM_FRAME;
2433             
2434         case STOP_FRAME:
2435             tso->sp = p;
2436             return STOP_FRAME;
2437
2438         case CATCH_RETRY_FRAME:
2439         default:
2440             p = next; 
2441             continue;
2442         }
2443     }
2444 }
2445
2446
2447 /* -----------------------------------------------------------------------------
2448    findRetryFrameHelper
2449
2450    This function is called by the retry# primitive.  It traverses the stack
2451    leaving tso->sp referring to the frame which should handle the retry.  
2452
2453    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2454    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2455
2456    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2457    create) because retries are not considered to be exceptions, despite the
2458    similar implementation.
2459
2460    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2461    not be created within memory transactions.
2462    -------------------------------------------------------------------------- */
2463
2464 StgWord
2465 findRetryFrameHelper (StgTSO *tso)
2466 {
2467   StgPtr           p, next;
2468   StgRetInfoTable *info;
2469
2470   p = tso -> sp;
2471   while (1) {
2472     info = get_ret_itbl((StgClosure *)p);
2473     next = p + stack_frame_sizeW((StgClosure *)p);
2474     switch (info->i.type) {
2475       
2476     case ATOMICALLY_FRAME:
2477         debugTrace(DEBUG_stm,
2478                    "found ATOMICALLY_FRAME at %p during retry", p);
2479         tso->sp = p;
2480         return ATOMICALLY_FRAME;
2481       
2482     case CATCH_RETRY_FRAME:
2483         debugTrace(DEBUG_stm,
2484                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2485         tso->sp = p;
2486         return CATCH_RETRY_FRAME;
2487       
2488     case CATCH_STM_FRAME: {
2489         StgTRecHeader *trec = tso -> trec;
2490         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2491         debugTrace(DEBUG_stm,
2492                    "found CATCH_STM_FRAME at %p during retry", p);
2493         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2494         stmAbortTransaction(tso -> cap, trec);
2495         stmFreeAbortedTRec(tso -> cap, trec);
2496         tso -> trec = outer;
2497         p = next; 
2498         continue;
2499     }
2500       
2501
2502     default:
2503       ASSERT(info->i.type != CATCH_FRAME);
2504       ASSERT(info->i.type != STOP_FRAME);
2505       p = next; 
2506       continue;
2507     }
2508   }
2509 }
2510
2511 /* -----------------------------------------------------------------------------
2512    resurrectThreads is called after garbage collection on the list of
2513    threads found to be garbage.  Each of these threads will be woken
2514    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2515    on an MVar, or NonTermination if the thread was blocked on a Black
2516    Hole.
2517
2518    Locks: assumes we hold *all* the capabilities.
2519    -------------------------------------------------------------------------- */
2520
2521 void
2522 resurrectThreads (StgTSO *threads)
2523 {
2524     StgTSO *tso, *next;
2525     Capability *cap;
2526     step *step;
2527
2528     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2529         next = tso->global_link;
2530
2531         step = Bdescr((P_)tso)->step;
2532         tso->global_link = step->threads;
2533         step->threads = tso;
2534
2535         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2536         
2537         // Wake up the thread on the Capability it was last on
2538         cap = tso->cap;
2539         
2540         switch (tso->why_blocked) {
2541         case BlockedOnMVar:
2542         case BlockedOnException:
2543             /* Called by GC - sched_mutex lock is currently held. */
2544             throwToSingleThreaded(cap, tso,
2545                                   (StgClosure *)blockedOnDeadMVar_closure);
2546             break;
2547         case BlockedOnBlackHole:
2548             throwToSingleThreaded(cap, tso,
2549                                   (StgClosure *)nonTermination_closure);
2550             break;
2551         case BlockedOnSTM:
2552             throwToSingleThreaded(cap, tso,
2553                                   (StgClosure *)blockedIndefinitely_closure);
2554             break;
2555         case NotBlocked:
2556             /* This might happen if the thread was blocked on a black hole
2557              * belonging to a thread that we've just woken up (raiseAsync
2558              * can wake up threads, remember...).
2559              */
2560             continue;
2561         default:
2562             barf("resurrectThreads: thread blocked in a strange way");
2563         }
2564     }
2565 }
2566
2567 /* -----------------------------------------------------------------------------
2568    performPendingThrowTos is called after garbage collection, and
2569    passed a list of threads that were found to have pending throwTos
2570    (tso->blocked_exceptions was not empty), and were blocked.
2571    Normally this doesn't happen, because we would deliver the
2572    exception directly if the target thread is blocked, but there are
2573    small windows where it might occur on a multiprocessor (see
2574    throwTo()).
2575
2576    NB. we must be holding all the capabilities at this point, just
2577    like resurrectThreads().
2578    -------------------------------------------------------------------------- */
2579
2580 void
2581 performPendingThrowTos (StgTSO *threads)
2582 {
2583     StgTSO *tso, *next;
2584     Capability *cap;
2585     step *step;
2586
2587     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2588         next = tso->global_link;
2589
2590         step = Bdescr((P_)tso)->step;
2591         tso->global_link = step->threads;
2592         step->threads = tso;
2593
2594         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2595         
2596         cap = tso->cap;
2597         maybePerformBlockedException(cap, tso);
2598     }
2599 }