1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
23 #include "RtsSignals.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
32 #include "Proftimer.h"
35 /* PARALLEL_HASKELL includes go here */
38 #include "Capability.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
45 #include "RaiseAsync.h"
47 #include "ThrIOManager.h"
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
64 // Turn off inlining when debugging - it obfuscates things
67 # define STATIC_INLINE static
70 /* -----------------------------------------------------------------------------
72 * -------------------------------------------------------------------------- */
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
81 /* Threads blocked on blackholes.
82 * LOCK: sched_mutex+capability, or all capabilities
84 StgTSO *blackhole_queue = NULL;
86 /* The blackhole_queue should be checked for threads to wake up. See
87 * Schedule.h for more thorough comment.
88 * LOCK: none (doesn't matter if we miss an update)
90 rtsBool blackholes_need_checking = rtsFalse;
92 /* flag that tracks whether we have done any execution in this time slice.
93 * LOCK: currently none, perhaps we should lock (but needs to be
94 * updated in the fast path of the scheduler).
96 nat recent_activity = ACTIVITY_YES;
98 /* if this flag is set as well, give up execution
99 * LOCK: none (changes once, from false->true)
101 rtsBool sched_state = SCHED_RUNNING;
103 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
104 * exists - earlier gccs apparently didn't.
110 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111 * in an MT setting, needed to signal that a worker thread shouldn't hang around
112 * in the scheduler when it is out of work.
114 rtsBool shutting_down_scheduler = rtsFalse;
117 * This mutex protects most of the global scheduler data in
118 * the THREADED_RTS runtime.
120 #if defined(THREADED_RTS)
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
128 /* -----------------------------------------------------------------------------
129 * static function prototypes
130 * -------------------------------------------------------------------------- */
132 static Capability *schedule (Capability *initialCapability, Task *task);
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability *cap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
144 static void scheduleStartSignalHandlers (Capability *cap);
145 static void scheduleCheckBlockedThreads (Capability *cap);
146 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
147 static void scheduleCheckBlackHoles (Capability *cap);
148 static void scheduleDetectDeadlock (Capability *cap, Task *task);
149 static void schedulePushWork(Capability *cap, Task *task);
150 #if defined(PARALLEL_HASKELL)
151 static rtsBool scheduleGetRemoteWork(Capability *cap);
152 static void scheduleSendPendingMessages(void);
154 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
155 static void scheduleActivateSpark(Capability *cap);
157 static void schedulePostRunThread(Capability *cap, StgTSO *t);
158 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
159 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
161 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
162 nat prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
166 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
167 static Capability *scheduleDoGC(Capability *cap, Task *task,
168 rtsBool force_major);
170 static rtsBool checkBlackHoles(Capability *cap);
172 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
173 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
175 static void deleteThread (Capability *cap, StgTSO *tso);
176 static void deleteAllThreads (Capability *cap);
178 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
179 static void deleteThread_(Capability *cap, StgTSO *tso);
183 static char *whatNext_strs[] = {
193 /* -----------------------------------------------------------------------------
194 * Putting a thread on the run queue: different scheduling policies
195 * -------------------------------------------------------------------------- */
198 addToRunQueue( Capability *cap, StgTSO *t )
200 #if defined(PARALLEL_HASKELL)
201 if (RtsFlags.ParFlags.doFairScheduling) {
202 // this does round-robin scheduling; good for concurrency
203 appendToRunQueue(cap,t);
205 // this does unfair scheduling; good for parallelism
206 pushOnRunQueue(cap,t);
209 // this does round-robin scheduling; good for concurrency
210 appendToRunQueue(cap,t);
214 /* ---------------------------------------------------------------------------
215 Main scheduling loop.
217 We use round-robin scheduling, each thread returning to the
218 scheduler loop when one of these conditions is detected:
221 * timer expires (thread yields)
227 In a GranSim setup this loop iterates over the global event queue.
228 This revolves around the global event queue, which determines what
229 to do next. Therefore, it's more complicated than either the
230 concurrent or the parallel (GUM) setup.
231 This version has been entirely removed (JB 2008/08).
234 GUM iterates over incoming messages.
235 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
236 and sends out a fish whenever it has nothing to do; in-between
237 doing the actual reductions (shared code below) it processes the
238 incoming messages and deals with delayed operations
239 (see PendingFetches).
240 This is not the ugliest code you could imagine, but it's bloody close.
242 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
243 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
244 as well as future GUM versions. This file has been refurbished to
245 only contain valid code, which is however incomplete, refers to
246 invalid includes etc.
248 ------------------------------------------------------------------------ */
251 schedule (Capability *initialCapability, Task *task)
255 StgThreadReturnCode ret;
256 #if defined(PARALLEL_HASKELL)
257 rtsBool receivedFinish = rtsFalse;
261 #if defined(THREADED_RTS)
262 rtsBool first = rtsTrue;
265 cap = initialCapability;
267 // Pre-condition: this task owns initialCapability.
268 // The sched_mutex is *NOT* held
269 // NB. on return, we still hold a capability.
271 debugTrace (DEBUG_sched,
272 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
273 task, initialCapability);
277 // -----------------------------------------------------------
278 // Scheduler loop starts here:
280 #if defined(PARALLEL_HASKELL)
281 #define TERMINATION_CONDITION (!receivedFinish)
283 #define TERMINATION_CONDITION rtsTrue
286 while (TERMINATION_CONDITION) {
288 // Check whether we have re-entered the RTS from Haskell without
289 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
291 if (cap->in_haskell) {
292 errorBelch("schedule: re-entered unsafely.\n"
293 " Perhaps a 'foreign import unsafe' should be 'safe'?");
294 stg_exit(EXIT_FAILURE);
297 // The interruption / shutdown sequence.
299 // In order to cleanly shut down the runtime, we want to:
300 // * make sure that all main threads return to their callers
301 // with the state 'Interrupted'.
302 // * clean up all OS threads assocated with the runtime
303 // * free all memory etc.
305 // So the sequence for ^C goes like this:
307 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
308 // arranges for some Capability to wake up
310 // * all threads in the system are halted, and the zombies are
311 // placed on the run queue for cleaning up. We acquire all
312 // the capabilities in order to delete the threads, this is
313 // done by scheduleDoGC() for convenience (because GC already
314 // needs to acquire all the capabilities). We can't kill
315 // threads involved in foreign calls.
317 // * somebody calls shutdownHaskell(), which calls exitScheduler()
319 // * sched_state := SCHED_SHUTTING_DOWN
321 // * all workers exit when the run queue on their capability
322 // drains. All main threads will also exit when their TSO
323 // reaches the head of the run queue and they can return.
325 // * eventually all Capabilities will shut down, and the RTS can
328 // * We might be left with threads blocked in foreign calls,
329 // we should really attempt to kill these somehow (TODO);
331 switch (sched_state) {
334 case SCHED_INTERRUPTING:
335 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
336 #if defined(THREADED_RTS)
337 discardSparksCap(cap);
339 /* scheduleDoGC() deletes all the threads */
340 cap = scheduleDoGC(cap,task,rtsFalse);
342 case SCHED_SHUTTING_DOWN:
343 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
344 // If we are a worker, just exit. If we're a bound thread
345 // then we will exit below when we've removed our TSO from
347 if (task->tso == NULL && emptyRunQueue(cap)) {
352 barf("sched_state: %d", sched_state);
355 scheduleFindWork(cap);
357 /* work pushing, currently relevant only for THREADED_RTS:
358 (pushes threads, wakes up idle capabilities for stealing) */
359 schedulePushWork(cap,task);
361 #if defined(PARALLEL_HASKELL)
362 /* since we perform a blocking receive and continue otherwise,
363 either we never reach here or we definitely have work! */
364 // from here: non-empty run queue
365 ASSERT(!emptyRunQueue(cap));
367 if (PacketsWaiting()) { /* now process incoming messages, if any
370 CAUTION: scheduleGetRemoteWork called
371 above, waits for messages as well! */
372 processMessages(cap, &receivedFinish);
374 #endif // PARALLEL_HASKELL: non-empty run queue!
376 scheduleDetectDeadlock(cap,task);
378 #if defined(THREADED_RTS)
379 cap = task->cap; // reload cap, it might have changed
382 // Normally, the only way we can get here with no threads to
383 // run is if a keyboard interrupt received during
384 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
385 // Additionally, it is not fatal for the
386 // threaded RTS to reach here with no threads to run.
388 // win32: might be here due to awaitEvent() being abandoned
389 // as a result of a console event having been delivered.
391 #if defined(THREADED_RTS)
395 // // don't yield the first time, we want a chance to run this
396 // // thread for a bit, even if there are others banging at the
399 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 scheduleYield(&cap,task);
403 if (emptyRunQueue(cap)) continue; // look for work again
406 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
407 if ( emptyRunQueue(cap) ) {
408 ASSERT(sched_state >= SCHED_INTERRUPTING);
413 // Get a thread to run
415 t = popRunQueue(cap);
417 // Sanity check the thread we're about to run. This can be
418 // expensive if there is lots of thread switching going on...
419 IF_DEBUG(sanity,checkTSO(t));
421 #if defined(THREADED_RTS)
422 // Check whether we can run this thread in the current task.
423 // If not, we have to pass our capability to the right task.
425 Task *bound = t->bound;
429 debugTrace(DEBUG_sched,
430 "### Running thread %lu in bound thread", (unsigned long)t->id);
431 // yes, the Haskell thread is bound to the current native thread
433 debugTrace(DEBUG_sched,
434 "### thread %lu bound to another OS thread", (unsigned long)t->id);
435 // no, bound to a different Haskell thread: pass to that thread
436 pushOnRunQueue(cap,t);
440 // The thread we want to run is unbound.
442 debugTrace(DEBUG_sched,
443 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
444 // no, the current native thread is bound to a different
445 // Haskell thread, so pass it to any worker thread
446 pushOnRunQueue(cap,t);
453 /* context switches are initiated by the timer signal, unless
454 * the user specified "context switch as often as possible", with
457 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
458 && !emptyThreadQueues(cap)) {
459 cap->context_switch = 1;
464 // CurrentTSO is the thread to run. t might be different if we
465 // loop back to run_thread, so make sure to set CurrentTSO after
467 cap->r.rCurrentTSO = t;
469 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
470 (long)t->id, whatNext_strs[t->what_next]);
472 startHeapProfTimer();
474 // Check for exceptions blocked on this thread
475 maybePerformBlockedException (cap, t);
477 // ----------------------------------------------------------------------
478 // Run the current thread
480 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
481 ASSERT(t->cap == cap);
483 prev_what_next = t->what_next;
485 errno = t->saved_errno;
487 SetLastError(t->saved_winerror);
490 cap->in_haskell = rtsTrue;
494 #if defined(THREADED_RTS)
495 if (recent_activity == ACTIVITY_DONE_GC) {
496 // ACTIVITY_DONE_GC means we turned off the timer signal to
497 // conserve power (see #1623). Re-enable it here.
499 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
500 if (prev == ACTIVITY_DONE_GC) {
504 recent_activity = ACTIVITY_YES;
508 switch (prev_what_next) {
512 /* Thread already finished, return to scheduler. */
513 ret = ThreadFinished;
519 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
520 cap = regTableToCapability(r);
525 case ThreadInterpret:
526 cap = interpretBCO(cap);
531 barf("schedule: invalid what_next field");
534 cap->in_haskell = rtsFalse;
536 // The TSO might have moved, eg. if it re-entered the RTS and a GC
537 // happened. So find the new location:
538 t = cap->r.rCurrentTSO;
540 // We have run some Haskell code: there might be blackhole-blocked
541 // threads to wake up now.
542 // Lock-free test here should be ok, we're just setting a flag.
543 if ( blackhole_queue != END_TSO_QUEUE ) {
544 blackholes_need_checking = rtsTrue;
547 // And save the current errno in this thread.
548 // XXX: possibly bogus for SMP because this thread might already
549 // be running again, see code below.
550 t->saved_errno = errno;
552 // Similarly for Windows error code
553 t->saved_winerror = GetLastError();
556 #if defined(THREADED_RTS)
557 // If ret is ThreadBlocked, and this Task is bound to the TSO that
558 // blocked, we are in limbo - the TSO is now owned by whatever it
559 // is blocked on, and may in fact already have been woken up,
560 // perhaps even on a different Capability. It may be the case
561 // that task->cap != cap. We better yield this Capability
562 // immediately and return to normaility.
563 if (ret == ThreadBlocked) {
564 debugTrace(DEBUG_sched,
565 "--<< thread %lu (%s) stopped: blocked",
566 (unsigned long)t->id, whatNext_strs[t->what_next]);
571 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
572 ASSERT(t->cap == cap);
574 // ----------------------------------------------------------------------
576 // Costs for the scheduler are assigned to CCS_SYSTEM
578 #if defined(PROFILING)
582 schedulePostRunThread(cap,t);
584 t = threadStackUnderflow(task,t);
586 ready_to_gc = rtsFalse;
590 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
594 scheduleHandleStackOverflow(cap,task,t);
598 if (scheduleHandleYield(cap, t, prev_what_next)) {
599 // shortcut for switching between compiler/interpreter:
605 scheduleHandleThreadBlocked(t);
609 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
610 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
614 barf("schedule: invalid thread return code %d", (int)ret);
617 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
618 cap = scheduleDoGC(cap,task,rtsFalse);
620 } /* end of while() */
623 /* ----------------------------------------------------------------------------
624 * Setting up the scheduler loop
625 * ------------------------------------------------------------------------- */
628 schedulePreLoop(void)
630 // initialisation for scheduler - what cannot go into initScheduler()
633 /* -----------------------------------------------------------------------------
636 * Search for work to do, and handle messages from elsewhere.
637 * -------------------------------------------------------------------------- */
640 scheduleFindWork (Capability *cap)
642 scheduleStartSignalHandlers(cap);
644 // Only check the black holes here if we've nothing else to do.
645 // During normal execution, the black hole list only gets checked
646 // at GC time, to avoid repeatedly traversing this possibly long
647 // list each time around the scheduler.
648 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
650 scheduleCheckWakeupThreads(cap);
652 scheduleCheckBlockedThreads(cap);
654 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
655 // Try to activate one of our own sparks
656 if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
659 #if defined(THREADED_RTS)
660 // Try to steak work if we don't have any
661 if (emptyRunQueue(cap)) { stealWork(cap); }
664 #if defined(PARALLEL_HASKELL)
665 // if messages have been buffered...
666 scheduleSendPendingMessages();
669 #if defined(PARALLEL_HASKELL)
670 if (emptyRunQueue(cap)) {
671 receivedFinish = scheduleGetRemoteWork(cap);
672 continue; // a new round, (hopefully) with new work
674 in GUM, this a) sends out a FISH and returns IF no fish is
676 b) (blocking) awaits and receives messages
678 in Eden, this is only the blocking receive, as b) in GUM.
684 #if defined(THREADED_RTS)
685 STATIC_INLINE rtsBool
686 shouldYieldCapability (Capability *cap, Task *task)
688 // we need to yield this capability to someone else if..
689 // - another thread is initiating a GC
690 // - another Task is returning from a foreign call
691 // - the thread at the head of the run queue cannot be run
692 // by this Task (it is bound to another Task, or it is unbound
693 // and this task it bound).
694 return (waiting_for_gc ||
695 cap->returning_tasks_hd != NULL ||
696 (!emptyRunQueue(cap) && (task->tso == NULL
697 ? cap->run_queue_hd->bound != NULL
698 : cap->run_queue_hd->bound != task)));
701 // This is the single place where a Task goes to sleep. There are
702 // two reasons it might need to sleep:
703 // - there are no threads to run
704 // - we need to yield this Capability to someone else
705 // (see shouldYieldCapability())
707 // The return value indicates whether
710 scheduleYield (Capability **pcap, Task *task)
712 Capability *cap = *pcap;
714 // if we have work, and we don't need to give up the Capability, continue.
715 if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
718 // otherwise yield (sleep), and keep yielding if necessary.
720 yieldCapability(&cap,task);
722 while (shouldYieldCapability(cap,task));
724 // note there may still be no threads on the run queue at this
725 // point, the caller has to check.
732 /* -----------------------------------------------------------------------------
735 * Push work to other Capabilities if we have some.
736 * -------------------------------------------------------------------------- */
739 schedulePushWork(Capability *cap USED_IF_THREADS,
740 Task *task USED_IF_THREADS)
742 /* following code not for PARALLEL_HASKELL. I kept the call general,
743 future GUM versions might use pushing in a distributed setup */
744 #if defined(THREADED_RTS)
746 Capability *free_caps[n_capabilities], *cap0;
749 // migration can be turned off with +RTS -qg
750 if (!RtsFlags.ParFlags.migrate) return;
752 // Check whether we have more threads on our run queue, or sparks
753 // in our pool, that we could hand to another Capability.
754 if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
755 && sparkPoolSizeCap(cap) < 2) {
759 // First grab as many free Capabilities as we can.
760 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
761 cap0 = &capabilities[i];
762 if (cap != cap0 && tryGrabCapability(cap0,task)) {
763 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
764 // it already has some work, we just grabbed it at
765 // the wrong moment. Or maybe it's deadlocked!
766 releaseCapability(cap0);
768 free_caps[n_free_caps++] = cap0;
773 // we now have n_free_caps free capabilities stashed in
774 // free_caps[]. Share our run queue equally with them. This is
775 // probably the simplest thing we could do; improvements we might
776 // want to do include:
778 // - giving high priority to moving relatively new threads, on
779 // the gournds that they haven't had time to build up a
780 // working set in the cache on this CPU/Capability.
782 // - giving low priority to moving long-lived threads
784 if (n_free_caps > 0) {
785 StgTSO *prev, *t, *next;
786 rtsBool pushed_to_all;
788 debugTrace(DEBUG_sched,
789 "cap %d: %s and %d free capabilities, sharing...",
791 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
792 "excess threads on run queue":"sparks to share (>=2)",
796 pushed_to_all = rtsFalse;
798 if (cap->run_queue_hd != END_TSO_QUEUE) {
799 prev = cap->run_queue_hd;
801 prev->_link = END_TSO_QUEUE;
802 for (; t != END_TSO_QUEUE; t = next) {
804 t->_link = END_TSO_QUEUE;
805 if (t->what_next == ThreadRelocated
806 || t->bound == task // don't move my bound thread
807 || tsoLocked(t)) { // don't move a locked thread
808 setTSOLink(cap, prev, t);
810 } else if (i == n_free_caps) {
811 pushed_to_all = rtsTrue;
814 setTSOLink(cap, prev, t);
817 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
818 appendToRunQueue(free_caps[i],t);
819 if (t->bound) { t->bound->cap = free_caps[i]; }
820 t->cap = free_caps[i];
824 cap->run_queue_tl = prev;
828 /* JB I left this code in place, it would work but is not necessary */
830 // If there are some free capabilities that we didn't push any
831 // threads to, then try to push a spark to each one.
832 if (!pushed_to_all) {
834 // i is the next free capability to push to
835 for (; i < n_free_caps; i++) {
836 if (emptySparkPoolCap(free_caps[i])) {
837 spark = tryStealSpark(cap->sparks);
839 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
840 newSpark(&(free_caps[i]->r), spark);
845 #endif /* SPARK_PUSHING */
847 // release the capabilities
848 for (i = 0; i < n_free_caps; i++) {
849 task->cap = free_caps[i];
850 releaseAndWakeupCapability(free_caps[i]);
853 task->cap = cap; // reset to point to our Capability.
855 #endif /* THREADED_RTS */
859 /* ----------------------------------------------------------------------------
860 * Start any pending signal handlers
861 * ------------------------------------------------------------------------- */
863 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
865 scheduleStartSignalHandlers(Capability *cap)
867 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
868 // safe outside the lock
869 startSignalHandlers(cap);
874 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
879 /* ----------------------------------------------------------------------------
880 * Check for blocked threads that can be woken up.
881 * ------------------------------------------------------------------------- */
884 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
886 #if !defined(THREADED_RTS)
888 // Check whether any waiting threads need to be woken up. If the
889 // run queue is empty, and there are no other tasks running, we
890 // can wait indefinitely for something to happen.
892 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
894 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
900 /* ----------------------------------------------------------------------------
901 * Check for threads woken up by other Capabilities
902 * ------------------------------------------------------------------------- */
905 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
907 #if defined(THREADED_RTS)
908 // Any threads that were woken up by other Capabilities get
909 // appended to our run queue.
910 if (!emptyWakeupQueue(cap)) {
911 ACQUIRE_LOCK(&cap->lock);
912 if (emptyRunQueue(cap)) {
913 cap->run_queue_hd = cap->wakeup_queue_hd;
914 cap->run_queue_tl = cap->wakeup_queue_tl;
916 setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
917 cap->run_queue_tl = cap->wakeup_queue_tl;
919 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
920 RELEASE_LOCK(&cap->lock);
925 /* ----------------------------------------------------------------------------
926 * Check for threads blocked on BLACKHOLEs that can be woken up
927 * ------------------------------------------------------------------------- */
929 scheduleCheckBlackHoles (Capability *cap)
931 if ( blackholes_need_checking ) // check without the lock first
933 ACQUIRE_LOCK(&sched_mutex);
934 if ( blackholes_need_checking ) {
935 checkBlackHoles(cap);
936 blackholes_need_checking = rtsFalse;
938 RELEASE_LOCK(&sched_mutex);
942 /* ----------------------------------------------------------------------------
943 * Detect deadlock conditions and attempt to resolve them.
944 * ------------------------------------------------------------------------- */
947 scheduleDetectDeadlock (Capability *cap, Task *task)
950 #if defined(PARALLEL_HASKELL)
951 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
956 * Detect deadlock: when we have no threads to run, there are no
957 * threads blocked, waiting for I/O, or sleeping, and all the
958 * other tasks are waiting for work, we must have a deadlock of
961 if ( emptyThreadQueues(cap) )
963 #if defined(THREADED_RTS)
965 * In the threaded RTS, we only check for deadlock if there
966 * has been no activity in a complete timeslice. This means
967 * we won't eagerly start a full GC just because we don't have
968 * any threads to run currently.
970 if (recent_activity != ACTIVITY_INACTIVE) return;
973 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
975 // Garbage collection can release some new threads due to
976 // either (a) finalizers or (b) threads resurrected because
977 // they are unreachable and will therefore be sent an
978 // exception. Any threads thus released will be immediately
980 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
982 recent_activity = ACTIVITY_DONE_GC;
983 // disable timer signals (see #1623)
986 if ( !emptyRunQueue(cap) ) return;
988 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
989 /* If we have user-installed signal handlers, then wait
990 * for signals to arrive rather then bombing out with a
993 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
994 debugTrace(DEBUG_sched,
995 "still deadlocked, waiting for signals...");
999 if (signals_pending()) {
1000 startSignalHandlers(cap);
1003 // either we have threads to run, or we were interrupted:
1004 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1010 #if !defined(THREADED_RTS)
1011 /* Probably a real deadlock. Send the current main thread the
1012 * Deadlock exception.
1015 switch (task->tso->why_blocked) {
1017 case BlockedOnBlackHole:
1018 case BlockedOnException:
1020 throwToSingleThreaded(cap, task->tso,
1021 (StgClosure *)nonTermination_closure);
1024 barf("deadlock: main thread blocked in a strange way");
1033 /* ----------------------------------------------------------------------------
1034 * Send pending messages (PARALLEL_HASKELL only)
1035 * ------------------------------------------------------------------------- */
1037 #if defined(PARALLEL_HASKELL)
1039 scheduleSendPendingMessages(void)
1042 # if defined(PAR) // global Mem.Mgmt., omit for now
1043 if (PendingFetches != END_BF_QUEUE) {
1048 if (RtsFlags.ParFlags.BufferTime) {
1049 // if we use message buffering, we must send away all message
1050 // packets which have become too old...
1056 /* ----------------------------------------------------------------------------
1057 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1058 * ------------------------------------------------------------------------- */
1060 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1062 scheduleActivateSpark(Capability *cap)
1066 /* We only want to stay here if the run queue is empty and we want some
1067 work. We try to turn a spark into a thread, and add it to the run
1068 queue, from where it will be picked up in the next iteration of the
1071 if (!emptyRunQueue(cap))
1072 /* In the threaded RTS, another task might have pushed a thread
1073 on our run queue in the meantime ? But would need a lock.. */
1077 // Really we should be using reclaimSpark() here, but
1078 // experimentally it doesn't seem to perform as well as just
1079 // stealing from our own spark pool:
1080 // spark = reclaimSpark(cap->sparks);
1081 spark = tryStealSpark(cap->sparks); // defined in Sparks.c
1083 if (spark != NULL) {
1084 debugTrace(DEBUG_sched,
1085 "turning spark of closure %p into a thread",
1086 (StgClosure *)spark);
1087 createSparkThread(cap,spark); // defined in Sparks.c
1090 #endif // PARALLEL_HASKELL || THREADED_RTS
1092 /* ----------------------------------------------------------------------------
1093 * Get work from a remote node (PARALLEL_HASKELL only)
1094 * ------------------------------------------------------------------------- */
1096 #if defined(PARALLEL_HASKELL)
1097 static rtsBool /* return value used in PARALLEL_HASKELL only */
1098 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1100 #if defined(PARALLEL_HASKELL)
1101 rtsBool receivedFinish = rtsFalse;
1103 // idle() , i.e. send all buffers, wait for work
1104 if (RtsFlags.ParFlags.BufferTime) {
1105 IF_PAR_DEBUG(verbose,
1106 debugBelch("...send all pending data,"));
1109 for (i=1; i<=nPEs; i++)
1110 sendImmediately(i); // send all messages away immediately
1114 /* this would be the place for fishing in GUM...
1116 if (no-earlier-fish-around)
1117 sendFish(choosePe());
1120 // Eden:just look for incoming messages (blocking receive)
1121 IF_PAR_DEBUG(verbose,
1122 debugBelch("...wait for incoming messages...\n"));
1123 processMessages(cap, &receivedFinish); // blocking receive...
1126 return receivedFinish;
1127 // reenter scheduling look after having received something
1129 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1131 return rtsFalse; /* return value unused in THREADED_RTS */
1133 #endif /* PARALLEL_HASKELL */
1135 #endif // PARALLEL_HASKELL || THREADED_RTS
1137 /* ----------------------------------------------------------------------------
1138 * After running a thread...
1139 * ------------------------------------------------------------------------- */
1142 schedulePostRunThread (Capability *cap, StgTSO *t)
1144 // We have to be able to catch transactions that are in an
1145 // infinite loop as a result of seeing an inconsistent view of
1149 // [a,b] <- mapM readTVar [ta,tb]
1150 // when (a == b) loop
1152 // and a is never equal to b given a consistent view of memory.
1154 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1155 if (!stmValidateNestOfTransactions (t -> trec)) {
1156 debugTrace(DEBUG_sched | DEBUG_stm,
1157 "trec %p found wasting its time", t);
1159 // strip the stack back to the
1160 // ATOMICALLY_FRAME, aborting the (nested)
1161 // transaction, and saving the stack of any
1162 // partially-evaluated thunks on the heap.
1163 throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1165 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1169 /* some statistics gathering in the parallel case */
1172 /* -----------------------------------------------------------------------------
1173 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1174 * -------------------------------------------------------------------------- */
1177 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1179 // did the task ask for a large block?
1180 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1181 // if so, get one and push it on the front of the nursery.
1185 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1187 debugTrace(DEBUG_sched,
1188 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1189 (long)t->id, whatNext_strs[t->what_next], blocks);
1191 // don't do this if the nursery is (nearly) full, we'll GC first.
1192 if (cap->r.rCurrentNursery->link != NULL ||
1193 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1194 // if the nursery has only one block.
1197 bd = allocGroup( blocks );
1199 cap->r.rNursery->n_blocks += blocks;
1201 // link the new group into the list
1202 bd->link = cap->r.rCurrentNursery;
1203 bd->u.back = cap->r.rCurrentNursery->u.back;
1204 if (cap->r.rCurrentNursery->u.back != NULL) {
1205 cap->r.rCurrentNursery->u.back->link = bd;
1207 #if !defined(THREADED_RTS)
1208 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1209 g0s0 == cap->r.rNursery);
1211 cap->r.rNursery->blocks = bd;
1213 cap->r.rCurrentNursery->u.back = bd;
1215 // initialise it as a nursery block. We initialise the
1216 // step, gen_no, and flags field of *every* sub-block in
1217 // this large block, because this is easier than making
1218 // sure that we always find the block head of a large
1219 // block whenever we call Bdescr() (eg. evacuate() and
1220 // isAlive() in the GC would both have to do this, at
1224 for (x = bd; x < bd + blocks; x++) {
1225 x->step = cap->r.rNursery;
1231 // This assert can be a killer if the app is doing lots
1232 // of large block allocations.
1233 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1235 // now update the nursery to point to the new block
1236 cap->r.rCurrentNursery = bd;
1238 // we might be unlucky and have another thread get on the
1239 // run queue before us and steal the large block, but in that
1240 // case the thread will just end up requesting another large
1242 pushOnRunQueue(cap,t);
1243 return rtsFalse; /* not actually GC'ing */
1247 debugTrace(DEBUG_sched,
1248 "--<< thread %ld (%s) stopped: HeapOverflow",
1249 (long)t->id, whatNext_strs[t->what_next]);
1251 if (cap->context_switch) {
1252 // Sometimes we miss a context switch, e.g. when calling
1253 // primitives in a tight loop, MAYBE_GC() doesn't check the
1254 // context switch flag, and we end up waiting for a GC.
1255 // See #1984, and concurrent/should_run/1984
1256 cap->context_switch = 0;
1257 addToRunQueue(cap,t);
1259 pushOnRunQueue(cap,t);
1262 /* actual GC is done at the end of the while loop in schedule() */
1265 /* -----------------------------------------------------------------------------
1266 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1267 * -------------------------------------------------------------------------- */
1270 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1272 debugTrace (DEBUG_sched,
1273 "--<< thread %ld (%s) stopped, StackOverflow",
1274 (long)t->id, whatNext_strs[t->what_next]);
1276 /* just adjust the stack for this thread, then pop it back
1280 /* enlarge the stack */
1281 StgTSO *new_t = threadStackOverflow(cap, t);
1283 /* The TSO attached to this Task may have moved, so update the
1286 if (task->tso == t) {
1289 pushOnRunQueue(cap,new_t);
1293 /* -----------------------------------------------------------------------------
1294 * Handle a thread that returned to the scheduler with ThreadYielding
1295 * -------------------------------------------------------------------------- */
1298 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1300 // Reset the context switch flag. We don't do this just before
1301 // running the thread, because that would mean we would lose ticks
1302 // during GC, which can lead to unfair scheduling (a thread hogs
1303 // the CPU because the tick always arrives during GC). This way
1304 // penalises threads that do a lot of allocation, but that seems
1305 // better than the alternative.
1306 cap->context_switch = 0;
1308 /* put the thread back on the run queue. Then, if we're ready to
1309 * GC, check whether this is the last task to stop. If so, wake
1310 * up the GC thread. getThread will block during a GC until the
1314 if (t->what_next != prev_what_next) {
1315 debugTrace(DEBUG_sched,
1316 "--<< thread %ld (%s) stopped to switch evaluators",
1317 (long)t->id, whatNext_strs[t->what_next]);
1319 debugTrace(DEBUG_sched,
1320 "--<< thread %ld (%s) stopped, yielding",
1321 (long)t->id, whatNext_strs[t->what_next]);
1326 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1328 ASSERT(t->_link == END_TSO_QUEUE);
1330 // Shortcut if we're just switching evaluators: don't bother
1331 // doing stack squeezing (which can be expensive), just run the
1333 if (t->what_next != prev_what_next) {
1337 addToRunQueue(cap,t);
1342 /* -----------------------------------------------------------------------------
1343 * Handle a thread that returned to the scheduler with ThreadBlocked
1344 * -------------------------------------------------------------------------- */
1347 scheduleHandleThreadBlocked( StgTSO *t
1348 #if !defined(GRAN) && !defined(DEBUG)
1354 // We don't need to do anything. The thread is blocked, and it
1355 // has tidied up its stack and placed itself on whatever queue
1356 // it needs to be on.
1358 // ASSERT(t->why_blocked != NotBlocked);
1359 // Not true: for example,
1360 // - in THREADED_RTS, the thread may already have been woken
1361 // up by another Capability. This actually happens: try
1362 // conc023 +RTS -N2.
1363 // - the thread may have woken itself up already, because
1364 // threadPaused() might have raised a blocked throwTo
1365 // exception, see maybePerformBlockedException().
1368 if (traceClass(DEBUG_sched)) {
1369 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1370 (unsigned long)t->id, whatNext_strs[t->what_next]);
1371 printThreadBlockage(t);
1377 /* -----------------------------------------------------------------------------
1378 * Handle a thread that returned to the scheduler with ThreadFinished
1379 * -------------------------------------------------------------------------- */
1382 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1384 /* Need to check whether this was a main thread, and if so,
1385 * return with the return value.
1387 * We also end up here if the thread kills itself with an
1388 * uncaught exception, see Exception.cmm.
1390 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1391 (unsigned long)t->id, whatNext_strs[t->what_next]);
1394 // Check whether the thread that just completed was a bound
1395 // thread, and if so return with the result.
1397 // There is an assumption here that all thread completion goes
1398 // through this point; we need to make sure that if a thread
1399 // ends up in the ThreadKilled state, that it stays on the run
1400 // queue so it can be dealt with here.
1405 if (t->bound != task) {
1406 #if !defined(THREADED_RTS)
1407 // Must be a bound thread that is not the topmost one. Leave
1408 // it on the run queue until the stack has unwound to the
1409 // point where we can deal with this. Leaving it on the run
1410 // queue also ensures that the garbage collector knows about
1411 // this thread and its return value (it gets dropped from the
1412 // step->threads list so there's no other way to find it).
1413 appendToRunQueue(cap,t);
1416 // this cannot happen in the threaded RTS, because a
1417 // bound thread can only be run by the appropriate Task.
1418 barf("finished bound thread that isn't mine");
1422 ASSERT(task->tso == t);
1424 if (t->what_next == ThreadComplete) {
1426 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1427 *(task->ret) = (StgClosure *)task->tso->sp[1];
1429 task->stat = Success;
1432 *(task->ret) = NULL;
1434 if (sched_state >= SCHED_INTERRUPTING) {
1435 task->stat = Interrupted;
1437 task->stat = Killed;
1441 removeThreadLabel((StgWord)task->tso->id);
1443 return rtsTrue; // tells schedule() to return
1449 /* -----------------------------------------------------------------------------
1450 * Perform a heap census
1451 * -------------------------------------------------------------------------- */
1454 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1456 // When we have +RTS -i0 and we're heap profiling, do a census at
1457 // every GC. This lets us get repeatable runs for debugging.
1458 if (performHeapProfile ||
1459 (RtsFlags.ProfFlags.profileInterval==0 &&
1460 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1467 /* -----------------------------------------------------------------------------
1468 * Perform a garbage collection if necessary
1469 * -------------------------------------------------------------------------- */
1472 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1474 rtsBool heap_census;
1476 /* extern static volatile StgWord waiting_for_gc;
1477 lives inside capability.c */
1478 rtsBool was_waiting;
1483 // In order to GC, there must be no threads running Haskell code.
1484 // Therefore, the GC thread needs to hold *all* the capabilities,
1485 // and release them after the GC has completed.
1487 // This seems to be the simplest way: previous attempts involved
1488 // making all the threads with capabilities give up their
1489 // capabilities and sleep except for the *last* one, which
1490 // actually did the GC. But it's quite hard to arrange for all
1491 // the other tasks to sleep and stay asleep.
1494 /* Other capabilities are prevented from running yet more Haskell
1495 threads if waiting_for_gc is set. Tested inside
1496 yieldCapability() and releaseCapability() in Capability.c */
1498 was_waiting = cas(&waiting_for_gc, 0, 1);
1501 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1502 if (cap) yieldCapability(&cap,task);
1503 } while (waiting_for_gc);
1504 return cap; // NOTE: task->cap might have changed here
1507 setContextSwitches();
1508 for (i=0; i < n_capabilities; i++) {
1509 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1510 if (cap != &capabilities[i]) {
1511 Capability *pcap = &capabilities[i];
1512 // we better hope this task doesn't get migrated to
1513 // another Capability while we're waiting for this one.
1514 // It won't, because load balancing happens while we have
1515 // all the Capabilities, but even so it's a slightly
1516 // unsavoury invariant.
1518 waitForReturnCapability(&pcap, task);
1519 if (pcap != &capabilities[i]) {
1520 barf("scheduleDoGC: got the wrong capability");
1525 waiting_for_gc = rtsFalse;
1528 // so this happens periodically:
1529 if (cap) scheduleCheckBlackHoles(cap);
1531 IF_DEBUG(scheduler, printAllThreads());
1534 * We now have all the capabilities; if we're in an interrupting
1535 * state, then we should take the opportunity to delete all the
1536 * threads in the system.
1538 if (sched_state >= SCHED_INTERRUPTING) {
1539 deleteAllThreads(&capabilities[0]);
1540 sched_state = SCHED_SHUTTING_DOWN;
1543 heap_census = scheduleNeedHeapProfile(rtsTrue);
1545 /* everybody back, start the GC.
1546 * Could do it in this thread, or signal a condition var
1547 * to do it in another thread. Either way, we need to
1548 * broadcast on gc_pending_cond afterward.
1550 #if defined(THREADED_RTS)
1551 debugTrace(DEBUG_sched, "doing GC");
1553 GarbageCollect(force_major || heap_census);
1556 debugTrace(DEBUG_sched, "performing heap census");
1558 performHeapProfile = rtsFalse;
1563 Once we are all together... this would be the place to balance all
1564 spark pools. No concurrent stealing or adding of new sparks can
1565 occur. Should be defined in Sparks.c. */
1566 balanceSparkPoolsCaps(n_capabilities, capabilities);
1569 #if defined(THREADED_RTS)
1570 // release our stash of capabilities.
1571 for (i = 0; i < n_capabilities; i++) {
1572 if (cap != &capabilities[i]) {
1573 task->cap = &capabilities[i];
1574 releaseCapability(&capabilities[i]);
1587 /* ---------------------------------------------------------------------------
1588 * Singleton fork(). Do not copy any running threads.
1589 * ------------------------------------------------------------------------- */
1592 forkProcess(HsStablePtr *entry
1593 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1598 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1605 #if defined(THREADED_RTS)
1606 if (RtsFlags.ParFlags.nNodes > 1) {
1607 errorBelch("forking not supported with +RTS -N<n> greater than 1");
1608 stg_exit(EXIT_FAILURE);
1612 debugTrace(DEBUG_sched, "forking!");
1614 // ToDo: for SMP, we should probably acquire *all* the capabilities
1617 // no funny business: hold locks while we fork, otherwise if some
1618 // other thread is holding a lock when the fork happens, the data
1619 // structure protected by the lock will forever be in an
1620 // inconsistent state in the child. See also #1391.
1621 ACQUIRE_LOCK(&sched_mutex);
1622 ACQUIRE_LOCK(&cap->lock);
1623 ACQUIRE_LOCK(&cap->running_task->lock);
1627 if (pid) { // parent
1629 RELEASE_LOCK(&sched_mutex);
1630 RELEASE_LOCK(&cap->lock);
1631 RELEASE_LOCK(&cap->running_task->lock);
1633 // just return the pid
1639 #if defined(THREADED_RTS)
1640 initMutex(&sched_mutex);
1641 initMutex(&cap->lock);
1642 initMutex(&cap->running_task->lock);
1645 // Now, all OS threads except the thread that forked are
1646 // stopped. We need to stop all Haskell threads, including
1647 // those involved in foreign calls. Also we need to delete
1648 // all Tasks, because they correspond to OS threads that are
1651 for (s = 0; s < total_steps; s++) {
1652 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1653 if (t->what_next == ThreadRelocated) {
1656 next = t->global_link;
1657 // don't allow threads to catch the ThreadKilled
1658 // exception, but we do want to raiseAsync() because these
1659 // threads may be evaluating thunks that we need later.
1660 deleteThread_(cap,t);
1665 // Empty the run queue. It seems tempting to let all the
1666 // killed threads stay on the run queue as zombies to be
1667 // cleaned up later, but some of them correspond to bound
1668 // threads for which the corresponding Task does not exist.
1669 cap->run_queue_hd = END_TSO_QUEUE;
1670 cap->run_queue_tl = END_TSO_QUEUE;
1672 // Any suspended C-calling Tasks are no more, their OS threads
1674 cap->suspended_ccalling_tasks = NULL;
1676 // Empty the threads lists. Otherwise, the garbage
1677 // collector may attempt to resurrect some of these threads.
1678 for (s = 0; s < total_steps; s++) {
1679 all_steps[s].threads = END_TSO_QUEUE;
1682 // Wipe the task list, except the current Task.
1683 ACQUIRE_LOCK(&sched_mutex);
1684 for (task = all_tasks; task != NULL; task=task->all_link) {
1685 if (task != cap->running_task) {
1686 #if defined(THREADED_RTS)
1687 initMutex(&task->lock); // see #1391
1692 RELEASE_LOCK(&sched_mutex);
1694 #if defined(THREADED_RTS)
1695 // Wipe our spare workers list, they no longer exist. New
1696 // workers will be created if necessary.
1697 cap->spare_workers = NULL;
1698 cap->returning_tasks_hd = NULL;
1699 cap->returning_tasks_tl = NULL;
1702 // On Unix, all timers are reset in the child, so we need to start
1707 cap = rts_evalStableIO(cap, entry, NULL); // run the action
1708 rts_checkSchedStatus("forkProcess",cap);
1711 hs_exit(); // clean up and exit
1712 stg_exit(EXIT_SUCCESS);
1714 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1715 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1720 /* ---------------------------------------------------------------------------
1721 * Delete all the threads in the system
1722 * ------------------------------------------------------------------------- */
1725 deleteAllThreads ( Capability *cap )
1727 // NOTE: only safe to call if we own all capabilities.
1732 debugTrace(DEBUG_sched,"deleting all threads");
1733 for (s = 0; s < total_steps; s++) {
1734 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1735 if (t->what_next == ThreadRelocated) {
1738 next = t->global_link;
1739 deleteThread(cap,t);
1744 // The run queue now contains a bunch of ThreadKilled threads. We
1745 // must not throw these away: the main thread(s) will be in there
1746 // somewhere, and the main scheduler loop has to deal with it.
1747 // Also, the run queue is the only thing keeping these threads from
1748 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1750 #if !defined(THREADED_RTS)
1751 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1752 ASSERT(sleeping_queue == END_TSO_QUEUE);
1756 /* -----------------------------------------------------------------------------
1757 Managing the suspended_ccalling_tasks list.
1758 Locks required: sched_mutex
1759 -------------------------------------------------------------------------- */
1762 suspendTask (Capability *cap, Task *task)
1764 ASSERT(task->next == NULL && task->prev == NULL);
1765 task->next = cap->suspended_ccalling_tasks;
1767 if (cap->suspended_ccalling_tasks) {
1768 cap->suspended_ccalling_tasks->prev = task;
1770 cap->suspended_ccalling_tasks = task;
1774 recoverSuspendedTask (Capability *cap, Task *task)
1777 task->prev->next = task->next;
1779 ASSERT(cap->suspended_ccalling_tasks == task);
1780 cap->suspended_ccalling_tasks = task->next;
1783 task->next->prev = task->prev;
1785 task->next = task->prev = NULL;
1788 /* ---------------------------------------------------------------------------
1789 * Suspending & resuming Haskell threads.
1791 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1792 * its capability before calling the C function. This allows another
1793 * task to pick up the capability and carry on running Haskell
1794 * threads. It also means that if the C call blocks, it won't lock
1797 * The Haskell thread making the C call is put to sleep for the
1798 * duration of the call, on the susepended_ccalling_threads queue. We
1799 * give out a token to the task, which it can use to resume the thread
1800 * on return from the C function.
1801 * ------------------------------------------------------------------------- */
1804 suspendThread (StgRegTable *reg)
1811 StgWord32 saved_winerror;
1814 saved_errno = errno;
1816 saved_winerror = GetLastError();
1819 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1821 cap = regTableToCapability(reg);
1823 task = cap->running_task;
1824 tso = cap->r.rCurrentTSO;
1826 debugTrace(DEBUG_sched,
1827 "thread %lu did a safe foreign call",
1828 (unsigned long)cap->r.rCurrentTSO->id);
1830 // XXX this might not be necessary --SDM
1831 tso->what_next = ThreadRunGHC;
1833 threadPaused(cap,tso);
1835 if ((tso->flags & TSO_BLOCKEX) == 0) {
1836 tso->why_blocked = BlockedOnCCall;
1837 tso->flags |= TSO_BLOCKEX;
1838 tso->flags &= ~TSO_INTERRUPTIBLE;
1840 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1843 // Hand back capability
1844 task->suspended_tso = tso;
1846 ACQUIRE_LOCK(&cap->lock);
1848 suspendTask(cap,task);
1849 cap->in_haskell = rtsFalse;
1850 releaseCapability_(cap,rtsFalse);
1852 RELEASE_LOCK(&cap->lock);
1854 #if defined(THREADED_RTS)
1855 /* Preparing to leave the RTS, so ensure there's a native thread/task
1856 waiting to take over.
1858 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1861 errno = saved_errno;
1863 SetLastError(saved_winerror);
1869 resumeThread (void *task_)
1876 StgWord32 saved_winerror;
1879 saved_errno = errno;
1881 saved_winerror = GetLastError();
1885 // Wait for permission to re-enter the RTS with the result.
1886 waitForReturnCapability(&cap,task);
1887 // we might be on a different capability now... but if so, our
1888 // entry on the suspended_ccalling_tasks list will also have been
1891 // Remove the thread from the suspended list
1892 recoverSuspendedTask(cap,task);
1894 tso = task->suspended_tso;
1895 task->suspended_tso = NULL;
1896 tso->_link = END_TSO_QUEUE; // no write barrier reqd
1897 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1899 if (tso->why_blocked == BlockedOnCCall) {
1900 awakenBlockedExceptionQueue(cap,tso);
1901 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1904 /* Reset blocking status */
1905 tso->why_blocked = NotBlocked;
1907 cap->r.rCurrentTSO = tso;
1908 cap->in_haskell = rtsTrue;
1909 errno = saved_errno;
1911 SetLastError(saved_winerror);
1914 /* We might have GC'd, mark the TSO dirty again */
1917 IF_DEBUG(sanity, checkTSO(tso));
1922 /* ---------------------------------------------------------------------------
1925 * scheduleThread puts a thread on the end of the runnable queue.
1926 * This will usually be done immediately after a thread is created.
1927 * The caller of scheduleThread must create the thread using e.g.
1928 * createThread and push an appropriate closure
1929 * on this thread's stack before the scheduler is invoked.
1930 * ------------------------------------------------------------------------ */
1933 scheduleThread(Capability *cap, StgTSO *tso)
1935 // The thread goes at the *end* of the run-queue, to avoid possible
1936 // starvation of any threads already on the queue.
1937 appendToRunQueue(cap,tso);
1941 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1943 #if defined(THREADED_RTS)
1944 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1945 // move this thread from now on.
1946 cpu %= RtsFlags.ParFlags.nNodes;
1947 if (cpu == cap->no) {
1948 appendToRunQueue(cap,tso);
1950 wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1953 appendToRunQueue(cap,tso);
1958 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1962 // We already created/initialised the Task
1963 task = cap->running_task;
1965 // This TSO is now a bound thread; make the Task and TSO
1966 // point to each other.
1972 task->stat = NoStatus;
1974 appendToRunQueue(cap,tso);
1976 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1978 cap = schedule(cap,task);
1980 ASSERT(task->stat != NoStatus);
1981 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1983 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1987 /* ----------------------------------------------------------------------------
1989 * ------------------------------------------------------------------------- */
1991 #if defined(THREADED_RTS)
1992 void OSThreadProcAttr
1993 workerStart(Task *task)
1997 // See startWorkerTask().
1998 ACQUIRE_LOCK(&task->lock);
2000 RELEASE_LOCK(&task->lock);
2002 // set the thread-local pointer to the Task:
2005 // schedule() runs without a lock.
2006 cap = schedule(cap,task);
2008 // On exit from schedule(), we have a Capability.
2009 releaseCapability(cap);
2010 workerTaskStop(task);
2014 /* ---------------------------------------------------------------------------
2017 * Initialise the scheduler. This resets all the queues - if the
2018 * queues contained any threads, they'll be garbage collected at the
2021 * ------------------------------------------------------------------------ */
2026 #if !defined(THREADED_RTS)
2027 blocked_queue_hd = END_TSO_QUEUE;
2028 blocked_queue_tl = END_TSO_QUEUE;
2029 sleeping_queue = END_TSO_QUEUE;
2032 blackhole_queue = END_TSO_QUEUE;
2034 sched_state = SCHED_RUNNING;
2035 recent_activity = ACTIVITY_YES;
2037 #if defined(THREADED_RTS)
2038 /* Initialise the mutex and condition variables used by
2040 initMutex(&sched_mutex);
2043 ACQUIRE_LOCK(&sched_mutex);
2045 /* A capability holds the state a native thread needs in
2046 * order to execute STG code. At least one capability is
2047 * floating around (only THREADED_RTS builds have more than one).
2053 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2057 #if defined(THREADED_RTS)
2059 * Eagerly start one worker to run each Capability, except for
2060 * Capability 0. The idea is that we're probably going to start a
2061 * bound thread on Capability 0 pretty soon, so we don't want a
2062 * worker task hogging it.
2067 for (i = 1; i < n_capabilities; i++) {
2068 cap = &capabilities[i];
2069 ACQUIRE_LOCK(&cap->lock);
2070 startWorkerTask(cap, workerStart);
2071 RELEASE_LOCK(&cap->lock);
2076 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2078 RELEASE_LOCK(&sched_mutex);
2083 rtsBool wait_foreign
2084 #if !defined(THREADED_RTS)
2085 __attribute__((unused))
2088 /* see Capability.c, shutdownCapability() */
2092 #if defined(THREADED_RTS)
2093 ACQUIRE_LOCK(&sched_mutex);
2094 task = newBoundTask();
2095 RELEASE_LOCK(&sched_mutex);
2098 // If we haven't killed all the threads yet, do it now.
2099 if (sched_state < SCHED_SHUTTING_DOWN) {
2100 sched_state = SCHED_INTERRUPTING;
2101 scheduleDoGC(NULL,task,rtsFalse);
2103 sched_state = SCHED_SHUTTING_DOWN;
2105 #if defined(THREADED_RTS)
2109 for (i = 0; i < n_capabilities; i++) {
2110 shutdownCapability(&capabilities[i], task, wait_foreign);
2112 boundTaskExiting(task);
2116 freeCapability(&MainCapability);
2121 freeScheduler( void )
2124 if (n_capabilities != 1) {
2125 stgFree(capabilities);
2127 #if defined(THREADED_RTS)
2128 closeMutex(&sched_mutex);
2132 /* -----------------------------------------------------------------------------
2135 This is the interface to the garbage collector from Haskell land.
2136 We provide this so that external C code can allocate and garbage
2137 collect when called from Haskell via _ccall_GC.
2138 -------------------------------------------------------------------------- */
2141 performGC_(rtsBool force_major)
2144 // We must grab a new Task here, because the existing Task may be
2145 // associated with a particular Capability, and chained onto the
2146 // suspended_ccalling_tasks queue.
2147 ACQUIRE_LOCK(&sched_mutex);
2148 task = newBoundTask();
2149 RELEASE_LOCK(&sched_mutex);
2150 scheduleDoGC(NULL,task,force_major);
2151 boundTaskExiting(task);
2157 performGC_(rtsFalse);
2161 performMajorGC(void)
2163 performGC_(rtsTrue);
2166 /* -----------------------------------------------------------------------------
2169 If the thread has reached its maximum stack size, then raise the
2170 StackOverflow exception in the offending thread. Otherwise
2171 relocate the TSO into a larger chunk of memory and adjust its stack
2173 -------------------------------------------------------------------------- */
2176 threadStackOverflow(Capability *cap, StgTSO *tso)
2178 nat new_stack_size, stack_words;
2183 IF_DEBUG(sanity,checkTSO(tso));
2185 // don't allow throwTo() to modify the blocked_exceptions queue
2186 // while we are moving the TSO:
2187 lockClosure((StgClosure *)tso);
2189 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2190 // NB. never raise a StackOverflow exception if the thread is
2191 // inside Control.Exceptino.block. It is impractical to protect
2192 // against stack overflow exceptions, since virtually anything
2193 // can raise one (even 'catch'), so this is the only sensible
2194 // thing to do here. See bug #767.
2196 debugTrace(DEBUG_gc,
2197 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2198 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2200 /* If we're debugging, just print out the top of the stack */
2201 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2204 // Send this thread the StackOverflow exception
2206 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2210 /* Try to double the current stack size. If that takes us over the
2211 * maximum stack size for this thread, then use the maximum instead
2212 * (that is, unless we're already at or over the max size and we
2213 * can't raise the StackOverflow exception (see above), in which
2214 * case just double the size). Finally round up so the TSO ends up as
2215 * a whole number of blocks.
2217 if (tso->stack_size >= tso->max_stack_size) {
2218 new_stack_size = tso->stack_size * 2;
2220 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2222 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2223 TSO_STRUCT_SIZE)/sizeof(W_);
2224 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2225 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2227 debugTrace(DEBUG_sched,
2228 "increasing stack size from %ld words to %d.",
2229 (long)tso->stack_size, new_stack_size);
2231 dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2232 TICK_ALLOC_TSO(new_stack_size,0);
2234 /* copy the TSO block and the old stack into the new area */
2235 memcpy(dest,tso,TSO_STRUCT_SIZE);
2236 stack_words = tso->stack + tso->stack_size - tso->sp;
2237 new_sp = (P_)dest + new_tso_size - stack_words;
2238 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2240 /* relocate the stack pointers... */
2242 dest->stack_size = new_stack_size;
2244 /* Mark the old TSO as relocated. We have to check for relocated
2245 * TSOs in the garbage collector and any primops that deal with TSOs.
2247 * It's important to set the sp value to just beyond the end
2248 * of the stack, so we don't attempt to scavenge any part of the
2251 tso->what_next = ThreadRelocated;
2252 setTSOLink(cap,tso,dest);
2253 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2254 tso->why_blocked = NotBlocked;
2256 IF_PAR_DEBUG(verbose,
2257 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2258 tso->id, tso, tso->stack_size);
2259 /* If we're debugging, just print out the top of the stack */
2260 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2266 IF_DEBUG(sanity,checkTSO(dest));
2268 IF_DEBUG(scheduler,printTSO(dest));
2275 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2277 bdescr *bd, *new_bd;
2278 lnat free_w, tso_size_w;
2281 tso_size_w = tso_sizeW(tso);
2283 if (tso_size_w < MBLOCK_SIZE_W ||
2284 (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
2289 // don't allow throwTo() to modify the blocked_exceptions queue
2290 // while we are moving the TSO:
2291 lockClosure((StgClosure *)tso);
2293 // this is the number of words we'll free
2294 free_w = round_to_mblocks(tso_size_w/2);
2296 bd = Bdescr((StgPtr)tso);
2297 new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2298 bd->free = bd->start + TSO_STRUCT_SIZEW;
2300 new_tso = (StgTSO *)new_bd->start;
2301 memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2302 new_tso->stack_size = new_bd->free - new_tso->stack;
2304 debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2305 (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2307 tso->what_next = ThreadRelocated;
2308 tso->_link = new_tso; // no write barrier reqd: same generation
2310 // The TSO attached to this Task may have moved, so update the
2312 if (task->tso == tso) {
2313 task->tso = new_tso;
2319 IF_DEBUG(sanity,checkTSO(new_tso));
2324 /* ---------------------------------------------------------------------------
2326 - usually called inside a signal handler so it mustn't do anything fancy.
2327 ------------------------------------------------------------------------ */
2330 interruptStgRts(void)
2332 sched_state = SCHED_INTERRUPTING;
2333 setContextSwitches();
2337 /* -----------------------------------------------------------------------------
2340 This function causes at least one OS thread to wake up and run the
2341 scheduler loop. It is invoked when the RTS might be deadlocked, or
2342 an external event has arrived that may need servicing (eg. a
2343 keyboard interrupt).
2345 In the single-threaded RTS we don't do anything here; we only have
2346 one thread anyway, and the event that caused us to want to wake up
2347 will have interrupted any blocking system call in progress anyway.
2348 -------------------------------------------------------------------------- */
2353 #if defined(THREADED_RTS)
2354 // This forces the IO Manager thread to wakeup, which will
2355 // in turn ensure that some OS thread wakes up and runs the
2356 // scheduler loop, which will cause a GC and deadlock check.
2361 /* -----------------------------------------------------------------------------
2364 * Check the blackhole_queue for threads that can be woken up. We do
2365 * this periodically: before every GC, and whenever the run queue is
2368 * An elegant solution might be to just wake up all the blocked
2369 * threads with awakenBlockedQueue occasionally: they'll go back to
2370 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2371 * doesn't give us a way to tell whether we've actually managed to
2372 * wake up any threads, so we would be busy-waiting.
2374 * -------------------------------------------------------------------------- */
2377 checkBlackHoles (Capability *cap)
2380 rtsBool any_woke_up = rtsFalse;
2383 // blackhole_queue is global:
2384 ASSERT_LOCK_HELD(&sched_mutex);
2386 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2388 // ASSUMES: sched_mutex
2389 prev = &blackhole_queue;
2390 t = blackhole_queue;
2391 while (t != END_TSO_QUEUE) {
2392 ASSERT(t->why_blocked == BlockedOnBlackHole);
2393 type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2394 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2395 IF_DEBUG(sanity,checkTSO(t));
2396 t = unblockOne(cap, t);
2398 any_woke_up = rtsTrue;
2408 /* -----------------------------------------------------------------------------
2411 This is used for interruption (^C) and forking, and corresponds to
2412 raising an exception but without letting the thread catch the
2414 -------------------------------------------------------------------------- */
2417 deleteThread (Capability *cap, StgTSO *tso)
2419 // NOTE: must only be called on a TSO that we have exclusive
2420 // access to, because we will call throwToSingleThreaded() below.
2421 // The TSO must be on the run queue of the Capability we own, or
2422 // we must own all Capabilities.
2424 if (tso->why_blocked != BlockedOnCCall &&
2425 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2426 throwToSingleThreaded(cap,tso,NULL);
2430 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2432 deleteThread_(Capability *cap, StgTSO *tso)
2433 { // for forkProcess only:
2434 // like deleteThread(), but we delete threads in foreign calls, too.
2436 if (tso->why_blocked == BlockedOnCCall ||
2437 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2438 unblockOne(cap,tso);
2439 tso->what_next = ThreadKilled;
2441 deleteThread(cap,tso);
2446 /* -----------------------------------------------------------------------------
2447 raiseExceptionHelper
2449 This function is called by the raise# primitve, just so that we can
2450 move some of the tricky bits of raising an exception from C-- into
2451 C. Who knows, it might be a useful re-useable thing here too.
2452 -------------------------------------------------------------------------- */
2455 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2457 Capability *cap = regTableToCapability(reg);
2458 StgThunk *raise_closure = NULL;
2460 StgRetInfoTable *info;
2462 // This closure represents the expression 'raise# E' where E
2463 // is the exception raise. It is used to overwrite all the
2464 // thunks which are currently under evaluataion.
2467 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2468 // LDV profiling: stg_raise_info has THUNK as its closure
2469 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2470 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2471 // 1 does not cause any problem unless profiling is performed.
2472 // However, when LDV profiling goes on, we need to linearly scan
2473 // small object pool, where raise_closure is stored, so we should
2474 // use MIN_UPD_SIZE.
2476 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2477 // sizeofW(StgClosure)+1);
2481 // Walk up the stack, looking for the catch frame. On the way,
2482 // we update any closures pointed to from update frames with the
2483 // raise closure that we just built.
2487 info = get_ret_itbl((StgClosure *)p);
2488 next = p + stack_frame_sizeW((StgClosure *)p);
2489 switch (info->i.type) {
2492 // Only create raise_closure if we need to.
2493 if (raise_closure == NULL) {
2495 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2496 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2497 raise_closure->payload[0] = exception;
2499 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2503 case ATOMICALLY_FRAME:
2504 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2506 return ATOMICALLY_FRAME;
2512 case CATCH_STM_FRAME:
2513 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2515 return CATCH_STM_FRAME;
2521 case CATCH_RETRY_FRAME:
2530 /* -----------------------------------------------------------------------------
2531 findRetryFrameHelper
2533 This function is called by the retry# primitive. It traverses the stack
2534 leaving tso->sp referring to the frame which should handle the retry.
2536 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2537 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2539 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2540 create) because retries are not considered to be exceptions, despite the
2541 similar implementation.
2543 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2544 not be created within memory transactions.
2545 -------------------------------------------------------------------------- */
2548 findRetryFrameHelper (StgTSO *tso)
2551 StgRetInfoTable *info;
2555 info = get_ret_itbl((StgClosure *)p);
2556 next = p + stack_frame_sizeW((StgClosure *)p);
2557 switch (info->i.type) {
2559 case ATOMICALLY_FRAME:
2560 debugTrace(DEBUG_stm,
2561 "found ATOMICALLY_FRAME at %p during retry", p);
2563 return ATOMICALLY_FRAME;
2565 case CATCH_RETRY_FRAME:
2566 debugTrace(DEBUG_stm,
2567 "found CATCH_RETRY_FRAME at %p during retrry", p);
2569 return CATCH_RETRY_FRAME;
2571 case CATCH_STM_FRAME: {
2572 StgTRecHeader *trec = tso -> trec;
2573 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2574 debugTrace(DEBUG_stm,
2575 "found CATCH_STM_FRAME at %p during retry", p);
2576 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2577 stmAbortTransaction(tso -> cap, trec);
2578 stmFreeAbortedTRec(tso -> cap, trec);
2579 tso -> trec = outer;
2586 ASSERT(info->i.type != CATCH_FRAME);
2587 ASSERT(info->i.type != STOP_FRAME);
2594 /* -----------------------------------------------------------------------------
2595 resurrectThreads is called after garbage collection on the list of
2596 threads found to be garbage. Each of these threads will be woken
2597 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2598 on an MVar, or NonTermination if the thread was blocked on a Black
2601 Locks: assumes we hold *all* the capabilities.
2602 -------------------------------------------------------------------------- */
2605 resurrectThreads (StgTSO *threads)
2611 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2612 next = tso->global_link;
2614 step = Bdescr((P_)tso)->step;
2615 tso->global_link = step->threads;
2616 step->threads = tso;
2618 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2620 // Wake up the thread on the Capability it was last on
2623 switch (tso->why_blocked) {
2625 case BlockedOnException:
2626 /* Called by GC - sched_mutex lock is currently held. */
2627 throwToSingleThreaded(cap, tso,
2628 (StgClosure *)blockedOnDeadMVar_closure);
2630 case BlockedOnBlackHole:
2631 throwToSingleThreaded(cap, tso,
2632 (StgClosure *)nonTermination_closure);
2635 throwToSingleThreaded(cap, tso,
2636 (StgClosure *)blockedIndefinitely_closure);
2639 /* This might happen if the thread was blocked on a black hole
2640 * belonging to a thread that we've just woken up (raiseAsync
2641 * can wake up threads, remember...).
2645 barf("resurrectThreads: thread blocked in a strange way");
2650 /* -----------------------------------------------------------------------------
2651 performPendingThrowTos is called after garbage collection, and
2652 passed a list of threads that were found to have pending throwTos
2653 (tso->blocked_exceptions was not empty), and were blocked.
2654 Normally this doesn't happen, because we would deliver the
2655 exception directly if the target thread is blocked, but there are
2656 small windows where it might occur on a multiprocessor (see
2659 NB. we must be holding all the capabilities at this point, just
2660 like resurrectThreads().
2661 -------------------------------------------------------------------------- */
2664 performPendingThrowTos (StgTSO *threads)
2670 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2671 next = tso->global_link;
2673 step = Bdescr((P_)tso)->step;
2674 tso->global_link = step->threads;
2675 step->threads = tso;
2677 debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2680 maybePerformBlockedException(cap, tso);