1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
23 #include "RtsSignals.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
32 #include "Proftimer.h"
37 /* PARALLEL_HASKELL includes go here */
40 #include "Capability.h"
42 #include "AwaitEvent.h"
43 #if defined(mingw32_HOST_OS)
44 #include "win32/IOManager.h"
47 #include "RaiseAsync.h"
49 #include "ThrIOManager.h"
51 #ifdef HAVE_SYS_TYPES_H
52 #include <sys/types.h>
66 // Turn off inlining when debugging - it obfuscates things
69 # define STATIC_INLINE static
72 /* -----------------------------------------------------------------------------
74 * -------------------------------------------------------------------------- */
76 #if !defined(THREADED_RTS)
77 // Blocked/sleeping thrads
78 StgTSO *blocked_queue_hd = NULL;
79 StgTSO *blocked_queue_tl = NULL;
80 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
83 /* Threads blocked on blackholes.
84 * LOCK: sched_mutex+capability, or all capabilities
86 StgTSO *blackhole_queue = NULL;
88 /* The blackhole_queue should be checked for threads to wake up. See
89 * Schedule.h for more thorough comment.
90 * LOCK: none (doesn't matter if we miss an update)
92 rtsBool blackholes_need_checking = rtsFalse;
94 /* Set to true when the latest garbage collection failed to reclaim
95 * enough space, and the runtime should proceed to shut itself down in
96 * an orderly fashion (emitting profiling info etc.)
98 rtsBool heap_overflow = rtsFalse;
100 /* flag that tracks whether we have done any execution in this time slice.
101 * LOCK: currently none, perhaps we should lock (but needs to be
102 * updated in the fast path of the scheduler).
104 * NB. must be StgWord, we do xchg() on it.
106 volatile StgWord recent_activity = ACTIVITY_YES;
108 /* if this flag is set as well, give up execution
109 * LOCK: none (changes monotonically)
111 volatile StgWord sched_state = SCHED_RUNNING;
113 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
114 * exists - earlier gccs apparently didn't.
120 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
121 * in an MT setting, needed to signal that a worker thread shouldn't hang around
122 * in the scheduler when it is out of work.
124 rtsBool shutting_down_scheduler = rtsFalse;
127 * This mutex protects most of the global scheduler data in
128 * the THREADED_RTS runtime.
130 #if defined(THREADED_RTS)
134 #if !defined(mingw32_HOST_OS)
135 #define FORKPROCESS_PRIMOP_SUPPORTED
138 /* -----------------------------------------------------------------------------
139 * static function prototypes
140 * -------------------------------------------------------------------------- */
142 static Capability *schedule (Capability *initialCapability, Task *task);
145 // These function all encapsulate parts of the scheduler loop, and are
146 // abstracted only to make the structure and control flow of the
147 // scheduler clearer.
149 static void schedulePreLoop (void);
150 static void scheduleFindWork (Capability *cap);
151 #if defined(THREADED_RTS)
152 static void scheduleYield (Capability **pcap, Task *task);
154 static void scheduleStartSignalHandlers (Capability *cap);
155 static void scheduleCheckBlockedThreads (Capability *cap);
156 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
157 static void scheduleCheckBlackHoles (Capability *cap);
158 static void scheduleDetectDeadlock (Capability *cap, Task *task);
159 static void schedulePushWork(Capability *cap, Task *task);
160 #if defined(PARALLEL_HASKELL)
161 static rtsBool scheduleGetRemoteWork(Capability *cap);
162 static void scheduleSendPendingMessages(void);
164 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
165 static void scheduleActivateSpark(Capability *cap);
167 static void schedulePostRunThread(Capability *cap, StgTSO *t);
168 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
169 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
171 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
172 nat prev_what_next );
173 static void scheduleHandleThreadBlocked( StgTSO *t );
174 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
176 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
177 static Capability *scheduleDoGC(Capability *cap, Task *task,
178 rtsBool force_major);
180 static rtsBool checkBlackHoles(Capability *cap);
182 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
183 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
185 static void deleteThread (Capability *cap, StgTSO *tso);
186 static void deleteAllThreads (Capability *cap);
188 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
189 static void deleteThread_(Capability *cap, StgTSO *tso);
193 static char *whatNext_strs[] = {
203 /* -----------------------------------------------------------------------------
204 * Putting a thread on the run queue: different scheduling policies
205 * -------------------------------------------------------------------------- */
208 addToRunQueue( Capability *cap, StgTSO *t )
210 #if defined(PARALLEL_HASKELL)
211 if (RtsFlags.ParFlags.doFairScheduling) {
212 // this does round-robin scheduling; good for concurrency
213 appendToRunQueue(cap,t);
215 // this does unfair scheduling; good for parallelism
216 pushOnRunQueue(cap,t);
219 // this does round-robin scheduling; good for concurrency
220 appendToRunQueue(cap,t);
224 /* ---------------------------------------------------------------------------
225 Main scheduling loop.
227 We use round-robin scheduling, each thread returning to the
228 scheduler loop when one of these conditions is detected:
231 * timer expires (thread yields)
237 In a GranSim setup this loop iterates over the global event queue.
238 This revolves around the global event queue, which determines what
239 to do next. Therefore, it's more complicated than either the
240 concurrent or the parallel (GUM) setup.
241 This version has been entirely removed (JB 2008/08).
244 GUM iterates over incoming messages.
245 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
246 and sends out a fish whenever it has nothing to do; in-between
247 doing the actual reductions (shared code below) it processes the
248 incoming messages and deals with delayed operations
249 (see PendingFetches).
250 This is not the ugliest code you could imagine, but it's bloody close.
252 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
253 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
254 as well as future GUM versions. This file has been refurbished to
255 only contain valid code, which is however incomplete, refers to
256 invalid includes etc.
258 ------------------------------------------------------------------------ */
261 schedule (Capability *initialCapability, Task *task)
265 StgThreadReturnCode ret;
266 #if defined(PARALLEL_HASKELL)
267 rtsBool receivedFinish = rtsFalse;
271 #if defined(THREADED_RTS)
272 rtsBool first = rtsTrue;
275 cap = initialCapability;
277 // Pre-condition: this task owns initialCapability.
278 // The sched_mutex is *NOT* held
279 // NB. on return, we still hold a capability.
281 debugTrace (DEBUG_sched,
282 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
283 task, initialCapability);
285 if (running_finalizers) {
286 errorBelch("error: a C finalizer called back into Haskell.\n"
287 " use Foreign.Concurrent.newForeignPtr for Haskell finalizers.");
288 stg_exit(EXIT_FAILURE);
293 // -----------------------------------------------------------
294 // Scheduler loop starts here:
296 #if defined(PARALLEL_HASKELL)
297 #define TERMINATION_CONDITION (!receivedFinish)
299 #define TERMINATION_CONDITION rtsTrue
302 while (TERMINATION_CONDITION) {
304 // Check whether we have re-entered the RTS from Haskell without
305 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
307 if (cap->in_haskell) {
308 errorBelch("schedule: re-entered unsafely.\n"
309 " Perhaps a 'foreign import unsafe' should be 'safe'?");
310 stg_exit(EXIT_FAILURE);
313 // The interruption / shutdown sequence.
315 // In order to cleanly shut down the runtime, we want to:
316 // * make sure that all main threads return to their callers
317 // with the state 'Interrupted'.
318 // * clean up all OS threads assocated with the runtime
319 // * free all memory etc.
321 // So the sequence for ^C goes like this:
323 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
324 // arranges for some Capability to wake up
326 // * all threads in the system are halted, and the zombies are
327 // placed on the run queue for cleaning up. We acquire all
328 // the capabilities in order to delete the threads, this is
329 // done by scheduleDoGC() for convenience (because GC already
330 // needs to acquire all the capabilities). We can't kill
331 // threads involved in foreign calls.
333 // * somebody calls shutdownHaskell(), which calls exitScheduler()
335 // * sched_state := SCHED_SHUTTING_DOWN
337 // * all workers exit when the run queue on their capability
338 // drains. All main threads will also exit when their TSO
339 // reaches the head of the run queue and they can return.
341 // * eventually all Capabilities will shut down, and the RTS can
344 // * We might be left with threads blocked in foreign calls,
345 // we should really attempt to kill these somehow (TODO);
347 switch (sched_state) {
350 case SCHED_INTERRUPTING:
351 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
352 #if defined(THREADED_RTS)
353 discardSparksCap(cap);
355 /* scheduleDoGC() deletes all the threads */
356 cap = scheduleDoGC(cap,task,rtsFalse);
358 // after scheduleDoGC(), we must be shutting down. Either some
359 // other Capability did the final GC, or we did it above,
360 // either way we can fall through to the SCHED_SHUTTING_DOWN
362 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
365 case SCHED_SHUTTING_DOWN:
366 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
367 // If we are a worker, just exit. If we're a bound thread
368 // then we will exit below when we've removed our TSO from
370 if (task->tso == NULL && emptyRunQueue(cap)) {
375 barf("sched_state: %d", sched_state);
378 scheduleFindWork(cap);
380 /* work pushing, currently relevant only for THREADED_RTS:
381 (pushes threads, wakes up idle capabilities for stealing) */
382 schedulePushWork(cap,task);
384 #if defined(PARALLEL_HASKELL)
385 /* since we perform a blocking receive and continue otherwise,
386 either we never reach here or we definitely have work! */
387 // from here: non-empty run queue
388 ASSERT(!emptyRunQueue(cap));
390 if (PacketsWaiting()) { /* now process incoming messages, if any
393 CAUTION: scheduleGetRemoteWork called
394 above, waits for messages as well! */
395 processMessages(cap, &receivedFinish);
397 #endif // PARALLEL_HASKELL: non-empty run queue!
399 scheduleDetectDeadlock(cap,task);
401 #if defined(THREADED_RTS)
402 cap = task->cap; // reload cap, it might have changed
405 // Normally, the only way we can get here with no threads to
406 // run is if a keyboard interrupt received during
407 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
408 // Additionally, it is not fatal for the
409 // threaded RTS to reach here with no threads to run.
411 // win32: might be here due to awaitEvent() being abandoned
412 // as a result of a console event having been delivered.
414 #if defined(THREADED_RTS)
418 // // don't yield the first time, we want a chance to run this
419 // // thread for a bit, even if there are others banging at the
422 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
426 scheduleYield(&cap,task);
427 if (emptyRunQueue(cap)) continue; // look for work again
430 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
431 if ( emptyRunQueue(cap) ) {
432 ASSERT(sched_state >= SCHED_INTERRUPTING);
437 // Get a thread to run
439 t = popRunQueue(cap);
441 // Sanity check the thread we're about to run. This can be
442 // expensive if there is lots of thread switching going on...
443 IF_DEBUG(sanity,checkTSO(t));
445 #if defined(THREADED_RTS)
446 // Check whether we can run this thread in the current task.
447 // If not, we have to pass our capability to the right task.
449 Task *bound = t->bound;
453 debugTrace(DEBUG_sched,
454 "### Running thread %lu in bound thread", (unsigned long)t->id);
455 // yes, the Haskell thread is bound to the current native thread
457 debugTrace(DEBUG_sched,
458 "### thread %lu bound to another OS thread", (unsigned long)t->id);
459 // no, bound to a different Haskell thread: pass to that thread
460 pushOnRunQueue(cap,t);
464 // The thread we want to run is unbound.
466 debugTrace(DEBUG_sched,
467 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
468 // no, the current native thread is bound to a different
469 // Haskell thread, so pass it to any worker thread
470 pushOnRunQueue(cap,t);
477 // If we're shutting down, and this thread has not yet been
478 // killed, kill it now. This sometimes happens when a finalizer
479 // thread is created by the final GC, or a thread previously
480 // in a foreign call returns.
481 if (sched_state >= SCHED_INTERRUPTING &&
482 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
486 /* context switches are initiated by the timer signal, unless
487 * the user specified "context switch as often as possible", with
490 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
491 && !emptyThreadQueues(cap)) {
492 cap->context_switch = 1;
497 // CurrentTSO is the thread to run. t might be different if we
498 // loop back to run_thread, so make sure to set CurrentTSO after
500 cap->r.rCurrentTSO = t;
502 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
503 (long)t->id, whatNext_strs[t->what_next]);
505 startHeapProfTimer();
507 // Check for exceptions blocked on this thread
508 maybePerformBlockedException (cap, t);
510 // ----------------------------------------------------------------------
511 // Run the current thread
513 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
514 ASSERT(t->cap == cap);
515 ASSERT(t->bound ? t->bound->cap == cap : 1);
517 prev_what_next = t->what_next;
519 errno = t->saved_errno;
521 SetLastError(t->saved_winerror);
524 cap->in_haskell = rtsTrue;
528 #if defined(THREADED_RTS)
529 if (recent_activity == ACTIVITY_DONE_GC) {
530 // ACTIVITY_DONE_GC means we turned off the timer signal to
531 // conserve power (see #1623). Re-enable it here.
533 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
534 if (prev == ACTIVITY_DONE_GC) {
538 recent_activity = ACTIVITY_YES;
542 switch (prev_what_next) {
546 /* Thread already finished, return to scheduler. */
547 ret = ThreadFinished;
553 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
554 cap = regTableToCapability(r);
559 case ThreadInterpret:
560 cap = interpretBCO(cap);
565 barf("schedule: invalid what_next field");
568 cap->in_haskell = rtsFalse;
570 // The TSO might have moved, eg. if it re-entered the RTS and a GC
571 // happened. So find the new location:
572 t = cap->r.rCurrentTSO;
574 // We have run some Haskell code: there might be blackhole-blocked
575 // threads to wake up now.
576 // Lock-free test here should be ok, we're just setting a flag.
577 if ( blackhole_queue != END_TSO_QUEUE ) {
578 blackholes_need_checking = rtsTrue;
581 // And save the current errno in this thread.
582 // XXX: possibly bogus for SMP because this thread might already
583 // be running again, see code below.
584 t->saved_errno = errno;
586 // Similarly for Windows error code
587 t->saved_winerror = GetLastError();
590 #if defined(THREADED_RTS)
591 // If ret is ThreadBlocked, and this Task is bound to the TSO that
592 // blocked, we are in limbo - the TSO is now owned by whatever it
593 // is blocked on, and may in fact already have been woken up,
594 // perhaps even on a different Capability. It may be the case
595 // that task->cap != cap. We better yield this Capability
596 // immediately and return to normaility.
597 if (ret == ThreadBlocked) {
598 debugTrace(DEBUG_sched,
599 "--<< thread %lu (%s) stopped: blocked",
600 (unsigned long)t->id, whatNext_strs[t->what_next]);
605 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
606 ASSERT(t->cap == cap);
608 // ----------------------------------------------------------------------
610 // Costs for the scheduler are assigned to CCS_SYSTEM
612 #if defined(PROFILING)
616 schedulePostRunThread(cap,t);
618 t = threadStackUnderflow(task,t);
620 ready_to_gc = rtsFalse;
624 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
628 scheduleHandleStackOverflow(cap,task,t);
632 if (scheduleHandleYield(cap, t, prev_what_next)) {
633 // shortcut for switching between compiler/interpreter:
639 scheduleHandleThreadBlocked(t);
643 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
644 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
648 barf("schedule: invalid thread return code %d", (int)ret);
651 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
652 cap = scheduleDoGC(cap,task,rtsFalse);
654 } /* end of while() */
657 /* ----------------------------------------------------------------------------
658 * Setting up the scheduler loop
659 * ------------------------------------------------------------------------- */
662 schedulePreLoop(void)
664 // initialisation for scheduler - what cannot go into initScheduler()
667 /* -----------------------------------------------------------------------------
670 * Search for work to do, and handle messages from elsewhere.
671 * -------------------------------------------------------------------------- */
674 scheduleFindWork (Capability *cap)
676 scheduleStartSignalHandlers(cap);
678 // Only check the black holes here if we've nothing else to do.
679 // During normal execution, the black hole list only gets checked
680 // at GC time, to avoid repeatedly traversing this possibly long
681 // list each time around the scheduler.
682 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
684 scheduleCheckWakeupThreads(cap);
686 scheduleCheckBlockedThreads(cap);
688 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
689 if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
692 #if defined(PARALLEL_HASKELL)
693 // if messages have been buffered...
694 scheduleSendPendingMessages();
697 #if defined(PARALLEL_HASKELL)
698 if (emptyRunQueue(cap)) {
699 receivedFinish = scheduleGetRemoteWork(cap);
700 continue; // a new round, (hopefully) with new work
702 in GUM, this a) sends out a FISH and returns IF no fish is
704 b) (blocking) awaits and receives messages
706 in Eden, this is only the blocking receive, as b) in GUM.
712 #if defined(THREADED_RTS)
713 STATIC_INLINE rtsBool
714 shouldYieldCapability (Capability *cap, Task *task)
716 // we need to yield this capability to someone else if..
717 // - another thread is initiating a GC
718 // - another Task is returning from a foreign call
719 // - the thread at the head of the run queue cannot be run
720 // by this Task (it is bound to another Task, or it is unbound
721 // and this task it bound).
722 return (waiting_for_gc ||
723 cap->returning_tasks_hd != NULL ||
724 (!emptyRunQueue(cap) && (task->tso == NULL
725 ? cap->run_queue_hd->bound != NULL
726 : cap->run_queue_hd->bound != task)));
729 // This is the single place where a Task goes to sleep. There are
730 // two reasons it might need to sleep:
731 // - there are no threads to run
732 // - we need to yield this Capability to someone else
733 // (see shouldYieldCapability())
735 // Careful: the scheduler loop is quite delicate. Make sure you run
736 // the tests in testsuite/concurrent (all ways) after modifying this,
737 // and also check the benchmarks in nofib/parallel for regressions.
740 scheduleYield (Capability **pcap, Task *task)
742 Capability *cap = *pcap;
744 // if we have work, and we don't need to give up the Capability, continue.
745 if (!shouldYieldCapability(cap,task) &&
746 (!emptyRunQueue(cap) ||
747 !emptyWakeupQueue(cap) ||
748 blackholes_need_checking ||
749 sched_state >= SCHED_INTERRUPTING))
752 // otherwise yield (sleep), and keep yielding if necessary.
754 yieldCapability(&cap,task);
756 while (shouldYieldCapability(cap,task));
758 // note there may still be no threads on the run queue at this
759 // point, the caller has to check.
766 /* -----------------------------------------------------------------------------
769 * Push work to other Capabilities if we have some.
770 * -------------------------------------------------------------------------- */
773 schedulePushWork(Capability *cap USED_IF_THREADS,
774 Task *task USED_IF_THREADS)
776 /* following code not for PARALLEL_HASKELL. I kept the call general,
777 future GUM versions might use pushing in a distributed setup */
778 #if defined(THREADED_RTS)
780 Capability *free_caps[n_capabilities], *cap0;
783 // migration can be turned off with +RTS -qg
784 if (!RtsFlags.ParFlags.migrate) return;
786 // Check whether we have more threads on our run queue, or sparks
787 // in our pool, that we could hand to another Capability.
788 if (cap->run_queue_hd == END_TSO_QUEUE) {
789 if (sparkPoolSizeCap(cap) < 2) return;
791 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
792 sparkPoolSizeCap(cap) < 1) return;
795 // First grab as many free Capabilities as we can.
796 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
797 cap0 = &capabilities[i];
798 if (cap != cap0 && tryGrabCapability(cap0,task)) {
799 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
800 // it already has some work, we just grabbed it at
801 // the wrong moment. Or maybe it's deadlocked!
802 releaseCapability(cap0);
804 free_caps[n_free_caps++] = cap0;
809 // we now have n_free_caps free capabilities stashed in
810 // free_caps[]. Share our run queue equally with them. This is
811 // probably the simplest thing we could do; improvements we might
812 // want to do include:
814 // - giving high priority to moving relatively new threads, on
815 // the gournds that they haven't had time to build up a
816 // working set in the cache on this CPU/Capability.
818 // - giving low priority to moving long-lived threads
820 if (n_free_caps > 0) {
821 StgTSO *prev, *t, *next;
822 rtsBool pushed_to_all;
824 debugTrace(DEBUG_sched,
825 "cap %d: %s and %d free capabilities, sharing...",
827 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
828 "excess threads on run queue":"sparks to share (>=2)",
832 pushed_to_all = rtsFalse;
834 if (cap->run_queue_hd != END_TSO_QUEUE) {
835 prev = cap->run_queue_hd;
837 prev->_link = END_TSO_QUEUE;
838 for (; t != END_TSO_QUEUE; t = next) {
840 t->_link = END_TSO_QUEUE;
841 if (t->what_next == ThreadRelocated
842 || t->bound == task // don't move my bound thread
843 || tsoLocked(t)) { // don't move a locked thread
844 setTSOLink(cap, prev, t);
846 } else if (i == n_free_caps) {
847 pushed_to_all = rtsTrue;
850 setTSOLink(cap, prev, t);
853 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
854 appendToRunQueue(free_caps[i],t);
855 if (t->bound) { t->bound->cap = free_caps[i]; }
856 t->cap = free_caps[i];
860 cap->run_queue_tl = prev;
864 /* JB I left this code in place, it would work but is not necessary */
866 // If there are some free capabilities that we didn't push any
867 // threads to, then try to push a spark to each one.
868 if (!pushed_to_all) {
870 // i is the next free capability to push to
871 for (; i < n_free_caps; i++) {
872 if (emptySparkPoolCap(free_caps[i])) {
873 spark = tryStealSpark(cap->sparks);
875 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
876 newSpark(&(free_caps[i]->r), spark);
881 #endif /* SPARK_PUSHING */
883 // release the capabilities
884 for (i = 0; i < n_free_caps; i++) {
885 task->cap = free_caps[i];
886 releaseAndWakeupCapability(free_caps[i]);
889 task->cap = cap; // reset to point to our Capability.
891 #endif /* THREADED_RTS */
895 /* ----------------------------------------------------------------------------
896 * Start any pending signal handlers
897 * ------------------------------------------------------------------------- */
899 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
901 scheduleStartSignalHandlers(Capability *cap)
903 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
904 // safe outside the lock
905 startSignalHandlers(cap);
910 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
915 /* ----------------------------------------------------------------------------
916 * Check for blocked threads that can be woken up.
917 * ------------------------------------------------------------------------- */
920 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
922 #if !defined(THREADED_RTS)
924 // Check whether any waiting threads need to be woken up. If the
925 // run queue is empty, and there are no other tasks running, we
926 // can wait indefinitely for something to happen.
928 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
930 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
936 /* ----------------------------------------------------------------------------
937 * Check for threads woken up by other Capabilities
938 * ------------------------------------------------------------------------- */
941 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
943 #if defined(THREADED_RTS)
944 // Any threads that were woken up by other Capabilities get
945 // appended to our run queue.
946 if (!emptyWakeupQueue(cap)) {
947 ACQUIRE_LOCK(&cap->lock);
948 if (emptyRunQueue(cap)) {
949 cap->run_queue_hd = cap->wakeup_queue_hd;
950 cap->run_queue_tl = cap->wakeup_queue_tl;
952 setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
953 cap->run_queue_tl = cap->wakeup_queue_tl;
955 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
956 RELEASE_LOCK(&cap->lock);
961 /* ----------------------------------------------------------------------------
962 * Check for threads blocked on BLACKHOLEs that can be woken up
963 * ------------------------------------------------------------------------- */
965 scheduleCheckBlackHoles (Capability *cap)
967 if ( blackholes_need_checking ) // check without the lock first
969 ACQUIRE_LOCK(&sched_mutex);
970 if ( blackholes_need_checking ) {
971 blackholes_need_checking = rtsFalse;
972 // important that we reset the flag *before* checking the
973 // blackhole queue, otherwise we could get deadlock. This
974 // happens as follows: we wake up a thread that
975 // immediately runs on another Capability, blocks on a
976 // blackhole, and then we reset the blackholes_need_checking flag.
977 checkBlackHoles(cap);
979 RELEASE_LOCK(&sched_mutex);
983 /* ----------------------------------------------------------------------------
984 * Detect deadlock conditions and attempt to resolve them.
985 * ------------------------------------------------------------------------- */
988 scheduleDetectDeadlock (Capability *cap, Task *task)
991 #if defined(PARALLEL_HASKELL)
992 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
997 * Detect deadlock: when we have no threads to run, there are no
998 * threads blocked, waiting for I/O, or sleeping, and all the
999 * other tasks are waiting for work, we must have a deadlock of
1002 if ( emptyThreadQueues(cap) )
1004 #if defined(THREADED_RTS)
1006 * In the threaded RTS, we only check for deadlock if there
1007 * has been no activity in a complete timeslice. This means
1008 * we won't eagerly start a full GC just because we don't have
1009 * any threads to run currently.
1011 if (recent_activity != ACTIVITY_INACTIVE) return;
1014 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
1016 // Garbage collection can release some new threads due to
1017 // either (a) finalizers or (b) threads resurrected because
1018 // they are unreachable and will therefore be sent an
1019 // exception. Any threads thus released will be immediately
1021 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
1022 // when force_major == rtsTrue. scheduleDoGC sets
1023 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
1026 if ( !emptyRunQueue(cap) ) return;
1028 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1029 /* If we have user-installed signal handlers, then wait
1030 * for signals to arrive rather then bombing out with a
1033 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1034 debugTrace(DEBUG_sched,
1035 "still deadlocked, waiting for signals...");
1039 if (signals_pending()) {
1040 startSignalHandlers(cap);
1043 // either we have threads to run, or we were interrupted:
1044 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1050 #if !defined(THREADED_RTS)
1051 /* Probably a real deadlock. Send the current main thread the
1052 * Deadlock exception.
1055 switch (task->tso->why_blocked) {
1057 case BlockedOnBlackHole:
1058 case BlockedOnException:
1060 throwToSingleThreaded(cap, task->tso,
1061 (StgClosure *)nonTermination_closure);
1064 barf("deadlock: main thread blocked in a strange way");
1073 /* ----------------------------------------------------------------------------
1074 * Send pending messages (PARALLEL_HASKELL only)
1075 * ------------------------------------------------------------------------- */
1077 #if defined(PARALLEL_HASKELL)
1079 scheduleSendPendingMessages(void)
1082 # if defined(PAR) // global Mem.Mgmt., omit for now
1083 if (PendingFetches != END_BF_QUEUE) {
1088 if (RtsFlags.ParFlags.BufferTime) {
1089 // if we use message buffering, we must send away all message
1090 // packets which have become too old...
1096 /* ----------------------------------------------------------------------------
1097 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1098 * ------------------------------------------------------------------------- */
1100 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1102 scheduleActivateSpark(Capability *cap)
1106 createSparkThread(cap);
1107 debugTrace(DEBUG_sched, "creating a spark thread");
1110 #endif // PARALLEL_HASKELL || THREADED_RTS
1112 /* ----------------------------------------------------------------------------
1113 * Get work from a remote node (PARALLEL_HASKELL only)
1114 * ------------------------------------------------------------------------- */
1116 #if defined(PARALLEL_HASKELL)
1117 static rtsBool /* return value used in PARALLEL_HASKELL only */
1118 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1120 #if defined(PARALLEL_HASKELL)
1121 rtsBool receivedFinish = rtsFalse;
1123 // idle() , i.e. send all buffers, wait for work
1124 if (RtsFlags.ParFlags.BufferTime) {
1125 IF_PAR_DEBUG(verbose,
1126 debugBelch("...send all pending data,"));
1129 for (i=1; i<=nPEs; i++)
1130 sendImmediately(i); // send all messages away immediately
1134 /* this would be the place for fishing in GUM...
1136 if (no-earlier-fish-around)
1137 sendFish(choosePe());
1140 // Eden:just look for incoming messages (blocking receive)
1141 IF_PAR_DEBUG(verbose,
1142 debugBelch("...wait for incoming messages...\n"));
1143 processMessages(cap, &receivedFinish); // blocking receive...
1146 return receivedFinish;
1147 // reenter scheduling look after having received something
1149 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1151 return rtsFalse; /* return value unused in THREADED_RTS */
1153 #endif /* PARALLEL_HASKELL */
1155 #endif // PARALLEL_HASKELL || THREADED_RTS
1157 /* ----------------------------------------------------------------------------
1158 * After running a thread...
1159 * ------------------------------------------------------------------------- */
1162 schedulePostRunThread (Capability *cap, StgTSO *t)
1164 // We have to be able to catch transactions that are in an
1165 // infinite loop as a result of seeing an inconsistent view of
1169 // [a,b] <- mapM readTVar [ta,tb]
1170 // when (a == b) loop
1172 // and a is never equal to b given a consistent view of memory.
1174 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1175 if (!stmValidateNestOfTransactions (t -> trec)) {
1176 debugTrace(DEBUG_sched | DEBUG_stm,
1177 "trec %p found wasting its time", t);
1179 // strip the stack back to the
1180 // ATOMICALLY_FRAME, aborting the (nested)
1181 // transaction, and saving the stack of any
1182 // partially-evaluated thunks on the heap.
1183 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1185 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1189 /* some statistics gathering in the parallel case */
1192 /* -----------------------------------------------------------------------------
1193 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1194 * -------------------------------------------------------------------------- */
1197 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1199 // did the task ask for a large block?
1200 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1201 // if so, get one and push it on the front of the nursery.
1205 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1207 debugTrace(DEBUG_sched,
1208 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1209 (long)t->id, whatNext_strs[t->what_next], blocks);
1211 // don't do this if the nursery is (nearly) full, we'll GC first.
1212 if (cap->r.rCurrentNursery->link != NULL ||
1213 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1214 // if the nursery has only one block.
1217 bd = allocGroup( blocks );
1219 cap->r.rNursery->n_blocks += blocks;
1221 // link the new group into the list
1222 bd->link = cap->r.rCurrentNursery;
1223 bd->u.back = cap->r.rCurrentNursery->u.back;
1224 if (cap->r.rCurrentNursery->u.back != NULL) {
1225 cap->r.rCurrentNursery->u.back->link = bd;
1227 #if !defined(THREADED_RTS)
1228 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1229 g0s0 == cap->r.rNursery);
1231 cap->r.rNursery->blocks = bd;
1233 cap->r.rCurrentNursery->u.back = bd;
1235 // initialise it as a nursery block. We initialise the
1236 // step, gen_no, and flags field of *every* sub-block in
1237 // this large block, because this is easier than making
1238 // sure that we always find the block head of a large
1239 // block whenever we call Bdescr() (eg. evacuate() and
1240 // isAlive() in the GC would both have to do this, at
1244 for (x = bd; x < bd + blocks; x++) {
1245 x->step = cap->r.rNursery;
1251 // This assert can be a killer if the app is doing lots
1252 // of large block allocations.
1253 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1255 // now update the nursery to point to the new block
1256 cap->r.rCurrentNursery = bd;
1258 // we might be unlucky and have another thread get on the
1259 // run queue before us and steal the large block, but in that
1260 // case the thread will just end up requesting another large
1262 pushOnRunQueue(cap,t);
1263 return rtsFalse; /* not actually GC'ing */
1267 debugTrace(DEBUG_sched,
1268 "--<< thread %ld (%s) stopped: HeapOverflow",
1269 (long)t->id, whatNext_strs[t->what_next]);
1271 if (cap->r.rHpLim == NULL || cap->context_switch) {
1272 // Sometimes we miss a context switch, e.g. when calling
1273 // primitives in a tight loop, MAYBE_GC() doesn't check the
1274 // context switch flag, and we end up waiting for a GC.
1275 // See #1984, and concurrent/should_run/1984
1276 cap->context_switch = 0;
1277 addToRunQueue(cap,t);
1279 pushOnRunQueue(cap,t);
1282 /* actual GC is done at the end of the while loop in schedule() */
1285 /* -----------------------------------------------------------------------------
1286 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1287 * -------------------------------------------------------------------------- */
1290 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1292 debugTrace (DEBUG_sched,
1293 "--<< thread %ld (%s) stopped, StackOverflow",
1294 (long)t->id, whatNext_strs[t->what_next]);
1296 /* just adjust the stack for this thread, then pop it back
1300 /* enlarge the stack */
1301 StgTSO *new_t = threadStackOverflow(cap, t);
1303 /* The TSO attached to this Task may have moved, so update the
1306 if (task->tso == t) {
1309 pushOnRunQueue(cap,new_t);
1313 /* -----------------------------------------------------------------------------
1314 * Handle a thread that returned to the scheduler with ThreadYielding
1315 * -------------------------------------------------------------------------- */
1318 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1320 // Reset the context switch flag. We don't do this just before
1321 // running the thread, because that would mean we would lose ticks
1322 // during GC, which can lead to unfair scheduling (a thread hogs
1323 // the CPU because the tick always arrives during GC). This way
1324 // penalises threads that do a lot of allocation, but that seems
1325 // better than the alternative.
1326 cap->context_switch = 0;
1328 /* put the thread back on the run queue. Then, if we're ready to
1329 * GC, check whether this is the last task to stop. If so, wake
1330 * up the GC thread. getThread will block during a GC until the
1334 if (t->what_next != prev_what_next) {
1335 debugTrace(DEBUG_sched,
1336 "--<< thread %ld (%s) stopped to switch evaluators",
1337 (long)t->id, whatNext_strs[t->what_next]);
1339 debugTrace(DEBUG_sched,
1340 "--<< thread %ld (%s) stopped, yielding",
1341 (long)t->id, whatNext_strs[t->what_next]);
1346 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1348 ASSERT(t->_link == END_TSO_QUEUE);
1350 // Shortcut if we're just switching evaluators: don't bother
1351 // doing stack squeezing (which can be expensive), just run the
1353 if (t->what_next != prev_what_next) {
1357 addToRunQueue(cap,t);
1362 /* -----------------------------------------------------------------------------
1363 * Handle a thread that returned to the scheduler with ThreadBlocked
1364 * -------------------------------------------------------------------------- */
1367 scheduleHandleThreadBlocked( StgTSO *t
1368 #if !defined(GRAN) && !defined(DEBUG)
1374 // We don't need to do anything. The thread is blocked, and it
1375 // has tidied up its stack and placed itself on whatever queue
1376 // it needs to be on.
1378 // ASSERT(t->why_blocked != NotBlocked);
1379 // Not true: for example,
1380 // - in THREADED_RTS, the thread may already have been woken
1381 // up by another Capability. This actually happens: try
1382 // conc023 +RTS -N2.
1383 // - the thread may have woken itself up already, because
1384 // threadPaused() might have raised a blocked throwTo
1385 // exception, see maybePerformBlockedException().
1388 if (traceClass(DEBUG_sched)) {
1389 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1390 (unsigned long)t->id, whatNext_strs[t->what_next]);
1391 printThreadBlockage(t);
1397 /* -----------------------------------------------------------------------------
1398 * Handle a thread that returned to the scheduler with ThreadFinished
1399 * -------------------------------------------------------------------------- */
1402 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1404 /* Need to check whether this was a main thread, and if so,
1405 * return with the return value.
1407 * We also end up here if the thread kills itself with an
1408 * uncaught exception, see Exception.cmm.
1410 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1411 (unsigned long)t->id, whatNext_strs[t->what_next]);
1413 // blocked exceptions can now complete, even if the thread was in
1414 // blocked mode (see #2910). This unconditionally calls
1415 // lockTSO(), which ensures that we don't miss any threads that
1416 // are engaged in throwTo() with this thread as a target.
1417 awakenBlockedExceptionQueue (cap, t);
1420 // Check whether the thread that just completed was a bound
1421 // thread, and if so return with the result.
1423 // There is an assumption here that all thread completion goes
1424 // through this point; we need to make sure that if a thread
1425 // ends up in the ThreadKilled state, that it stays on the run
1426 // queue so it can be dealt with here.
1431 if (t->bound != task) {
1432 #if !defined(THREADED_RTS)
1433 // Must be a bound thread that is not the topmost one. Leave
1434 // it on the run queue until the stack has unwound to the
1435 // point where we can deal with this. Leaving it on the run
1436 // queue also ensures that the garbage collector knows about
1437 // this thread and its return value (it gets dropped from the
1438 // step->threads list so there's no other way to find it).
1439 appendToRunQueue(cap,t);
1442 // this cannot happen in the threaded RTS, because a
1443 // bound thread can only be run by the appropriate Task.
1444 barf("finished bound thread that isn't mine");
1448 ASSERT(task->tso == t);
1450 if (t->what_next == ThreadComplete) {
1452 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1453 *(task->ret) = (StgClosure *)task->tso->sp[1];
1455 task->stat = Success;
1458 *(task->ret) = NULL;
1460 if (sched_state >= SCHED_INTERRUPTING) {
1461 if (heap_overflow) {
1462 task->stat = HeapExhausted;
1464 task->stat = Interrupted;
1467 task->stat = Killed;
1471 removeThreadLabel((StgWord)task->tso->id);
1473 return rtsTrue; // tells schedule() to return
1479 /* -----------------------------------------------------------------------------
1480 * Perform a heap census
1481 * -------------------------------------------------------------------------- */
1484 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1486 // When we have +RTS -i0 and we're heap profiling, do a census at
1487 // every GC. This lets us get repeatable runs for debugging.
1488 if (performHeapProfile ||
1489 (RtsFlags.ProfFlags.profileInterval==0 &&
1490 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1497 /* -----------------------------------------------------------------------------
1498 * Perform a garbage collection if necessary
1499 * -------------------------------------------------------------------------- */
1502 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1504 rtsBool heap_census;
1506 /* extern static volatile StgWord waiting_for_gc;
1507 lives inside capability.c */
1508 rtsBool gc_type, prev_pending_gc;
1512 if (sched_state == SCHED_SHUTTING_DOWN) {
1513 // The final GC has already been done, and the system is
1514 // shutting down. We'll probably deadlock if we try to GC
1520 if (sched_state < SCHED_INTERRUPTING
1521 && RtsFlags.ParFlags.parGcEnabled
1522 && N >= RtsFlags.ParFlags.parGcGen
1523 && ! oldest_gen->steps[0].mark)
1525 gc_type = PENDING_GC_PAR;
1527 gc_type = PENDING_GC_SEQ;
1530 // In order to GC, there must be no threads running Haskell code.
1531 // Therefore, the GC thread needs to hold *all* the capabilities,
1532 // and release them after the GC has completed.
1534 // This seems to be the simplest way: previous attempts involved
1535 // making all the threads with capabilities give up their
1536 // capabilities and sleep except for the *last* one, which
1537 // actually did the GC. But it's quite hard to arrange for all
1538 // the other tasks to sleep and stay asleep.
1541 /* Other capabilities are prevented from running yet more Haskell
1542 threads if waiting_for_gc is set. Tested inside
1543 yieldCapability() and releaseCapability() in Capability.c */
1545 prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1546 if (prev_pending_gc) {
1548 debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...",
1551 yieldCapability(&cap,task);
1552 } while (waiting_for_gc);
1553 return cap; // NOTE: task->cap might have changed here
1556 setContextSwitches();
1558 // The final shutdown GC is always single-threaded, because it's
1559 // possible that some of the Capabilities have no worker threads.
1561 if (gc_type == PENDING_GC_SEQ)
1563 // single-threaded GC: grab all the capabilities
1564 for (i=0; i < n_capabilities; i++) {
1565 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1566 if (cap != &capabilities[i]) {
1567 Capability *pcap = &capabilities[i];
1568 // we better hope this task doesn't get migrated to
1569 // another Capability while we're waiting for this one.
1570 // It won't, because load balancing happens while we have
1571 // all the Capabilities, but even so it's a slightly
1572 // unsavoury invariant.
1574 waitForReturnCapability(&pcap, task);
1575 if (pcap != &capabilities[i]) {
1576 barf("scheduleDoGC: got the wrong capability");
1583 // multi-threaded GC: make sure all the Capabilities donate one
1585 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1587 waitForGcThreads(cap);
1591 // so this happens periodically:
1592 if (cap) scheduleCheckBlackHoles(cap);
1594 IF_DEBUG(scheduler, printAllThreads());
1596 delete_threads_and_gc:
1598 * We now have all the capabilities; if we're in an interrupting
1599 * state, then we should take the opportunity to delete all the
1600 * threads in the system.
1602 if (sched_state == SCHED_INTERRUPTING) {
1603 deleteAllThreads(cap);
1604 sched_state = SCHED_SHUTTING_DOWN;
1607 heap_census = scheduleNeedHeapProfile(rtsTrue);
1609 #if defined(THREADED_RTS)
1610 debugTrace(DEBUG_sched, "doing GC");
1611 // reset waiting_for_gc *before* GC, so that when the GC threads
1612 // emerge they don't immediately re-enter the GC.
1614 GarbageCollect(force_major || heap_census, gc_type, cap);
1616 GarbageCollect(force_major || heap_census, 0, cap);
1619 if (recent_activity == ACTIVITY_INACTIVE && force_major)
1621 // We are doing a GC because the system has been idle for a
1622 // timeslice and we need to check for deadlock. Record the
1623 // fact that we've done a GC and turn off the timer signal;
1624 // it will get re-enabled if we run any threads after the GC.
1625 recent_activity = ACTIVITY_DONE_GC;
1630 // the GC might have taken long enough for the timer to set
1631 // recent_activity = ACTIVITY_INACTIVE, but we aren't
1632 // necessarily deadlocked:
1633 recent_activity = ACTIVITY_YES;
1636 #if defined(THREADED_RTS)
1637 if (gc_type == PENDING_GC_PAR)
1639 releaseGCThreads(cap);
1644 debugTrace(DEBUG_sched, "performing heap census");
1646 performHeapProfile = rtsFalse;
1649 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1650 // GC set the heap_overflow flag, so we should proceed with
1651 // an orderly shutdown now. Ultimately we want the main
1652 // thread to return to its caller with HeapExhausted, at which
1653 // point the caller should call hs_exit(). The first step is
1654 // to delete all the threads.
1656 // Another way to do this would be to raise an exception in
1657 // the main thread, which we really should do because it gives
1658 // the program a chance to clean up. But how do we find the
1659 // main thread? It should presumably be the same one that
1660 // gets ^C exceptions, but that's all done on the Haskell side
1661 // (GHC.TopHandler).
1662 sched_state = SCHED_INTERRUPTING;
1663 goto delete_threads_and_gc;
1668 Once we are all together... this would be the place to balance all
1669 spark pools. No concurrent stealing or adding of new sparks can
1670 occur. Should be defined in Sparks.c. */
1671 balanceSparkPoolsCaps(n_capabilities, capabilities);
1674 #if defined(THREADED_RTS)
1675 if (gc_type == PENDING_GC_SEQ) {
1676 // release our stash of capabilities.
1677 for (i = 0; i < n_capabilities; i++) {
1678 if (cap != &capabilities[i]) {
1679 task->cap = &capabilities[i];
1680 releaseCapability(&capabilities[i]);
1694 /* ---------------------------------------------------------------------------
1695 * Singleton fork(). Do not copy any running threads.
1696 * ------------------------------------------------------------------------- */
1699 forkProcess(HsStablePtr *entry
1700 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1705 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1712 #if defined(THREADED_RTS)
1713 if (RtsFlags.ParFlags.nNodes > 1) {
1714 errorBelch("forking not supported with +RTS -N<n> greater than 1");
1715 stg_exit(EXIT_FAILURE);
1719 debugTrace(DEBUG_sched, "forking!");
1721 // ToDo: for SMP, we should probably acquire *all* the capabilities
1724 // no funny business: hold locks while we fork, otherwise if some
1725 // other thread is holding a lock when the fork happens, the data
1726 // structure protected by the lock will forever be in an
1727 // inconsistent state in the child. See also #1391.
1728 ACQUIRE_LOCK(&sched_mutex);
1729 ACQUIRE_LOCK(&cap->lock);
1730 ACQUIRE_LOCK(&cap->running_task->lock);
1734 if (pid) { // parent
1736 RELEASE_LOCK(&sched_mutex);
1737 RELEASE_LOCK(&cap->lock);
1738 RELEASE_LOCK(&cap->running_task->lock);
1740 // just return the pid
1746 #if defined(THREADED_RTS)
1747 initMutex(&sched_mutex);
1748 initMutex(&cap->lock);
1749 initMutex(&cap->running_task->lock);
1752 // Now, all OS threads except the thread that forked are
1753 // stopped. We need to stop all Haskell threads, including
1754 // those involved in foreign calls. Also we need to delete
1755 // all Tasks, because they correspond to OS threads that are
1758 for (s = 0; s < total_steps; s++) {
1759 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1760 if (t->what_next == ThreadRelocated) {
1763 next = t->global_link;
1764 // don't allow threads to catch the ThreadKilled
1765 // exception, but we do want to raiseAsync() because these
1766 // threads may be evaluating thunks that we need later.
1767 deleteThread_(cap,t);
1772 // Empty the run queue. It seems tempting to let all the
1773 // killed threads stay on the run queue as zombies to be
1774 // cleaned up later, but some of them correspond to bound
1775 // threads for which the corresponding Task does not exist.
1776 cap->run_queue_hd = END_TSO_QUEUE;
1777 cap->run_queue_tl = END_TSO_QUEUE;
1779 // Any suspended C-calling Tasks are no more, their OS threads
1781 cap->suspended_ccalling_tasks = NULL;
1783 // Empty the threads lists. Otherwise, the garbage
1784 // collector may attempt to resurrect some of these threads.
1785 for (s = 0; s < total_steps; s++) {
1786 all_steps[s].threads = END_TSO_QUEUE;
1789 // Wipe the task list, except the current Task.
1790 ACQUIRE_LOCK(&sched_mutex);
1791 for (task = all_tasks; task != NULL; task=task->all_link) {
1792 if (task != cap->running_task) {
1793 #if defined(THREADED_RTS)
1794 initMutex(&task->lock); // see #1391
1799 RELEASE_LOCK(&sched_mutex);
1801 #if defined(THREADED_RTS)
1802 // Wipe our spare workers list, they no longer exist. New
1803 // workers will be created if necessary.
1804 cap->spare_workers = NULL;
1805 cap->returning_tasks_hd = NULL;
1806 cap->returning_tasks_tl = NULL;
1809 // On Unix, all timers are reset in the child, so we need to start
1814 cap = rts_evalStableIO(cap, entry, NULL); // run the action
1815 rts_checkSchedStatus("forkProcess",cap);
1818 hs_exit(); // clean up and exit
1819 stg_exit(EXIT_SUCCESS);
1821 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1822 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1827 /* ---------------------------------------------------------------------------
1828 * Delete all the threads in the system
1829 * ------------------------------------------------------------------------- */
1832 deleteAllThreads ( Capability *cap )
1834 // NOTE: only safe to call if we own all capabilities.
1839 debugTrace(DEBUG_sched,"deleting all threads");
1840 for (s = 0; s < total_steps; s++) {
1841 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1842 if (t->what_next == ThreadRelocated) {
1845 next = t->global_link;
1846 deleteThread(cap,t);
1851 // The run queue now contains a bunch of ThreadKilled threads. We
1852 // must not throw these away: the main thread(s) will be in there
1853 // somewhere, and the main scheduler loop has to deal with it.
1854 // Also, the run queue is the only thing keeping these threads from
1855 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1857 #if !defined(THREADED_RTS)
1858 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1859 ASSERT(sleeping_queue == END_TSO_QUEUE);
1863 /* -----------------------------------------------------------------------------
1864 Managing the suspended_ccalling_tasks list.
1865 Locks required: sched_mutex
1866 -------------------------------------------------------------------------- */
1869 suspendTask (Capability *cap, Task *task)
1871 ASSERT(task->next == NULL && task->prev == NULL);
1872 task->next = cap->suspended_ccalling_tasks;
1874 if (cap->suspended_ccalling_tasks) {
1875 cap->suspended_ccalling_tasks->prev = task;
1877 cap->suspended_ccalling_tasks = task;
1881 recoverSuspendedTask (Capability *cap, Task *task)
1884 task->prev->next = task->next;
1886 ASSERT(cap->suspended_ccalling_tasks == task);
1887 cap->suspended_ccalling_tasks = task->next;
1890 task->next->prev = task->prev;
1892 task->next = task->prev = NULL;
1895 /* ---------------------------------------------------------------------------
1896 * Suspending & resuming Haskell threads.
1898 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1899 * its capability before calling the C function. This allows another
1900 * task to pick up the capability and carry on running Haskell
1901 * threads. It also means that if the C call blocks, it won't lock
1904 * The Haskell thread making the C call is put to sleep for the
1905 * duration of the call, on the susepended_ccalling_threads queue. We
1906 * give out a token to the task, which it can use to resume the thread
1907 * on return from the C function.
1908 * ------------------------------------------------------------------------- */
1911 suspendThread (StgRegTable *reg)
1918 StgWord32 saved_winerror;
1921 saved_errno = errno;
1923 saved_winerror = GetLastError();
1926 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1928 cap = regTableToCapability(reg);
1930 task = cap->running_task;
1931 tso = cap->r.rCurrentTSO;
1933 debugTrace(DEBUG_sched,
1934 "thread %lu did a safe foreign call",
1935 (unsigned long)cap->r.rCurrentTSO->id);
1937 // XXX this might not be necessary --SDM
1938 tso->what_next = ThreadRunGHC;
1940 threadPaused(cap,tso);
1942 if ((tso->flags & TSO_BLOCKEX) == 0) {
1943 tso->why_blocked = BlockedOnCCall;
1944 tso->flags |= TSO_BLOCKEX;
1945 tso->flags &= ~TSO_INTERRUPTIBLE;
1947 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1950 // Hand back capability
1951 task->suspended_tso = tso;
1953 ACQUIRE_LOCK(&cap->lock);
1955 suspendTask(cap,task);
1956 cap->in_haskell = rtsFalse;
1957 releaseCapability_(cap,rtsFalse);
1959 RELEASE_LOCK(&cap->lock);
1961 #if defined(THREADED_RTS)
1962 /* Preparing to leave the RTS, so ensure there's a native thread/task
1963 waiting to take over.
1965 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1968 errno = saved_errno;
1970 SetLastError(saved_winerror);
1976 resumeThread (void *task_)
1983 StgWord32 saved_winerror;
1986 saved_errno = errno;
1988 saved_winerror = GetLastError();
1992 // Wait for permission to re-enter the RTS with the result.
1993 waitForReturnCapability(&cap,task);
1994 // we might be on a different capability now... but if so, our
1995 // entry on the suspended_ccalling_tasks list will also have been
1998 // Remove the thread from the suspended list
1999 recoverSuspendedTask(cap,task);
2001 tso = task->suspended_tso;
2002 task->suspended_tso = NULL;
2003 tso->_link = END_TSO_QUEUE; // no write barrier reqd
2004 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2006 if (tso->why_blocked == BlockedOnCCall) {
2007 // avoid locking the TSO if we don't have to
2008 if (tso->blocked_exceptions != END_TSO_QUEUE) {
2009 awakenBlockedExceptionQueue(cap,tso);
2011 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2014 /* Reset blocking status */
2015 tso->why_blocked = NotBlocked;
2017 cap->r.rCurrentTSO = tso;
2018 cap->in_haskell = rtsTrue;
2019 errno = saved_errno;
2021 SetLastError(saved_winerror);
2024 /* We might have GC'd, mark the TSO dirty again */
2027 IF_DEBUG(sanity, checkTSO(tso));
2032 /* ---------------------------------------------------------------------------
2035 * scheduleThread puts a thread on the end of the runnable queue.
2036 * This will usually be done immediately after a thread is created.
2037 * The caller of scheduleThread must create the thread using e.g.
2038 * createThread and push an appropriate closure
2039 * on this thread's stack before the scheduler is invoked.
2040 * ------------------------------------------------------------------------ */
2043 scheduleThread(Capability *cap, StgTSO *tso)
2045 // The thread goes at the *end* of the run-queue, to avoid possible
2046 // starvation of any threads already on the queue.
2047 appendToRunQueue(cap,tso);
2051 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2053 #if defined(THREADED_RTS)
2054 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2055 // move this thread from now on.
2056 cpu %= RtsFlags.ParFlags.nNodes;
2057 if (cpu == cap->no) {
2058 appendToRunQueue(cap,tso);
2060 wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
2063 appendToRunQueue(cap,tso);
2068 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2072 // We already created/initialised the Task
2073 task = cap->running_task;
2075 // This TSO is now a bound thread; make the Task and TSO
2076 // point to each other.
2082 task->stat = NoStatus;
2084 appendToRunQueue(cap,tso);
2086 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2088 cap = schedule(cap,task);
2090 ASSERT(task->stat != NoStatus);
2091 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2093 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2097 /* ----------------------------------------------------------------------------
2099 * ------------------------------------------------------------------------- */
2101 #if defined(THREADED_RTS)
2102 void OSThreadProcAttr
2103 workerStart(Task *task)
2107 // See startWorkerTask().
2108 ACQUIRE_LOCK(&task->lock);
2110 RELEASE_LOCK(&task->lock);
2112 // set the thread-local pointer to the Task:
2115 // schedule() runs without a lock.
2116 cap = schedule(cap,task);
2118 // On exit from schedule(), we have a Capability, but possibly not
2119 // the same one we started with.
2121 // During shutdown, the requirement is that after all the
2122 // Capabilities are shut down, all workers that are shutting down
2123 // have finished workerTaskStop(). This is why we hold on to
2124 // cap->lock until we've finished workerTaskStop() below.
2126 // There may be workers still involved in foreign calls; those
2127 // will just block in waitForReturnCapability() because the
2128 // Capability has been shut down.
2130 ACQUIRE_LOCK(&cap->lock);
2131 releaseCapability_(cap,rtsFalse);
2132 workerTaskStop(task);
2133 RELEASE_LOCK(&cap->lock);
2137 /* ---------------------------------------------------------------------------
2140 * Initialise the scheduler. This resets all the queues - if the
2141 * queues contained any threads, they'll be garbage collected at the
2144 * ------------------------------------------------------------------------ */
2149 #if !defined(THREADED_RTS)
2150 blocked_queue_hd = END_TSO_QUEUE;
2151 blocked_queue_tl = END_TSO_QUEUE;
2152 sleeping_queue = END_TSO_QUEUE;
2155 blackhole_queue = END_TSO_QUEUE;
2157 sched_state = SCHED_RUNNING;
2158 recent_activity = ACTIVITY_YES;
2160 #if defined(THREADED_RTS)
2161 /* Initialise the mutex and condition variables used by
2163 initMutex(&sched_mutex);
2166 ACQUIRE_LOCK(&sched_mutex);
2168 /* A capability holds the state a native thread needs in
2169 * order to execute STG code. At least one capability is
2170 * floating around (only THREADED_RTS builds have more than one).
2176 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2180 #if defined(THREADED_RTS)
2182 * Eagerly start one worker to run each Capability, except for
2183 * Capability 0. The idea is that we're probably going to start a
2184 * bound thread on Capability 0 pretty soon, so we don't want a
2185 * worker task hogging it.
2190 for (i = 1; i < n_capabilities; i++) {
2191 cap = &capabilities[i];
2192 ACQUIRE_LOCK(&cap->lock);
2193 startWorkerTask(cap, workerStart);
2194 RELEASE_LOCK(&cap->lock);
2199 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2201 RELEASE_LOCK(&sched_mutex);
2206 rtsBool wait_foreign
2207 #if !defined(THREADED_RTS)
2208 __attribute__((unused))
2211 /* see Capability.c, shutdownCapability() */
2215 ACQUIRE_LOCK(&sched_mutex);
2216 task = newBoundTask();
2217 RELEASE_LOCK(&sched_mutex);
2219 // If we haven't killed all the threads yet, do it now.
2220 if (sched_state < SCHED_SHUTTING_DOWN) {
2221 sched_state = SCHED_INTERRUPTING;
2222 waitForReturnCapability(&task->cap,task);
2223 scheduleDoGC(task->cap,task,rtsFalse);
2224 releaseCapability(task->cap);
2226 sched_state = SCHED_SHUTTING_DOWN;
2228 #if defined(THREADED_RTS)
2232 for (i = 0; i < n_capabilities; i++) {
2233 shutdownCapability(&capabilities[i], task, wait_foreign);
2235 boundTaskExiting(task);
2241 freeScheduler( void )
2245 ACQUIRE_LOCK(&sched_mutex);
2246 still_running = freeTaskManager();
2247 // We can only free the Capabilities if there are no Tasks still
2248 // running. We might have a Task about to return from a foreign
2249 // call into waitForReturnCapability(), for example (actually,
2250 // this should be the *only* thing that a still-running Task can
2251 // do at this point, and it will block waiting for the
2253 if (still_running == 0) {
2255 if (n_capabilities != 1) {
2256 stgFree(capabilities);
2259 RELEASE_LOCK(&sched_mutex);
2260 #if defined(THREADED_RTS)
2261 closeMutex(&sched_mutex);
2265 /* -----------------------------------------------------------------------------
2268 This is the interface to the garbage collector from Haskell land.
2269 We provide this so that external C code can allocate and garbage
2270 collect when called from Haskell via _ccall_GC.
2271 -------------------------------------------------------------------------- */
2274 performGC_(rtsBool force_major)
2278 // We must grab a new Task here, because the existing Task may be
2279 // associated with a particular Capability, and chained onto the
2280 // suspended_ccalling_tasks queue.
2281 ACQUIRE_LOCK(&sched_mutex);
2282 task = newBoundTask();
2283 RELEASE_LOCK(&sched_mutex);
2285 waitForReturnCapability(&task->cap,task);
2286 scheduleDoGC(task->cap,task,force_major);
2287 releaseCapability(task->cap);
2288 boundTaskExiting(task);
2294 performGC_(rtsFalse);
2298 performMajorGC(void)
2300 performGC_(rtsTrue);
2303 /* -----------------------------------------------------------------------------
2306 If the thread has reached its maximum stack size, then raise the
2307 StackOverflow exception in the offending thread. Otherwise
2308 relocate the TSO into a larger chunk of memory and adjust its stack
2310 -------------------------------------------------------------------------- */
2313 threadStackOverflow(Capability *cap, StgTSO *tso)
2315 nat new_stack_size, stack_words;
2320 IF_DEBUG(sanity,checkTSO(tso));
2322 // don't allow throwTo() to modify the blocked_exceptions queue
2323 // while we are moving the TSO:
2324 lockClosure((StgClosure *)tso);
2326 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2327 // NB. never raise a StackOverflow exception if the thread is
2328 // inside Control.Exceptino.block. It is impractical to protect
2329 // against stack overflow exceptions, since virtually anything
2330 // can raise one (even 'catch'), so this is the only sensible
2331 // thing to do here. See bug #767.
2333 debugTrace(DEBUG_gc,
2334 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2335 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2337 /* If we're debugging, just print out the top of the stack */
2338 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2341 // Send this thread the StackOverflow exception
2343 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2347 /* Try to double the current stack size. If that takes us over the
2348 * maximum stack size for this thread, then use the maximum instead
2349 * (that is, unless we're already at or over the max size and we
2350 * can't raise the StackOverflow exception (see above), in which
2351 * case just double the size). Finally round up so the TSO ends up as
2352 * a whole number of blocks.
2354 if (tso->stack_size >= tso->max_stack_size) {
2355 new_stack_size = tso->stack_size * 2;
2357 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2359 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2360 TSO_STRUCT_SIZE)/sizeof(W_);
2361 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2362 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2364 debugTrace(DEBUG_sched,
2365 "increasing stack size from %ld words to %d.",
2366 (long)tso->stack_size, new_stack_size);
2368 dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2369 TICK_ALLOC_TSO(new_stack_size,0);
2371 /* copy the TSO block and the old stack into the new area */
2372 memcpy(dest,tso,TSO_STRUCT_SIZE);
2373 stack_words = tso->stack + tso->stack_size - tso->sp;
2374 new_sp = (P_)dest + new_tso_size - stack_words;
2375 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2377 /* relocate the stack pointers... */
2379 dest->stack_size = new_stack_size;
2381 /* Mark the old TSO as relocated. We have to check for relocated
2382 * TSOs in the garbage collector and any primops that deal with TSOs.
2384 * It's important to set the sp value to just beyond the end
2385 * of the stack, so we don't attempt to scavenge any part of the
2388 tso->what_next = ThreadRelocated;
2389 setTSOLink(cap,tso,dest);
2390 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2391 tso->why_blocked = NotBlocked;
2393 IF_PAR_DEBUG(verbose,
2394 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2395 tso->id, tso, tso->stack_size);
2396 /* If we're debugging, just print out the top of the stack */
2397 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2403 IF_DEBUG(sanity,checkTSO(dest));
2405 IF_DEBUG(scheduler,printTSO(dest));
2412 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2414 bdescr *bd, *new_bd;
2415 lnat free_w, tso_size_w;
2418 tso_size_w = tso_sizeW(tso);
2420 if (tso_size_w < MBLOCK_SIZE_W ||
2421 (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
2426 // don't allow throwTo() to modify the blocked_exceptions queue
2427 // while we are moving the TSO:
2428 lockClosure((StgClosure *)tso);
2430 // this is the number of words we'll free
2431 free_w = round_to_mblocks(tso_size_w/2);
2433 bd = Bdescr((StgPtr)tso);
2434 new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2435 bd->free = bd->start + TSO_STRUCT_SIZEW;
2437 new_tso = (StgTSO *)new_bd->start;
2438 memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2439 new_tso->stack_size = new_bd->free - new_tso->stack;
2441 debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2442 (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2444 tso->what_next = ThreadRelocated;
2445 tso->_link = new_tso; // no write barrier reqd: same generation
2447 // The TSO attached to this Task may have moved, so update the
2449 if (task->tso == tso) {
2450 task->tso = new_tso;
2456 IF_DEBUG(sanity,checkTSO(new_tso));
2461 /* ---------------------------------------------------------------------------
2463 - usually called inside a signal handler so it mustn't do anything fancy.
2464 ------------------------------------------------------------------------ */
2467 interruptStgRts(void)
2469 sched_state = SCHED_INTERRUPTING;
2470 setContextSwitches();
2474 /* -----------------------------------------------------------------------------
2477 This function causes at least one OS thread to wake up and run the
2478 scheduler loop. It is invoked when the RTS might be deadlocked, or
2479 an external event has arrived that may need servicing (eg. a
2480 keyboard interrupt).
2482 In the single-threaded RTS we don't do anything here; we only have
2483 one thread anyway, and the event that caused us to want to wake up
2484 will have interrupted any blocking system call in progress anyway.
2485 -------------------------------------------------------------------------- */
2490 #if defined(THREADED_RTS)
2491 // This forces the IO Manager thread to wakeup, which will
2492 // in turn ensure that some OS thread wakes up and runs the
2493 // scheduler loop, which will cause a GC and deadlock check.
2498 /* -----------------------------------------------------------------------------
2501 * Check the blackhole_queue for threads that can be woken up. We do
2502 * this periodically: before every GC, and whenever the run queue is
2505 * An elegant solution might be to just wake up all the blocked
2506 * threads with awakenBlockedQueue occasionally: they'll go back to
2507 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2508 * doesn't give us a way to tell whether we've actually managed to
2509 * wake up any threads, so we would be busy-waiting.
2511 * -------------------------------------------------------------------------- */
2514 checkBlackHoles (Capability *cap)
2517 rtsBool any_woke_up = rtsFalse;
2520 // blackhole_queue is global:
2521 ASSERT_LOCK_HELD(&sched_mutex);
2523 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2525 // ASSUMES: sched_mutex
2526 prev = &blackhole_queue;
2527 t = blackhole_queue;
2528 while (t != END_TSO_QUEUE) {
2529 ASSERT(t->why_blocked == BlockedOnBlackHole);
2530 type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2531 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2532 IF_DEBUG(sanity,checkTSO(t));
2533 t = unblockOne(cap, t);
2535 any_woke_up = rtsTrue;
2545 /* -----------------------------------------------------------------------------
2548 This is used for interruption (^C) and forking, and corresponds to
2549 raising an exception but without letting the thread catch the
2551 -------------------------------------------------------------------------- */
2554 deleteThread (Capability *cap, StgTSO *tso)
2556 // NOTE: must only be called on a TSO that we have exclusive
2557 // access to, because we will call throwToSingleThreaded() below.
2558 // The TSO must be on the run queue of the Capability we own, or
2559 // we must own all Capabilities.
2561 if (tso->why_blocked != BlockedOnCCall &&
2562 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2563 throwToSingleThreaded(cap,tso,NULL);
2567 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2569 deleteThread_(Capability *cap, StgTSO *tso)
2570 { // for forkProcess only:
2571 // like deleteThread(), but we delete threads in foreign calls, too.
2573 if (tso->why_blocked == BlockedOnCCall ||
2574 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2575 unblockOne(cap,tso);
2576 tso->what_next = ThreadKilled;
2578 deleteThread(cap,tso);
2583 /* -----------------------------------------------------------------------------
2584 raiseExceptionHelper
2586 This function is called by the raise# primitve, just so that we can
2587 move some of the tricky bits of raising an exception from C-- into
2588 C. Who knows, it might be a useful re-useable thing here too.
2589 -------------------------------------------------------------------------- */
2592 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2594 Capability *cap = regTableToCapability(reg);
2595 StgThunk *raise_closure = NULL;
2597 StgRetInfoTable *info;
2599 // This closure represents the expression 'raise# E' where E
2600 // is the exception raise. It is used to overwrite all the
2601 // thunks which are currently under evaluataion.
2604 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2605 // LDV profiling: stg_raise_info has THUNK as its closure
2606 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2607 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2608 // 1 does not cause any problem unless profiling is performed.
2609 // However, when LDV profiling goes on, we need to linearly scan
2610 // small object pool, where raise_closure is stored, so we should
2611 // use MIN_UPD_SIZE.
2613 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2614 // sizeofW(StgClosure)+1);
2618 // Walk up the stack, looking for the catch frame. On the way,
2619 // we update any closures pointed to from update frames with the
2620 // raise closure that we just built.
2624 info = get_ret_itbl((StgClosure *)p);
2625 next = p + stack_frame_sizeW((StgClosure *)p);
2626 switch (info->i.type) {
2629 // Only create raise_closure if we need to.
2630 if (raise_closure == NULL) {
2632 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2633 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2634 raise_closure->payload[0] = exception;
2636 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2640 case ATOMICALLY_FRAME:
2641 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2643 return ATOMICALLY_FRAME;
2649 case CATCH_STM_FRAME:
2650 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2652 return CATCH_STM_FRAME;
2658 case CATCH_RETRY_FRAME:
2667 /* -----------------------------------------------------------------------------
2668 findRetryFrameHelper
2670 This function is called by the retry# primitive. It traverses the stack
2671 leaving tso->sp referring to the frame which should handle the retry.
2673 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2674 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2676 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2677 create) because retries are not considered to be exceptions, despite the
2678 similar implementation.
2680 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2681 not be created within memory transactions.
2682 -------------------------------------------------------------------------- */
2685 findRetryFrameHelper (StgTSO *tso)
2688 StgRetInfoTable *info;
2692 info = get_ret_itbl((StgClosure *)p);
2693 next = p + stack_frame_sizeW((StgClosure *)p);
2694 switch (info->i.type) {
2696 case ATOMICALLY_FRAME:
2697 debugTrace(DEBUG_stm,
2698 "found ATOMICALLY_FRAME at %p during retry", p);
2700 return ATOMICALLY_FRAME;
2702 case CATCH_RETRY_FRAME:
2703 debugTrace(DEBUG_stm,
2704 "found CATCH_RETRY_FRAME at %p during retrry", p);
2706 return CATCH_RETRY_FRAME;
2708 case CATCH_STM_FRAME: {
2709 StgTRecHeader *trec = tso -> trec;
2710 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2711 debugTrace(DEBUG_stm,
2712 "found CATCH_STM_FRAME at %p during retry", p);
2713 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2714 stmAbortTransaction(tso -> cap, trec);
2715 stmFreeAbortedTRec(tso -> cap, trec);
2716 tso -> trec = outer;
2723 ASSERT(info->i.type != CATCH_FRAME);
2724 ASSERT(info->i.type != STOP_FRAME);
2731 /* -----------------------------------------------------------------------------
2732 resurrectThreads is called after garbage collection on the list of
2733 threads found to be garbage. Each of these threads will be woken
2734 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2735 on an MVar, or NonTermination if the thread was blocked on a Black
2738 Locks: assumes we hold *all* the capabilities.
2739 -------------------------------------------------------------------------- */
2742 resurrectThreads (StgTSO *threads)
2748 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2749 next = tso->global_link;
2751 step = Bdescr((P_)tso)->step;
2752 tso->global_link = step->threads;
2753 step->threads = tso;
2755 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2757 // Wake up the thread on the Capability it was last on
2760 switch (tso->why_blocked) {
2762 case BlockedOnException:
2763 /* Called by GC - sched_mutex lock is currently held. */
2764 throwToSingleThreaded(cap, tso,
2765 (StgClosure *)blockedOnDeadMVar_closure);
2767 case BlockedOnBlackHole:
2768 throwToSingleThreaded(cap, tso,
2769 (StgClosure *)nonTermination_closure);
2772 throwToSingleThreaded(cap, tso,
2773 (StgClosure *)blockedIndefinitely_closure);
2776 /* This might happen if the thread was blocked on a black hole
2777 * belonging to a thread that we've just woken up (raiseAsync
2778 * can wake up threads, remember...).
2782 barf("resurrectThreads: thread blocked in a strange way");
2787 /* -----------------------------------------------------------------------------
2788 performPendingThrowTos is called after garbage collection, and
2789 passed a list of threads that were found to have pending throwTos
2790 (tso->blocked_exceptions was not empty), and were blocked.
2791 Normally this doesn't happen, because we would deliver the
2792 exception directly if the target thread is blocked, but there are
2793 small windows where it might occur on a multiprocessor (see
2796 NB. we must be holding all the capabilities at this point, just
2797 like resurrectThreads().
2798 -------------------------------------------------------------------------- */
2801 performPendingThrowTos (StgTSO *threads)
2807 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2808 next = tso->global_link;
2810 step = Bdescr((P_)tso)->step;
2811 tso->global_link = step->threads;
2812 step->threads = tso;
2814 debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2817 maybePerformBlockedException(cap, tso);