Interruptible FFI calls with pthread_kill and CancelSynchronousIO. v4
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12
13 #include "sm/Storage.h"
14 #include "RtsUtils.h"
15 #include "StgRun.h"
16 #include "Schedule.h"
17 #include "Interpreter.h"
18 #include "Printer.h"
19 #include "RtsSignals.h"
20 #include "sm/Sanity.h"
21 #include "Stats.h"
22 #include "STM.h"
23 #include "Prelude.h"
24 #include "ThreadLabels.h"
25 #include "Updates.h"
26 #include "Proftimer.h"
27 #include "ProfHeap.h"
28 #include "Weak.h"
29 #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N
30 #include "Sparks.h"
31 #include "Capability.h"
32 #include "Task.h"
33 #include "AwaitEvent.h"
34 #if defined(mingw32_HOST_OS)
35 #include "win32/IOManager.h"
36 #endif
37 #include "Trace.h"
38 #include "RaiseAsync.h"
39 #include "Threads.h"
40 #include "Timer.h"
41 #include "ThreadPaused.h"
42 #include "Messages.h"
43
44 #ifdef HAVE_SYS_TYPES_H
45 #include <sys/types.h>
46 #endif
47 #ifdef HAVE_UNISTD_H
48 #include <unistd.h>
49 #endif
50
51 #include <string.h>
52 #include <stdlib.h>
53 #include <stdarg.h>
54
55 #ifdef HAVE_ERRNO_H
56 #include <errno.h>
57 #endif
58
59 /* -----------------------------------------------------------------------------
60  * Global variables
61  * -------------------------------------------------------------------------- */
62
63 #if !defined(THREADED_RTS)
64 // Blocked/sleeping thrads
65 StgTSO *blocked_queue_hd = NULL;
66 StgTSO *blocked_queue_tl = NULL;
67 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
68 #endif
69
70 /* Set to true when the latest garbage collection failed to reclaim
71  * enough space, and the runtime should proceed to shut itself down in
72  * an orderly fashion (emitting profiling info etc.)
73  */
74 rtsBool heap_overflow = rtsFalse;
75
76 /* flag that tracks whether we have done any execution in this time slice.
77  * LOCK: currently none, perhaps we should lock (but needs to be
78  * updated in the fast path of the scheduler).
79  *
80  * NB. must be StgWord, we do xchg() on it.
81  */
82 volatile StgWord recent_activity = ACTIVITY_YES;
83
84 /* if this flag is set as well, give up execution
85  * LOCK: none (changes monotonically)
86  */
87 volatile StgWord sched_state = SCHED_RUNNING;
88
89 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
90  *  exists - earlier gccs apparently didn't.
91  *  -= chak
92  */
93 StgTSO dummy_tso;
94
95 /*
96  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
97  * in an MT setting, needed to signal that a worker thread shouldn't hang around
98  * in the scheduler when it is out of work.
99  */
100 rtsBool shutting_down_scheduler = rtsFalse;
101
102 /*
103  * This mutex protects most of the global scheduler data in
104  * the THREADED_RTS runtime.
105  */
106 #if defined(THREADED_RTS)
107 Mutex sched_mutex;
108 #endif
109
110 #if !defined(mingw32_HOST_OS)
111 #define FORKPROCESS_PRIMOP_SUPPORTED
112 #endif
113
114 /* -----------------------------------------------------------------------------
115  * static function prototypes
116  * -------------------------------------------------------------------------- */
117
118 static Capability *schedule (Capability *initialCapability, Task *task);
119
120 //
121 // These function all encapsulate parts of the scheduler loop, and are
122 // abstracted only to make the structure and control flow of the
123 // scheduler clearer.
124 //
125 static void schedulePreLoop (void);
126 static void scheduleFindWork (Capability *cap);
127 #if defined(THREADED_RTS)
128 static void scheduleYield (Capability **pcap, Task *task);
129 #endif
130 static void scheduleStartSignalHandlers (Capability *cap);
131 static void scheduleCheckBlockedThreads (Capability *cap);
132 static void scheduleProcessInbox(Capability *cap);
133 static void scheduleDetectDeadlock (Capability *cap, Task *task);
134 static void schedulePushWork(Capability *cap, Task *task);
135 #if defined(THREADED_RTS)
136 static void scheduleActivateSpark(Capability *cap);
137 #endif
138 static void schedulePostRunThread(Capability *cap, StgTSO *t);
139 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
140 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
141                                          StgTSO *t);
142 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
143                                     nat prev_what_next );
144 static void scheduleHandleThreadBlocked( StgTSO *t );
145 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
146                                              StgTSO *t );
147 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
148 static Capability *scheduleDoGC(Capability *cap, Task *task,
149                                 rtsBool force_major);
150
151 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
152 static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso);
153
154 static void deleteThread (Capability *cap, StgTSO *tso);
155 static void deleteAllThreads (Capability *cap);
156
157 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
158 static void deleteThread_(Capability *cap, StgTSO *tso);
159 #endif
160
161 /* ---------------------------------------------------------------------------
162    Main scheduling loop.
163
164    We use round-robin scheduling, each thread returning to the
165    scheduler loop when one of these conditions is detected:
166
167       * out of heap space
168       * timer expires (thread yields)
169       * thread blocks
170       * thread ends
171       * stack overflow
172
173    GRAN version:
174      In a GranSim setup this loop iterates over the global event queue.
175      This revolves around the global event queue, which determines what 
176      to do next. Therefore, it's more complicated than either the 
177      concurrent or the parallel (GUM) setup.
178   This version has been entirely removed (JB 2008/08).
179
180    GUM version:
181      GUM iterates over incoming messages.
182      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
183      and sends out a fish whenever it has nothing to do; in-between
184      doing the actual reductions (shared code below) it processes the
185      incoming messages and deals with delayed operations 
186      (see PendingFetches).
187      This is not the ugliest code you could imagine, but it's bloody close.
188
189   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
190   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
191   as well as future GUM versions. This file has been refurbished to
192   only contain valid code, which is however incomplete, refers to
193   invalid includes etc.
194
195    ------------------------------------------------------------------------ */
196
197 static Capability *
198 schedule (Capability *initialCapability, Task *task)
199 {
200   StgTSO *t;
201   Capability *cap;
202   StgThreadReturnCode ret;
203   nat prev_what_next;
204   rtsBool ready_to_gc;
205 #if defined(THREADED_RTS)
206   rtsBool first = rtsTrue;
207 #endif
208   
209   cap = initialCapability;
210
211   // Pre-condition: this task owns initialCapability.
212   // The sched_mutex is *NOT* held
213   // NB. on return, we still hold a capability.
214
215   debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
216
217   schedulePreLoop();
218
219   // -----------------------------------------------------------
220   // Scheduler loop starts here:
221
222   while (1) {
223
224     // Check whether we have re-entered the RTS from Haskell without
225     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
226     // call).
227     if (cap->in_haskell) {
228           errorBelch("schedule: re-entered unsafely.\n"
229                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
230           stg_exit(EXIT_FAILURE);
231     }
232
233     // The interruption / shutdown sequence.
234     // 
235     // In order to cleanly shut down the runtime, we want to:
236     //   * make sure that all main threads return to their callers
237     //     with the state 'Interrupted'.
238     //   * clean up all OS threads assocated with the runtime
239     //   * free all memory etc.
240     //
241     // So the sequence for ^C goes like this:
242     //
243     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
244     //     arranges for some Capability to wake up
245     //
246     //   * all threads in the system are halted, and the zombies are
247     //     placed on the run queue for cleaning up.  We acquire all
248     //     the capabilities in order to delete the threads, this is
249     //     done by scheduleDoGC() for convenience (because GC already
250     //     needs to acquire all the capabilities).  We can't kill
251     //     threads involved in foreign calls.
252     // 
253     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
254     //
255     //   * sched_state := SCHED_SHUTTING_DOWN
256     //
257     //   * all workers exit when the run queue on their capability
258     //     drains.  All main threads will also exit when their TSO
259     //     reaches the head of the run queue and they can return.
260     //
261     //   * eventually all Capabilities will shut down, and the RTS can
262     //     exit.
263     //
264     //   * We might be left with threads blocked in foreign calls, 
265     //     we should really attempt to kill these somehow (TODO);
266     
267     switch (sched_state) {
268     case SCHED_RUNNING:
269         break;
270     case SCHED_INTERRUPTING:
271         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
272 #if defined(THREADED_RTS)
273         discardSparksCap(cap);
274 #endif
275         /* scheduleDoGC() deletes all the threads */
276         cap = scheduleDoGC(cap,task,rtsFalse);
277
278         // after scheduleDoGC(), we must be shutting down.  Either some
279         // other Capability did the final GC, or we did it above,
280         // either way we can fall through to the SCHED_SHUTTING_DOWN
281         // case now.
282         ASSERT(sched_state == SCHED_SHUTTING_DOWN);
283         // fall through
284
285     case SCHED_SHUTTING_DOWN:
286         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
287         // If we are a worker, just exit.  If we're a bound thread
288         // then we will exit below when we've removed our TSO from
289         // the run queue.
290         if (!isBoundTask(task) && emptyRunQueue(cap)) {
291             return cap;
292         }
293         break;
294     default:
295         barf("sched_state: %d", sched_state);
296     }
297
298     scheduleFindWork(cap);
299
300     /* work pushing, currently relevant only for THREADED_RTS:
301        (pushes threads, wakes up idle capabilities for stealing) */
302     schedulePushWork(cap,task);
303
304     scheduleDetectDeadlock(cap,task);
305
306 #if defined(THREADED_RTS)
307     cap = task->cap;    // reload cap, it might have changed
308 #endif
309
310     // Normally, the only way we can get here with no threads to
311     // run is if a keyboard interrupt received during 
312     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
313     // Additionally, it is not fatal for the
314     // threaded RTS to reach here with no threads to run.
315     //
316     // win32: might be here due to awaitEvent() being abandoned
317     // as a result of a console event having been delivered.
318     
319 #if defined(THREADED_RTS)
320     if (first) 
321     {
322     // XXX: ToDo
323     //     // don't yield the first time, we want a chance to run this
324     //     // thread for a bit, even if there are others banging at the
325     //     // door.
326     //     first = rtsFalse;
327     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
328     }
329
330     scheduleYield(&cap,task);
331
332     if (emptyRunQueue(cap)) continue; // look for work again
333 #endif
334
335 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
336     if ( emptyRunQueue(cap) ) {
337         ASSERT(sched_state >= SCHED_INTERRUPTING);
338     }
339 #endif
340
341     // 
342     // Get a thread to run
343     //
344     t = popRunQueue(cap);
345
346     // Sanity check the thread we're about to run.  This can be
347     // expensive if there is lots of thread switching going on...
348     IF_DEBUG(sanity,checkTSO(t));
349
350 #if defined(THREADED_RTS)
351     // Check whether we can run this thread in the current task.
352     // If not, we have to pass our capability to the right task.
353     {
354         InCall *bound = t->bound;
355       
356         if (bound) {
357             if (bound->task == task) {
358                 // yes, the Haskell thread is bound to the current native thread
359             } else {
360                 debugTrace(DEBUG_sched,
361                            "thread %lu bound to another OS thread",
362                            (unsigned long)t->id);
363                 // no, bound to a different Haskell thread: pass to that thread
364                 pushOnRunQueue(cap,t);
365                 continue;
366             }
367         } else {
368             // The thread we want to run is unbound.
369             if (task->incall->tso) { 
370                 debugTrace(DEBUG_sched,
371                            "this OS thread cannot run thread %lu",
372                            (unsigned long)t->id);
373                 // no, the current native thread is bound to a different
374                 // Haskell thread, so pass it to any worker thread
375                 pushOnRunQueue(cap,t);
376                 continue; 
377             }
378         }
379     }
380 #endif
381
382     // If we're shutting down, and this thread has not yet been
383     // killed, kill it now.  This sometimes happens when a finalizer
384     // thread is created by the final GC, or a thread previously
385     // in a foreign call returns.
386     if (sched_state >= SCHED_INTERRUPTING &&
387         !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
388         deleteThread(cap,t);
389     }
390
391     /* context switches are initiated by the timer signal, unless
392      * the user specified "context switch as often as possible", with
393      * +RTS -C0
394      */
395     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
396         && !emptyThreadQueues(cap)) {
397         cap->context_switch = 1;
398     }
399          
400 run_thread:
401
402     // CurrentTSO is the thread to run.  t might be different if we
403     // loop back to run_thread, so make sure to set CurrentTSO after
404     // that.
405     cap->r.rCurrentTSO = t;
406
407     startHeapProfTimer();
408
409     // ----------------------------------------------------------------------
410     // Run the current thread 
411
412     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
413     ASSERT(t->cap == cap);
414     ASSERT(t->bound ? t->bound->task->cap == cap : 1);
415
416     prev_what_next = t->what_next;
417
418     errno = t->saved_errno;
419 #if mingw32_HOST_OS
420     SetLastError(t->saved_winerror);
421 #endif
422
423     cap->in_haskell = rtsTrue;
424
425     dirty_TSO(cap,t);
426
427 #if defined(THREADED_RTS)
428     if (recent_activity == ACTIVITY_DONE_GC) {
429         // ACTIVITY_DONE_GC means we turned off the timer signal to
430         // conserve power (see #1623).  Re-enable it here.
431         nat prev;
432         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
433         if (prev == ACTIVITY_DONE_GC) {
434             startTimer();
435         }
436     } else if (recent_activity != ACTIVITY_INACTIVE) {
437         // If we reached ACTIVITY_INACTIVE, then don't reset it until
438         // we've done the GC.  The thread running here might just be
439         // the IO manager thread that handle_tick() woke up via
440         // wakeUpRts().
441         recent_activity = ACTIVITY_YES;
442     }
443 #endif
444
445     traceEventRunThread(cap, t);
446
447     switch (prev_what_next) {
448         
449     case ThreadKilled:
450     case ThreadComplete:
451         /* Thread already finished, return to scheduler. */
452         ret = ThreadFinished;
453         break;
454         
455     case ThreadRunGHC:
456     {
457         StgRegTable *r;
458         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
459         cap = regTableToCapability(r);
460         ret = r->rRet;
461         break;
462     }
463     
464     case ThreadInterpret:
465         cap = interpretBCO(cap);
466         ret = cap->r.rRet;
467         break;
468         
469     default:
470         barf("schedule: invalid what_next field");
471     }
472
473     cap->in_haskell = rtsFalse;
474
475     // The TSO might have moved, eg. if it re-entered the RTS and a GC
476     // happened.  So find the new location:
477     t = cap->r.rCurrentTSO;
478
479     // And save the current errno in this thread.
480     // XXX: possibly bogus for SMP because this thread might already
481     // be running again, see code below.
482     t->saved_errno = errno;
483 #if mingw32_HOST_OS
484     // Similarly for Windows error code
485     t->saved_winerror = GetLastError();
486 #endif
487
488     traceEventStopThread(cap, t, ret);
489
490     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
491     ASSERT(t->cap == cap);
492
493     // ----------------------------------------------------------------------
494     
495     // Costs for the scheduler are assigned to CCS_SYSTEM
496     stopHeapProfTimer();
497 #if defined(PROFILING)
498     CCCS = CCS_SYSTEM;
499 #endif
500     
501     schedulePostRunThread(cap,t);
502
503     if (ret != StackOverflow) {
504         t = threadStackUnderflow(cap,task,t);
505     }
506
507     ready_to_gc = rtsFalse;
508
509     switch (ret) {
510     case HeapOverflow:
511         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
512         break;
513
514     case StackOverflow:
515         scheduleHandleStackOverflow(cap,task,t);
516         break;
517
518     case ThreadYielding:
519         if (scheduleHandleYield(cap, t, prev_what_next)) {
520             // shortcut for switching between compiler/interpreter:
521             goto run_thread; 
522         }
523         break;
524
525     case ThreadBlocked:
526         scheduleHandleThreadBlocked(t);
527         break;
528
529     case ThreadFinished:
530         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
531         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
532         break;
533
534     default:
535       barf("schedule: invalid thread return code %d", (int)ret);
536     }
537
538     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
539       cap = scheduleDoGC(cap,task,rtsFalse);
540     }
541   } /* end of while() */
542 }
543
544 /* -----------------------------------------------------------------------------
545  * Run queue operations
546  * -------------------------------------------------------------------------- */
547
548 void
549 removeFromRunQueue (Capability *cap, StgTSO *tso)
550 {
551     if (tso->block_info.prev == END_TSO_QUEUE) {
552         ASSERT(cap->run_queue_hd == tso);
553         cap->run_queue_hd = tso->_link;
554     } else {
555         setTSOLink(cap, tso->block_info.prev, tso->_link);
556     }
557     if (tso->_link == END_TSO_QUEUE) {
558         ASSERT(cap->run_queue_tl == tso);
559         cap->run_queue_tl = tso->block_info.prev;
560     } else {
561         setTSOPrev(cap, tso->_link, tso->block_info.prev);
562     }
563     tso->_link = tso->block_info.prev = END_TSO_QUEUE;
564
565     IF_DEBUG(sanity, checkRunQueue(cap));
566 }
567
568 /* ----------------------------------------------------------------------------
569  * Setting up the scheduler loop
570  * ------------------------------------------------------------------------- */
571
572 static void
573 schedulePreLoop(void)
574 {
575   // initialisation for scheduler - what cannot go into initScheduler()  
576 }
577
578 /* -----------------------------------------------------------------------------
579  * scheduleFindWork()
580  *
581  * Search for work to do, and handle messages from elsewhere.
582  * -------------------------------------------------------------------------- */
583
584 static void
585 scheduleFindWork (Capability *cap)
586 {
587     scheduleStartSignalHandlers(cap);
588
589     scheduleProcessInbox(cap);
590
591     scheduleCheckBlockedThreads(cap);
592
593 #if defined(THREADED_RTS)
594     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
595 #endif
596 }
597
598 #if defined(THREADED_RTS)
599 STATIC_INLINE rtsBool
600 shouldYieldCapability (Capability *cap, Task *task)
601 {
602     // we need to yield this capability to someone else if..
603     //   - another thread is initiating a GC
604     //   - another Task is returning from a foreign call
605     //   - the thread at the head of the run queue cannot be run
606     //     by this Task (it is bound to another Task, or it is unbound
607     //     and this task it bound).
608     return (waiting_for_gc || 
609             cap->returning_tasks_hd != NULL ||
610             (!emptyRunQueue(cap) && (task->incall->tso == NULL
611                                      ? cap->run_queue_hd->bound != NULL
612                                      : cap->run_queue_hd->bound != task->incall)));
613 }
614
615 // This is the single place where a Task goes to sleep.  There are
616 // two reasons it might need to sleep:
617 //    - there are no threads to run
618 //    - we need to yield this Capability to someone else 
619 //      (see shouldYieldCapability())
620 //
621 // Careful: the scheduler loop is quite delicate.  Make sure you run
622 // the tests in testsuite/concurrent (all ways) after modifying this,
623 // and also check the benchmarks in nofib/parallel for regressions.
624
625 static void
626 scheduleYield (Capability **pcap, Task *task)
627 {
628     Capability *cap = *pcap;
629
630     // if we have work, and we don't need to give up the Capability, continue.
631     //
632     if (!shouldYieldCapability(cap,task) && 
633         (!emptyRunQueue(cap) ||
634          !emptyInbox(cap) ||
635          sched_state >= SCHED_INTERRUPTING))
636         return;
637
638     // otherwise yield (sleep), and keep yielding if necessary.
639     do {
640         yieldCapability(&cap,task);
641     } 
642     while (shouldYieldCapability(cap,task));
643
644     // note there may still be no threads on the run queue at this
645     // point, the caller has to check.
646
647     *pcap = cap;
648     return;
649 }
650 #endif
651     
652 /* -----------------------------------------------------------------------------
653  * schedulePushWork()
654  *
655  * Push work to other Capabilities if we have some.
656  * -------------------------------------------------------------------------- */
657
658 static void
659 schedulePushWork(Capability *cap USED_IF_THREADS, 
660                  Task *task      USED_IF_THREADS)
661 {
662   /* following code not for PARALLEL_HASKELL. I kept the call general,
663      future GUM versions might use pushing in a distributed setup */
664 #if defined(THREADED_RTS)
665
666     Capability *free_caps[n_capabilities], *cap0;
667     nat i, n_free_caps;
668
669     // migration can be turned off with +RTS -qm
670     if (!RtsFlags.ParFlags.migrate) return;
671
672     // Check whether we have more threads on our run queue, or sparks
673     // in our pool, that we could hand to another Capability.
674     if (cap->run_queue_hd == END_TSO_QUEUE) {
675         if (sparkPoolSizeCap(cap) < 2) return;
676     } else {
677         if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
678             sparkPoolSizeCap(cap) < 1) return;
679     }
680
681     // First grab as many free Capabilities as we can.
682     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
683         cap0 = &capabilities[i];
684         if (cap != cap0 && tryGrabCapability(cap0,task)) {
685             if (!emptyRunQueue(cap0)
686                 || cap->returning_tasks_hd != NULL
687                 || cap->inbox != (Message*)END_TSO_QUEUE) {
688                 // it already has some work, we just grabbed it at 
689                 // the wrong moment.  Or maybe it's deadlocked!
690                 releaseCapability(cap0);
691             } else {
692                 free_caps[n_free_caps++] = cap0;
693             }
694         }
695     }
696
697     // we now have n_free_caps free capabilities stashed in
698     // free_caps[].  Share our run queue equally with them.  This is
699     // probably the simplest thing we could do; improvements we might
700     // want to do include:
701     //
702     //   - giving high priority to moving relatively new threads, on 
703     //     the gournds that they haven't had time to build up a
704     //     working set in the cache on this CPU/Capability.
705     //
706     //   - giving low priority to moving long-lived threads
707
708     if (n_free_caps > 0) {
709         StgTSO *prev, *t, *next;
710         rtsBool pushed_to_all;
711
712         debugTrace(DEBUG_sched, 
713                    "cap %d: %s and %d free capabilities, sharing...", 
714                    cap->no, 
715                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
716                    "excess threads on run queue":"sparks to share (>=2)",
717                    n_free_caps);
718
719         i = 0;
720         pushed_to_all = rtsFalse;
721
722         if (cap->run_queue_hd != END_TSO_QUEUE) {
723             prev = cap->run_queue_hd;
724             t = prev->_link;
725             prev->_link = END_TSO_QUEUE;
726             for (; t != END_TSO_QUEUE; t = next) {
727                 next = t->_link;
728                 t->_link = END_TSO_QUEUE;
729                 if (t->what_next == ThreadRelocated
730                     || t->bound == task->incall // don't move my bound thread
731                     || tsoLocked(t)) {  // don't move a locked thread
732                     setTSOLink(cap, prev, t);
733                     setTSOPrev(cap, t, prev);
734                     prev = t;
735                 } else if (i == n_free_caps) {
736                     pushed_to_all = rtsTrue;
737                     i = 0;
738                     // keep one for us
739                     setTSOLink(cap, prev, t);
740                     setTSOPrev(cap, t, prev);
741                     prev = t;
742                 } else {
743                     appendToRunQueue(free_caps[i],t);
744
745                     traceEventMigrateThread (cap, t, free_caps[i]->no);
746
747                     if (t->bound) { t->bound->task->cap = free_caps[i]; }
748                     t->cap = free_caps[i];
749                     i++;
750                 }
751             }
752             cap->run_queue_tl = prev;
753
754             IF_DEBUG(sanity, checkRunQueue(cap));
755         }
756
757 #ifdef SPARK_PUSHING
758         /* JB I left this code in place, it would work but is not necessary */
759
760         // If there are some free capabilities that we didn't push any
761         // threads to, then try to push a spark to each one.
762         if (!pushed_to_all) {
763             StgClosure *spark;
764             // i is the next free capability to push to
765             for (; i < n_free_caps; i++) {
766                 if (emptySparkPoolCap(free_caps[i])) {
767                     spark = tryStealSpark(cap->sparks);
768                     if (spark != NULL) {
769                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
770
771             traceEventStealSpark(free_caps[i], t, cap->no);
772
773                         newSpark(&(free_caps[i]->r), spark);
774                     }
775                 }
776             }
777         }
778 #endif /* SPARK_PUSHING */
779
780         // release the capabilities
781         for (i = 0; i < n_free_caps; i++) {
782             task->cap = free_caps[i];
783             releaseAndWakeupCapability(free_caps[i]);
784         }
785     }
786     task->cap = cap; // reset to point to our Capability.
787
788 #endif /* THREADED_RTS */
789
790 }
791
792 /* ----------------------------------------------------------------------------
793  * Start any pending signal handlers
794  * ------------------------------------------------------------------------- */
795
796 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
797 static void
798 scheduleStartSignalHandlers(Capability *cap)
799 {
800     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
801         // safe outside the lock
802         startSignalHandlers(cap);
803     }
804 }
805 #else
806 static void
807 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
808 {
809 }
810 #endif
811
812 /* ----------------------------------------------------------------------------
813  * Check for blocked threads that can be woken up.
814  * ------------------------------------------------------------------------- */
815
816 static void
817 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
818 {
819 #if !defined(THREADED_RTS)
820     //
821     // Check whether any waiting threads need to be woken up.  If the
822     // run queue is empty, and there are no other tasks running, we
823     // can wait indefinitely for something to happen.
824     //
825     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
826     {
827         awaitEvent (emptyRunQueue(cap));
828     }
829 #endif
830 }
831
832 /* ----------------------------------------------------------------------------
833  * Detect deadlock conditions and attempt to resolve them.
834  * ------------------------------------------------------------------------- */
835
836 static void
837 scheduleDetectDeadlock (Capability *cap, Task *task)
838 {
839     /* 
840      * Detect deadlock: when we have no threads to run, there are no
841      * threads blocked, waiting for I/O, or sleeping, and all the
842      * other tasks are waiting for work, we must have a deadlock of
843      * some description.
844      */
845     if ( emptyThreadQueues(cap) )
846     {
847 #if defined(THREADED_RTS)
848         /* 
849          * In the threaded RTS, we only check for deadlock if there
850          * has been no activity in a complete timeslice.  This means
851          * we won't eagerly start a full GC just because we don't have
852          * any threads to run currently.
853          */
854         if (recent_activity != ACTIVITY_INACTIVE) return;
855 #endif
856
857         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
858
859         // Garbage collection can release some new threads due to
860         // either (a) finalizers or (b) threads resurrected because
861         // they are unreachable and will therefore be sent an
862         // exception.  Any threads thus released will be immediately
863         // runnable.
864         cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
865         // when force_major == rtsTrue. scheduleDoGC sets
866         // recent_activity to ACTIVITY_DONE_GC and turns off the timer
867         // signal.
868
869         if ( !emptyRunQueue(cap) ) return;
870
871 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
872         /* If we have user-installed signal handlers, then wait
873          * for signals to arrive rather then bombing out with a
874          * deadlock.
875          */
876         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
877             debugTrace(DEBUG_sched,
878                        "still deadlocked, waiting for signals...");
879
880             awaitUserSignals();
881
882             if (signals_pending()) {
883                 startSignalHandlers(cap);
884             }
885
886             // either we have threads to run, or we were interrupted:
887             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
888
889             return;
890         }
891 #endif
892
893 #if !defined(THREADED_RTS)
894         /* Probably a real deadlock.  Send the current main thread the
895          * Deadlock exception.
896          */
897         if (task->incall->tso) {
898             switch (task->incall->tso->why_blocked) {
899             case BlockedOnSTM:
900             case BlockedOnBlackHole:
901             case BlockedOnMsgThrowTo:
902             case BlockedOnMVar:
903                 throwToSingleThreaded(cap, task->incall->tso, 
904                                       (StgClosure *)nonTermination_closure);
905                 return;
906             default:
907                 barf("deadlock: main thread blocked in a strange way");
908             }
909         }
910         return;
911 #endif
912     }
913 }
914
915
916 /* ----------------------------------------------------------------------------
917  * Send pending messages (PARALLEL_HASKELL only)
918  * ------------------------------------------------------------------------- */
919
920 #if defined(PARALLEL_HASKELL)
921 static void
922 scheduleSendPendingMessages(void)
923 {
924
925 # if defined(PAR) // global Mem.Mgmt., omit for now
926     if (PendingFetches != END_BF_QUEUE) {
927         processFetches();
928     }
929 # endif
930     
931     if (RtsFlags.ParFlags.BufferTime) {
932         // if we use message buffering, we must send away all message
933         // packets which have become too old...
934         sendOldBuffers(); 
935     }
936 }
937 #endif
938
939 /* ----------------------------------------------------------------------------
940  * Process message in the current Capability's inbox
941  * ------------------------------------------------------------------------- */
942
943 static void
944 scheduleProcessInbox (Capability *cap USED_IF_THREADS)
945 {
946 #if defined(THREADED_RTS)
947     Message *m;
948
949     while (!emptyInbox(cap)) {
950         ACQUIRE_LOCK(&cap->lock);
951         m = cap->inbox;
952         cap->inbox = m->link;
953         RELEASE_LOCK(&cap->lock);
954         executeMessage(cap, (Message *)m);
955     }
956 #endif
957 }
958
959 /* ----------------------------------------------------------------------------
960  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
961  * ------------------------------------------------------------------------- */
962
963 #if defined(THREADED_RTS)
964 static void
965 scheduleActivateSpark(Capability *cap)
966 {
967     if (anySparks())
968     {
969         createSparkThread(cap);
970         debugTrace(DEBUG_sched, "creating a spark thread");
971     }
972 }
973 #endif // PARALLEL_HASKELL || THREADED_RTS
974
975 /* ----------------------------------------------------------------------------
976  * After running a thread...
977  * ------------------------------------------------------------------------- */
978
979 static void
980 schedulePostRunThread (Capability *cap, StgTSO *t)
981 {
982     // We have to be able to catch transactions that are in an
983     // infinite loop as a result of seeing an inconsistent view of
984     // memory, e.g. 
985     //
986     //   atomically $ do
987     //       [a,b] <- mapM readTVar [ta,tb]
988     //       when (a == b) loop
989     //
990     // and a is never equal to b given a consistent view of memory.
991     //
992     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
993         if (!stmValidateNestOfTransactions (t -> trec)) {
994             debugTrace(DEBUG_sched | DEBUG_stm,
995                        "trec %p found wasting its time", t);
996             
997             // strip the stack back to the
998             // ATOMICALLY_FRAME, aborting the (nested)
999             // transaction, and saving the stack of any
1000             // partially-evaluated thunks on the heap.
1001             throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1002             
1003 //            ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1004         }
1005     }
1006
1007   /* some statistics gathering in the parallel case */
1008 }
1009
1010 /* -----------------------------------------------------------------------------
1011  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1012  * -------------------------------------------------------------------------- */
1013
1014 static rtsBool
1015 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1016 {
1017     // did the task ask for a large block?
1018     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1019         // if so, get one and push it on the front of the nursery.
1020         bdescr *bd;
1021         lnat blocks;
1022         
1023         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1024         
1025         debugTrace(DEBUG_sched,
1026                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1027                    (long)t->id, what_next_strs[t->what_next], blocks);
1028     
1029         // don't do this if the nursery is (nearly) full, we'll GC first.
1030         if (cap->r.rCurrentNursery->link != NULL ||
1031             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1032                                                // if the nursery has only one block.
1033             
1034             ACQUIRE_SM_LOCK
1035             bd = allocGroup( blocks );
1036             RELEASE_SM_LOCK
1037             cap->r.rNursery->n_blocks += blocks;
1038             
1039             // link the new group into the list
1040             bd->link = cap->r.rCurrentNursery;
1041             bd->u.back = cap->r.rCurrentNursery->u.back;
1042             if (cap->r.rCurrentNursery->u.back != NULL) {
1043                 cap->r.rCurrentNursery->u.back->link = bd;
1044             } else {
1045                 cap->r.rNursery->blocks = bd;
1046             }             
1047             cap->r.rCurrentNursery->u.back = bd;
1048             
1049             // initialise it as a nursery block.  We initialise the
1050             // step, gen_no, and flags field of *every* sub-block in
1051             // this large block, because this is easier than making
1052             // sure that we always find the block head of a large
1053             // block whenever we call Bdescr() (eg. evacuate() and
1054             // isAlive() in the GC would both have to do this, at
1055             // least).
1056             { 
1057                 bdescr *x;
1058                 for (x = bd; x < bd + blocks; x++) {
1059                     initBdescr(x,g0,g0);
1060                     x->free = x->start;
1061                     x->flags = 0;
1062                 }
1063             }
1064             
1065             // This assert can be a killer if the app is doing lots
1066             // of large block allocations.
1067             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1068             
1069             // now update the nursery to point to the new block
1070             cap->r.rCurrentNursery = bd;
1071             
1072             // we might be unlucky and have another thread get on the
1073             // run queue before us and steal the large block, but in that
1074             // case the thread will just end up requesting another large
1075             // block.
1076             pushOnRunQueue(cap,t);
1077             return rtsFalse;  /* not actually GC'ing */
1078         }
1079     }
1080     
1081     if (cap->r.rHpLim == NULL || cap->context_switch) {
1082         // Sometimes we miss a context switch, e.g. when calling
1083         // primitives in a tight loop, MAYBE_GC() doesn't check the
1084         // context switch flag, and we end up waiting for a GC.
1085         // See #1984, and concurrent/should_run/1984
1086         cap->context_switch = 0;
1087         appendToRunQueue(cap,t);
1088     } else {
1089         pushOnRunQueue(cap,t);
1090     }
1091     return rtsTrue;
1092     /* actual GC is done at the end of the while loop in schedule() */
1093 }
1094
1095 /* -----------------------------------------------------------------------------
1096  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1097  * -------------------------------------------------------------------------- */
1098
1099 static void
1100 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1101 {
1102     /* just adjust the stack for this thread, then pop it back
1103      * on the run queue.
1104      */
1105     { 
1106         /* enlarge the stack */
1107         StgTSO *new_t = threadStackOverflow(cap, t);
1108         
1109         /* The TSO attached to this Task may have moved, so update the
1110          * pointer to it.
1111          */
1112         if (task->incall->tso == t) {
1113             task->incall->tso = new_t;
1114         }
1115         pushOnRunQueue(cap,new_t);
1116     }
1117 }
1118
1119 /* -----------------------------------------------------------------------------
1120  * Handle a thread that returned to the scheduler with ThreadYielding
1121  * -------------------------------------------------------------------------- */
1122
1123 static rtsBool
1124 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1125 {
1126     /* put the thread back on the run queue.  Then, if we're ready to
1127      * GC, check whether this is the last task to stop.  If so, wake
1128      * up the GC thread.  getThread will block during a GC until the
1129      * GC is finished.
1130      */
1131
1132     ASSERT(t->_link == END_TSO_QUEUE);
1133     
1134     // Shortcut if we're just switching evaluators: don't bother
1135     // doing stack squeezing (which can be expensive), just run the
1136     // thread.
1137     if (cap->context_switch == 0 && t->what_next != prev_what_next) {
1138         debugTrace(DEBUG_sched,
1139                    "--<< thread %ld (%s) stopped to switch evaluators", 
1140                    (long)t->id, what_next_strs[t->what_next]);
1141         return rtsTrue;
1142     }
1143
1144     // Reset the context switch flag.  We don't do this just before
1145     // running the thread, because that would mean we would lose ticks
1146     // during GC, which can lead to unfair scheduling (a thread hogs
1147     // the CPU because the tick always arrives during GC).  This way
1148     // penalises threads that do a lot of allocation, but that seems
1149     // better than the alternative.
1150     cap->context_switch = 0;
1151     
1152     IF_DEBUG(sanity,
1153              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1154              checkTSO(t));
1155
1156     appendToRunQueue(cap,t);
1157
1158     return rtsFalse;
1159 }
1160
1161 /* -----------------------------------------------------------------------------
1162  * Handle a thread that returned to the scheduler with ThreadBlocked
1163  * -------------------------------------------------------------------------- */
1164
1165 static void
1166 scheduleHandleThreadBlocked( StgTSO *t
1167 #if !defined(DEBUG)
1168     STG_UNUSED
1169 #endif
1170     )
1171 {
1172
1173       // We don't need to do anything.  The thread is blocked, and it
1174       // has tidied up its stack and placed itself on whatever queue
1175       // it needs to be on.
1176
1177     // ASSERT(t->why_blocked != NotBlocked);
1178     // Not true: for example,
1179     //    - the thread may have woken itself up already, because
1180     //      threadPaused() might have raised a blocked throwTo
1181     //      exception, see maybePerformBlockedException().
1182
1183 #ifdef DEBUG
1184     traceThreadStatus(DEBUG_sched, t);
1185 #endif
1186 }
1187
1188 /* -----------------------------------------------------------------------------
1189  * Handle a thread that returned to the scheduler with ThreadFinished
1190  * -------------------------------------------------------------------------- */
1191
1192 static rtsBool
1193 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1194 {
1195     /* Need to check whether this was a main thread, and if so,
1196      * return with the return value.
1197      *
1198      * We also end up here if the thread kills itself with an
1199      * uncaught exception, see Exception.cmm.
1200      */
1201
1202     // blocked exceptions can now complete, even if the thread was in
1203     // blocked mode (see #2910).
1204     awakenBlockedExceptionQueue (cap, t);
1205
1206       //
1207       // Check whether the thread that just completed was a bound
1208       // thread, and if so return with the result.  
1209       //
1210       // There is an assumption here that all thread completion goes
1211       // through this point; we need to make sure that if a thread
1212       // ends up in the ThreadKilled state, that it stays on the run
1213       // queue so it can be dealt with here.
1214       //
1215
1216       if (t->bound) {
1217
1218           if (t->bound != task->incall) {
1219 #if !defined(THREADED_RTS)
1220               // Must be a bound thread that is not the topmost one.  Leave
1221               // it on the run queue until the stack has unwound to the
1222               // point where we can deal with this.  Leaving it on the run
1223               // queue also ensures that the garbage collector knows about
1224               // this thread and its return value (it gets dropped from the
1225               // step->threads list so there's no other way to find it).
1226               appendToRunQueue(cap,t);
1227               return rtsFalse;
1228 #else
1229               // this cannot happen in the threaded RTS, because a
1230               // bound thread can only be run by the appropriate Task.
1231               barf("finished bound thread that isn't mine");
1232 #endif
1233           }
1234
1235           ASSERT(task->incall->tso == t);
1236
1237           if (t->what_next == ThreadComplete) {
1238               if (task->incall->ret) {
1239                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1240                   *(task->incall->ret) = (StgClosure *)task->incall->tso->sp[1]; 
1241               }
1242               task->incall->stat = Success;
1243           } else {
1244               if (task->incall->ret) {
1245                   *(task->incall->ret) = NULL;
1246               }
1247               if (sched_state >= SCHED_INTERRUPTING) {
1248                   if (heap_overflow) {
1249                       task->incall->stat = HeapExhausted;
1250                   } else {
1251                       task->incall->stat = Interrupted;
1252                   }
1253               } else {
1254                   task->incall->stat = Killed;
1255               }
1256           }
1257 #ifdef DEBUG
1258           removeThreadLabel((StgWord)task->incall->tso->id);
1259 #endif
1260
1261           // We no longer consider this thread and task to be bound to
1262           // each other.  The TSO lives on until it is GC'd, but the
1263           // task is about to be released by the caller, and we don't
1264           // want anyone following the pointer from the TSO to the
1265           // defunct task (which might have already been
1266           // re-used). This was a real bug: the GC updated
1267           // tso->bound->tso which lead to a deadlock.
1268           t->bound = NULL;
1269           task->incall->tso = NULL;
1270
1271           return rtsTrue; // tells schedule() to return
1272       }
1273
1274       return rtsFalse;
1275 }
1276
1277 /* -----------------------------------------------------------------------------
1278  * Perform a heap census
1279  * -------------------------------------------------------------------------- */
1280
1281 static rtsBool
1282 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1283 {
1284     // When we have +RTS -i0 and we're heap profiling, do a census at
1285     // every GC.  This lets us get repeatable runs for debugging.
1286     if (performHeapProfile ||
1287         (RtsFlags.ProfFlags.profileInterval==0 &&
1288          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1289         return rtsTrue;
1290     } else {
1291         return rtsFalse;
1292     }
1293 }
1294
1295 /* -----------------------------------------------------------------------------
1296  * Perform a garbage collection if necessary
1297  * -------------------------------------------------------------------------- */
1298
1299 static Capability *
1300 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1301 {
1302     rtsBool heap_census;
1303 #ifdef THREADED_RTS
1304     /* extern static volatile StgWord waiting_for_gc; 
1305        lives inside capability.c */
1306     rtsBool gc_type, prev_pending_gc;
1307     nat i;
1308 #endif
1309
1310     if (sched_state == SCHED_SHUTTING_DOWN) {
1311         // The final GC has already been done, and the system is
1312         // shutting down.  We'll probably deadlock if we try to GC
1313         // now.
1314         return cap;
1315     }
1316
1317 #ifdef THREADED_RTS
1318     if (sched_state < SCHED_INTERRUPTING
1319         && RtsFlags.ParFlags.parGcEnabled
1320         && N >= RtsFlags.ParFlags.parGcGen
1321         && ! oldest_gen->mark)
1322     {
1323         gc_type = PENDING_GC_PAR;
1324     } else {
1325         gc_type = PENDING_GC_SEQ;
1326     }
1327
1328     // In order to GC, there must be no threads running Haskell code.
1329     // Therefore, the GC thread needs to hold *all* the capabilities,
1330     // and release them after the GC has completed.  
1331     //
1332     // This seems to be the simplest way: previous attempts involved
1333     // making all the threads with capabilities give up their
1334     // capabilities and sleep except for the *last* one, which
1335     // actually did the GC.  But it's quite hard to arrange for all
1336     // the other tasks to sleep and stay asleep.
1337     //
1338
1339     /*  Other capabilities are prevented from running yet more Haskell
1340         threads if waiting_for_gc is set. Tested inside
1341         yieldCapability() and releaseCapability() in Capability.c */
1342
1343     prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1344     if (prev_pending_gc) {
1345         do {
1346             debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
1347                        prev_pending_gc);
1348             ASSERT(cap);
1349             yieldCapability(&cap,task);
1350         } while (waiting_for_gc);
1351         return cap;  // NOTE: task->cap might have changed here
1352     }
1353
1354     setContextSwitches();
1355
1356     // The final shutdown GC is always single-threaded, because it's
1357     // possible that some of the Capabilities have no worker threads.
1358     
1359     if (gc_type == PENDING_GC_SEQ)
1360     {
1361         traceEventRequestSeqGc(cap);
1362     }
1363     else
1364     {
1365         traceEventRequestParGc(cap);
1366         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1367     }
1368
1369     if (gc_type == PENDING_GC_SEQ)
1370     {
1371         // single-threaded GC: grab all the capabilities
1372         for (i=0; i < n_capabilities; i++) {
1373             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1374             if (cap != &capabilities[i]) {
1375                 Capability *pcap = &capabilities[i];
1376                 // we better hope this task doesn't get migrated to
1377                 // another Capability while we're waiting for this one.
1378                 // It won't, because load balancing happens while we have
1379                 // all the Capabilities, but even so it's a slightly
1380                 // unsavoury invariant.
1381                 task->cap = pcap;
1382                 waitForReturnCapability(&pcap, task);
1383                 if (pcap != &capabilities[i]) {
1384                     barf("scheduleDoGC: got the wrong capability");
1385                 }
1386             }
1387         }
1388     }
1389     else
1390     {
1391         // multi-threaded GC: make sure all the Capabilities donate one
1392         // GC thread each.
1393         waitForGcThreads(cap);
1394     }
1395
1396 #endif
1397
1398     IF_DEBUG(scheduler, printAllThreads());
1399
1400 delete_threads_and_gc:
1401     /*
1402      * We now have all the capabilities; if we're in an interrupting
1403      * state, then we should take the opportunity to delete all the
1404      * threads in the system.
1405      */
1406     if (sched_state == SCHED_INTERRUPTING) {
1407         deleteAllThreads(cap);
1408         sched_state = SCHED_SHUTTING_DOWN;
1409     }
1410     
1411     heap_census = scheduleNeedHeapProfile(rtsTrue);
1412
1413     traceEventGcStart(cap);
1414 #if defined(THREADED_RTS)
1415     // reset waiting_for_gc *before* GC, so that when the GC threads
1416     // emerge they don't immediately re-enter the GC.
1417     waiting_for_gc = 0;
1418     GarbageCollect(force_major || heap_census, gc_type, cap);
1419 #else
1420     GarbageCollect(force_major || heap_census, 0, cap);
1421 #endif
1422     traceEventGcEnd(cap);
1423
1424     if (recent_activity == ACTIVITY_INACTIVE && force_major)
1425     {
1426         // We are doing a GC because the system has been idle for a
1427         // timeslice and we need to check for deadlock.  Record the
1428         // fact that we've done a GC and turn off the timer signal;
1429         // it will get re-enabled if we run any threads after the GC.
1430         recent_activity = ACTIVITY_DONE_GC;
1431         stopTimer();
1432     }
1433     else
1434     {
1435         // the GC might have taken long enough for the timer to set
1436         // recent_activity = ACTIVITY_INACTIVE, but we aren't
1437         // necessarily deadlocked:
1438         recent_activity = ACTIVITY_YES;
1439     }
1440
1441 #if defined(THREADED_RTS)
1442     if (gc_type == PENDING_GC_PAR)
1443     {
1444         releaseGCThreads(cap);
1445     }
1446 #endif
1447
1448     if (heap_census) {
1449         debugTrace(DEBUG_sched, "performing heap census");
1450         heapCensus();
1451         performHeapProfile = rtsFalse;
1452     }
1453
1454     if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1455         // GC set the heap_overflow flag, so we should proceed with
1456         // an orderly shutdown now.  Ultimately we want the main
1457         // thread to return to its caller with HeapExhausted, at which
1458         // point the caller should call hs_exit().  The first step is
1459         // to delete all the threads.
1460         //
1461         // Another way to do this would be to raise an exception in
1462         // the main thread, which we really should do because it gives
1463         // the program a chance to clean up.  But how do we find the
1464         // main thread?  It should presumably be the same one that
1465         // gets ^C exceptions, but that's all done on the Haskell side
1466         // (GHC.TopHandler).
1467         sched_state = SCHED_INTERRUPTING;
1468         goto delete_threads_and_gc;
1469     }
1470
1471 #ifdef SPARKBALANCE
1472     /* JB 
1473        Once we are all together... this would be the place to balance all
1474        spark pools. No concurrent stealing or adding of new sparks can
1475        occur. Should be defined in Sparks.c. */
1476     balanceSparkPoolsCaps(n_capabilities, capabilities);
1477 #endif
1478
1479 #if defined(THREADED_RTS)
1480     if (gc_type == PENDING_GC_SEQ) {
1481         // release our stash of capabilities.
1482         for (i = 0; i < n_capabilities; i++) {
1483             if (cap != &capabilities[i]) {
1484                 task->cap = &capabilities[i];
1485                 releaseCapability(&capabilities[i]);
1486             }
1487         }
1488     }
1489     if (cap) {
1490         task->cap = cap;
1491     } else {
1492         task->cap = NULL;
1493     }
1494 #endif
1495
1496     return cap;
1497 }
1498
1499 /* ---------------------------------------------------------------------------
1500  * Singleton fork(). Do not copy any running threads.
1501  * ------------------------------------------------------------------------- */
1502
1503 pid_t
1504 forkProcess(HsStablePtr *entry
1505 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1506             STG_UNUSED
1507 #endif
1508            )
1509 {
1510 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1511     pid_t pid;
1512     StgTSO* t,*next;
1513     Capability *cap;
1514     nat g;
1515     
1516 #if defined(THREADED_RTS)
1517     if (RtsFlags.ParFlags.nNodes > 1) {
1518         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1519         stg_exit(EXIT_FAILURE);
1520     }
1521 #endif
1522
1523     debugTrace(DEBUG_sched, "forking!");
1524     
1525     // ToDo: for SMP, we should probably acquire *all* the capabilities
1526     cap = rts_lock();
1527     
1528     // no funny business: hold locks while we fork, otherwise if some
1529     // other thread is holding a lock when the fork happens, the data
1530     // structure protected by the lock will forever be in an
1531     // inconsistent state in the child.  See also #1391.
1532     ACQUIRE_LOCK(&sched_mutex);
1533     ACQUIRE_LOCK(&cap->lock);
1534     ACQUIRE_LOCK(&cap->running_task->lock);
1535
1536     stopTimer(); // See #4074
1537
1538     pid = fork();
1539     
1540     if (pid) { // parent
1541         
1542         startTimer(); // #4074
1543
1544         RELEASE_LOCK(&sched_mutex);
1545         RELEASE_LOCK(&cap->lock);
1546         RELEASE_LOCK(&cap->running_task->lock);
1547
1548         // just return the pid
1549         rts_unlock(cap);
1550         return pid;
1551         
1552     } else { // child
1553         
1554 #if defined(THREADED_RTS)
1555         initMutex(&sched_mutex);
1556         initMutex(&cap->lock);
1557         initMutex(&cap->running_task->lock);
1558 #endif
1559
1560         // Now, all OS threads except the thread that forked are
1561         // stopped.  We need to stop all Haskell threads, including
1562         // those involved in foreign calls.  Also we need to delete
1563         // all Tasks, because they correspond to OS threads that are
1564         // now gone.
1565
1566         for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1567           for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1568             if (t->what_next == ThreadRelocated) {
1569                 next = t->_link;
1570             } else {
1571                 next = t->global_link;
1572                 // don't allow threads to catch the ThreadKilled
1573                 // exception, but we do want to raiseAsync() because these
1574                 // threads may be evaluating thunks that we need later.
1575                 deleteThread_(cap,t);
1576
1577                 // stop the GC from updating the InCall to point to
1578                 // the TSO.  This is only necessary because the
1579                 // OSThread bound to the TSO has been killed, and
1580                 // won't get a chance to exit in the usual way (see
1581                 // also scheduleHandleThreadFinished).
1582                 t->bound = NULL;
1583             }
1584           }
1585         }
1586         
1587         // Empty the run queue.  It seems tempting to let all the
1588         // killed threads stay on the run queue as zombies to be
1589         // cleaned up later, but some of them correspond to bound
1590         // threads for which the corresponding Task does not exist.
1591         cap->run_queue_hd = END_TSO_QUEUE;
1592         cap->run_queue_tl = END_TSO_QUEUE;
1593
1594         // Any suspended C-calling Tasks are no more, their OS threads
1595         // don't exist now:
1596         cap->suspended_ccalls = NULL;
1597
1598         // Empty the threads lists.  Otherwise, the garbage
1599         // collector may attempt to resurrect some of these threads.
1600         for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1601             generations[g].threads = END_TSO_QUEUE;
1602         }
1603
1604         discardTasksExcept(cap->running_task);
1605
1606 #if defined(THREADED_RTS)
1607         // Wipe our spare workers list, they no longer exist.  New
1608         // workers will be created if necessary.
1609         cap->spare_workers = NULL;
1610         cap->returning_tasks_hd = NULL;
1611         cap->returning_tasks_tl = NULL;
1612 #endif
1613
1614         // On Unix, all timers are reset in the child, so we need to start
1615         // the timer again.
1616         initTimer();
1617         startTimer();
1618
1619 #if defined(THREADED_RTS)
1620         cap = ioManagerStartCap(cap);
1621 #endif
1622
1623         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1624         rts_checkSchedStatus("forkProcess",cap);
1625         
1626         rts_unlock(cap);
1627         hs_exit();                      // clean up and exit
1628         stg_exit(EXIT_SUCCESS);
1629     }
1630 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1631     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1632 #endif
1633 }
1634
1635 /* ---------------------------------------------------------------------------
1636  * Delete all the threads in the system
1637  * ------------------------------------------------------------------------- */
1638    
1639 static void
1640 deleteAllThreads ( Capability *cap )
1641 {
1642     // NOTE: only safe to call if we own all capabilities.
1643
1644     StgTSO* t, *next;
1645     nat g;
1646
1647     debugTrace(DEBUG_sched,"deleting all threads");
1648     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
1649         for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
1650             if (t->what_next == ThreadRelocated) {
1651                 next = t->_link;
1652             } else {
1653                 next = t->global_link;
1654                 deleteThread(cap,t);
1655             }
1656         }
1657     }
1658
1659     // The run queue now contains a bunch of ThreadKilled threads.  We
1660     // must not throw these away: the main thread(s) will be in there
1661     // somewhere, and the main scheduler loop has to deal with it.
1662     // Also, the run queue is the only thing keeping these threads from
1663     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1664
1665 #if !defined(THREADED_RTS)
1666     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1667     ASSERT(sleeping_queue == END_TSO_QUEUE);
1668 #endif
1669 }
1670
1671 /* -----------------------------------------------------------------------------
1672    Managing the suspended_ccalls list.
1673    Locks required: sched_mutex
1674    -------------------------------------------------------------------------- */
1675
1676 STATIC_INLINE void
1677 suspendTask (Capability *cap, Task *task)
1678 {
1679     InCall *incall;
1680     
1681     incall = task->incall;
1682     ASSERT(incall->next == NULL && incall->prev == NULL);
1683     incall->next = cap->suspended_ccalls;
1684     incall->prev = NULL;
1685     if (cap->suspended_ccalls) {
1686         cap->suspended_ccalls->prev = incall;
1687     }
1688     cap->suspended_ccalls = incall;
1689 }
1690
1691 STATIC_INLINE void
1692 recoverSuspendedTask (Capability *cap, Task *task)
1693 {
1694     InCall *incall;
1695
1696     incall = task->incall;
1697     if (incall->prev) {
1698         incall->prev->next = incall->next;
1699     } else {
1700         ASSERT(cap->suspended_ccalls == incall);
1701         cap->suspended_ccalls = incall->next;
1702     }
1703     if (incall->next) {
1704         incall->next->prev = incall->prev;
1705     }
1706     incall->next = incall->prev = NULL;
1707 }
1708
1709 /* ---------------------------------------------------------------------------
1710  * Suspending & resuming Haskell threads.
1711  * 
1712  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1713  * its capability before calling the C function.  This allows another
1714  * task to pick up the capability and carry on running Haskell
1715  * threads.  It also means that if the C call blocks, it won't lock
1716  * the whole system.
1717  *
1718  * The Haskell thread making the C call is put to sleep for the
1719  * duration of the call, on the suspended_ccalling_threads queue.  We
1720  * give out a token to the task, which it can use to resume the thread
1721  * on return from the C function.
1722  *
1723  * If this is an interruptible C call, this means that the FFI call may be
1724  * unceremoniously terminated and should be scheduled on an
1725  * unbound worker thread.
1726  * ------------------------------------------------------------------------- */
1727    
1728 void *
1729 suspendThread (StgRegTable *reg, rtsBool interruptible)
1730 {
1731   Capability *cap;
1732   int saved_errno;
1733   StgTSO *tso;
1734   Task *task;
1735 #if mingw32_HOST_OS
1736   StgWord32 saved_winerror;
1737 #endif
1738
1739   saved_errno = errno;
1740 #if mingw32_HOST_OS
1741   saved_winerror = GetLastError();
1742 #endif
1743
1744   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1745    */
1746   cap = regTableToCapability(reg);
1747
1748   task = cap->running_task;
1749   tso = cap->r.rCurrentTSO;
1750
1751   traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL);
1752
1753   // XXX this might not be necessary --SDM
1754   tso->what_next = ThreadRunGHC;
1755
1756   threadPaused(cap,tso);
1757
1758   if (interruptible) {
1759     tso->why_blocked = BlockedOnCCall_Interruptible;
1760   } else {
1761     tso->why_blocked = BlockedOnCCall;
1762   }
1763
1764   // Hand back capability
1765   task->incall->suspended_tso = tso;
1766   task->incall->suspended_cap = cap;
1767
1768   ACQUIRE_LOCK(&cap->lock);
1769
1770   suspendTask(cap,task);
1771   cap->in_haskell = rtsFalse;
1772   releaseCapability_(cap,rtsFalse);
1773   
1774   RELEASE_LOCK(&cap->lock);
1775
1776   errno = saved_errno;
1777 #if mingw32_HOST_OS
1778   SetLastError(saved_winerror);
1779 #endif
1780   return task;
1781 }
1782
1783 StgRegTable *
1784 resumeThread (void *task_)
1785 {
1786     StgTSO *tso;
1787     InCall *incall;
1788     Capability *cap;
1789     Task *task = task_;
1790     int saved_errno;
1791 #if mingw32_HOST_OS
1792     StgWord32 saved_winerror;
1793 #endif
1794
1795     saved_errno = errno;
1796 #if mingw32_HOST_OS
1797     saved_winerror = GetLastError();
1798 #endif
1799
1800     incall = task->incall;
1801     cap = incall->suspended_cap;
1802     task->cap = cap;
1803
1804     // Wait for permission to re-enter the RTS with the result.
1805     waitForReturnCapability(&cap,task);
1806     // we might be on a different capability now... but if so, our
1807     // entry on the suspended_ccalls list will also have been
1808     // migrated.
1809
1810     // Remove the thread from the suspended list
1811     recoverSuspendedTask(cap,task);
1812
1813     tso = incall->suspended_tso;
1814     incall->suspended_tso = NULL;
1815     incall->suspended_cap = NULL;
1816     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1817
1818     traceEventRunThread(cap, tso);
1819     
1820     if ((tso->flags & TSO_BLOCKEX) == 0) {
1821         // avoid locking the TSO if we don't have to
1822         if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
1823             awakenBlockedExceptionQueue(cap,tso);
1824         }
1825     }
1826     
1827     /* Reset blocking status */
1828     tso->why_blocked  = NotBlocked;
1829     
1830     cap->r.rCurrentTSO = tso;
1831     cap->in_haskell = rtsTrue;
1832     errno = saved_errno;
1833 #if mingw32_HOST_OS
1834     SetLastError(saved_winerror);
1835 #endif
1836
1837     /* We might have GC'd, mark the TSO dirty again */
1838     dirty_TSO(cap,tso);
1839
1840     IF_DEBUG(sanity, checkTSO(tso));
1841
1842     return &cap->r;
1843 }
1844
1845 /* ---------------------------------------------------------------------------
1846  * scheduleThread()
1847  *
1848  * scheduleThread puts a thread on the end  of the runnable queue.
1849  * This will usually be done immediately after a thread is created.
1850  * The caller of scheduleThread must create the thread using e.g.
1851  * createThread and push an appropriate closure
1852  * on this thread's stack before the scheduler is invoked.
1853  * ------------------------------------------------------------------------ */
1854
1855 void
1856 scheduleThread(Capability *cap, StgTSO *tso)
1857 {
1858     // The thread goes at the *end* of the run-queue, to avoid possible
1859     // starvation of any threads already on the queue.
1860     appendToRunQueue(cap,tso);
1861 }
1862
1863 void
1864 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1865 {
1866 #if defined(THREADED_RTS)
1867     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1868                               // move this thread from now on.
1869     cpu %= RtsFlags.ParFlags.nNodes;
1870     if (cpu == cap->no) {
1871         appendToRunQueue(cap,tso);
1872     } else {
1873         migrateThread(cap, tso, &capabilities[cpu]);
1874     }
1875 #else
1876     appendToRunQueue(cap,tso);
1877 #endif
1878 }
1879
1880 Capability *
1881 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1882 {
1883     Task *task;
1884     StgThreadID id;
1885
1886     // We already created/initialised the Task
1887     task = cap->running_task;
1888
1889     // This TSO is now a bound thread; make the Task and TSO
1890     // point to each other.
1891     tso->bound = task->incall;
1892     tso->cap = cap;
1893
1894     task->incall->tso = tso;
1895     task->incall->ret = ret;
1896     task->incall->stat = NoStatus;
1897
1898     appendToRunQueue(cap,tso);
1899
1900     id = tso->id;
1901     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
1902
1903     cap = schedule(cap,task);
1904
1905     ASSERT(task->incall->stat != NoStatus);
1906     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1907
1908     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
1909     return cap;
1910 }
1911
1912 /* ----------------------------------------------------------------------------
1913  * Starting Tasks
1914  * ------------------------------------------------------------------------- */
1915
1916 #if defined(THREADED_RTS)
1917 void scheduleWorker (Capability *cap, Task *task)
1918 {
1919     // schedule() runs without a lock.
1920     cap = schedule(cap,task);
1921
1922     // On exit from schedule(), we have a Capability, but possibly not
1923     // the same one we started with.
1924
1925     // During shutdown, the requirement is that after all the
1926     // Capabilities are shut down, all workers that are shutting down
1927     // have finished workerTaskStop().  This is why we hold on to
1928     // cap->lock until we've finished workerTaskStop() below.
1929     //
1930     // There may be workers still involved in foreign calls; those
1931     // will just block in waitForReturnCapability() because the
1932     // Capability has been shut down.
1933     //
1934     ACQUIRE_LOCK(&cap->lock);
1935     releaseCapability_(cap,rtsFalse);
1936     workerTaskStop(task);
1937     RELEASE_LOCK(&cap->lock);
1938 }
1939 #endif
1940
1941 /* ---------------------------------------------------------------------------
1942  * initScheduler()
1943  *
1944  * Initialise the scheduler.  This resets all the queues - if the
1945  * queues contained any threads, they'll be garbage collected at the
1946  * next pass.
1947  *
1948  * ------------------------------------------------------------------------ */
1949
1950 void 
1951 initScheduler(void)
1952 {
1953 #if !defined(THREADED_RTS)
1954   blocked_queue_hd  = END_TSO_QUEUE;
1955   blocked_queue_tl  = END_TSO_QUEUE;
1956   sleeping_queue    = END_TSO_QUEUE;
1957 #endif
1958
1959   sched_state    = SCHED_RUNNING;
1960   recent_activity = ACTIVITY_YES;
1961
1962 #if defined(THREADED_RTS)
1963   /* Initialise the mutex and condition variables used by
1964    * the scheduler. */
1965   initMutex(&sched_mutex);
1966 #endif
1967   
1968   ACQUIRE_LOCK(&sched_mutex);
1969
1970   /* A capability holds the state a native thread needs in
1971    * order to execute STG code. At least one capability is
1972    * floating around (only THREADED_RTS builds have more than one).
1973    */
1974   initCapabilities();
1975
1976   initTaskManager();
1977
1978 #if defined(THREADED_RTS)
1979   initSparkPools();
1980 #endif
1981
1982   RELEASE_LOCK(&sched_mutex);
1983
1984 #if defined(THREADED_RTS)
1985   /*
1986    * Eagerly start one worker to run each Capability, except for
1987    * Capability 0.  The idea is that we're probably going to start a
1988    * bound thread on Capability 0 pretty soon, so we don't want a
1989    * worker task hogging it.
1990    */
1991   { 
1992       nat i;
1993       Capability *cap;
1994       for (i = 1; i < n_capabilities; i++) {
1995           cap = &capabilities[i];
1996           ACQUIRE_LOCK(&cap->lock);
1997           startWorkerTask(cap);
1998           RELEASE_LOCK(&cap->lock);
1999       }
2000   }
2001 #endif
2002 }
2003
2004 void
2005 exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
2006                /* see Capability.c, shutdownCapability() */
2007 {
2008     Task *task = NULL;
2009
2010     task = newBoundTask();
2011
2012     // If we haven't killed all the threads yet, do it now.
2013     if (sched_state < SCHED_SHUTTING_DOWN) {
2014         sched_state = SCHED_INTERRUPTING;
2015         waitForReturnCapability(&task->cap,task);
2016         scheduleDoGC(task->cap,task,rtsFalse);
2017         ASSERT(task->incall->tso == NULL);
2018         releaseCapability(task->cap);
2019     }
2020     sched_state = SCHED_SHUTTING_DOWN;
2021
2022 #if defined(THREADED_RTS)
2023     { 
2024         nat i;
2025         
2026         for (i = 0; i < n_capabilities; i++) {
2027             ASSERT(task->incall->tso == NULL);
2028             shutdownCapability(&capabilities[i], task, wait_foreign);
2029         }
2030     }
2031 #endif
2032
2033     boundTaskExiting(task);
2034 }
2035
2036 void
2037 freeScheduler( void )
2038 {
2039     nat still_running;
2040
2041     ACQUIRE_LOCK(&sched_mutex);
2042     still_running = freeTaskManager();
2043     // We can only free the Capabilities if there are no Tasks still
2044     // running.  We might have a Task about to return from a foreign
2045     // call into waitForReturnCapability(), for example (actually,
2046     // this should be the *only* thing that a still-running Task can
2047     // do at this point, and it will block waiting for the
2048     // Capability).
2049     if (still_running == 0) {
2050         freeCapabilities();
2051         if (n_capabilities != 1) {
2052             stgFree(capabilities);
2053         }
2054     }
2055     RELEASE_LOCK(&sched_mutex);
2056 #if defined(THREADED_RTS)
2057     closeMutex(&sched_mutex);
2058 #endif
2059 }
2060
2061 /* -----------------------------------------------------------------------------
2062    performGC
2063
2064    This is the interface to the garbage collector from Haskell land.
2065    We provide this so that external C code can allocate and garbage
2066    collect when called from Haskell via _ccall_GC.
2067    -------------------------------------------------------------------------- */
2068
2069 static void
2070 performGC_(rtsBool force_major)
2071 {
2072     Task *task;
2073
2074     // We must grab a new Task here, because the existing Task may be
2075     // associated with a particular Capability, and chained onto the 
2076     // suspended_ccalls queue.
2077     task = newBoundTask();
2078
2079     waitForReturnCapability(&task->cap,task);
2080     scheduleDoGC(task->cap,task,force_major);
2081     releaseCapability(task->cap);
2082     boundTaskExiting(task);
2083 }
2084
2085 void
2086 performGC(void)
2087 {
2088     performGC_(rtsFalse);
2089 }
2090
2091 void
2092 performMajorGC(void)
2093 {
2094     performGC_(rtsTrue);
2095 }
2096
2097 /* -----------------------------------------------------------------------------
2098    Stack overflow
2099
2100    If the thread has reached its maximum stack size, then raise the
2101    StackOverflow exception in the offending thread.  Otherwise
2102    relocate the TSO into a larger chunk of memory and adjust its stack
2103    size appropriately.
2104    -------------------------------------------------------------------------- */
2105
2106 static StgTSO *
2107 threadStackOverflow(Capability *cap, StgTSO *tso)
2108 {
2109   nat new_stack_size, stack_words;
2110   lnat new_tso_size;
2111   StgPtr new_sp;
2112   StgTSO *dest;
2113
2114   IF_DEBUG(sanity,checkTSO(tso));
2115
2116   if (tso->stack_size >= tso->max_stack_size
2117       && !(tso->flags & TSO_BLOCKEX)) {
2118       // NB. never raise a StackOverflow exception if the thread is
2119       // inside Control.Exceptino.block.  It is impractical to protect
2120       // against stack overflow exceptions, since virtually anything
2121       // can raise one (even 'catch'), so this is the only sensible
2122       // thing to do here.  See bug #767.
2123       //
2124
2125       if (tso->flags & TSO_SQUEEZED) {
2126           return tso;
2127       }
2128       // #3677: In a stack overflow situation, stack squeezing may
2129       // reduce the stack size, but we don't know whether it has been
2130       // reduced enough for the stack check to succeed if we try
2131       // again.  Fortunately stack squeezing is idempotent, so all we
2132       // need to do is record whether *any* squeezing happened.  If we
2133       // are at the stack's absolute -K limit, and stack squeezing
2134       // happened, then we try running the thread again.  The
2135       // TSO_SQUEEZED flag is set by threadPaused() to tell us whether
2136       // squeezing happened or not.
2137
2138       debugTrace(DEBUG_gc,
2139                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2140                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2141       IF_DEBUG(gc,
2142                /* If we're debugging, just print out the top of the stack */
2143                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2144                                                 tso->sp+64)));
2145
2146       // Send this thread the StackOverflow exception
2147       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2148       return tso;
2149   }
2150
2151
2152   // We also want to avoid enlarging the stack if squeezing has
2153   // already released some of it.  However, we don't want to get into
2154   // a pathalogical situation where a thread has a nearly full stack
2155   // (near its current limit, but not near the absolute -K limit),
2156   // keeps allocating a little bit, squeezing removes a little bit,
2157   // and then it runs again.  So to avoid this, if we squeezed *and*
2158   // there is still less than BLOCK_SIZE_W words free, then we enlarge
2159   // the stack anyway.
2160   if ((tso->flags & TSO_SQUEEZED) && 
2161       ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) {
2162       return tso;
2163   }
2164
2165   /* Try to double the current stack size.  If that takes us over the
2166    * maximum stack size for this thread, then use the maximum instead
2167    * (that is, unless we're already at or over the max size and we
2168    * can't raise the StackOverflow exception (see above), in which
2169    * case just double the size). Finally round up so the TSO ends up as
2170    * a whole number of blocks.
2171    */
2172   if (tso->stack_size >= tso->max_stack_size) {
2173       new_stack_size = tso->stack_size * 2;
2174   } else { 
2175       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2176   }
2177   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2178                                        TSO_STRUCT_SIZE)/sizeof(W_);
2179   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2180   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2181
2182   debugTrace(DEBUG_sched, 
2183              "increasing stack size from %ld words to %d.",
2184              (long)tso->stack_size, new_stack_size);
2185
2186   dest = (StgTSO *)allocate(cap,new_tso_size);
2187   TICK_ALLOC_TSO(new_stack_size,0);
2188
2189   /* copy the TSO block and the old stack into the new area */
2190   memcpy(dest,tso,TSO_STRUCT_SIZE);
2191   stack_words = tso->stack + tso->stack_size - tso->sp;
2192   new_sp = (P_)dest + new_tso_size - stack_words;
2193   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2194
2195   /* relocate the stack pointers... */
2196   dest->sp         = new_sp;
2197   dest->stack_size = new_stack_size;
2198         
2199   /* Mark the old TSO as relocated.  We have to check for relocated
2200    * TSOs in the garbage collector and any primops that deal with TSOs.
2201    *
2202    * It's important to set the sp value to just beyond the end
2203    * of the stack, so we don't attempt to scavenge any part of the
2204    * dead TSO's stack.
2205    */
2206   setTSOLink(cap,tso,dest);
2207   write_barrier(); // other threads seeing ThreadRelocated will look at _link
2208   tso->what_next = ThreadRelocated;
2209   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2210   tso->why_blocked = NotBlocked;
2211
2212   IF_DEBUG(sanity,checkTSO(dest));
2213 #if 0
2214   IF_DEBUG(scheduler,printTSO(dest));
2215 #endif
2216
2217   return dest;
2218 }
2219
2220 static StgTSO *
2221 threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
2222 {
2223     bdescr *bd, *new_bd;
2224     lnat free_w, tso_size_w;
2225     StgTSO *new_tso;
2226
2227     tso_size_w = tso_sizeW(tso);
2228
2229     if (tso_size_w < MBLOCK_SIZE_W ||
2230           // TSO is less than 2 mblocks (since the first mblock is
2231           // shorter than MBLOCK_SIZE_W)
2232         (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 ||
2233           // or TSO is not a whole number of megablocks (ensuring
2234           // precondition of splitLargeBlock() below)
2235         (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) ||
2236           // or TSO is smaller than the minimum stack size (rounded up)
2237         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2238           // or stack is using more than 1/4 of the available space
2239     {
2240         // then do nothing
2241         return tso;
2242     }
2243
2244     // this is the number of words we'll free
2245     free_w = round_to_mblocks(tso_size_w/2);
2246
2247     bd = Bdescr((StgPtr)tso);
2248     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2249     bd->free = bd->start + TSO_STRUCT_SIZEW;
2250
2251     new_tso = (StgTSO *)new_bd->start;
2252     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2253     new_tso->stack_size = new_bd->free - new_tso->stack;
2254
2255     // The original TSO was dirty and probably on the mutable
2256     // list. The new TSO is not yet on the mutable list, so we better
2257     // put it there.
2258     new_tso->dirty = 0;
2259     new_tso->flags &= ~TSO_LINK_DIRTY;
2260     dirty_TSO(cap, new_tso);
2261
2262     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2263                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2264
2265     tso->_link = new_tso; // no write barrier reqd: same generation
2266     write_barrier(); // other threads seeing ThreadRelocated will look at _link
2267     tso->what_next = ThreadRelocated;
2268
2269     // The TSO attached to this Task may have moved, so update the
2270     // pointer to it.
2271     if (task->incall->tso == tso) {
2272         task->incall->tso = new_tso;
2273     }
2274
2275     IF_DEBUG(sanity,checkTSO(new_tso));
2276
2277     return new_tso;
2278 }
2279
2280 /* ---------------------------------------------------------------------------
2281    Interrupt execution
2282    - usually called inside a signal handler so it mustn't do anything fancy.   
2283    ------------------------------------------------------------------------ */
2284
2285 void
2286 interruptStgRts(void)
2287 {
2288     sched_state = SCHED_INTERRUPTING;
2289     setContextSwitches();
2290 #if defined(THREADED_RTS)
2291     wakeUpRts();
2292 #endif
2293 }
2294
2295 /* -----------------------------------------------------------------------------
2296    Wake up the RTS
2297    
2298    This function causes at least one OS thread to wake up and run the
2299    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2300    an external event has arrived that may need servicing (eg. a
2301    keyboard interrupt).
2302
2303    In the single-threaded RTS we don't do anything here; we only have
2304    one thread anyway, and the event that caused us to want to wake up
2305    will have interrupted any blocking system call in progress anyway.
2306    -------------------------------------------------------------------------- */
2307
2308 #if defined(THREADED_RTS)
2309 void wakeUpRts(void)
2310 {
2311     // This forces the IO Manager thread to wakeup, which will
2312     // in turn ensure that some OS thread wakes up and runs the
2313     // scheduler loop, which will cause a GC and deadlock check.
2314     ioManagerWakeup();
2315 }
2316 #endif
2317
2318 /* -----------------------------------------------------------------------------
2319    Deleting threads
2320
2321    This is used for interruption (^C) and forking, and corresponds to
2322    raising an exception but without letting the thread catch the
2323    exception.
2324    -------------------------------------------------------------------------- */
2325
2326 static void 
2327 deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
2328 {
2329     // NOTE: must only be called on a TSO that we have exclusive
2330     // access to, because we will call throwToSingleThreaded() below.
2331     // The TSO must be on the run queue of the Capability we own, or 
2332     // we must own all Capabilities.
2333
2334     if (tso->why_blocked != BlockedOnCCall &&
2335         tso->why_blocked != BlockedOnCCall_Interruptible) {
2336         throwToSingleThreaded(tso->cap,tso,NULL);
2337     }
2338 }
2339
2340 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2341 static void 
2342 deleteThread_(Capability *cap, StgTSO *tso)
2343 { // for forkProcess only:
2344   // like deleteThread(), but we delete threads in foreign calls, too.
2345
2346     if (tso->why_blocked == BlockedOnCCall ||
2347         tso->why_blocked == BlockedOnCCall_Interruptible) {
2348         tso->what_next = ThreadKilled;
2349         appendToRunQueue(tso->cap, tso);
2350     } else {
2351         deleteThread(cap,tso);
2352     }
2353 }
2354 #endif
2355
2356 /* -----------------------------------------------------------------------------
2357    raiseExceptionHelper
2358    
2359    This function is called by the raise# primitve, just so that we can
2360    move some of the tricky bits of raising an exception from C-- into
2361    C.  Who knows, it might be a useful re-useable thing here too.
2362    -------------------------------------------------------------------------- */
2363
2364 StgWord
2365 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2366 {
2367     Capability *cap = regTableToCapability(reg);
2368     StgThunk *raise_closure = NULL;
2369     StgPtr p, next;
2370     StgRetInfoTable *info;
2371     //
2372     // This closure represents the expression 'raise# E' where E
2373     // is the exception raise.  It is used to overwrite all the
2374     // thunks which are currently under evaluataion.
2375     //
2376
2377     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2378     // LDV profiling: stg_raise_info has THUNK as its closure
2379     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2380     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2381     // 1 does not cause any problem unless profiling is performed.
2382     // However, when LDV profiling goes on, we need to linearly scan
2383     // small object pool, where raise_closure is stored, so we should
2384     // use MIN_UPD_SIZE.
2385     //
2386     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2387     //                                 sizeofW(StgClosure)+1);
2388     //
2389
2390     //
2391     // Walk up the stack, looking for the catch frame.  On the way,
2392     // we update any closures pointed to from update frames with the
2393     // raise closure that we just built.
2394     //
2395     p = tso->sp;
2396     while(1) {
2397         info = get_ret_itbl((StgClosure *)p);
2398         next = p + stack_frame_sizeW((StgClosure *)p);
2399         switch (info->i.type) {
2400             
2401         case UPDATE_FRAME:
2402             // Only create raise_closure if we need to.
2403             if (raise_closure == NULL) {
2404                 raise_closure = 
2405                     (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
2406                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2407                 raise_closure->payload[0] = exception;
2408             }
2409             updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
2410                         (StgClosure *)raise_closure);
2411             p = next;
2412             continue;
2413
2414         case ATOMICALLY_FRAME:
2415             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2416             tso->sp = p;
2417             return ATOMICALLY_FRAME;
2418             
2419         case CATCH_FRAME:
2420             tso->sp = p;
2421             return CATCH_FRAME;
2422
2423         case CATCH_STM_FRAME:
2424             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2425             tso->sp = p;
2426             return CATCH_STM_FRAME;
2427             
2428         case STOP_FRAME:
2429             tso->sp = p;
2430             return STOP_FRAME;
2431
2432         case CATCH_RETRY_FRAME:
2433         default:
2434             p = next; 
2435             continue;
2436         }
2437     }
2438 }
2439
2440
2441 /* -----------------------------------------------------------------------------
2442    findRetryFrameHelper
2443
2444    This function is called by the retry# primitive.  It traverses the stack
2445    leaving tso->sp referring to the frame which should handle the retry.  
2446
2447    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2448    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2449
2450    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2451    create) because retries are not considered to be exceptions, despite the
2452    similar implementation.
2453
2454    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2455    not be created within memory transactions.
2456    -------------------------------------------------------------------------- */
2457
2458 StgWord
2459 findRetryFrameHelper (StgTSO *tso)
2460 {
2461   StgPtr           p, next;
2462   StgRetInfoTable *info;
2463
2464   p = tso -> sp;
2465   while (1) {
2466     info = get_ret_itbl((StgClosure *)p);
2467     next = p + stack_frame_sizeW((StgClosure *)p);
2468     switch (info->i.type) {
2469       
2470     case ATOMICALLY_FRAME:
2471         debugTrace(DEBUG_stm,
2472                    "found ATOMICALLY_FRAME at %p during retry", p);
2473         tso->sp = p;
2474         return ATOMICALLY_FRAME;
2475       
2476     case CATCH_RETRY_FRAME:
2477         debugTrace(DEBUG_stm,
2478                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2479         tso->sp = p;
2480         return CATCH_RETRY_FRAME;
2481       
2482     case CATCH_STM_FRAME: {
2483         StgTRecHeader *trec = tso -> trec;
2484         StgTRecHeader *outer = trec -> enclosing_trec;
2485         debugTrace(DEBUG_stm,
2486                    "found CATCH_STM_FRAME at %p during retry", p);
2487         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2488         stmAbortTransaction(tso -> cap, trec);
2489         stmFreeAbortedTRec(tso -> cap, trec);
2490         tso -> trec = outer;
2491         p = next; 
2492         continue;
2493     }
2494       
2495
2496     default:
2497       ASSERT(info->i.type != CATCH_FRAME);
2498       ASSERT(info->i.type != STOP_FRAME);
2499       p = next; 
2500       continue;
2501     }
2502   }
2503 }
2504
2505 /* -----------------------------------------------------------------------------
2506    resurrectThreads is called after garbage collection on the list of
2507    threads found to be garbage.  Each of these threads will be woken
2508    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2509    on an MVar, or NonTermination if the thread was blocked on a Black
2510    Hole.
2511
2512    Locks: assumes we hold *all* the capabilities.
2513    -------------------------------------------------------------------------- */
2514
2515 void
2516 resurrectThreads (StgTSO *threads)
2517 {
2518     StgTSO *tso, *next;
2519     Capability *cap;
2520     generation *gen;
2521
2522     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2523         next = tso->global_link;
2524
2525         gen = Bdescr((P_)tso)->gen;
2526         tso->global_link = gen->threads;
2527         gen->threads = tso;
2528
2529         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2530         
2531         // Wake up the thread on the Capability it was last on
2532         cap = tso->cap;
2533         
2534         switch (tso->why_blocked) {
2535         case BlockedOnMVar:
2536             /* Called by GC - sched_mutex lock is currently held. */
2537             throwToSingleThreaded(cap, tso,
2538                                   (StgClosure *)blockedIndefinitelyOnMVar_closure);
2539             break;
2540         case BlockedOnBlackHole:
2541             throwToSingleThreaded(cap, tso,
2542                                   (StgClosure *)nonTermination_closure);
2543             break;
2544         case BlockedOnSTM:
2545             throwToSingleThreaded(cap, tso,
2546                                   (StgClosure *)blockedIndefinitelyOnSTM_closure);
2547             break;
2548         case NotBlocked:
2549             /* This might happen if the thread was blocked on a black hole
2550              * belonging to a thread that we've just woken up (raiseAsync
2551              * can wake up threads, remember...).
2552              */
2553             continue;
2554         case BlockedOnMsgThrowTo:
2555             // This can happen if the target is masking, blocks on a
2556             // black hole, and then is found to be unreachable.  In
2557             // this case, we want to let the target wake up and carry
2558             // on, and do nothing to this thread.
2559             continue;
2560         default:
2561             barf("resurrectThreads: thread blocked in a strange way: %d",
2562                  tso->why_blocked);
2563         }
2564     }
2565 }