1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
23 #include "RtsSignals.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
32 #include "Proftimer.h"
37 /* PARALLEL_HASKELL includes go here */
40 #include "Capability.h"
42 #include "AwaitEvent.h"
43 #if defined(mingw32_HOST_OS)
44 #include "win32/IOManager.h"
47 #include "RaiseAsync.h"
49 #include "ThrIOManager.h"
51 #ifdef HAVE_SYS_TYPES_H
52 #include <sys/types.h>
66 // Turn off inlining when debugging - it obfuscates things
69 # define STATIC_INLINE static
72 /* -----------------------------------------------------------------------------
74 * -------------------------------------------------------------------------- */
76 #if !defined(THREADED_RTS)
77 // Blocked/sleeping thrads
78 StgTSO *blocked_queue_hd = NULL;
79 StgTSO *blocked_queue_tl = NULL;
80 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
83 /* Threads blocked on blackholes.
84 * LOCK: sched_mutex+capability, or all capabilities
86 StgTSO *blackhole_queue = NULL;
88 /* The blackhole_queue should be checked for threads to wake up. See
89 * Schedule.h for more thorough comment.
90 * LOCK: none (doesn't matter if we miss an update)
92 rtsBool blackholes_need_checking = rtsFalse;
94 /* Set to true when the latest garbage collection failed to reclaim
95 * enough space, and the runtime should proceed to shut itself down in
96 * an orderly fashion (emitting profiling info etc.)
98 rtsBool heap_overflow = rtsFalse;
100 /* flag that tracks whether we have done any execution in this time slice.
101 * LOCK: currently none, perhaps we should lock (but needs to be
102 * updated in the fast path of the scheduler).
104 * NB. must be StgWord, we do xchg() on it.
106 volatile StgWord recent_activity = ACTIVITY_YES;
108 /* if this flag is set as well, give up execution
109 * LOCK: none (changes monotonically)
111 volatile StgWord sched_state = SCHED_RUNNING;
113 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
114 * exists - earlier gccs apparently didn't.
120 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
121 * in an MT setting, needed to signal that a worker thread shouldn't hang around
122 * in the scheduler when it is out of work.
124 rtsBool shutting_down_scheduler = rtsFalse;
127 * This mutex protects most of the global scheduler data in
128 * the THREADED_RTS runtime.
130 #if defined(THREADED_RTS)
134 #if !defined(mingw32_HOST_OS)
135 #define FORKPROCESS_PRIMOP_SUPPORTED
138 /* -----------------------------------------------------------------------------
139 * static function prototypes
140 * -------------------------------------------------------------------------- */
142 static Capability *schedule (Capability *initialCapability, Task *task);
145 // These function all encapsulate parts of the scheduler loop, and are
146 // abstracted only to make the structure and control flow of the
147 // scheduler clearer.
149 static void schedulePreLoop (void);
150 static void scheduleFindWork (Capability *cap);
151 #if defined(THREADED_RTS)
152 static void scheduleYield (Capability **pcap, Task *task);
154 static void scheduleStartSignalHandlers (Capability *cap);
155 static void scheduleCheckBlockedThreads (Capability *cap);
156 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
157 static void scheduleCheckBlackHoles (Capability *cap);
158 static void scheduleDetectDeadlock (Capability *cap, Task *task);
159 static void schedulePushWork(Capability *cap, Task *task);
160 #if defined(PARALLEL_HASKELL)
161 static rtsBool scheduleGetRemoteWork(Capability *cap);
162 static void scheduleSendPendingMessages(void);
164 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
165 static void scheduleActivateSpark(Capability *cap);
167 static void schedulePostRunThread(Capability *cap, StgTSO *t);
168 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
169 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
171 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
172 nat prev_what_next );
173 static void scheduleHandleThreadBlocked( StgTSO *t );
174 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
176 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
177 static Capability *scheduleDoGC(Capability *cap, Task *task,
178 rtsBool force_major);
180 static rtsBool checkBlackHoles(Capability *cap);
182 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
183 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
185 static void deleteThread (Capability *cap, StgTSO *tso);
186 static void deleteAllThreads (Capability *cap);
188 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
189 static void deleteThread_(Capability *cap, StgTSO *tso);
193 static char *whatNext_strs[] = {
203 /* -----------------------------------------------------------------------------
204 * Putting a thread on the run queue: different scheduling policies
205 * -------------------------------------------------------------------------- */
208 addToRunQueue( Capability *cap, StgTSO *t )
210 #if defined(PARALLEL_HASKELL)
211 if (RtsFlags.ParFlags.doFairScheduling) {
212 // this does round-robin scheduling; good for concurrency
213 appendToRunQueue(cap,t);
215 // this does unfair scheduling; good for parallelism
216 pushOnRunQueue(cap,t);
219 // this does round-robin scheduling; good for concurrency
220 appendToRunQueue(cap,t);
224 /* ---------------------------------------------------------------------------
225 Main scheduling loop.
227 We use round-robin scheduling, each thread returning to the
228 scheduler loop when one of these conditions is detected:
231 * timer expires (thread yields)
237 In a GranSim setup this loop iterates over the global event queue.
238 This revolves around the global event queue, which determines what
239 to do next. Therefore, it's more complicated than either the
240 concurrent or the parallel (GUM) setup.
241 This version has been entirely removed (JB 2008/08).
244 GUM iterates over incoming messages.
245 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
246 and sends out a fish whenever it has nothing to do; in-between
247 doing the actual reductions (shared code below) it processes the
248 incoming messages and deals with delayed operations
249 (see PendingFetches).
250 This is not the ugliest code you could imagine, but it's bloody close.
252 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
253 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
254 as well as future GUM versions. This file has been refurbished to
255 only contain valid code, which is however incomplete, refers to
256 invalid includes etc.
258 ------------------------------------------------------------------------ */
261 schedule (Capability *initialCapability, Task *task)
265 StgThreadReturnCode ret;
266 #if defined(PARALLEL_HASKELL)
267 rtsBool receivedFinish = rtsFalse;
271 #if defined(THREADED_RTS)
272 rtsBool first = rtsTrue;
275 cap = initialCapability;
277 // Pre-condition: this task owns initialCapability.
278 // The sched_mutex is *NOT* held
279 // NB. on return, we still hold a capability.
281 debugTrace (DEBUG_sched,
282 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
283 task, initialCapability);
285 if (running_finalizers) {
286 errorBelch("error: a C finalizer called back into Haskell.\n"
287 " use Foreign.Concurrent.newForeignPtr for Haskell finalizers.");
288 stg_exit(EXIT_FAILURE);
293 // -----------------------------------------------------------
294 // Scheduler loop starts here:
296 #if defined(PARALLEL_HASKELL)
297 #define TERMINATION_CONDITION (!receivedFinish)
299 #define TERMINATION_CONDITION rtsTrue
302 while (TERMINATION_CONDITION) {
304 // Check whether we have re-entered the RTS from Haskell without
305 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
307 if (cap->in_haskell) {
308 errorBelch("schedule: re-entered unsafely.\n"
309 " Perhaps a 'foreign import unsafe' should be 'safe'?");
310 stg_exit(EXIT_FAILURE);
313 // The interruption / shutdown sequence.
315 // In order to cleanly shut down the runtime, we want to:
316 // * make sure that all main threads return to their callers
317 // with the state 'Interrupted'.
318 // * clean up all OS threads assocated with the runtime
319 // * free all memory etc.
321 // So the sequence for ^C goes like this:
323 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
324 // arranges for some Capability to wake up
326 // * all threads in the system are halted, and the zombies are
327 // placed on the run queue for cleaning up. We acquire all
328 // the capabilities in order to delete the threads, this is
329 // done by scheduleDoGC() for convenience (because GC already
330 // needs to acquire all the capabilities). We can't kill
331 // threads involved in foreign calls.
333 // * somebody calls shutdownHaskell(), which calls exitScheduler()
335 // * sched_state := SCHED_SHUTTING_DOWN
337 // * all workers exit when the run queue on their capability
338 // drains. All main threads will also exit when their TSO
339 // reaches the head of the run queue and they can return.
341 // * eventually all Capabilities will shut down, and the RTS can
344 // * We might be left with threads blocked in foreign calls,
345 // we should really attempt to kill these somehow (TODO);
347 switch (sched_state) {
350 case SCHED_INTERRUPTING:
351 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
352 #if defined(THREADED_RTS)
353 discardSparksCap(cap);
355 /* scheduleDoGC() deletes all the threads */
356 cap = scheduleDoGC(cap,task,rtsFalse);
358 // after scheduleDoGC(), we must be shutting down. Either some
359 // other Capability did the final GC, or we did it above,
360 // either way we can fall through to the SCHED_SHUTTING_DOWN
362 ASSERT(sched_state == SCHED_SHUTTING_DOWN);
365 case SCHED_SHUTTING_DOWN:
366 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
367 // If we are a worker, just exit. If we're a bound thread
368 // then we will exit below when we've removed our TSO from
370 if (task->tso == NULL && emptyRunQueue(cap)) {
375 barf("sched_state: %d", sched_state);
378 scheduleFindWork(cap);
380 /* work pushing, currently relevant only for THREADED_RTS:
381 (pushes threads, wakes up idle capabilities for stealing) */
382 schedulePushWork(cap,task);
384 #if defined(PARALLEL_HASKELL)
385 /* since we perform a blocking receive and continue otherwise,
386 either we never reach here or we definitely have work! */
387 // from here: non-empty run queue
388 ASSERT(!emptyRunQueue(cap));
390 if (PacketsWaiting()) { /* now process incoming messages, if any
393 CAUTION: scheduleGetRemoteWork called
394 above, waits for messages as well! */
395 processMessages(cap, &receivedFinish);
397 #endif // PARALLEL_HASKELL: non-empty run queue!
399 scheduleDetectDeadlock(cap,task);
401 #if defined(THREADED_RTS)
402 cap = task->cap; // reload cap, it might have changed
405 // Normally, the only way we can get here with no threads to
406 // run is if a keyboard interrupt received during
407 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
408 // Additionally, it is not fatal for the
409 // threaded RTS to reach here with no threads to run.
411 // win32: might be here due to awaitEvent() being abandoned
412 // as a result of a console event having been delivered.
414 #if defined(THREADED_RTS)
418 // // don't yield the first time, we want a chance to run this
419 // // thread for a bit, even if there are others banging at the
422 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
426 scheduleYield(&cap,task);
427 if (emptyRunQueue(cap)) continue; // look for work again
430 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
431 if ( emptyRunQueue(cap) ) {
432 ASSERT(sched_state >= SCHED_INTERRUPTING);
437 // Get a thread to run
439 t = popRunQueue(cap);
441 // Sanity check the thread we're about to run. This can be
442 // expensive if there is lots of thread switching going on...
443 IF_DEBUG(sanity,checkTSO(t));
445 #if defined(THREADED_RTS)
446 // Check whether we can run this thread in the current task.
447 // If not, we have to pass our capability to the right task.
449 Task *bound = t->bound;
453 debugTrace(DEBUG_sched,
454 "### Running thread %lu in bound thread", (unsigned long)t->id);
455 // yes, the Haskell thread is bound to the current native thread
457 debugTrace(DEBUG_sched,
458 "### thread %lu bound to another OS thread", (unsigned long)t->id);
459 // no, bound to a different Haskell thread: pass to that thread
460 pushOnRunQueue(cap,t);
464 // The thread we want to run is unbound.
466 debugTrace(DEBUG_sched,
467 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
468 // no, the current native thread is bound to a different
469 // Haskell thread, so pass it to any worker thread
470 pushOnRunQueue(cap,t);
477 // If we're shutting down, and this thread has not yet been
478 // killed, kill it now. This sometimes happens when a finalizer
479 // thread is created by the final GC, or a thread previously
480 // in a foreign call returns.
481 if (sched_state >= SCHED_INTERRUPTING &&
482 !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
486 /* context switches are initiated by the timer signal, unless
487 * the user specified "context switch as often as possible", with
490 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
491 && !emptyThreadQueues(cap)) {
492 cap->context_switch = 1;
497 // CurrentTSO is the thread to run. t might be different if we
498 // loop back to run_thread, so make sure to set CurrentTSO after
500 cap->r.rCurrentTSO = t;
502 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
503 (long)t->id, whatNext_strs[t->what_next]);
505 startHeapProfTimer();
507 // Check for exceptions blocked on this thread
508 maybePerformBlockedException (cap, t);
510 // ----------------------------------------------------------------------
511 // Run the current thread
513 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
514 ASSERT(t->cap == cap);
515 ASSERT(t->bound ? t->bound->cap == cap : 1);
517 prev_what_next = t->what_next;
519 errno = t->saved_errno;
521 SetLastError(t->saved_winerror);
524 cap->in_haskell = rtsTrue;
528 #if defined(THREADED_RTS)
529 if (recent_activity == ACTIVITY_DONE_GC) {
530 // ACTIVITY_DONE_GC means we turned off the timer signal to
531 // conserve power (see #1623). Re-enable it here.
533 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
534 if (prev == ACTIVITY_DONE_GC) {
538 recent_activity = ACTIVITY_YES;
542 switch (prev_what_next) {
546 /* Thread already finished, return to scheduler. */
547 ret = ThreadFinished;
553 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
554 cap = regTableToCapability(r);
559 case ThreadInterpret:
560 cap = interpretBCO(cap);
565 barf("schedule: invalid what_next field");
568 cap->in_haskell = rtsFalse;
570 // The TSO might have moved, eg. if it re-entered the RTS and a GC
571 // happened. So find the new location:
572 t = cap->r.rCurrentTSO;
574 // We have run some Haskell code: there might be blackhole-blocked
575 // threads to wake up now.
576 // Lock-free test here should be ok, we're just setting a flag.
577 if ( blackhole_queue != END_TSO_QUEUE ) {
578 blackholes_need_checking = rtsTrue;
581 // And save the current errno in this thread.
582 // XXX: possibly bogus for SMP because this thread might already
583 // be running again, see code below.
584 t->saved_errno = errno;
586 // Similarly for Windows error code
587 t->saved_winerror = GetLastError();
590 #if defined(THREADED_RTS)
591 // If ret is ThreadBlocked, and this Task is bound to the TSO that
592 // blocked, we are in limbo - the TSO is now owned by whatever it
593 // is blocked on, and may in fact already have been woken up,
594 // perhaps even on a different Capability. It may be the case
595 // that task->cap != cap. We better yield this Capability
596 // immediately and return to normaility.
597 if (ret == ThreadBlocked) {
598 debugTrace(DEBUG_sched,
599 "--<< thread %lu (%s) stopped: blocked",
600 (unsigned long)t->id, whatNext_strs[t->what_next]);
605 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
606 ASSERT(t->cap == cap);
608 // ----------------------------------------------------------------------
610 // Costs for the scheduler are assigned to CCS_SYSTEM
612 #if defined(PROFILING)
616 schedulePostRunThread(cap,t);
618 t = threadStackUnderflow(task,t);
620 ready_to_gc = rtsFalse;
624 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
628 scheduleHandleStackOverflow(cap,task,t);
632 if (scheduleHandleYield(cap, t, prev_what_next)) {
633 // shortcut for switching between compiler/interpreter:
639 scheduleHandleThreadBlocked(t);
643 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
644 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
648 barf("schedule: invalid thread return code %d", (int)ret);
651 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
652 cap = scheduleDoGC(cap,task,rtsFalse);
654 } /* end of while() */
657 /* ----------------------------------------------------------------------------
658 * Setting up the scheduler loop
659 * ------------------------------------------------------------------------- */
662 schedulePreLoop(void)
664 // initialisation for scheduler - what cannot go into initScheduler()
667 /* -----------------------------------------------------------------------------
670 * Search for work to do, and handle messages from elsewhere.
671 * -------------------------------------------------------------------------- */
674 scheduleFindWork (Capability *cap)
676 scheduleStartSignalHandlers(cap);
678 // Only check the black holes here if we've nothing else to do.
679 // During normal execution, the black hole list only gets checked
680 // at GC time, to avoid repeatedly traversing this possibly long
681 // list each time around the scheduler.
682 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
684 scheduleCheckWakeupThreads(cap);
686 scheduleCheckBlockedThreads(cap);
688 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
689 if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
692 #if defined(PARALLEL_HASKELL)
693 // if messages have been buffered...
694 scheduleSendPendingMessages();
697 #if defined(PARALLEL_HASKELL)
698 if (emptyRunQueue(cap)) {
699 receivedFinish = scheduleGetRemoteWork(cap);
700 continue; // a new round, (hopefully) with new work
702 in GUM, this a) sends out a FISH and returns IF no fish is
704 b) (blocking) awaits and receives messages
706 in Eden, this is only the blocking receive, as b) in GUM.
712 #if defined(THREADED_RTS)
713 STATIC_INLINE rtsBool
714 shouldYieldCapability (Capability *cap, Task *task)
716 // we need to yield this capability to someone else if..
717 // - another thread is initiating a GC
718 // - another Task is returning from a foreign call
719 // - the thread at the head of the run queue cannot be run
720 // by this Task (it is bound to another Task, or it is unbound
721 // and this task it bound).
722 return (waiting_for_gc ||
723 cap->returning_tasks_hd != NULL ||
724 (!emptyRunQueue(cap) && (task->tso == NULL
725 ? cap->run_queue_hd->bound != NULL
726 : cap->run_queue_hd->bound != task)));
729 // This is the single place where a Task goes to sleep. There are
730 // two reasons it might need to sleep:
731 // - there are no threads to run
732 // - we need to yield this Capability to someone else
733 // (see shouldYieldCapability())
735 // Careful: the scheduler loop is quite delicate. Make sure you run
736 // the tests in testsuite/concurrent (all ways) after modifying this,
737 // and also check the benchmarks in nofib/parallel for regressions.
740 scheduleYield (Capability **pcap, Task *task)
742 Capability *cap = *pcap;
744 // if we have work, and we don't need to give up the Capability, continue.
745 if (!shouldYieldCapability(cap,task) &&
746 (!emptyRunQueue(cap) ||
747 !emptyWakeupQueue(cap) ||
748 blackholes_need_checking ||
749 sched_state >= SCHED_INTERRUPTING))
752 // otherwise yield (sleep), and keep yielding if necessary.
754 yieldCapability(&cap,task);
756 while (shouldYieldCapability(cap,task));
758 // note there may still be no threads on the run queue at this
759 // point, the caller has to check.
766 /* -----------------------------------------------------------------------------
769 * Push work to other Capabilities if we have some.
770 * -------------------------------------------------------------------------- */
773 schedulePushWork(Capability *cap USED_IF_THREADS,
774 Task *task USED_IF_THREADS)
776 /* following code not for PARALLEL_HASKELL. I kept the call general,
777 future GUM versions might use pushing in a distributed setup */
778 #if defined(THREADED_RTS)
780 Capability *free_caps[n_capabilities], *cap0;
783 // migration can be turned off with +RTS -qg
784 if (!RtsFlags.ParFlags.migrate) return;
786 // Check whether we have more threads on our run queue, or sparks
787 // in our pool, that we could hand to another Capability.
788 if (cap->run_queue_hd == END_TSO_QUEUE) {
789 if (sparkPoolSizeCap(cap) < 2) return;
791 if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
792 sparkPoolSizeCap(cap) < 1) return;
795 // First grab as many free Capabilities as we can.
796 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
797 cap0 = &capabilities[i];
798 if (cap != cap0 && tryGrabCapability(cap0,task)) {
799 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
800 // it already has some work, we just grabbed it at
801 // the wrong moment. Or maybe it's deadlocked!
802 releaseCapability(cap0);
804 free_caps[n_free_caps++] = cap0;
809 // we now have n_free_caps free capabilities stashed in
810 // free_caps[]. Share our run queue equally with them. This is
811 // probably the simplest thing we could do; improvements we might
812 // want to do include:
814 // - giving high priority to moving relatively new threads, on
815 // the gournds that they haven't had time to build up a
816 // working set in the cache on this CPU/Capability.
818 // - giving low priority to moving long-lived threads
820 if (n_free_caps > 0) {
821 StgTSO *prev, *t, *next;
822 rtsBool pushed_to_all;
824 debugTrace(DEBUG_sched,
825 "cap %d: %s and %d free capabilities, sharing...",
827 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
828 "excess threads on run queue":"sparks to share (>=2)",
832 pushed_to_all = rtsFalse;
834 if (cap->run_queue_hd != END_TSO_QUEUE) {
835 prev = cap->run_queue_hd;
837 prev->_link = END_TSO_QUEUE;
838 for (; t != END_TSO_QUEUE; t = next) {
840 t->_link = END_TSO_QUEUE;
841 if (t->what_next == ThreadRelocated
842 || t->bound == task // don't move my bound thread
843 || tsoLocked(t)) { // don't move a locked thread
844 setTSOLink(cap, prev, t);
846 } else if (i == n_free_caps) {
847 pushed_to_all = rtsTrue;
850 setTSOLink(cap, prev, t);
853 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
854 appendToRunQueue(free_caps[i],t);
855 if (t->bound) { t->bound->cap = free_caps[i]; }
856 t->cap = free_caps[i];
860 cap->run_queue_tl = prev;
864 /* JB I left this code in place, it would work but is not necessary */
866 // If there are some free capabilities that we didn't push any
867 // threads to, then try to push a spark to each one.
868 if (!pushed_to_all) {
870 // i is the next free capability to push to
871 for (; i < n_free_caps; i++) {
872 if (emptySparkPoolCap(free_caps[i])) {
873 spark = tryStealSpark(cap->sparks);
875 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
876 newSpark(&(free_caps[i]->r), spark);
881 #endif /* SPARK_PUSHING */
883 // release the capabilities
884 for (i = 0; i < n_free_caps; i++) {
885 task->cap = free_caps[i];
886 releaseAndWakeupCapability(free_caps[i]);
889 task->cap = cap; // reset to point to our Capability.
891 #endif /* THREADED_RTS */
895 /* ----------------------------------------------------------------------------
896 * Start any pending signal handlers
897 * ------------------------------------------------------------------------- */
899 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
901 scheduleStartSignalHandlers(Capability *cap)
903 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
904 // safe outside the lock
905 startSignalHandlers(cap);
910 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
915 /* ----------------------------------------------------------------------------
916 * Check for blocked threads that can be woken up.
917 * ------------------------------------------------------------------------- */
920 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
922 #if !defined(THREADED_RTS)
924 // Check whether any waiting threads need to be woken up. If the
925 // run queue is empty, and there are no other tasks running, we
926 // can wait indefinitely for something to happen.
928 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
930 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
936 /* ----------------------------------------------------------------------------
937 * Check for threads woken up by other Capabilities
938 * ------------------------------------------------------------------------- */
941 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
943 #if defined(THREADED_RTS)
944 // Any threads that were woken up by other Capabilities get
945 // appended to our run queue.
946 if (!emptyWakeupQueue(cap)) {
947 ACQUIRE_LOCK(&cap->lock);
948 if (emptyRunQueue(cap)) {
949 cap->run_queue_hd = cap->wakeup_queue_hd;
950 cap->run_queue_tl = cap->wakeup_queue_tl;
952 setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
953 cap->run_queue_tl = cap->wakeup_queue_tl;
955 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
956 RELEASE_LOCK(&cap->lock);
961 /* ----------------------------------------------------------------------------
962 * Check for threads blocked on BLACKHOLEs that can be woken up
963 * ------------------------------------------------------------------------- */
965 scheduleCheckBlackHoles (Capability *cap)
967 if ( blackholes_need_checking ) // check without the lock first
969 ACQUIRE_LOCK(&sched_mutex);
970 if ( blackholes_need_checking ) {
971 blackholes_need_checking = rtsFalse;
972 // important that we reset the flag *before* checking the
973 // blackhole queue, otherwise we could get deadlock. This
974 // happens as follows: we wake up a thread that
975 // immediately runs on another Capability, blocks on a
976 // blackhole, and then we reset the blackholes_need_checking flag.
977 checkBlackHoles(cap);
979 RELEASE_LOCK(&sched_mutex);
983 /* ----------------------------------------------------------------------------
984 * Detect deadlock conditions and attempt to resolve them.
985 * ------------------------------------------------------------------------- */
988 scheduleDetectDeadlock (Capability *cap, Task *task)
991 #if defined(PARALLEL_HASKELL)
992 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
997 * Detect deadlock: when we have no threads to run, there are no
998 * threads blocked, waiting for I/O, or sleeping, and all the
999 * other tasks are waiting for work, we must have a deadlock of
1002 if ( emptyThreadQueues(cap) )
1004 #if defined(THREADED_RTS)
1006 * In the threaded RTS, we only check for deadlock if there
1007 * has been no activity in a complete timeslice. This means
1008 * we won't eagerly start a full GC just because we don't have
1009 * any threads to run currently.
1011 if (recent_activity != ACTIVITY_INACTIVE) return;
1014 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
1016 // Garbage collection can release some new threads due to
1017 // either (a) finalizers or (b) threads resurrected because
1018 // they are unreachable and will therefore be sent an
1019 // exception. Any threads thus released will be immediately
1021 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
1022 // when force_major == rtsTrue. scheduleDoGC sets
1023 // recent_activity to ACTIVITY_DONE_GC and turns off the timer
1026 if ( !emptyRunQueue(cap) ) return;
1028 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
1029 /* If we have user-installed signal handlers, then wait
1030 * for signals to arrive rather then bombing out with a
1033 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1034 debugTrace(DEBUG_sched,
1035 "still deadlocked, waiting for signals...");
1039 if (signals_pending()) {
1040 startSignalHandlers(cap);
1043 // either we have threads to run, or we were interrupted:
1044 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1050 #if !defined(THREADED_RTS)
1051 /* Probably a real deadlock. Send the current main thread the
1052 * Deadlock exception.
1055 switch (task->tso->why_blocked) {
1057 case BlockedOnBlackHole:
1058 case BlockedOnException:
1060 throwToSingleThreaded(cap, task->tso,
1061 (StgClosure *)nonTermination_closure);
1064 barf("deadlock: main thread blocked in a strange way");
1073 /* ----------------------------------------------------------------------------
1074 * Send pending messages (PARALLEL_HASKELL only)
1075 * ------------------------------------------------------------------------- */
1077 #if defined(PARALLEL_HASKELL)
1079 scheduleSendPendingMessages(void)
1082 # if defined(PAR) // global Mem.Mgmt., omit for now
1083 if (PendingFetches != END_BF_QUEUE) {
1088 if (RtsFlags.ParFlags.BufferTime) {
1089 // if we use message buffering, we must send away all message
1090 // packets which have become too old...
1096 /* ----------------------------------------------------------------------------
1097 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1098 * ------------------------------------------------------------------------- */
1100 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1102 scheduleActivateSpark(Capability *cap)
1106 createSparkThread(cap);
1107 debugTrace(DEBUG_sched, "creating a spark thread");
1110 #endif // PARALLEL_HASKELL || THREADED_RTS
1112 /* ----------------------------------------------------------------------------
1113 * Get work from a remote node (PARALLEL_HASKELL only)
1114 * ------------------------------------------------------------------------- */
1116 #if defined(PARALLEL_HASKELL)
1117 static rtsBool /* return value used in PARALLEL_HASKELL only */
1118 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1120 #if defined(PARALLEL_HASKELL)
1121 rtsBool receivedFinish = rtsFalse;
1123 // idle() , i.e. send all buffers, wait for work
1124 if (RtsFlags.ParFlags.BufferTime) {
1125 IF_PAR_DEBUG(verbose,
1126 debugBelch("...send all pending data,"));
1129 for (i=1; i<=nPEs; i++)
1130 sendImmediately(i); // send all messages away immediately
1134 /* this would be the place for fishing in GUM...
1136 if (no-earlier-fish-around)
1137 sendFish(choosePe());
1140 // Eden:just look for incoming messages (blocking receive)
1141 IF_PAR_DEBUG(verbose,
1142 debugBelch("...wait for incoming messages...\n"));
1143 processMessages(cap, &receivedFinish); // blocking receive...
1146 return receivedFinish;
1147 // reenter scheduling look after having received something
1149 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1151 return rtsFalse; /* return value unused in THREADED_RTS */
1153 #endif /* PARALLEL_HASKELL */
1155 #endif // PARALLEL_HASKELL || THREADED_RTS
1157 /* ----------------------------------------------------------------------------
1158 * After running a thread...
1159 * ------------------------------------------------------------------------- */
1162 schedulePostRunThread (Capability *cap, StgTSO *t)
1164 // We have to be able to catch transactions that are in an
1165 // infinite loop as a result of seeing an inconsistent view of
1169 // [a,b] <- mapM readTVar [ta,tb]
1170 // when (a == b) loop
1172 // and a is never equal to b given a consistent view of memory.
1174 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1175 if (!stmValidateNestOfTransactions (t -> trec)) {
1176 debugTrace(DEBUG_sched | DEBUG_stm,
1177 "trec %p found wasting its time", t);
1179 // strip the stack back to the
1180 // ATOMICALLY_FRAME, aborting the (nested)
1181 // transaction, and saving the stack of any
1182 // partially-evaluated thunks on the heap.
1183 throwToSingleThreaded_(cap, t, NULL, rtsTrue);
1185 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1189 /* some statistics gathering in the parallel case */
1192 /* -----------------------------------------------------------------------------
1193 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1194 * -------------------------------------------------------------------------- */
1197 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1199 // did the task ask for a large block?
1200 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1201 // if so, get one and push it on the front of the nursery.
1205 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1207 debugTrace(DEBUG_sched,
1208 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1209 (long)t->id, whatNext_strs[t->what_next], blocks);
1211 // don't do this if the nursery is (nearly) full, we'll GC first.
1212 if (cap->r.rCurrentNursery->link != NULL ||
1213 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1214 // if the nursery has only one block.
1217 bd = allocGroup( blocks );
1219 cap->r.rNursery->n_blocks += blocks;
1221 // link the new group into the list
1222 bd->link = cap->r.rCurrentNursery;
1223 bd->u.back = cap->r.rCurrentNursery->u.back;
1224 if (cap->r.rCurrentNursery->u.back != NULL) {
1225 cap->r.rCurrentNursery->u.back->link = bd;
1227 #if !defined(THREADED_RTS)
1228 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1229 g0s0 == cap->r.rNursery);
1231 cap->r.rNursery->blocks = bd;
1233 cap->r.rCurrentNursery->u.back = bd;
1235 // initialise it as a nursery block. We initialise the
1236 // step, gen_no, and flags field of *every* sub-block in
1237 // this large block, because this is easier than making
1238 // sure that we always find the block head of a large
1239 // block whenever we call Bdescr() (eg. evacuate() and
1240 // isAlive() in the GC would both have to do this, at
1244 for (x = bd; x < bd + blocks; x++) {
1245 x->step = cap->r.rNursery;
1251 // This assert can be a killer if the app is doing lots
1252 // of large block allocations.
1253 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1255 // now update the nursery to point to the new block
1256 cap->r.rCurrentNursery = bd;
1258 // we might be unlucky and have another thread get on the
1259 // run queue before us and steal the large block, but in that
1260 // case the thread will just end up requesting another large
1262 pushOnRunQueue(cap,t);
1263 return rtsFalse; /* not actually GC'ing */
1267 debugTrace(DEBUG_sched,
1268 "--<< thread %ld (%s) stopped: HeapOverflow",
1269 (long)t->id, whatNext_strs[t->what_next]);
1271 if (cap->context_switch) {
1272 // Sometimes we miss a context switch, e.g. when calling
1273 // primitives in a tight loop, MAYBE_GC() doesn't check the
1274 // context switch flag, and we end up waiting for a GC.
1275 // See #1984, and concurrent/should_run/1984
1276 cap->context_switch = 0;
1277 addToRunQueue(cap,t);
1279 pushOnRunQueue(cap,t);
1282 /* actual GC is done at the end of the while loop in schedule() */
1285 /* -----------------------------------------------------------------------------
1286 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1287 * -------------------------------------------------------------------------- */
1290 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1292 debugTrace (DEBUG_sched,
1293 "--<< thread %ld (%s) stopped, StackOverflow",
1294 (long)t->id, whatNext_strs[t->what_next]);
1296 /* just adjust the stack for this thread, then pop it back
1300 /* enlarge the stack */
1301 StgTSO *new_t = threadStackOverflow(cap, t);
1303 /* The TSO attached to this Task may have moved, so update the
1306 if (task->tso == t) {
1309 pushOnRunQueue(cap,new_t);
1313 /* -----------------------------------------------------------------------------
1314 * Handle a thread that returned to the scheduler with ThreadYielding
1315 * -------------------------------------------------------------------------- */
1318 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1320 // Reset the context switch flag. We don't do this just before
1321 // running the thread, because that would mean we would lose ticks
1322 // during GC, which can lead to unfair scheduling (a thread hogs
1323 // the CPU because the tick always arrives during GC). This way
1324 // penalises threads that do a lot of allocation, but that seems
1325 // better than the alternative.
1326 cap->context_switch = 0;
1328 /* put the thread back on the run queue. Then, if we're ready to
1329 * GC, check whether this is the last task to stop. If so, wake
1330 * up the GC thread. getThread will block during a GC until the
1334 if (t->what_next != prev_what_next) {
1335 debugTrace(DEBUG_sched,
1336 "--<< thread %ld (%s) stopped to switch evaluators",
1337 (long)t->id, whatNext_strs[t->what_next]);
1339 debugTrace(DEBUG_sched,
1340 "--<< thread %ld (%s) stopped, yielding",
1341 (long)t->id, whatNext_strs[t->what_next]);
1346 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1348 ASSERT(t->_link == END_TSO_QUEUE);
1350 // Shortcut if we're just switching evaluators: don't bother
1351 // doing stack squeezing (which can be expensive), just run the
1353 if (t->what_next != prev_what_next) {
1357 addToRunQueue(cap,t);
1362 /* -----------------------------------------------------------------------------
1363 * Handle a thread that returned to the scheduler with ThreadBlocked
1364 * -------------------------------------------------------------------------- */
1367 scheduleHandleThreadBlocked( StgTSO *t
1368 #if !defined(GRAN) && !defined(DEBUG)
1374 // We don't need to do anything. The thread is blocked, and it
1375 // has tidied up its stack and placed itself on whatever queue
1376 // it needs to be on.
1378 // ASSERT(t->why_blocked != NotBlocked);
1379 // Not true: for example,
1380 // - in THREADED_RTS, the thread may already have been woken
1381 // up by another Capability. This actually happens: try
1382 // conc023 +RTS -N2.
1383 // - the thread may have woken itself up already, because
1384 // threadPaused() might have raised a blocked throwTo
1385 // exception, see maybePerformBlockedException().
1388 if (traceClass(DEBUG_sched)) {
1389 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1390 (unsigned long)t->id, whatNext_strs[t->what_next]);
1391 printThreadBlockage(t);
1397 /* -----------------------------------------------------------------------------
1398 * Handle a thread that returned to the scheduler with ThreadFinished
1399 * -------------------------------------------------------------------------- */
1402 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1404 /* Need to check whether this was a main thread, and if so,
1405 * return with the return value.
1407 * We also end up here if the thread kills itself with an
1408 * uncaught exception, see Exception.cmm.
1410 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1411 (unsigned long)t->id, whatNext_strs[t->what_next]);
1413 // blocked exceptions can now complete, even if the thread was in
1414 // blocked mode (see #2910). This unconditionally calls
1415 // lockTSO(), which ensures that we don't miss any threads that
1416 // are engaged in throwTo() with this thread as a target.
1417 awakenBlockedExceptionQueue (cap, t);
1420 // Check whether the thread that just completed was a bound
1421 // thread, and if so return with the result.
1423 // There is an assumption here that all thread completion goes
1424 // through this point; we need to make sure that if a thread
1425 // ends up in the ThreadKilled state, that it stays on the run
1426 // queue so it can be dealt with here.
1431 if (t->bound != task) {
1432 #if !defined(THREADED_RTS)
1433 // Must be a bound thread that is not the topmost one. Leave
1434 // it on the run queue until the stack has unwound to the
1435 // point where we can deal with this. Leaving it on the run
1436 // queue also ensures that the garbage collector knows about
1437 // this thread and its return value (it gets dropped from the
1438 // step->threads list so there's no other way to find it).
1439 appendToRunQueue(cap,t);
1442 // this cannot happen in the threaded RTS, because a
1443 // bound thread can only be run by the appropriate Task.
1444 barf("finished bound thread that isn't mine");
1448 ASSERT(task->tso == t);
1450 if (t->what_next == ThreadComplete) {
1452 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1453 *(task->ret) = (StgClosure *)task->tso->sp[1];
1455 task->stat = Success;
1458 *(task->ret) = NULL;
1460 if (sched_state >= SCHED_INTERRUPTING) {
1461 if (heap_overflow) {
1462 task->stat = HeapExhausted;
1464 task->stat = Interrupted;
1467 task->stat = Killed;
1471 removeThreadLabel((StgWord)task->tso->id);
1473 return rtsTrue; // tells schedule() to return
1479 /* -----------------------------------------------------------------------------
1480 * Perform a heap census
1481 * -------------------------------------------------------------------------- */
1484 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1486 // When we have +RTS -i0 and we're heap profiling, do a census at
1487 // every GC. This lets us get repeatable runs for debugging.
1488 if (performHeapProfile ||
1489 (RtsFlags.ProfFlags.profileInterval==0 &&
1490 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1497 /* -----------------------------------------------------------------------------
1498 * Perform a garbage collection if necessary
1499 * -------------------------------------------------------------------------- */
1502 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1504 rtsBool heap_census;
1506 /* extern static volatile StgWord waiting_for_gc;
1507 lives inside capability.c */
1508 rtsBool gc_type, prev_pending_gc;
1512 if (sched_state == SCHED_SHUTTING_DOWN) {
1513 // The final GC has already been done, and the system is
1514 // shutting down. We'll probably deadlock if we try to GC
1520 if (sched_state < SCHED_INTERRUPTING
1521 && RtsFlags.ParFlags.parGcEnabled
1522 && N >= RtsFlags.ParFlags.parGcGen
1523 && ! oldest_gen->steps[0].mark)
1525 gc_type = PENDING_GC_PAR;
1527 gc_type = PENDING_GC_SEQ;
1530 // In order to GC, there must be no threads running Haskell code.
1531 // Therefore, the GC thread needs to hold *all* the capabilities,
1532 // and release them after the GC has completed.
1534 // This seems to be the simplest way: previous attempts involved
1535 // making all the threads with capabilities give up their
1536 // capabilities and sleep except for the *last* one, which
1537 // actually did the GC. But it's quite hard to arrange for all
1538 // the other tasks to sleep and stay asleep.
1541 /* Other capabilities are prevented from running yet more Haskell
1542 threads if waiting_for_gc is set. Tested inside
1543 yieldCapability() and releaseCapability() in Capability.c */
1545 prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
1546 if (prev_pending_gc) {
1548 debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...",
1551 yieldCapability(&cap,task);
1552 } while (waiting_for_gc);
1553 return cap; // NOTE: task->cap might have changed here
1556 setContextSwitches();
1558 // The final shutdown GC is always single-threaded, because it's
1559 // possible that some of the Capabilities have no worker threads.
1561 if (gc_type == PENDING_GC_SEQ)
1563 // single-threaded GC: grab all the capabilities
1564 for (i=0; i < n_capabilities; i++) {
1565 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1566 if (cap != &capabilities[i]) {
1567 Capability *pcap = &capabilities[i];
1568 // we better hope this task doesn't get migrated to
1569 // another Capability while we're waiting for this one.
1570 // It won't, because load balancing happens while we have
1571 // all the Capabilities, but even so it's a slightly
1572 // unsavoury invariant.
1574 waitForReturnCapability(&pcap, task);
1575 if (pcap != &capabilities[i]) {
1576 barf("scheduleDoGC: got the wrong capability");
1583 // multi-threaded GC: make sure all the Capabilities donate one
1585 debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
1587 waitForGcThreads(cap);
1591 // so this happens periodically:
1592 if (cap) scheduleCheckBlackHoles(cap);
1594 IF_DEBUG(scheduler, printAllThreads());
1596 delete_threads_and_gc:
1598 * We now have all the capabilities; if we're in an interrupting
1599 * state, then we should take the opportunity to delete all the
1600 * threads in the system.
1602 if (sched_state == SCHED_INTERRUPTING) {
1603 deleteAllThreads(cap);
1604 sched_state = SCHED_SHUTTING_DOWN;
1607 heap_census = scheduleNeedHeapProfile(rtsTrue);
1609 if (recent_activity == ACTIVITY_INACTIVE && force_major)
1611 // We are doing a GC because the system has been idle for a
1612 // timeslice and we need to check for deadlock. Record the
1613 // fact that we've done a GC and turn off the timer signal;
1614 // it will get re-enabled if we run any threads after the GC.
1616 // Note: this is done before GC, because after GC there might
1617 // be threads already running (GarbageCollect() releases the
1618 // GC threads when it completes), so we risk turning off the
1619 // timer signal when it should really be on.
1620 recent_activity = ACTIVITY_DONE_GC;
1624 #if defined(THREADED_RTS)
1625 debugTrace(DEBUG_sched, "doing GC");
1626 // reset waiting_for_gc *before* GC, so that when the GC threads
1627 // emerge they don't immediately re-enter the GC.
1629 GarbageCollect(force_major || heap_census, gc_type, cap);
1631 GarbageCollect(force_major || heap_census, 0, cap);
1635 debugTrace(DEBUG_sched, "performing heap census");
1637 performHeapProfile = rtsFalse;
1640 if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
1641 // GC set the heap_overflow flag, so we should proceed with
1642 // an orderly shutdown now. Ultimately we want the main
1643 // thread to return to its caller with HeapExhausted, at which
1644 // point the caller should call hs_exit(). The first step is
1645 // to delete all the threads.
1647 // Another way to do this would be to raise an exception in
1648 // the main thread, which we really should do because it gives
1649 // the program a chance to clean up. But how do we find the
1650 // main thread? It should presumably be the same one that
1651 // gets ^C exceptions, but that's all done on the Haskell side
1652 // (GHC.TopHandler).
1653 sched_state = SCHED_INTERRUPTING;
1654 goto delete_threads_and_gc;
1659 Once we are all together... this would be the place to balance all
1660 spark pools. No concurrent stealing or adding of new sparks can
1661 occur. Should be defined in Sparks.c. */
1662 balanceSparkPoolsCaps(n_capabilities, capabilities);
1665 #if defined(THREADED_RTS)
1666 if (gc_type == PENDING_GC_SEQ) {
1667 // release our stash of capabilities.
1668 for (i = 0; i < n_capabilities; i++) {
1669 if (cap != &capabilities[i]) {
1670 task->cap = &capabilities[i];
1671 releaseCapability(&capabilities[i]);
1685 /* ---------------------------------------------------------------------------
1686 * Singleton fork(). Do not copy any running threads.
1687 * ------------------------------------------------------------------------- */
1690 forkProcess(HsStablePtr *entry
1691 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1696 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1703 #if defined(THREADED_RTS)
1704 if (RtsFlags.ParFlags.nNodes > 1) {
1705 errorBelch("forking not supported with +RTS -N<n> greater than 1");
1706 stg_exit(EXIT_FAILURE);
1710 debugTrace(DEBUG_sched, "forking!");
1712 // ToDo: for SMP, we should probably acquire *all* the capabilities
1715 // no funny business: hold locks while we fork, otherwise if some
1716 // other thread is holding a lock when the fork happens, the data
1717 // structure protected by the lock will forever be in an
1718 // inconsistent state in the child. See also #1391.
1719 ACQUIRE_LOCK(&sched_mutex);
1720 ACQUIRE_LOCK(&cap->lock);
1721 ACQUIRE_LOCK(&cap->running_task->lock);
1725 if (pid) { // parent
1727 RELEASE_LOCK(&sched_mutex);
1728 RELEASE_LOCK(&cap->lock);
1729 RELEASE_LOCK(&cap->running_task->lock);
1731 // just return the pid
1737 #if defined(THREADED_RTS)
1738 initMutex(&sched_mutex);
1739 initMutex(&cap->lock);
1740 initMutex(&cap->running_task->lock);
1743 // Now, all OS threads except the thread that forked are
1744 // stopped. We need to stop all Haskell threads, including
1745 // those involved in foreign calls. Also we need to delete
1746 // all Tasks, because they correspond to OS threads that are
1749 for (s = 0; s < total_steps; s++) {
1750 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1751 if (t->what_next == ThreadRelocated) {
1754 next = t->global_link;
1755 // don't allow threads to catch the ThreadKilled
1756 // exception, but we do want to raiseAsync() because these
1757 // threads may be evaluating thunks that we need later.
1758 deleteThread_(cap,t);
1763 // Empty the run queue. It seems tempting to let all the
1764 // killed threads stay on the run queue as zombies to be
1765 // cleaned up later, but some of them correspond to bound
1766 // threads for which the corresponding Task does not exist.
1767 cap->run_queue_hd = END_TSO_QUEUE;
1768 cap->run_queue_tl = END_TSO_QUEUE;
1770 // Any suspended C-calling Tasks are no more, their OS threads
1772 cap->suspended_ccalling_tasks = NULL;
1774 // Empty the threads lists. Otherwise, the garbage
1775 // collector may attempt to resurrect some of these threads.
1776 for (s = 0; s < total_steps; s++) {
1777 all_steps[s].threads = END_TSO_QUEUE;
1780 // Wipe the task list, except the current Task.
1781 ACQUIRE_LOCK(&sched_mutex);
1782 for (task = all_tasks; task != NULL; task=task->all_link) {
1783 if (task != cap->running_task) {
1784 #if defined(THREADED_RTS)
1785 initMutex(&task->lock); // see #1391
1790 RELEASE_LOCK(&sched_mutex);
1792 #if defined(THREADED_RTS)
1793 // Wipe our spare workers list, they no longer exist. New
1794 // workers will be created if necessary.
1795 cap->spare_workers = NULL;
1796 cap->returning_tasks_hd = NULL;
1797 cap->returning_tasks_tl = NULL;
1800 // On Unix, all timers are reset in the child, so we need to start
1805 cap = rts_evalStableIO(cap, entry, NULL); // run the action
1806 rts_checkSchedStatus("forkProcess",cap);
1809 hs_exit(); // clean up and exit
1810 stg_exit(EXIT_SUCCESS);
1812 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1813 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1818 /* ---------------------------------------------------------------------------
1819 * Delete all the threads in the system
1820 * ------------------------------------------------------------------------- */
1823 deleteAllThreads ( Capability *cap )
1825 // NOTE: only safe to call if we own all capabilities.
1830 debugTrace(DEBUG_sched,"deleting all threads");
1831 for (s = 0; s < total_steps; s++) {
1832 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1833 if (t->what_next == ThreadRelocated) {
1836 next = t->global_link;
1837 deleteThread(cap,t);
1842 // The run queue now contains a bunch of ThreadKilled threads. We
1843 // must not throw these away: the main thread(s) will be in there
1844 // somewhere, and the main scheduler loop has to deal with it.
1845 // Also, the run queue is the only thing keeping these threads from
1846 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1848 #if !defined(THREADED_RTS)
1849 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1850 ASSERT(sleeping_queue == END_TSO_QUEUE);
1854 /* -----------------------------------------------------------------------------
1855 Managing the suspended_ccalling_tasks list.
1856 Locks required: sched_mutex
1857 -------------------------------------------------------------------------- */
1860 suspendTask (Capability *cap, Task *task)
1862 ASSERT(task->next == NULL && task->prev == NULL);
1863 task->next = cap->suspended_ccalling_tasks;
1865 if (cap->suspended_ccalling_tasks) {
1866 cap->suspended_ccalling_tasks->prev = task;
1868 cap->suspended_ccalling_tasks = task;
1872 recoverSuspendedTask (Capability *cap, Task *task)
1875 task->prev->next = task->next;
1877 ASSERT(cap->suspended_ccalling_tasks == task);
1878 cap->suspended_ccalling_tasks = task->next;
1881 task->next->prev = task->prev;
1883 task->next = task->prev = NULL;
1886 /* ---------------------------------------------------------------------------
1887 * Suspending & resuming Haskell threads.
1889 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1890 * its capability before calling the C function. This allows another
1891 * task to pick up the capability and carry on running Haskell
1892 * threads. It also means that if the C call blocks, it won't lock
1895 * The Haskell thread making the C call is put to sleep for the
1896 * duration of the call, on the susepended_ccalling_threads queue. We
1897 * give out a token to the task, which it can use to resume the thread
1898 * on return from the C function.
1899 * ------------------------------------------------------------------------- */
1902 suspendThread (StgRegTable *reg)
1909 StgWord32 saved_winerror;
1912 saved_errno = errno;
1914 saved_winerror = GetLastError();
1917 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1919 cap = regTableToCapability(reg);
1921 task = cap->running_task;
1922 tso = cap->r.rCurrentTSO;
1924 debugTrace(DEBUG_sched,
1925 "thread %lu did a safe foreign call",
1926 (unsigned long)cap->r.rCurrentTSO->id);
1928 // XXX this might not be necessary --SDM
1929 tso->what_next = ThreadRunGHC;
1931 threadPaused(cap,tso);
1933 if ((tso->flags & TSO_BLOCKEX) == 0) {
1934 tso->why_blocked = BlockedOnCCall;
1935 tso->flags |= TSO_BLOCKEX;
1936 tso->flags &= ~TSO_INTERRUPTIBLE;
1938 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1941 // Hand back capability
1942 task->suspended_tso = tso;
1944 ACQUIRE_LOCK(&cap->lock);
1946 suspendTask(cap,task);
1947 cap->in_haskell = rtsFalse;
1948 releaseCapability_(cap,rtsFalse);
1950 RELEASE_LOCK(&cap->lock);
1952 #if defined(THREADED_RTS)
1953 /* Preparing to leave the RTS, so ensure there's a native thread/task
1954 waiting to take over.
1956 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1959 errno = saved_errno;
1961 SetLastError(saved_winerror);
1967 resumeThread (void *task_)
1974 StgWord32 saved_winerror;
1977 saved_errno = errno;
1979 saved_winerror = GetLastError();
1983 // Wait for permission to re-enter the RTS with the result.
1984 waitForReturnCapability(&cap,task);
1985 // we might be on a different capability now... but if so, our
1986 // entry on the suspended_ccalling_tasks list will also have been
1989 // Remove the thread from the suspended list
1990 recoverSuspendedTask(cap,task);
1992 tso = task->suspended_tso;
1993 task->suspended_tso = NULL;
1994 tso->_link = END_TSO_QUEUE; // no write barrier reqd
1995 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1997 if (tso->why_blocked == BlockedOnCCall) {
1998 // avoid locking the TSO if we don't have to
1999 if (tso->blocked_exceptions != END_TSO_QUEUE) {
2000 awakenBlockedExceptionQueue(cap,tso);
2002 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2005 /* Reset blocking status */
2006 tso->why_blocked = NotBlocked;
2008 cap->r.rCurrentTSO = tso;
2009 cap->in_haskell = rtsTrue;
2010 errno = saved_errno;
2012 SetLastError(saved_winerror);
2015 /* We might have GC'd, mark the TSO dirty again */
2018 IF_DEBUG(sanity, checkTSO(tso));
2023 /* ---------------------------------------------------------------------------
2026 * scheduleThread puts a thread on the end of the runnable queue.
2027 * This will usually be done immediately after a thread is created.
2028 * The caller of scheduleThread must create the thread using e.g.
2029 * createThread and push an appropriate closure
2030 * on this thread's stack before the scheduler is invoked.
2031 * ------------------------------------------------------------------------ */
2034 scheduleThread(Capability *cap, StgTSO *tso)
2036 // The thread goes at the *end* of the run-queue, to avoid possible
2037 // starvation of any threads already on the queue.
2038 appendToRunQueue(cap,tso);
2042 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2044 #if defined(THREADED_RTS)
2045 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2046 // move this thread from now on.
2047 cpu %= RtsFlags.ParFlags.nNodes;
2048 if (cpu == cap->no) {
2049 appendToRunQueue(cap,tso);
2051 wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
2054 appendToRunQueue(cap,tso);
2059 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2063 // We already created/initialised the Task
2064 task = cap->running_task;
2066 // This TSO is now a bound thread; make the Task and TSO
2067 // point to each other.
2073 task->stat = NoStatus;
2075 appendToRunQueue(cap,tso);
2077 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2079 cap = schedule(cap,task);
2081 ASSERT(task->stat != NoStatus);
2082 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2084 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2088 /* ----------------------------------------------------------------------------
2090 * ------------------------------------------------------------------------- */
2092 #if defined(THREADED_RTS)
2093 void OSThreadProcAttr
2094 workerStart(Task *task)
2098 // See startWorkerTask().
2099 ACQUIRE_LOCK(&task->lock);
2101 RELEASE_LOCK(&task->lock);
2103 // set the thread-local pointer to the Task:
2106 // schedule() runs without a lock.
2107 cap = schedule(cap,task);
2109 // On exit from schedule(), we have a Capability, but possibly not
2110 // the same one we started with.
2112 // During shutdown, the requirement is that after all the
2113 // Capabilities are shut down, all workers that are shutting down
2114 // have finished workerTaskStop(). This is why we hold on to
2115 // cap->lock until we've finished workerTaskStop() below.
2117 // There may be workers still involved in foreign calls; those
2118 // will just block in waitForReturnCapability() because the
2119 // Capability has been shut down.
2121 ACQUIRE_LOCK(&cap->lock);
2122 releaseCapability_(cap,rtsFalse);
2123 workerTaskStop(task);
2124 RELEASE_LOCK(&cap->lock);
2128 /* ---------------------------------------------------------------------------
2131 * Initialise the scheduler. This resets all the queues - if the
2132 * queues contained any threads, they'll be garbage collected at the
2135 * ------------------------------------------------------------------------ */
2140 #if !defined(THREADED_RTS)
2141 blocked_queue_hd = END_TSO_QUEUE;
2142 blocked_queue_tl = END_TSO_QUEUE;
2143 sleeping_queue = END_TSO_QUEUE;
2146 blackhole_queue = END_TSO_QUEUE;
2148 sched_state = SCHED_RUNNING;
2149 recent_activity = ACTIVITY_YES;
2151 #if defined(THREADED_RTS)
2152 /* Initialise the mutex and condition variables used by
2154 initMutex(&sched_mutex);
2157 ACQUIRE_LOCK(&sched_mutex);
2159 /* A capability holds the state a native thread needs in
2160 * order to execute STG code. At least one capability is
2161 * floating around (only THREADED_RTS builds have more than one).
2167 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2171 #if defined(THREADED_RTS)
2173 * Eagerly start one worker to run each Capability, except for
2174 * Capability 0. The idea is that we're probably going to start a
2175 * bound thread on Capability 0 pretty soon, so we don't want a
2176 * worker task hogging it.
2181 for (i = 1; i < n_capabilities; i++) {
2182 cap = &capabilities[i];
2183 ACQUIRE_LOCK(&cap->lock);
2184 startWorkerTask(cap, workerStart);
2185 RELEASE_LOCK(&cap->lock);
2190 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2192 RELEASE_LOCK(&sched_mutex);
2197 rtsBool wait_foreign
2198 #if !defined(THREADED_RTS)
2199 __attribute__((unused))
2202 /* see Capability.c, shutdownCapability() */
2206 #if defined(THREADED_RTS)
2207 ACQUIRE_LOCK(&sched_mutex);
2208 task = newBoundTask();
2209 RELEASE_LOCK(&sched_mutex);
2212 // If we haven't killed all the threads yet, do it now.
2213 if (sched_state < SCHED_SHUTTING_DOWN) {
2214 sched_state = SCHED_INTERRUPTING;
2215 #if defined(THREADED_RTS)
2216 waitForReturnCapability(&task->cap,task);
2217 scheduleDoGC(task->cap,task,rtsFalse);
2218 releaseCapability(task->cap);
2220 scheduleDoGC(&MainCapability,task,rtsFalse);
2223 sched_state = SCHED_SHUTTING_DOWN;
2225 #if defined(THREADED_RTS)
2229 for (i = 0; i < n_capabilities; i++) {
2230 shutdownCapability(&capabilities[i], task, wait_foreign);
2232 boundTaskExiting(task);
2238 freeScheduler( void )
2242 ACQUIRE_LOCK(&sched_mutex);
2243 still_running = freeTaskManager();
2244 // We can only free the Capabilities if there are no Tasks still
2245 // running. We might have a Task about to return from a foreign
2246 // call into waitForReturnCapability(), for example (actually,
2247 // this should be the *only* thing that a still-running Task can
2248 // do at this point, and it will block waiting for the
2250 if (still_running == 0) {
2252 if (n_capabilities != 1) {
2253 stgFree(capabilities);
2256 RELEASE_LOCK(&sched_mutex);
2257 #if defined(THREADED_RTS)
2258 closeMutex(&sched_mutex);
2262 /* -----------------------------------------------------------------------------
2265 This is the interface to the garbage collector from Haskell land.
2266 We provide this so that external C code can allocate and garbage
2267 collect when called from Haskell via _ccall_GC.
2268 -------------------------------------------------------------------------- */
2271 performGC_(rtsBool force_major)
2275 // We must grab a new Task here, because the existing Task may be
2276 // associated with a particular Capability, and chained onto the
2277 // suspended_ccalling_tasks queue.
2278 ACQUIRE_LOCK(&sched_mutex);
2279 task = newBoundTask();
2280 RELEASE_LOCK(&sched_mutex);
2282 waitForReturnCapability(&task->cap,task);
2283 scheduleDoGC(task->cap,task,force_major);
2284 releaseCapability(task->cap);
2285 boundTaskExiting(task);
2291 performGC_(rtsFalse);
2295 performMajorGC(void)
2297 performGC_(rtsTrue);
2300 /* -----------------------------------------------------------------------------
2303 If the thread has reached its maximum stack size, then raise the
2304 StackOverflow exception in the offending thread. Otherwise
2305 relocate the TSO into a larger chunk of memory and adjust its stack
2307 -------------------------------------------------------------------------- */
2310 threadStackOverflow(Capability *cap, StgTSO *tso)
2312 nat new_stack_size, stack_words;
2317 IF_DEBUG(sanity,checkTSO(tso));
2319 // don't allow throwTo() to modify the blocked_exceptions queue
2320 // while we are moving the TSO:
2321 lockClosure((StgClosure *)tso);
2323 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2324 // NB. never raise a StackOverflow exception if the thread is
2325 // inside Control.Exceptino.block. It is impractical to protect
2326 // against stack overflow exceptions, since virtually anything
2327 // can raise one (even 'catch'), so this is the only sensible
2328 // thing to do here. See bug #767.
2330 debugTrace(DEBUG_gc,
2331 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2332 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2334 /* If we're debugging, just print out the top of the stack */
2335 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2338 // Send this thread the StackOverflow exception
2340 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2344 /* Try to double the current stack size. If that takes us over the
2345 * maximum stack size for this thread, then use the maximum instead
2346 * (that is, unless we're already at or over the max size and we
2347 * can't raise the StackOverflow exception (see above), in which
2348 * case just double the size). Finally round up so the TSO ends up as
2349 * a whole number of blocks.
2351 if (tso->stack_size >= tso->max_stack_size) {
2352 new_stack_size = tso->stack_size * 2;
2354 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2356 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2357 TSO_STRUCT_SIZE)/sizeof(W_);
2358 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2359 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2361 debugTrace(DEBUG_sched,
2362 "increasing stack size from %ld words to %d.",
2363 (long)tso->stack_size, new_stack_size);
2365 dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2366 TICK_ALLOC_TSO(new_stack_size,0);
2368 /* copy the TSO block and the old stack into the new area */
2369 memcpy(dest,tso,TSO_STRUCT_SIZE);
2370 stack_words = tso->stack + tso->stack_size - tso->sp;
2371 new_sp = (P_)dest + new_tso_size - stack_words;
2372 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2374 /* relocate the stack pointers... */
2376 dest->stack_size = new_stack_size;
2378 /* Mark the old TSO as relocated. We have to check for relocated
2379 * TSOs in the garbage collector and any primops that deal with TSOs.
2381 * It's important to set the sp value to just beyond the end
2382 * of the stack, so we don't attempt to scavenge any part of the
2385 tso->what_next = ThreadRelocated;
2386 setTSOLink(cap,tso,dest);
2387 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2388 tso->why_blocked = NotBlocked;
2390 IF_PAR_DEBUG(verbose,
2391 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2392 tso->id, tso, tso->stack_size);
2393 /* If we're debugging, just print out the top of the stack */
2394 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2400 IF_DEBUG(sanity,checkTSO(dest));
2402 IF_DEBUG(scheduler,printTSO(dest));
2409 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2411 bdescr *bd, *new_bd;
2412 lnat free_w, tso_size_w;
2415 tso_size_w = tso_sizeW(tso);
2417 if (tso_size_w < MBLOCK_SIZE_W ||
2418 (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
2423 // don't allow throwTo() to modify the blocked_exceptions queue
2424 // while we are moving the TSO:
2425 lockClosure((StgClosure *)tso);
2427 // this is the number of words we'll free
2428 free_w = round_to_mblocks(tso_size_w/2);
2430 bd = Bdescr((StgPtr)tso);
2431 new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2432 bd->free = bd->start + TSO_STRUCT_SIZEW;
2434 new_tso = (StgTSO *)new_bd->start;
2435 memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2436 new_tso->stack_size = new_bd->free - new_tso->stack;
2438 debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2439 (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2441 tso->what_next = ThreadRelocated;
2442 tso->_link = new_tso; // no write barrier reqd: same generation
2444 // The TSO attached to this Task may have moved, so update the
2446 if (task->tso == tso) {
2447 task->tso = new_tso;
2453 IF_DEBUG(sanity,checkTSO(new_tso));
2458 /* ---------------------------------------------------------------------------
2460 - usually called inside a signal handler so it mustn't do anything fancy.
2461 ------------------------------------------------------------------------ */
2464 interruptStgRts(void)
2466 sched_state = SCHED_INTERRUPTING;
2467 setContextSwitches();
2471 /* -----------------------------------------------------------------------------
2474 This function causes at least one OS thread to wake up and run the
2475 scheduler loop. It is invoked when the RTS might be deadlocked, or
2476 an external event has arrived that may need servicing (eg. a
2477 keyboard interrupt).
2479 In the single-threaded RTS we don't do anything here; we only have
2480 one thread anyway, and the event that caused us to want to wake up
2481 will have interrupted any blocking system call in progress anyway.
2482 -------------------------------------------------------------------------- */
2487 #if defined(THREADED_RTS)
2488 // This forces the IO Manager thread to wakeup, which will
2489 // in turn ensure that some OS thread wakes up and runs the
2490 // scheduler loop, which will cause a GC and deadlock check.
2495 /* -----------------------------------------------------------------------------
2498 * Check the blackhole_queue for threads that can be woken up. We do
2499 * this periodically: before every GC, and whenever the run queue is
2502 * An elegant solution might be to just wake up all the blocked
2503 * threads with awakenBlockedQueue occasionally: they'll go back to
2504 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2505 * doesn't give us a way to tell whether we've actually managed to
2506 * wake up any threads, so we would be busy-waiting.
2508 * -------------------------------------------------------------------------- */
2511 checkBlackHoles (Capability *cap)
2514 rtsBool any_woke_up = rtsFalse;
2517 // blackhole_queue is global:
2518 ASSERT_LOCK_HELD(&sched_mutex);
2520 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2522 // ASSUMES: sched_mutex
2523 prev = &blackhole_queue;
2524 t = blackhole_queue;
2525 while (t != END_TSO_QUEUE) {
2526 ASSERT(t->why_blocked == BlockedOnBlackHole);
2527 type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2528 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2529 IF_DEBUG(sanity,checkTSO(t));
2530 t = unblockOne(cap, t);
2532 any_woke_up = rtsTrue;
2542 /* -----------------------------------------------------------------------------
2545 This is used for interruption (^C) and forking, and corresponds to
2546 raising an exception but without letting the thread catch the
2548 -------------------------------------------------------------------------- */
2551 deleteThread (Capability *cap, StgTSO *tso)
2553 // NOTE: must only be called on a TSO that we have exclusive
2554 // access to, because we will call throwToSingleThreaded() below.
2555 // The TSO must be on the run queue of the Capability we own, or
2556 // we must own all Capabilities.
2558 if (tso->why_blocked != BlockedOnCCall &&
2559 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2560 throwToSingleThreaded(cap,tso,NULL);
2564 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2566 deleteThread_(Capability *cap, StgTSO *tso)
2567 { // for forkProcess only:
2568 // like deleteThread(), but we delete threads in foreign calls, too.
2570 if (tso->why_blocked == BlockedOnCCall ||
2571 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2572 unblockOne(cap,tso);
2573 tso->what_next = ThreadKilled;
2575 deleteThread(cap,tso);
2580 /* -----------------------------------------------------------------------------
2581 raiseExceptionHelper
2583 This function is called by the raise# primitve, just so that we can
2584 move some of the tricky bits of raising an exception from C-- into
2585 C. Who knows, it might be a useful re-useable thing here too.
2586 -------------------------------------------------------------------------- */
2589 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2591 Capability *cap = regTableToCapability(reg);
2592 StgThunk *raise_closure = NULL;
2594 StgRetInfoTable *info;
2596 // This closure represents the expression 'raise# E' where E
2597 // is the exception raise. It is used to overwrite all the
2598 // thunks which are currently under evaluataion.
2601 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2602 // LDV profiling: stg_raise_info has THUNK as its closure
2603 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2604 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2605 // 1 does not cause any problem unless profiling is performed.
2606 // However, when LDV profiling goes on, we need to linearly scan
2607 // small object pool, where raise_closure is stored, so we should
2608 // use MIN_UPD_SIZE.
2610 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2611 // sizeofW(StgClosure)+1);
2615 // Walk up the stack, looking for the catch frame. On the way,
2616 // we update any closures pointed to from update frames with the
2617 // raise closure that we just built.
2621 info = get_ret_itbl((StgClosure *)p);
2622 next = p + stack_frame_sizeW((StgClosure *)p);
2623 switch (info->i.type) {
2626 // Only create raise_closure if we need to.
2627 if (raise_closure == NULL) {
2629 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2630 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2631 raise_closure->payload[0] = exception;
2633 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2637 case ATOMICALLY_FRAME:
2638 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2640 return ATOMICALLY_FRAME;
2646 case CATCH_STM_FRAME:
2647 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2649 return CATCH_STM_FRAME;
2655 case CATCH_RETRY_FRAME:
2664 /* -----------------------------------------------------------------------------
2665 findRetryFrameHelper
2667 This function is called by the retry# primitive. It traverses the stack
2668 leaving tso->sp referring to the frame which should handle the retry.
2670 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2671 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2673 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2674 create) because retries are not considered to be exceptions, despite the
2675 similar implementation.
2677 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2678 not be created within memory transactions.
2679 -------------------------------------------------------------------------- */
2682 findRetryFrameHelper (StgTSO *tso)
2685 StgRetInfoTable *info;
2689 info = get_ret_itbl((StgClosure *)p);
2690 next = p + stack_frame_sizeW((StgClosure *)p);
2691 switch (info->i.type) {
2693 case ATOMICALLY_FRAME:
2694 debugTrace(DEBUG_stm,
2695 "found ATOMICALLY_FRAME at %p during retry", p);
2697 return ATOMICALLY_FRAME;
2699 case CATCH_RETRY_FRAME:
2700 debugTrace(DEBUG_stm,
2701 "found CATCH_RETRY_FRAME at %p during retrry", p);
2703 return CATCH_RETRY_FRAME;
2705 case CATCH_STM_FRAME: {
2706 StgTRecHeader *trec = tso -> trec;
2707 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2708 debugTrace(DEBUG_stm,
2709 "found CATCH_STM_FRAME at %p during retry", p);
2710 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2711 stmAbortTransaction(tso -> cap, trec);
2712 stmFreeAbortedTRec(tso -> cap, trec);
2713 tso -> trec = outer;
2720 ASSERT(info->i.type != CATCH_FRAME);
2721 ASSERT(info->i.type != STOP_FRAME);
2728 /* -----------------------------------------------------------------------------
2729 resurrectThreads is called after garbage collection on the list of
2730 threads found to be garbage. Each of these threads will be woken
2731 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2732 on an MVar, or NonTermination if the thread was blocked on a Black
2735 Locks: assumes we hold *all* the capabilities.
2736 -------------------------------------------------------------------------- */
2739 resurrectThreads (StgTSO *threads)
2745 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2746 next = tso->global_link;
2748 step = Bdescr((P_)tso)->step;
2749 tso->global_link = step->threads;
2750 step->threads = tso;
2752 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2754 // Wake up the thread on the Capability it was last on
2757 switch (tso->why_blocked) {
2759 case BlockedOnException:
2760 /* Called by GC - sched_mutex lock is currently held. */
2761 throwToSingleThreaded(cap, tso,
2762 (StgClosure *)blockedOnDeadMVar_closure);
2764 case BlockedOnBlackHole:
2765 throwToSingleThreaded(cap, tso,
2766 (StgClosure *)nonTermination_closure);
2769 throwToSingleThreaded(cap, tso,
2770 (StgClosure *)blockedIndefinitely_closure);
2773 /* This might happen if the thread was blocked on a black hole
2774 * belonging to a thread that we've just woken up (raiseAsync
2775 * can wake up threads, remember...).
2779 barf("resurrectThreads: thread blocked in a strange way");
2784 /* -----------------------------------------------------------------------------
2785 performPendingThrowTos is called after garbage collection, and
2786 passed a list of threads that were found to have pending throwTos
2787 (tso->blocked_exceptions was not empty), and were blocked.
2788 Normally this doesn't happen, because we would deliver the
2789 exception directly if the target thread is blocked, but there are
2790 small windows where it might occur on a multiprocessor (see
2793 NB. we must be holding all the capabilities at this point, just
2794 like resurrectThreads().
2795 -------------------------------------------------------------------------- */
2798 performPendingThrowTos (StgTSO *threads)
2804 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2805 next = tso->global_link;
2807 step = Bdescr((P_)tso)->step;
2808 tso->global_link = step->threads;
2809 step->threads = tso;
2811 debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2814 maybePerformBlockedException(cap, tso);