Scheduler code cleanup
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag set by signal handler to precipitate a context switch
93  * LOCK: none (just an advisory flag)
94  */
95 int context_switch = 0;
96
97 /* flag that tracks whether we have done any execution in this time slice.
98  * LOCK: currently none, perhaps we should lock (but needs to be
99  * updated in the fast path of the scheduler).
100  */
101 nat recent_activity = ACTIVITY_YES;
102
103 /* if this flag is set as well, give up execution
104  * LOCK: none (changes once, from false->true)
105  */
106 rtsBool sched_state = SCHED_RUNNING;
107
108 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
109  *  exists - earlier gccs apparently didn't.
110  *  -= chak
111  */
112 StgTSO dummy_tso;
113
114 /*
115  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
116  * in an MT setting, needed to signal that a worker thread shouldn't hang around
117  * in the scheduler when it is out of work.
118  */
119 rtsBool shutting_down_scheduler = rtsFalse;
120
121 /*
122  * This mutex protects most of the global scheduler data in
123  * the THREADED_RTS runtime.
124  */
125 #if defined(THREADED_RTS)
126 Mutex sched_mutex;
127 #endif
128
129 #if !defined(mingw32_HOST_OS)
130 #define FORKPROCESS_PRIMOP_SUPPORTED
131 #endif
132
133 /* -----------------------------------------------------------------------------
134  * static function prototypes
135  * -------------------------------------------------------------------------- */
136
137 static Capability *schedule (Capability *initialCapability, Task *task);
138
139 //
140 // These function all encapsulate parts of the scheduler loop, and are
141 // abstracted only to make the structure and control flow of the
142 // scheduler clearer.
143 //
144 static void schedulePreLoop (void);
145 #if defined(THREADED_RTS)
146 static void schedulePushWork(Capability *cap, Task *task);
147 #endif
148 static void scheduleStartSignalHandlers (Capability *cap);
149 static void scheduleCheckBlockedThreads (Capability *cap);
150 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
151 static void scheduleCheckBlackHoles (Capability *cap);
152 static void scheduleDetectDeadlock (Capability *cap, Task *task);
153 #if defined(PARALLEL_HASKELL)
154 static rtsBool scheduleGetRemoteWork(Capability *cap);
155 static void scheduleSendPendingMessages(void);
156 static void scheduleActivateSpark(Capability *cap);
157 #endif
158 static void schedulePostRunThread(StgTSO *t);
159 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
160 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
161                                          StgTSO *t);
162 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
163                                     nat prev_what_next );
164 static void scheduleHandleThreadBlocked( StgTSO *t );
165 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
166                                              StgTSO *t );
167 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
168 static Capability *scheduleDoGC(Capability *cap, Task *task,
169                                 rtsBool force_major);
170
171 static rtsBool checkBlackHoles(Capability *cap);
172
173 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
174 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
175
176 static void deleteThread (Capability *cap, StgTSO *tso);
177 static void deleteAllThreads (Capability *cap);
178
179 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
180 static void deleteThread_(Capability *cap, StgTSO *tso);
181 #endif
182
183 #ifdef DEBUG
184 static char *whatNext_strs[] = {
185   "(unknown)",
186   "ThreadRunGHC",
187   "ThreadInterpret",
188   "ThreadKilled",
189   "ThreadRelocated",
190   "ThreadComplete"
191 };
192 #endif
193
194 /* -----------------------------------------------------------------------------
195  * Putting a thread on the run queue: different scheduling policies
196  * -------------------------------------------------------------------------- */
197
198 STATIC_INLINE void
199 addToRunQueue( Capability *cap, StgTSO *t )
200 {
201 #if defined(PARALLEL_HASKELL)
202     if (RtsFlags.ParFlags.doFairScheduling) { 
203         // this does round-robin scheduling; good for concurrency
204         appendToRunQueue(cap,t);
205     } else {
206         // this does unfair scheduling; good for parallelism
207         pushOnRunQueue(cap,t);
208     }
209 #else
210     // this does round-robin scheduling; good for concurrency
211     appendToRunQueue(cap,t);
212 #endif
213 }
214
215 /* ---------------------------------------------------------------------------
216    Main scheduling loop.
217
218    We use round-robin scheduling, each thread returning to the
219    scheduler loop when one of these conditions is detected:
220
221       * out of heap space
222       * timer expires (thread yields)
223       * thread blocks
224       * thread ends
225       * stack overflow
226
227    GRAN version:
228      In a GranSim setup this loop iterates over the global event queue.
229      This revolves around the global event queue, which determines what 
230      to do next. Therefore, it's more complicated than either the 
231      concurrent or the parallel (GUM) setup.
232   This version has been entirely removed (JB 2008/08).
233
234    GUM version:
235      GUM iterates over incoming messages.
236      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
237      and sends out a fish whenever it has nothing to do; in-between
238      doing the actual reductions (shared code below) it processes the
239      incoming messages and deals with delayed operations 
240      (see PendingFetches).
241      This is not the ugliest code you could imagine, but it's bloody close.
242
243   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
244   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
245   as well as future GUM versions. This file has been refurbished to
246   only contain valid code, which is however incomplete, refers to
247   invalid includes etc.
248
249    ------------------------------------------------------------------------ */
250
251 static Capability *
252 schedule (Capability *initialCapability, Task *task)
253 {
254   StgTSO *t;
255   Capability *cap;
256   StgThreadReturnCode ret;
257 #if defined(PARALLEL_HASKELL)
258   rtsBool receivedFinish = rtsFalse;
259 #endif
260   nat prev_what_next;
261   rtsBool ready_to_gc;
262 #if defined(THREADED_RTS)
263   rtsBool first = rtsTrue;
264 #endif
265   
266   cap = initialCapability;
267
268   // Pre-condition: this task owns initialCapability.
269   // The sched_mutex is *NOT* held
270   // NB. on return, we still hold a capability.
271
272   debugTrace (DEBUG_sched, 
273               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
274               task, initialCapability);
275
276   schedulePreLoop();
277
278   // -----------------------------------------------------------
279   // Scheduler loop starts here:
280
281 #if defined(PARALLEL_HASKELL)
282 #define TERMINATION_CONDITION        (!receivedFinish)
283 #else
284 #define TERMINATION_CONDITION        rtsTrue
285 #endif
286
287   while (TERMINATION_CONDITION) {
288
289 #if defined(THREADED_RTS)
290       if (first) {
291           // don't yield the first time, we want a chance to run this
292           // thread for a bit, even if there are others banging at the
293           // door.
294           first = rtsFalse;
295           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
296       } else {
297           // Yield the capability to higher-priority tasks if necessary.
298           yieldCapability(&cap, task);
299       }
300 #endif
301       
302 #if defined(THREADED_RTS)
303       schedulePushWork(cap,task);
304 #endif
305
306     // Check whether we have re-entered the RTS from Haskell without
307     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
308     // call).
309     if (cap->in_haskell) {
310           errorBelch("schedule: re-entered unsafely.\n"
311                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
312           stg_exit(EXIT_FAILURE);
313     }
314
315     // The interruption / shutdown sequence.
316     // 
317     // In order to cleanly shut down the runtime, we want to:
318     //   * make sure that all main threads return to their callers
319     //     with the state 'Interrupted'.
320     //   * clean up all OS threads assocated with the runtime
321     //   * free all memory etc.
322     //
323     // So the sequence for ^C goes like this:
324     //
325     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
326     //     arranges for some Capability to wake up
327     //
328     //   * all threads in the system are halted, and the zombies are
329     //     placed on the run queue for cleaning up.  We acquire all
330     //     the capabilities in order to delete the threads, this is
331     //     done by scheduleDoGC() for convenience (because GC already
332     //     needs to acquire all the capabilities).  We can't kill
333     //     threads involved in foreign calls.
334     // 
335     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
336     //
337     //   * sched_state := SCHED_SHUTTING_DOWN
338     //
339     //   * all workers exit when the run queue on their capability
340     //     drains.  All main threads will also exit when their TSO
341     //     reaches the head of the run queue and they can return.
342     //
343     //   * eventually all Capabilities will shut down, and the RTS can
344     //     exit.
345     //
346     //   * We might be left with threads blocked in foreign calls, 
347     //     we should really attempt to kill these somehow (TODO);
348     
349     switch (sched_state) {
350     case SCHED_RUNNING:
351         break;
352     case SCHED_INTERRUPTING:
353         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
354 #if defined(THREADED_RTS)
355         discardSparksCap(cap);
356 #endif
357         /* scheduleDoGC() deletes all the threads */
358         cap = scheduleDoGC(cap,task,rtsFalse);
359         break;
360     case SCHED_SHUTTING_DOWN:
361         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
362         // If we are a worker, just exit.  If we're a bound thread
363         // then we will exit below when we've removed our TSO from
364         // the run queue.
365         if (task->tso == NULL && emptyRunQueue(cap)) {
366             return cap;
367         }
368         break;
369     default:
370         barf("sched_state: %d", sched_state);
371     }
372
373 #if defined(THREADED_RTS)
374     // If the run queue is empty, take a spark and turn it into a thread.
375     {
376         if (emptyRunQueue(cap)) {
377             StgClosure *spark;
378             spark = findSpark(cap);
379             if (spark != NULL) {
380                 debugTrace(DEBUG_sched,
381                            "turning spark of closure %p into a thread",
382                            (StgClosure *)spark);
383                 createSparkThread(cap,spark);     
384             }
385         }
386     }
387 #endif // THREADED_RTS
388
389     scheduleStartSignalHandlers(cap);
390
391     // Only check the black holes here if we've nothing else to do.
392     // During normal execution, the black hole list only gets checked
393     // at GC time, to avoid repeatedly traversing this possibly long
394     // list each time around the scheduler.
395     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
396
397     scheduleCheckWakeupThreads(cap);
398
399     scheduleCheckBlockedThreads(cap);
400
401 #if defined(PARALLEL_HASKELL)
402     /* message processing and work distribution goes here */ 
403
404     /* if messages have been buffered... a NOOP in THREADED_RTS */
405     scheduleSendPendingMessages();
406
407     /* If the run queue is empty,...*/
408     if (emptyRunQueue(cap)) {
409       /* ...take one of our own sparks and turn it into a thread */
410       scheduleActivateSpark(cap);
411
412         /* if this did not work, try to steal a spark from someone else */
413       if (emptyRunQueue(cap)) {
414         receivedFinish = scheduleGetRemoteWork(cap);
415         continue; //  a new round, (hopefully) with new work
416         /* 
417            in GUM, this a) sends out a FISH and returns IF no fish is
418                            out already
419                         b) (blocking) awaits and receives messages
420            
421            in Eden, this is only the blocking receive, as b) in GUM.
422         */
423       }
424     } 
425
426     /* since we perform a blocking receive and continue otherwise,
427        either we never reach here or we definitely have work! */
428     // from here: non-empty run queue
429     ASSERT(!emptyRunQueue(cap));
430
431     if (PacketsWaiting()) {  /* now process incoming messages, if any
432                                 pending...  
433
434                                 CAUTION: scheduleGetRemoteWork called
435                                 above, waits for messages as well! */
436       processMessages(cap, &receivedFinish);
437     }
438 #endif // PARALLEL_HASKELL
439
440     scheduleDetectDeadlock(cap,task);
441 #if defined(THREADED_RTS)
442     cap = task->cap;    // reload cap, it might have changed
443 #endif
444
445     // Normally, the only way we can get here with no threads to
446     // run is if a keyboard interrupt received during 
447     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
448     // Additionally, it is not fatal for the
449     // threaded RTS to reach here with no threads to run.
450     //
451     // win32: might be here due to awaitEvent() being abandoned
452     // as a result of a console event having been delivered.
453     if ( emptyRunQueue(cap) ) {
454 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
455         ASSERT(sched_state >= SCHED_INTERRUPTING);
456 #endif
457         continue; // nothing to do
458     }
459
460     // 
461     // Get a thread to run
462     //
463     t = popRunQueue(cap);
464
465     // Sanity check the thread we're about to run.  This can be
466     // expensive if there is lots of thread switching going on...
467     IF_DEBUG(sanity,checkTSO(t));
468
469 #if defined(THREADED_RTS)
470     // Check whether we can run this thread in the current task.
471     // If not, we have to pass our capability to the right task.
472     {
473         Task *bound = t->bound;
474       
475         if (bound) {
476             if (bound == task) {
477                 debugTrace(DEBUG_sched,
478                            "### Running thread %lu in bound thread", (unsigned long)t->id);
479                 // yes, the Haskell thread is bound to the current native thread
480             } else {
481                 debugTrace(DEBUG_sched,
482                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
483                 // no, bound to a different Haskell thread: pass to that thread
484                 pushOnRunQueue(cap,t);
485                 continue;
486             }
487         } else {
488             // The thread we want to run is unbound.
489             if (task->tso) { 
490                 debugTrace(DEBUG_sched,
491                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
492                 // no, the current native thread is bound to a different
493                 // Haskell thread, so pass it to any worker thread
494                 pushOnRunQueue(cap,t);
495                 continue; 
496             }
497         }
498     }
499 #endif
500
501     /* context switches are initiated by the timer signal, unless
502      * the user specified "context switch as often as possible", with
503      * +RTS -C0
504      */
505     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
506         && !emptyThreadQueues(cap)) {
507         context_switch = 1;
508     }
509          
510 run_thread:
511
512     // CurrentTSO is the thread to run.  t might be different if we
513     // loop back to run_thread, so make sure to set CurrentTSO after
514     // that.
515     cap->r.rCurrentTSO = t;
516
517     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
518                               (long)t->id, whatNext_strs[t->what_next]);
519
520     startHeapProfTimer();
521
522     // Check for exceptions blocked on this thread
523     maybePerformBlockedException (cap, t);
524
525     // ----------------------------------------------------------------------
526     // Run the current thread 
527
528     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
529     ASSERT(t->cap == cap);
530
531     prev_what_next = t->what_next;
532
533     errno = t->saved_errno;
534 #if mingw32_HOST_OS
535     SetLastError(t->saved_winerror);
536 #endif
537
538     cap->in_haskell = rtsTrue;
539
540     dirty_TSO(cap,t);
541
542 #if defined(THREADED_RTS)
543     if (recent_activity == ACTIVITY_DONE_GC) {
544         // ACTIVITY_DONE_GC means we turned off the timer signal to
545         // conserve power (see #1623).  Re-enable it here.
546         nat prev;
547         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
548         if (prev == ACTIVITY_DONE_GC) {
549             startTimer();
550         }
551     } else {
552         recent_activity = ACTIVITY_YES;
553     }
554 #endif
555
556     switch (prev_what_next) {
557         
558     case ThreadKilled:
559     case ThreadComplete:
560         /* Thread already finished, return to scheduler. */
561         ret = ThreadFinished;
562         break;
563         
564     case ThreadRunGHC:
565     {
566         StgRegTable *r;
567         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
568         cap = regTableToCapability(r);
569         ret = r->rRet;
570         break;
571     }
572     
573     case ThreadInterpret:
574         cap = interpretBCO(cap);
575         ret = cap->r.rRet;
576         break;
577         
578     default:
579         barf("schedule: invalid what_next field");
580     }
581
582     cap->in_haskell = rtsFalse;
583
584     // The TSO might have moved, eg. if it re-entered the RTS and a GC
585     // happened.  So find the new location:
586     t = cap->r.rCurrentTSO;
587
588     // We have run some Haskell code: there might be blackhole-blocked
589     // threads to wake up now.
590     // Lock-free test here should be ok, we're just setting a flag.
591     if ( blackhole_queue != END_TSO_QUEUE ) {
592         blackholes_need_checking = rtsTrue;
593     }
594     
595     // And save the current errno in this thread.
596     // XXX: possibly bogus for SMP because this thread might already
597     // be running again, see code below.
598     t->saved_errno = errno;
599 #if mingw32_HOST_OS
600     // Similarly for Windows error code
601     t->saved_winerror = GetLastError();
602 #endif
603
604 #if defined(THREADED_RTS)
605     // If ret is ThreadBlocked, and this Task is bound to the TSO that
606     // blocked, we are in limbo - the TSO is now owned by whatever it
607     // is blocked on, and may in fact already have been woken up,
608     // perhaps even on a different Capability.  It may be the case
609     // that task->cap != cap.  We better yield this Capability
610     // immediately and return to normaility.
611     if (ret == ThreadBlocked) {
612         debugTrace(DEBUG_sched,
613                    "--<< thread %lu (%s) stopped: blocked",
614                    (unsigned long)t->id, whatNext_strs[t->what_next]);
615         continue;
616     }
617 #endif
618
619     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
620     ASSERT(t->cap == cap);
621
622     // ----------------------------------------------------------------------
623     
624     // Costs for the scheduler are assigned to CCS_SYSTEM
625     stopHeapProfTimer();
626 #if defined(PROFILING)
627     CCCS = CCS_SYSTEM;
628 #endif
629     
630     schedulePostRunThread(t);
631
632     t = threadStackUnderflow(task,t);
633
634     ready_to_gc = rtsFalse;
635
636     switch (ret) {
637     case HeapOverflow:
638         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
639         break;
640
641     case StackOverflow:
642         scheduleHandleStackOverflow(cap,task,t);
643         break;
644
645     case ThreadYielding:
646         if (scheduleHandleYield(cap, t, prev_what_next)) {
647             // shortcut for switching between compiler/interpreter:
648             goto run_thread; 
649         }
650         break;
651
652     case ThreadBlocked:
653         scheduleHandleThreadBlocked(t);
654         break;
655
656     case ThreadFinished:
657         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
658         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
659         break;
660
661     default:
662       barf("schedule: invalid thread return code %d", (int)ret);
663     }
664
665     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
666       cap = scheduleDoGC(cap,task,rtsFalse);
667     }
668   } /* end of while() */
669 }
670
671 /* ----------------------------------------------------------------------------
672  * Setting up the scheduler loop
673  * ------------------------------------------------------------------------- */
674
675 static void
676 schedulePreLoop(void)
677 {
678   // initialisation for scheduler - what cannot go into initScheduler()  
679 }
680
681 /* -----------------------------------------------------------------------------
682  * schedulePushWork()
683  *
684  * Push work to other Capabilities if we have some.
685  * -------------------------------------------------------------------------- */
686
687 #if defined(THREADED_RTS)
688 static void
689 schedulePushWork(Capability *cap USED_IF_THREADS, 
690                  Task *task      USED_IF_THREADS)
691 {
692     Capability *free_caps[n_capabilities], *cap0;
693     nat i, n_free_caps;
694
695     // migration can be turned off with +RTS -qg
696     if (!RtsFlags.ParFlags.migrate) return;
697
698     // Check whether we have more threads on our run queue, or sparks
699     // in our pool, that we could hand to another Capability.
700     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
701         && sparkPoolSizeCap(cap) < 2) {
702         return;
703     }
704
705     // First grab as many free Capabilities as we can.
706     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
707         cap0 = &capabilities[i];
708         if (cap != cap0 && tryGrabCapability(cap0,task)) {
709             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
710                 // it already has some work, we just grabbed it at 
711                 // the wrong moment.  Or maybe it's deadlocked!
712                 releaseCapability(cap0);
713             } else {
714                 free_caps[n_free_caps++] = cap0;
715             }
716         }
717     }
718
719     // we now have n_free_caps free capabilities stashed in
720     // free_caps[].  Share our run queue equally with them.  This is
721     // probably the simplest thing we could do; improvements we might
722     // want to do include:
723     //
724     //   - giving high priority to moving relatively new threads, on 
725     //     the gournds that they haven't had time to build up a
726     //     working set in the cache on this CPU/Capability.
727     //
728     //   - giving low priority to moving long-lived threads
729
730     if (n_free_caps > 0) {
731         StgTSO *prev, *t, *next;
732         rtsBool pushed_to_all;
733
734         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
735
736         i = 0;
737         pushed_to_all = rtsFalse;
738
739         if (cap->run_queue_hd != END_TSO_QUEUE) {
740             prev = cap->run_queue_hd;
741             t = prev->_link;
742             prev->_link = END_TSO_QUEUE;
743             for (; t != END_TSO_QUEUE; t = next) {
744                 next = t->_link;
745                 t->_link = END_TSO_QUEUE;
746                 if (t->what_next == ThreadRelocated
747                     || t->bound == task // don't move my bound thread
748                     || tsoLocked(t)) {  // don't move a locked thread
749                     setTSOLink(cap, prev, t);
750                     prev = t;
751                 } else if (i == n_free_caps) {
752                     pushed_to_all = rtsTrue;
753                     i = 0;
754                     // keep one for us
755                     setTSOLink(cap, prev, t);
756                     prev = t;
757                 } else {
758                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
759                     appendToRunQueue(free_caps[i],t);
760                     if (t->bound) { t->bound->cap = free_caps[i]; }
761                     t->cap = free_caps[i];
762                     i++;
763                 }
764             }
765             cap->run_queue_tl = prev;
766         }
767
768         // If there are some free capabilities that we didn't push any
769         // threads to, then try to push a spark to each one.
770         if (!pushed_to_all) {
771             StgClosure *spark;
772             // i is the next free capability to push to
773             for (; i < n_free_caps; i++) {
774                 if (emptySparkPoolCap(free_caps[i])) {
775                     spark = findSpark(cap);
776                     if (spark != NULL) {
777                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
778                         newSpark(&(free_caps[i]->r), spark);
779                     }
780                 }
781             }
782         }
783
784         // release the capabilities
785         for (i = 0; i < n_free_caps; i++) {
786             task->cap = free_caps[i];
787             releaseCapability(free_caps[i]);
788         }
789     }
790     task->cap = cap; // reset to point to our Capability.
791 }
792 #endif
793
794 /* ----------------------------------------------------------------------------
795  * Start any pending signal handlers
796  * ------------------------------------------------------------------------- */
797
798 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
799 static void
800 scheduleStartSignalHandlers(Capability *cap)
801 {
802     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
803         // safe outside the lock
804         startSignalHandlers(cap);
805     }
806 }
807 #else
808 static void
809 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
810 {
811 }
812 #endif
813
814 /* ----------------------------------------------------------------------------
815  * Check for blocked threads that can be woken up.
816  * ------------------------------------------------------------------------- */
817
818 static void
819 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
820 {
821 #if !defined(THREADED_RTS)
822     //
823     // Check whether any waiting threads need to be woken up.  If the
824     // run queue is empty, and there are no other tasks running, we
825     // can wait indefinitely for something to happen.
826     //
827     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
828     {
829         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
830     }
831 #endif
832 }
833
834
835 /* ----------------------------------------------------------------------------
836  * Check for threads woken up by other Capabilities
837  * ------------------------------------------------------------------------- */
838
839 static void
840 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
841 {
842 #if defined(THREADED_RTS)
843     // Any threads that were woken up by other Capabilities get
844     // appended to our run queue.
845     if (!emptyWakeupQueue(cap)) {
846         ACQUIRE_LOCK(&cap->lock);
847         if (emptyRunQueue(cap)) {
848             cap->run_queue_hd = cap->wakeup_queue_hd;
849             cap->run_queue_tl = cap->wakeup_queue_tl;
850         } else {
851             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
852             cap->run_queue_tl = cap->wakeup_queue_tl;
853         }
854         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
855         RELEASE_LOCK(&cap->lock);
856     }
857 #endif
858 }
859
860 /* ----------------------------------------------------------------------------
861  * Check for threads blocked on BLACKHOLEs that can be woken up
862  * ------------------------------------------------------------------------- */
863 static void
864 scheduleCheckBlackHoles (Capability *cap)
865 {
866     if ( blackholes_need_checking ) // check without the lock first
867     {
868         ACQUIRE_LOCK(&sched_mutex);
869         if ( blackholes_need_checking ) {
870             checkBlackHoles(cap);
871             blackholes_need_checking = rtsFalse;
872         }
873         RELEASE_LOCK(&sched_mutex);
874     }
875 }
876
877 /* ----------------------------------------------------------------------------
878  * Detect deadlock conditions and attempt to resolve them.
879  * ------------------------------------------------------------------------- */
880
881 static void
882 scheduleDetectDeadlock (Capability *cap, Task *task)
883 {
884
885 #if defined(PARALLEL_HASKELL)
886     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
887     return;
888 #endif
889
890     /* 
891      * Detect deadlock: when we have no threads to run, there are no
892      * threads blocked, waiting for I/O, or sleeping, and all the
893      * other tasks are waiting for work, we must have a deadlock of
894      * some description.
895      */
896     if ( emptyThreadQueues(cap) )
897     {
898 #if defined(THREADED_RTS)
899         /* 
900          * In the threaded RTS, we only check for deadlock if there
901          * has been no activity in a complete timeslice.  This means
902          * we won't eagerly start a full GC just because we don't have
903          * any threads to run currently.
904          */
905         if (recent_activity != ACTIVITY_INACTIVE) return;
906 #endif
907
908         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
909
910         // Garbage collection can release some new threads due to
911         // either (a) finalizers or (b) threads resurrected because
912         // they are unreachable and will therefore be sent an
913         // exception.  Any threads thus released will be immediately
914         // runnable.
915         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
916
917         recent_activity = ACTIVITY_DONE_GC;
918         // disable timer signals (see #1623)
919         stopTimer();
920         
921         if ( !emptyRunQueue(cap) ) return;
922
923 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
924         /* If we have user-installed signal handlers, then wait
925          * for signals to arrive rather then bombing out with a
926          * deadlock.
927          */
928         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
929             debugTrace(DEBUG_sched,
930                        "still deadlocked, waiting for signals...");
931
932             awaitUserSignals();
933
934             if (signals_pending()) {
935                 startSignalHandlers(cap);
936             }
937
938             // either we have threads to run, or we were interrupted:
939             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
940
941             return;
942         }
943 #endif
944
945 #if !defined(THREADED_RTS)
946         /* Probably a real deadlock.  Send the current main thread the
947          * Deadlock exception.
948          */
949         if (task->tso) {
950             switch (task->tso->why_blocked) {
951             case BlockedOnSTM:
952             case BlockedOnBlackHole:
953             case BlockedOnException:
954             case BlockedOnMVar:
955                 throwToSingleThreaded(cap, task->tso, 
956                                       (StgClosure *)nonTermination_closure);
957                 return;
958             default:
959                 barf("deadlock: main thread blocked in a strange way");
960             }
961         }
962         return;
963 #endif
964     }
965 }
966
967
968 /* ----------------------------------------------------------------------------
969  * Send pending messages (PARALLEL_HASKELL only)
970  * ------------------------------------------------------------------------- */
971
972 static StgTSO *
973 scheduleSendPendingMessages(void)
974 {
975 #if defined(PARALLEL_HASKELL)
976
977 # if defined(PAR) // global Mem.Mgmt., omit for now
978     if (PendingFetches != END_BF_QUEUE) {
979         processFetches();
980     }
981 # endif
982     
983     if (RtsFlags.ParFlags.BufferTime) {
984         // if we use message buffering, we must send away all message
985         // packets which have become too old...
986         sendOldBuffers(); 
987     }
988 #endif
989 }
990
991 /* ----------------------------------------------------------------------------
992  * Activate spark threads (PARALLEL_HASKELL only)
993  * ------------------------------------------------------------------------- */
994
995 #if defined(PARALLEL_HASKELL)
996 static void
997 scheduleActivateSpark(Capability *cap)
998 {
999     StgClosure *spark;
1000
1001 /* We only want to stay here if the run queue is empty and we want some
1002    work. We try to turn a spark into a thread, and add it to the run
1003    queue, from where it will be picked up in the next iteration of the
1004    scheduler loop.  
1005 */
1006     if (!emptyRunQueue(cap)) 
1007       /* In the threaded RTS, another task might have pushed a thread
1008          on our run queue in the meantime ? But would need a lock.. */
1009       return;
1010
1011     spark = findSpark(cap); // defined in Sparks.c
1012
1013     if (spark != NULL) {
1014       debugTrace(DEBUG_sched,
1015                  "turning spark of closure %p into a thread",
1016                  (StgClosure *)spark);
1017       createSparkThread(cap,spark); // defined in Sparks.c
1018     }
1019 }
1020 #endif // PARALLEL_HASKELL
1021
1022 /* ----------------------------------------------------------------------------
1023  * Get work from a remote node (PARALLEL_HASKELL only)
1024  * ------------------------------------------------------------------------- */
1025     
1026 #if defined(PARALLEL_HASKELL)
1027 static rtsBool
1028 scheduleGetRemoteWork(Capability *cap)
1029 {
1030 #if defined(PARALLEL_HASKELL)
1031   rtsBool receivedFinish = rtsFalse;
1032
1033   // idle() , i.e. send all buffers, wait for work
1034   if (RtsFlags.ParFlags.BufferTime) {
1035         IF_PAR_DEBUG(verbose, 
1036                 debugBelch("...send all pending data,"));
1037         {
1038           nat i;
1039           for (i=1; i<=nPEs; i++)
1040             sendImmediately(i); // send all messages away immediately
1041         }
1042   }
1043
1044   /* this would be the place for fishing in GUM... 
1045
1046      if (no-earlier-fish-around) 
1047           sendFish(choosePe());
1048    */
1049
1050   // Eden:just look for incoming messages (blocking receive)
1051   IF_PAR_DEBUG(verbose, 
1052                debugBelch("...wait for incoming messages...\n"));
1053   processMessages(cap, &receivedFinish); // blocking receive...
1054
1055
1056   return receivedFinish;
1057   // reenter scheduling look after having received something
1058
1059 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1060
1061   return rtsFalse; /* return value unused in THREADED_RTS */
1062
1063 #endif /* PARALLEL_HASKELL */
1064 }
1065 #endif // PARALLEL_HASKELL
1066
1067 /* ----------------------------------------------------------------------------
1068  * After running a thread...
1069  * ------------------------------------------------------------------------- */
1070
1071 static void
1072 schedulePostRunThread (StgTSO *t)
1073 {
1074     // We have to be able to catch transactions that are in an
1075     // infinite loop as a result of seeing an inconsistent view of
1076     // memory, e.g. 
1077     //
1078     //   atomically $ do
1079     //       [a,b] <- mapM readTVar [ta,tb]
1080     //       when (a == b) loop
1081     //
1082     // and a is never equal to b given a consistent view of memory.
1083     //
1084     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1085         if (!stmValidateNestOfTransactions (t -> trec)) {
1086             debugTrace(DEBUG_sched | DEBUG_stm,
1087                        "trec %p found wasting its time", t);
1088             
1089             // strip the stack back to the
1090             // ATOMICALLY_FRAME, aborting the (nested)
1091             // transaction, and saving the stack of any
1092             // partially-evaluated thunks on the heap.
1093             throwToSingleThreaded_(&capabilities[0], t, 
1094                                    NULL, rtsTrue, NULL);
1095             
1096             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1097         }
1098     }
1099
1100   /* some statistics gathering in the parallel case */
1101 }
1102
1103 /* -----------------------------------------------------------------------------
1104  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1105  * -------------------------------------------------------------------------- */
1106
1107 static rtsBool
1108 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1109 {
1110     // did the task ask for a large block?
1111     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1112         // if so, get one and push it on the front of the nursery.
1113         bdescr *bd;
1114         lnat blocks;
1115         
1116         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1117         
1118         debugTrace(DEBUG_sched,
1119                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1120                    (long)t->id, whatNext_strs[t->what_next], blocks);
1121     
1122         // don't do this if the nursery is (nearly) full, we'll GC first.
1123         if (cap->r.rCurrentNursery->link != NULL ||
1124             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1125                                                // if the nursery has only one block.
1126             
1127             ACQUIRE_SM_LOCK
1128             bd = allocGroup( blocks );
1129             RELEASE_SM_LOCK
1130             cap->r.rNursery->n_blocks += blocks;
1131             
1132             // link the new group into the list
1133             bd->link = cap->r.rCurrentNursery;
1134             bd->u.back = cap->r.rCurrentNursery->u.back;
1135             if (cap->r.rCurrentNursery->u.back != NULL) {
1136                 cap->r.rCurrentNursery->u.back->link = bd;
1137             } else {
1138 #if !defined(THREADED_RTS)
1139                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1140                        g0s0 == cap->r.rNursery);
1141 #endif
1142                 cap->r.rNursery->blocks = bd;
1143             }             
1144             cap->r.rCurrentNursery->u.back = bd;
1145             
1146             // initialise it as a nursery block.  We initialise the
1147             // step, gen_no, and flags field of *every* sub-block in
1148             // this large block, because this is easier than making
1149             // sure that we always find the block head of a large
1150             // block whenever we call Bdescr() (eg. evacuate() and
1151             // isAlive() in the GC would both have to do this, at
1152             // least).
1153             { 
1154                 bdescr *x;
1155                 for (x = bd; x < bd + blocks; x++) {
1156                     x->step = cap->r.rNursery;
1157                     x->gen_no = 0;
1158                     x->flags = 0;
1159                 }
1160             }
1161             
1162             // This assert can be a killer if the app is doing lots
1163             // of large block allocations.
1164             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1165             
1166             // now update the nursery to point to the new block
1167             cap->r.rCurrentNursery = bd;
1168             
1169             // we might be unlucky and have another thread get on the
1170             // run queue before us and steal the large block, but in that
1171             // case the thread will just end up requesting another large
1172             // block.
1173             pushOnRunQueue(cap,t);
1174             return rtsFalse;  /* not actually GC'ing */
1175         }
1176     }
1177     
1178     debugTrace(DEBUG_sched,
1179                "--<< thread %ld (%s) stopped: HeapOverflow",
1180                (long)t->id, whatNext_strs[t->what_next]);
1181
1182     if (context_switch) {
1183         // Sometimes we miss a context switch, e.g. when calling
1184         // primitives in a tight loop, MAYBE_GC() doesn't check the
1185         // context switch flag, and we end up waiting for a GC.
1186         // See #1984, and concurrent/should_run/1984
1187         context_switch = 0;
1188         addToRunQueue(cap,t);
1189     } else {
1190         pushOnRunQueue(cap,t);
1191     }
1192     return rtsTrue;
1193     /* actual GC is done at the end of the while loop in schedule() */
1194 }
1195
1196 /* -----------------------------------------------------------------------------
1197  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1198  * -------------------------------------------------------------------------- */
1199
1200 static void
1201 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1202 {
1203     debugTrace (DEBUG_sched,
1204                 "--<< thread %ld (%s) stopped, StackOverflow", 
1205                 (long)t->id, whatNext_strs[t->what_next]);
1206
1207     /* just adjust the stack for this thread, then pop it back
1208      * on the run queue.
1209      */
1210     { 
1211         /* enlarge the stack */
1212         StgTSO *new_t = threadStackOverflow(cap, t);
1213         
1214         /* The TSO attached to this Task may have moved, so update the
1215          * pointer to it.
1216          */
1217         if (task->tso == t) {
1218             task->tso = new_t;
1219         }
1220         pushOnRunQueue(cap,new_t);
1221     }
1222 }
1223
1224 /* -----------------------------------------------------------------------------
1225  * Handle a thread that returned to the scheduler with ThreadYielding
1226  * -------------------------------------------------------------------------- */
1227
1228 static rtsBool
1229 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1230 {
1231     // Reset the context switch flag.  We don't do this just before
1232     // running the thread, because that would mean we would lose ticks
1233     // during GC, which can lead to unfair scheduling (a thread hogs
1234     // the CPU because the tick always arrives during GC).  This way
1235     // penalises threads that do a lot of allocation, but that seems
1236     // better than the alternative.
1237     context_switch = 0;
1238     
1239     /* put the thread back on the run queue.  Then, if we're ready to
1240      * GC, check whether this is the last task to stop.  If so, wake
1241      * up the GC thread.  getThread will block during a GC until the
1242      * GC is finished.
1243      */
1244 #ifdef DEBUG
1245     if (t->what_next != prev_what_next) {
1246         debugTrace(DEBUG_sched,
1247                    "--<< thread %ld (%s) stopped to switch evaluators", 
1248                    (long)t->id, whatNext_strs[t->what_next]);
1249     } else {
1250         debugTrace(DEBUG_sched,
1251                    "--<< thread %ld (%s) stopped, yielding",
1252                    (long)t->id, whatNext_strs[t->what_next]);
1253     }
1254 #endif
1255     
1256     IF_DEBUG(sanity,
1257              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1258              checkTSO(t));
1259     ASSERT(t->_link == END_TSO_QUEUE);
1260     
1261     // Shortcut if we're just switching evaluators: don't bother
1262     // doing stack squeezing (which can be expensive), just run the
1263     // thread.
1264     if (t->what_next != prev_what_next) {
1265         return rtsTrue;
1266     }
1267
1268     addToRunQueue(cap,t);
1269
1270     return rtsFalse;
1271 }
1272
1273 /* -----------------------------------------------------------------------------
1274  * Handle a thread that returned to the scheduler with ThreadBlocked
1275  * -------------------------------------------------------------------------- */
1276
1277 static void
1278 scheduleHandleThreadBlocked( StgTSO *t
1279 #if !defined(GRAN) && !defined(DEBUG)
1280     STG_UNUSED
1281 #endif
1282     )
1283 {
1284
1285       // We don't need to do anything.  The thread is blocked, and it
1286       // has tidied up its stack and placed itself on whatever queue
1287       // it needs to be on.
1288
1289     // ASSERT(t->why_blocked != NotBlocked);
1290     // Not true: for example,
1291     //    - in THREADED_RTS, the thread may already have been woken
1292     //      up by another Capability.  This actually happens: try
1293     //      conc023 +RTS -N2.
1294     //    - the thread may have woken itself up already, because
1295     //      threadPaused() might have raised a blocked throwTo
1296     //      exception, see maybePerformBlockedException().
1297
1298 #ifdef DEBUG
1299     if (traceClass(DEBUG_sched)) {
1300         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1301                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1302         printThreadBlockage(t);
1303         debugTraceEnd();
1304     }
1305 #endif
1306 }
1307
1308 /* -----------------------------------------------------------------------------
1309  * Handle a thread that returned to the scheduler with ThreadFinished
1310  * -------------------------------------------------------------------------- */
1311
1312 static rtsBool
1313 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1314 {
1315     /* Need to check whether this was a main thread, and if so,
1316      * return with the return value.
1317      *
1318      * We also end up here if the thread kills itself with an
1319      * uncaught exception, see Exception.cmm.
1320      */
1321     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1322                (unsigned long)t->id, whatNext_strs[t->what_next]);
1323
1324       //
1325       // Check whether the thread that just completed was a bound
1326       // thread, and if so return with the result.  
1327       //
1328       // There is an assumption here that all thread completion goes
1329       // through this point; we need to make sure that if a thread
1330       // ends up in the ThreadKilled state, that it stays on the run
1331       // queue so it can be dealt with here.
1332       //
1333
1334       if (t->bound) {
1335
1336           if (t->bound != task) {
1337 #if !defined(THREADED_RTS)
1338               // Must be a bound thread that is not the topmost one.  Leave
1339               // it on the run queue until the stack has unwound to the
1340               // point where we can deal with this.  Leaving it on the run
1341               // queue also ensures that the garbage collector knows about
1342               // this thread and its return value (it gets dropped from the
1343               // step->threads list so there's no other way to find it).
1344               appendToRunQueue(cap,t);
1345               return rtsFalse;
1346 #else
1347               // this cannot happen in the threaded RTS, because a
1348               // bound thread can only be run by the appropriate Task.
1349               barf("finished bound thread that isn't mine");
1350 #endif
1351           }
1352
1353           ASSERT(task->tso == t);
1354
1355           if (t->what_next == ThreadComplete) {
1356               if (task->ret) {
1357                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1358                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1359               }
1360               task->stat = Success;
1361           } else {
1362               if (task->ret) {
1363                   *(task->ret) = NULL;
1364               }
1365               if (sched_state >= SCHED_INTERRUPTING) {
1366                   task->stat = Interrupted;
1367               } else {
1368                   task->stat = Killed;
1369               }
1370           }
1371 #ifdef DEBUG
1372           removeThreadLabel((StgWord)task->tso->id);
1373 #endif
1374           return rtsTrue; // tells schedule() to return
1375       }
1376
1377       return rtsFalse;
1378 }
1379
1380 /* -----------------------------------------------------------------------------
1381  * Perform a heap census
1382  * -------------------------------------------------------------------------- */
1383
1384 static rtsBool
1385 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1386 {
1387     // When we have +RTS -i0 and we're heap profiling, do a census at
1388     // every GC.  This lets us get repeatable runs for debugging.
1389     if (performHeapProfile ||
1390         (RtsFlags.ProfFlags.profileInterval==0 &&
1391          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1392         return rtsTrue;
1393     } else {
1394         return rtsFalse;
1395     }
1396 }
1397
1398 /* -----------------------------------------------------------------------------
1399  * Perform a garbage collection if necessary
1400  * -------------------------------------------------------------------------- */
1401
1402 static Capability *
1403 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1404 {
1405     StgTSO *t;
1406     rtsBool heap_census;
1407 #ifdef THREADED_RTS
1408     static volatile StgWord waiting_for_gc;
1409     rtsBool was_waiting;
1410     nat i;
1411 #endif
1412
1413 #ifdef THREADED_RTS
1414     // In order to GC, there must be no threads running Haskell code.
1415     // Therefore, the GC thread needs to hold *all* the capabilities,
1416     // and release them after the GC has completed.  
1417     //
1418     // This seems to be the simplest way: previous attempts involved
1419     // making all the threads with capabilities give up their
1420     // capabilities and sleep except for the *last* one, which
1421     // actually did the GC.  But it's quite hard to arrange for all
1422     // the other tasks to sleep and stay asleep.
1423     //
1424         
1425     was_waiting = cas(&waiting_for_gc, 0, 1);
1426     if (was_waiting) {
1427         do {
1428             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1429             if (cap) yieldCapability(&cap,task);
1430         } while (waiting_for_gc);
1431         return cap;  // NOTE: task->cap might have changed here
1432     }
1433
1434     for (i=0; i < n_capabilities; i++) {
1435         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1436         if (cap != &capabilities[i]) {
1437             Capability *pcap = &capabilities[i];
1438             // we better hope this task doesn't get migrated to
1439             // another Capability while we're waiting for this one.
1440             // It won't, because load balancing happens while we have
1441             // all the Capabilities, but even so it's a slightly
1442             // unsavoury invariant.
1443             task->cap = pcap;
1444             context_switch = 1;
1445             waitForReturnCapability(&pcap, task);
1446             if (pcap != &capabilities[i]) {
1447                 barf("scheduleDoGC: got the wrong capability");
1448             }
1449         }
1450     }
1451
1452     waiting_for_gc = rtsFalse;
1453 #endif
1454
1455     // so this happens periodically:
1456     if (cap) scheduleCheckBlackHoles(cap);
1457     
1458     IF_DEBUG(scheduler, printAllThreads());
1459
1460     /*
1461      * We now have all the capabilities; if we're in an interrupting
1462      * state, then we should take the opportunity to delete all the
1463      * threads in the system.
1464      */
1465     if (sched_state >= SCHED_INTERRUPTING) {
1466         deleteAllThreads(&capabilities[0]);
1467         sched_state = SCHED_SHUTTING_DOWN;
1468     }
1469     
1470     heap_census = scheduleNeedHeapProfile(rtsTrue);
1471
1472     /* everybody back, start the GC.
1473      * Could do it in this thread, or signal a condition var
1474      * to do it in another thread.  Either way, we need to
1475      * broadcast on gc_pending_cond afterward.
1476      */
1477 #if defined(THREADED_RTS)
1478     debugTrace(DEBUG_sched, "doing GC");
1479 #endif
1480     GarbageCollect(force_major || heap_census);
1481     
1482     if (heap_census) {
1483         debugTrace(DEBUG_sched, "performing heap census");
1484         heapCensus();
1485         performHeapProfile = rtsFalse;
1486     }
1487
1488 #if defined(THREADED_RTS)
1489     // release our stash of capabilities.
1490     for (i = 0; i < n_capabilities; i++) {
1491         if (cap != &capabilities[i]) {
1492             task->cap = &capabilities[i];
1493             releaseCapability(&capabilities[i]);
1494         }
1495     }
1496     if (cap) {
1497         task->cap = cap;
1498     } else {
1499         task->cap = NULL;
1500     }
1501 #endif
1502
1503     return cap;
1504 }
1505
1506 /* ---------------------------------------------------------------------------
1507  * Singleton fork(). Do not copy any running threads.
1508  * ------------------------------------------------------------------------- */
1509
1510 pid_t
1511 forkProcess(HsStablePtr *entry
1512 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1513             STG_UNUSED
1514 #endif
1515            )
1516 {
1517 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1518     Task *task;
1519     pid_t pid;
1520     StgTSO* t,*next;
1521     Capability *cap;
1522     nat s;
1523     
1524 #if defined(THREADED_RTS)
1525     if (RtsFlags.ParFlags.nNodes > 1) {
1526         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1527         stg_exit(EXIT_FAILURE);
1528     }
1529 #endif
1530
1531     debugTrace(DEBUG_sched, "forking!");
1532     
1533     // ToDo: for SMP, we should probably acquire *all* the capabilities
1534     cap = rts_lock();
1535     
1536     // no funny business: hold locks while we fork, otherwise if some
1537     // other thread is holding a lock when the fork happens, the data
1538     // structure protected by the lock will forever be in an
1539     // inconsistent state in the child.  See also #1391.
1540     ACQUIRE_LOCK(&sched_mutex);
1541     ACQUIRE_LOCK(&cap->lock);
1542     ACQUIRE_LOCK(&cap->running_task->lock);
1543
1544     pid = fork();
1545     
1546     if (pid) { // parent
1547         
1548         RELEASE_LOCK(&sched_mutex);
1549         RELEASE_LOCK(&cap->lock);
1550         RELEASE_LOCK(&cap->running_task->lock);
1551
1552         // just return the pid
1553         rts_unlock(cap);
1554         return pid;
1555         
1556     } else { // child
1557         
1558 #if defined(THREADED_RTS)
1559         initMutex(&sched_mutex);
1560         initMutex(&cap->lock);
1561         initMutex(&cap->running_task->lock);
1562 #endif
1563
1564         // Now, all OS threads except the thread that forked are
1565         // stopped.  We need to stop all Haskell threads, including
1566         // those involved in foreign calls.  Also we need to delete
1567         // all Tasks, because they correspond to OS threads that are
1568         // now gone.
1569
1570         for (s = 0; s < total_steps; s++) {
1571           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1572             if (t->what_next == ThreadRelocated) {
1573                 next = t->_link;
1574             } else {
1575                 next = t->global_link;
1576                 // don't allow threads to catch the ThreadKilled
1577                 // exception, but we do want to raiseAsync() because these
1578                 // threads may be evaluating thunks that we need later.
1579                 deleteThread_(cap,t);
1580             }
1581           }
1582         }
1583         
1584         // Empty the run queue.  It seems tempting to let all the
1585         // killed threads stay on the run queue as zombies to be
1586         // cleaned up later, but some of them correspond to bound
1587         // threads for which the corresponding Task does not exist.
1588         cap->run_queue_hd = END_TSO_QUEUE;
1589         cap->run_queue_tl = END_TSO_QUEUE;
1590
1591         // Any suspended C-calling Tasks are no more, their OS threads
1592         // don't exist now:
1593         cap->suspended_ccalling_tasks = NULL;
1594
1595         // Empty the threads lists.  Otherwise, the garbage
1596         // collector may attempt to resurrect some of these threads.
1597         for (s = 0; s < total_steps; s++) {
1598             all_steps[s].threads = END_TSO_QUEUE;
1599         }
1600
1601         // Wipe the task list, except the current Task.
1602         ACQUIRE_LOCK(&sched_mutex);
1603         for (task = all_tasks; task != NULL; task=task->all_link) {
1604             if (task != cap->running_task) {
1605 #if defined(THREADED_RTS)
1606                 initMutex(&task->lock); // see #1391
1607 #endif
1608                 discardTask(task);
1609             }
1610         }
1611         RELEASE_LOCK(&sched_mutex);
1612
1613 #if defined(THREADED_RTS)
1614         // Wipe our spare workers list, they no longer exist.  New
1615         // workers will be created if necessary.
1616         cap->spare_workers = NULL;
1617         cap->returning_tasks_hd = NULL;
1618         cap->returning_tasks_tl = NULL;
1619 #endif
1620
1621         // On Unix, all timers are reset in the child, so we need to start
1622         // the timer again.
1623         initTimer();
1624         startTimer();
1625
1626         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1627         rts_checkSchedStatus("forkProcess",cap);
1628         
1629         rts_unlock(cap);
1630         hs_exit();                      // clean up and exit
1631         stg_exit(EXIT_SUCCESS);
1632     }
1633 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1634     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1635     return -1;
1636 #endif
1637 }
1638
1639 /* ---------------------------------------------------------------------------
1640  * Delete all the threads in the system
1641  * ------------------------------------------------------------------------- */
1642    
1643 static void
1644 deleteAllThreads ( Capability *cap )
1645 {
1646     // NOTE: only safe to call if we own all capabilities.
1647
1648     StgTSO* t, *next;
1649     nat s;
1650
1651     debugTrace(DEBUG_sched,"deleting all threads");
1652     for (s = 0; s < total_steps; s++) {
1653       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1654         if (t->what_next == ThreadRelocated) {
1655             next = t->_link;
1656         } else {
1657             next = t->global_link;
1658             deleteThread(cap,t);
1659         }
1660       }
1661     }      
1662
1663     // The run queue now contains a bunch of ThreadKilled threads.  We
1664     // must not throw these away: the main thread(s) will be in there
1665     // somewhere, and the main scheduler loop has to deal with it.
1666     // Also, the run queue is the only thing keeping these threads from
1667     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1668
1669 #if !defined(THREADED_RTS)
1670     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1671     ASSERT(sleeping_queue == END_TSO_QUEUE);
1672 #endif
1673 }
1674
1675 /* -----------------------------------------------------------------------------
1676    Managing the suspended_ccalling_tasks list.
1677    Locks required: sched_mutex
1678    -------------------------------------------------------------------------- */
1679
1680 STATIC_INLINE void
1681 suspendTask (Capability *cap, Task *task)
1682 {
1683     ASSERT(task->next == NULL && task->prev == NULL);
1684     task->next = cap->suspended_ccalling_tasks;
1685     task->prev = NULL;
1686     if (cap->suspended_ccalling_tasks) {
1687         cap->suspended_ccalling_tasks->prev = task;
1688     }
1689     cap->suspended_ccalling_tasks = task;
1690 }
1691
1692 STATIC_INLINE void
1693 recoverSuspendedTask (Capability *cap, Task *task)
1694 {
1695     if (task->prev) {
1696         task->prev->next = task->next;
1697     } else {
1698         ASSERT(cap->suspended_ccalling_tasks == task);
1699         cap->suspended_ccalling_tasks = task->next;
1700     }
1701     if (task->next) {
1702         task->next->prev = task->prev;
1703     }
1704     task->next = task->prev = NULL;
1705 }
1706
1707 /* ---------------------------------------------------------------------------
1708  * Suspending & resuming Haskell threads.
1709  * 
1710  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1711  * its capability before calling the C function.  This allows another
1712  * task to pick up the capability and carry on running Haskell
1713  * threads.  It also means that if the C call blocks, it won't lock
1714  * the whole system.
1715  *
1716  * The Haskell thread making the C call is put to sleep for the
1717  * duration of the call, on the susepended_ccalling_threads queue.  We
1718  * give out a token to the task, which it can use to resume the thread
1719  * on return from the C function.
1720  * ------------------------------------------------------------------------- */
1721    
1722 void *
1723 suspendThread (StgRegTable *reg)
1724 {
1725   Capability *cap;
1726   int saved_errno;
1727   StgTSO *tso;
1728   Task *task;
1729 #if mingw32_HOST_OS
1730   StgWord32 saved_winerror;
1731 #endif
1732
1733   saved_errno = errno;
1734 #if mingw32_HOST_OS
1735   saved_winerror = GetLastError();
1736 #endif
1737
1738   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1739    */
1740   cap = regTableToCapability(reg);
1741
1742   task = cap->running_task;
1743   tso = cap->r.rCurrentTSO;
1744
1745   debugTrace(DEBUG_sched, 
1746              "thread %lu did a safe foreign call", 
1747              (unsigned long)cap->r.rCurrentTSO->id);
1748
1749   // XXX this might not be necessary --SDM
1750   tso->what_next = ThreadRunGHC;
1751
1752   threadPaused(cap,tso);
1753
1754   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1755       tso->why_blocked = BlockedOnCCall;
1756       tso->flags |= TSO_BLOCKEX;
1757       tso->flags &= ~TSO_INTERRUPTIBLE;
1758   } else {
1759       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1760   }
1761
1762   // Hand back capability
1763   task->suspended_tso = tso;
1764
1765   ACQUIRE_LOCK(&cap->lock);
1766
1767   suspendTask(cap,task);
1768   cap->in_haskell = rtsFalse;
1769   releaseCapability_(cap);
1770   
1771   RELEASE_LOCK(&cap->lock);
1772
1773 #if defined(THREADED_RTS)
1774   /* Preparing to leave the RTS, so ensure there's a native thread/task
1775      waiting to take over.
1776   */
1777   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1778 #endif
1779
1780   errno = saved_errno;
1781 #if mingw32_HOST_OS
1782   SetLastError(saved_winerror);
1783 #endif
1784   return task;
1785 }
1786
1787 StgRegTable *
1788 resumeThread (void *task_)
1789 {
1790     StgTSO *tso;
1791     Capability *cap;
1792     Task *task = task_;
1793     int saved_errno;
1794 #if mingw32_HOST_OS
1795     StgWord32 saved_winerror;
1796 #endif
1797
1798     saved_errno = errno;
1799 #if mingw32_HOST_OS
1800     saved_winerror = GetLastError();
1801 #endif
1802
1803     cap = task->cap;
1804     // Wait for permission to re-enter the RTS with the result.
1805     waitForReturnCapability(&cap,task);
1806     // we might be on a different capability now... but if so, our
1807     // entry on the suspended_ccalling_tasks list will also have been
1808     // migrated.
1809
1810     // Remove the thread from the suspended list
1811     recoverSuspendedTask(cap,task);
1812
1813     tso = task->suspended_tso;
1814     task->suspended_tso = NULL;
1815     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1816     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1817     
1818     if (tso->why_blocked == BlockedOnCCall) {
1819         awakenBlockedExceptionQueue(cap,tso);
1820         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1821     }
1822     
1823     /* Reset blocking status */
1824     tso->why_blocked  = NotBlocked;
1825     
1826     cap->r.rCurrentTSO = tso;
1827     cap->in_haskell = rtsTrue;
1828     errno = saved_errno;
1829 #if mingw32_HOST_OS
1830     SetLastError(saved_winerror);
1831 #endif
1832
1833     /* We might have GC'd, mark the TSO dirty again */
1834     dirty_TSO(cap,tso);
1835
1836     IF_DEBUG(sanity, checkTSO(tso));
1837
1838     return &cap->r;
1839 }
1840
1841 /* ---------------------------------------------------------------------------
1842  * scheduleThread()
1843  *
1844  * scheduleThread puts a thread on the end  of the runnable queue.
1845  * This will usually be done immediately after a thread is created.
1846  * The caller of scheduleThread must create the thread using e.g.
1847  * createThread and push an appropriate closure
1848  * on this thread's stack before the scheduler is invoked.
1849  * ------------------------------------------------------------------------ */
1850
1851 void
1852 scheduleThread(Capability *cap, StgTSO *tso)
1853 {
1854     // The thread goes at the *end* of the run-queue, to avoid possible
1855     // starvation of any threads already on the queue.
1856     appendToRunQueue(cap,tso);
1857 }
1858
1859 void
1860 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1861 {
1862 #if defined(THREADED_RTS)
1863     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1864                               // move this thread from now on.
1865     cpu %= RtsFlags.ParFlags.nNodes;
1866     if (cpu == cap->no) {
1867         appendToRunQueue(cap,tso);
1868     } else {
1869         migrateThreadToCapability_lock(&capabilities[cpu],tso);
1870     }
1871 #else
1872     appendToRunQueue(cap,tso);
1873 #endif
1874 }
1875
1876 Capability *
1877 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1878 {
1879     Task *task;
1880
1881     // We already created/initialised the Task
1882     task = cap->running_task;
1883
1884     // This TSO is now a bound thread; make the Task and TSO
1885     // point to each other.
1886     tso->bound = task;
1887     tso->cap = cap;
1888
1889     task->tso = tso;
1890     task->ret = ret;
1891     task->stat = NoStatus;
1892
1893     appendToRunQueue(cap,tso);
1894
1895     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1896
1897     cap = schedule(cap,task);
1898
1899     ASSERT(task->stat != NoStatus);
1900     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1901
1902     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1903     return cap;
1904 }
1905
1906 /* ----------------------------------------------------------------------------
1907  * Starting Tasks
1908  * ------------------------------------------------------------------------- */
1909
1910 #if defined(THREADED_RTS)
1911 void
1912 workerStart(Task *task)
1913 {
1914     Capability *cap;
1915
1916     // See startWorkerTask().
1917     ACQUIRE_LOCK(&task->lock);
1918     cap = task->cap;
1919     RELEASE_LOCK(&task->lock);
1920
1921     // set the thread-local pointer to the Task:
1922     taskEnter(task);
1923
1924     // schedule() runs without a lock.
1925     cap = schedule(cap,task);
1926
1927     // On exit from schedule(), we have a Capability.
1928     releaseCapability(cap);
1929     workerTaskStop(task);
1930 }
1931 #endif
1932
1933 /* ---------------------------------------------------------------------------
1934  * initScheduler()
1935  *
1936  * Initialise the scheduler.  This resets all the queues - if the
1937  * queues contained any threads, they'll be garbage collected at the
1938  * next pass.
1939  *
1940  * ------------------------------------------------------------------------ */
1941
1942 void 
1943 initScheduler(void)
1944 {
1945 #if !defined(THREADED_RTS)
1946   blocked_queue_hd  = END_TSO_QUEUE;
1947   blocked_queue_tl  = END_TSO_QUEUE;
1948   sleeping_queue    = END_TSO_QUEUE;
1949 #endif
1950
1951   blackhole_queue   = END_TSO_QUEUE;
1952
1953   context_switch = 0;
1954   sched_state    = SCHED_RUNNING;
1955   recent_activity = ACTIVITY_YES;
1956
1957 #if defined(THREADED_RTS)
1958   /* Initialise the mutex and condition variables used by
1959    * the scheduler. */
1960   initMutex(&sched_mutex);
1961 #endif
1962   
1963   ACQUIRE_LOCK(&sched_mutex);
1964
1965   /* A capability holds the state a native thread needs in
1966    * order to execute STG code. At least one capability is
1967    * floating around (only THREADED_RTS builds have more than one).
1968    */
1969   initCapabilities();
1970
1971   initTaskManager();
1972
1973 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
1974   initSparkPools();
1975 #endif
1976
1977 #if defined(THREADED_RTS)
1978   /*
1979    * Eagerly start one worker to run each Capability, except for
1980    * Capability 0.  The idea is that we're probably going to start a
1981    * bound thread on Capability 0 pretty soon, so we don't want a
1982    * worker task hogging it.
1983    */
1984   { 
1985       nat i;
1986       Capability *cap;
1987       for (i = 1; i < n_capabilities; i++) {
1988           cap = &capabilities[i];
1989           ACQUIRE_LOCK(&cap->lock);
1990           startWorkerTask(cap, workerStart);
1991           RELEASE_LOCK(&cap->lock);
1992       }
1993   }
1994 #endif
1995
1996   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
1997
1998   RELEASE_LOCK(&sched_mutex);
1999 }
2000
2001 void
2002 exitScheduler(
2003     rtsBool wait_foreign
2004 #if !defined(THREADED_RTS)
2005                          __attribute__((unused))
2006 #endif
2007 )
2008                /* see Capability.c, shutdownCapability() */
2009 {
2010     Task *task = NULL;
2011
2012 #if defined(THREADED_RTS)
2013     ACQUIRE_LOCK(&sched_mutex);
2014     task = newBoundTask();
2015     RELEASE_LOCK(&sched_mutex);
2016 #endif
2017
2018     // If we haven't killed all the threads yet, do it now.
2019     if (sched_state < SCHED_SHUTTING_DOWN) {
2020         sched_state = SCHED_INTERRUPTING;
2021         scheduleDoGC(NULL,task,rtsFalse);    
2022     }
2023     sched_state = SCHED_SHUTTING_DOWN;
2024
2025 #if defined(THREADED_RTS)
2026     { 
2027         nat i;
2028         
2029         for (i = 0; i < n_capabilities; i++) {
2030             shutdownCapability(&capabilities[i], task, wait_foreign);
2031         }
2032         boundTaskExiting(task);
2033         stopTaskManager();
2034     }
2035 #else
2036     freeCapability(&MainCapability);
2037 #endif
2038 }
2039
2040 void
2041 freeScheduler( void )
2042 {
2043     freeTaskManager();
2044     if (n_capabilities != 1) {
2045         stgFree(capabilities);
2046     }
2047 #if defined(THREADED_RTS)
2048     closeMutex(&sched_mutex);
2049 #endif
2050 }
2051
2052 /* -----------------------------------------------------------------------------
2053    performGC
2054
2055    This is the interface to the garbage collector from Haskell land.
2056    We provide this so that external C code can allocate and garbage
2057    collect when called from Haskell via _ccall_GC.
2058    -------------------------------------------------------------------------- */
2059
2060 static void
2061 performGC_(rtsBool force_major)
2062 {
2063     Task *task;
2064     // We must grab a new Task here, because the existing Task may be
2065     // associated with a particular Capability, and chained onto the 
2066     // suspended_ccalling_tasks queue.
2067     ACQUIRE_LOCK(&sched_mutex);
2068     task = newBoundTask();
2069     RELEASE_LOCK(&sched_mutex);
2070     scheduleDoGC(NULL,task,force_major);
2071     boundTaskExiting(task);
2072 }
2073
2074 void
2075 performGC(void)
2076 {
2077     performGC_(rtsFalse);
2078 }
2079
2080 void
2081 performMajorGC(void)
2082 {
2083     performGC_(rtsTrue);
2084 }
2085
2086 /* -----------------------------------------------------------------------------
2087    Stack overflow
2088
2089    If the thread has reached its maximum stack size, then raise the
2090    StackOverflow exception in the offending thread.  Otherwise
2091    relocate the TSO into a larger chunk of memory and adjust its stack
2092    size appropriately.
2093    -------------------------------------------------------------------------- */
2094
2095 static StgTSO *
2096 threadStackOverflow(Capability *cap, StgTSO *tso)
2097 {
2098   nat new_stack_size, stack_words;
2099   lnat new_tso_size;
2100   StgPtr new_sp;
2101   StgTSO *dest;
2102
2103   IF_DEBUG(sanity,checkTSO(tso));
2104
2105   // don't allow throwTo() to modify the blocked_exceptions queue
2106   // while we are moving the TSO:
2107   lockClosure((StgClosure *)tso);
2108
2109   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2110       // NB. never raise a StackOverflow exception if the thread is
2111       // inside Control.Exceptino.block.  It is impractical to protect
2112       // against stack overflow exceptions, since virtually anything
2113       // can raise one (even 'catch'), so this is the only sensible
2114       // thing to do here.  See bug #767.
2115
2116       debugTrace(DEBUG_gc,
2117                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2118                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2119       IF_DEBUG(gc,
2120                /* If we're debugging, just print out the top of the stack */
2121                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2122                                                 tso->sp+64)));
2123
2124       // Send this thread the StackOverflow exception
2125       unlockTSO(tso);
2126       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2127       return tso;
2128   }
2129
2130   /* Try to double the current stack size.  If that takes us over the
2131    * maximum stack size for this thread, then use the maximum instead.
2132    * Finally round up so the TSO ends up as a whole number of blocks.
2133    */
2134   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2135   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2136                                        TSO_STRUCT_SIZE)/sizeof(W_);
2137   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2138   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2139
2140   debugTrace(DEBUG_sched, 
2141              "increasing stack size from %ld words to %d.",
2142              (long)tso->stack_size, new_stack_size);
2143
2144   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2145   TICK_ALLOC_TSO(new_stack_size,0);
2146
2147   /* copy the TSO block and the old stack into the new area */
2148   memcpy(dest,tso,TSO_STRUCT_SIZE);
2149   stack_words = tso->stack + tso->stack_size - tso->sp;
2150   new_sp = (P_)dest + new_tso_size - stack_words;
2151   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2152
2153   /* relocate the stack pointers... */
2154   dest->sp         = new_sp;
2155   dest->stack_size = new_stack_size;
2156         
2157   /* Mark the old TSO as relocated.  We have to check for relocated
2158    * TSOs in the garbage collector and any primops that deal with TSOs.
2159    *
2160    * It's important to set the sp value to just beyond the end
2161    * of the stack, so we don't attempt to scavenge any part of the
2162    * dead TSO's stack.
2163    */
2164   tso->what_next = ThreadRelocated;
2165   setTSOLink(cap,tso,dest);
2166   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2167   tso->why_blocked = NotBlocked;
2168
2169   IF_PAR_DEBUG(verbose,
2170                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2171                      tso->id, tso, tso->stack_size);
2172                /* If we're debugging, just print out the top of the stack */
2173                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2174                                                 tso->sp+64)));
2175   
2176   unlockTSO(dest);
2177   unlockTSO(tso);
2178
2179   IF_DEBUG(sanity,checkTSO(dest));
2180 #if 0
2181   IF_DEBUG(scheduler,printTSO(dest));
2182 #endif
2183
2184   return dest;
2185 }
2186
2187 static StgTSO *
2188 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2189 {
2190     bdescr *bd, *new_bd;
2191     lnat new_tso_size_w, tso_size_w;
2192     StgTSO *new_tso;
2193
2194     tso_size_w = tso_sizeW(tso);
2195
2196     if (tso_size_w < MBLOCK_SIZE_W || 
2197         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2198     {
2199         return tso;
2200     }
2201
2202     // don't allow throwTo() to modify the blocked_exceptions queue
2203     // while we are moving the TSO:
2204     lockClosure((StgClosure *)tso);
2205
2206     new_tso_size_w = round_to_mblocks(tso_size_w/2);
2207
2208     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2209                tso->id, tso_size_w, new_tso_size_w);
2210
2211     bd = Bdescr((StgPtr)tso);
2212     new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
2213     new_bd->free = bd->free;
2214     bd->free = bd->start + TSO_STRUCT_SIZEW;
2215
2216     new_tso = (StgTSO *)new_bd->start;
2217     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2218     new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
2219
2220     tso->what_next = ThreadRelocated;
2221     tso->_link = new_tso; // no write barrier reqd: same generation
2222
2223     // The TSO attached to this Task may have moved, so update the
2224     // pointer to it.
2225     if (task->tso == tso) {
2226         task->tso = new_tso;
2227     }
2228
2229     unlockTSO(new_tso);
2230     unlockTSO(tso);
2231
2232     IF_DEBUG(sanity,checkTSO(new_tso));
2233
2234     return new_tso;
2235 }
2236
2237 /* ---------------------------------------------------------------------------
2238    Interrupt execution
2239    - usually called inside a signal handler so it mustn't do anything fancy.   
2240    ------------------------------------------------------------------------ */
2241
2242 void
2243 interruptStgRts(void)
2244 {
2245     sched_state = SCHED_INTERRUPTING;
2246     context_switch = 1;
2247     wakeUpRts();
2248 }
2249
2250 /* -----------------------------------------------------------------------------
2251    Wake up the RTS
2252    
2253    This function causes at least one OS thread to wake up and run the
2254    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2255    an external event has arrived that may need servicing (eg. a
2256    keyboard interrupt).
2257
2258    In the single-threaded RTS we don't do anything here; we only have
2259    one thread anyway, and the event that caused us to want to wake up
2260    will have interrupted any blocking system call in progress anyway.
2261    -------------------------------------------------------------------------- */
2262
2263 void
2264 wakeUpRts(void)
2265 {
2266 #if defined(THREADED_RTS)
2267     // This forces the IO Manager thread to wakeup, which will
2268     // in turn ensure that some OS thread wakes up and runs the
2269     // scheduler loop, which will cause a GC and deadlock check.
2270     ioManagerWakeup();
2271 #endif
2272 }
2273
2274 /* -----------------------------------------------------------------------------
2275  * checkBlackHoles()
2276  *
2277  * Check the blackhole_queue for threads that can be woken up.  We do
2278  * this periodically: before every GC, and whenever the run queue is
2279  * empty.
2280  *
2281  * An elegant solution might be to just wake up all the blocked
2282  * threads with awakenBlockedQueue occasionally: they'll go back to
2283  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2284  * doesn't give us a way to tell whether we've actually managed to
2285  * wake up any threads, so we would be busy-waiting.
2286  *
2287  * -------------------------------------------------------------------------- */
2288
2289 static rtsBool
2290 checkBlackHoles (Capability *cap)
2291 {
2292     StgTSO **prev, *t;
2293     rtsBool any_woke_up = rtsFalse;
2294     StgHalfWord type;
2295
2296     // blackhole_queue is global:
2297     ASSERT_LOCK_HELD(&sched_mutex);
2298
2299     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2300
2301     // ASSUMES: sched_mutex
2302     prev = &blackhole_queue;
2303     t = blackhole_queue;
2304     while (t != END_TSO_QUEUE) {
2305         ASSERT(t->why_blocked == BlockedOnBlackHole);
2306         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2307         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2308             IF_DEBUG(sanity,checkTSO(t));
2309             t = unblockOne(cap, t);
2310             // urk, the threads migrate to the current capability
2311             // here, but we'd like to keep them on the original one.
2312             *prev = t;
2313             any_woke_up = rtsTrue;
2314         } else {
2315             prev = &t->_link;
2316             t = t->_link;
2317         }
2318     }
2319
2320     return any_woke_up;
2321 }
2322
2323 /* -----------------------------------------------------------------------------
2324    Deleting threads
2325
2326    This is used for interruption (^C) and forking, and corresponds to
2327    raising an exception but without letting the thread catch the
2328    exception.
2329    -------------------------------------------------------------------------- */
2330
2331 static void 
2332 deleteThread (Capability *cap, StgTSO *tso)
2333 {
2334     // NOTE: must only be called on a TSO that we have exclusive
2335     // access to, because we will call throwToSingleThreaded() below.
2336     // The TSO must be on the run queue of the Capability we own, or 
2337     // we must own all Capabilities.
2338
2339     if (tso->why_blocked != BlockedOnCCall &&
2340         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2341         throwToSingleThreaded(cap,tso,NULL);
2342     }
2343 }
2344
2345 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2346 static void 
2347 deleteThread_(Capability *cap, StgTSO *tso)
2348 { // for forkProcess only:
2349   // like deleteThread(), but we delete threads in foreign calls, too.
2350
2351     if (tso->why_blocked == BlockedOnCCall ||
2352         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2353         unblockOne(cap,tso);
2354         tso->what_next = ThreadKilled;
2355     } else {
2356         deleteThread(cap,tso);
2357     }
2358 }
2359 #endif
2360
2361 /* -----------------------------------------------------------------------------
2362    raiseExceptionHelper
2363    
2364    This function is called by the raise# primitve, just so that we can
2365    move some of the tricky bits of raising an exception from C-- into
2366    C.  Who knows, it might be a useful re-useable thing here too.
2367    -------------------------------------------------------------------------- */
2368
2369 StgWord
2370 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2371 {
2372     Capability *cap = regTableToCapability(reg);
2373     StgThunk *raise_closure = NULL;
2374     StgPtr p, next;
2375     StgRetInfoTable *info;
2376     //
2377     // This closure represents the expression 'raise# E' where E
2378     // is the exception raise.  It is used to overwrite all the
2379     // thunks which are currently under evaluataion.
2380     //
2381
2382     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2383     // LDV profiling: stg_raise_info has THUNK as its closure
2384     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2385     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2386     // 1 does not cause any problem unless profiling is performed.
2387     // However, when LDV profiling goes on, we need to linearly scan
2388     // small object pool, where raise_closure is stored, so we should
2389     // use MIN_UPD_SIZE.
2390     //
2391     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2392     //                                 sizeofW(StgClosure)+1);
2393     //
2394
2395     //
2396     // Walk up the stack, looking for the catch frame.  On the way,
2397     // we update any closures pointed to from update frames with the
2398     // raise closure that we just built.
2399     //
2400     p = tso->sp;
2401     while(1) {
2402         info = get_ret_itbl((StgClosure *)p);
2403         next = p + stack_frame_sizeW((StgClosure *)p);
2404         switch (info->i.type) {
2405             
2406         case UPDATE_FRAME:
2407             // Only create raise_closure if we need to.
2408             if (raise_closure == NULL) {
2409                 raise_closure = 
2410                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2411                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2412                 raise_closure->payload[0] = exception;
2413             }
2414             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2415             p = next;
2416             continue;
2417
2418         case ATOMICALLY_FRAME:
2419             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2420             tso->sp = p;
2421             return ATOMICALLY_FRAME;
2422             
2423         case CATCH_FRAME:
2424             tso->sp = p;
2425             return CATCH_FRAME;
2426
2427         case CATCH_STM_FRAME:
2428             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2429             tso->sp = p;
2430             return CATCH_STM_FRAME;
2431             
2432         case STOP_FRAME:
2433             tso->sp = p;
2434             return STOP_FRAME;
2435
2436         case CATCH_RETRY_FRAME:
2437         default:
2438             p = next; 
2439             continue;
2440         }
2441     }
2442 }
2443
2444
2445 /* -----------------------------------------------------------------------------
2446    findRetryFrameHelper
2447
2448    This function is called by the retry# primitive.  It traverses the stack
2449    leaving tso->sp referring to the frame which should handle the retry.  
2450
2451    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2452    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2453
2454    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2455    create) because retries are not considered to be exceptions, despite the
2456    similar implementation.
2457
2458    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2459    not be created within memory transactions.
2460    -------------------------------------------------------------------------- */
2461
2462 StgWord
2463 findRetryFrameHelper (StgTSO *tso)
2464 {
2465   StgPtr           p, next;
2466   StgRetInfoTable *info;
2467
2468   p = tso -> sp;
2469   while (1) {
2470     info = get_ret_itbl((StgClosure *)p);
2471     next = p + stack_frame_sizeW((StgClosure *)p);
2472     switch (info->i.type) {
2473       
2474     case ATOMICALLY_FRAME:
2475         debugTrace(DEBUG_stm,
2476                    "found ATOMICALLY_FRAME at %p during retry", p);
2477         tso->sp = p;
2478         return ATOMICALLY_FRAME;
2479       
2480     case CATCH_RETRY_FRAME:
2481         debugTrace(DEBUG_stm,
2482                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2483         tso->sp = p;
2484         return CATCH_RETRY_FRAME;
2485       
2486     case CATCH_STM_FRAME: {
2487         StgTRecHeader *trec = tso -> trec;
2488         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2489         debugTrace(DEBUG_stm,
2490                    "found CATCH_STM_FRAME at %p during retry", p);
2491         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2492         stmAbortTransaction(tso -> cap, trec);
2493         stmFreeAbortedTRec(tso -> cap, trec);
2494         tso -> trec = outer;
2495         p = next; 
2496         continue;
2497     }
2498       
2499
2500     default:
2501       ASSERT(info->i.type != CATCH_FRAME);
2502       ASSERT(info->i.type != STOP_FRAME);
2503       p = next; 
2504       continue;
2505     }
2506   }
2507 }
2508
2509 /* -----------------------------------------------------------------------------
2510    resurrectThreads is called after garbage collection on the list of
2511    threads found to be garbage.  Each of these threads will be woken
2512    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2513    on an MVar, or NonTermination if the thread was blocked on a Black
2514    Hole.
2515
2516    Locks: assumes we hold *all* the capabilities.
2517    -------------------------------------------------------------------------- */
2518
2519 void
2520 resurrectThreads (StgTSO *threads)
2521 {
2522     StgTSO *tso, *next;
2523     Capability *cap;
2524     step *step;
2525
2526     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2527         next = tso->global_link;
2528
2529         step = Bdescr((P_)tso)->step;
2530         tso->global_link = step->threads;
2531         step->threads = tso;
2532
2533         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2534         
2535         // Wake up the thread on the Capability it was last on
2536         cap = tso->cap;
2537         
2538         switch (tso->why_blocked) {
2539         case BlockedOnMVar:
2540         case BlockedOnException:
2541             /* Called by GC - sched_mutex lock is currently held. */
2542             throwToSingleThreaded(cap, tso,
2543                                   (StgClosure *)blockedOnDeadMVar_closure);
2544             break;
2545         case BlockedOnBlackHole:
2546             throwToSingleThreaded(cap, tso,
2547                                   (StgClosure *)nonTermination_closure);
2548             break;
2549         case BlockedOnSTM:
2550             throwToSingleThreaded(cap, tso,
2551                                   (StgClosure *)blockedIndefinitely_closure);
2552             break;
2553         case NotBlocked:
2554             /* This might happen if the thread was blocked on a black hole
2555              * belonging to a thread that we've just woken up (raiseAsync
2556              * can wake up threads, remember...).
2557              */
2558             continue;
2559         default:
2560             barf("resurrectThreads: thread blocked in a strange way");
2561         }
2562     }
2563 }
2564
2565 /* -----------------------------------------------------------------------------
2566    performPendingThrowTos is called after garbage collection, and
2567    passed a list of threads that were found to have pending throwTos
2568    (tso->blocked_exceptions was not empty), and were blocked.
2569    Normally this doesn't happen, because we would deliver the
2570    exception directly if the target thread is blocked, but there are
2571    small windows where it might occur on a multiprocessor (see
2572    throwTo()).
2573
2574    NB. we must be holding all the capabilities at this point, just
2575    like resurrectThreads().
2576    -------------------------------------------------------------------------- */
2577
2578 void
2579 performPendingThrowTos (StgTSO *threads)
2580 {
2581     StgTSO *tso, *next;
2582     Capability *cap;
2583     step *step;
2584
2585     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2586         next = tso->global_link;
2587
2588         step = Bdescr((P_)tso)->step;
2589         tso->global_link = step->threads;
2590         step->threads = tso;
2591
2592         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2593         
2594         cap = tso->cap;
2595         maybePerformBlockedException(cap, tso);
2596     }
2597 }