1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
23 #include "RtsSignals.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
32 #include "Proftimer.h"
35 /* PARALLEL_HASKELL includes go here */
38 #include "Capability.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
45 #include "RaiseAsync.h"
47 #include "ThrIOManager.h"
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
64 // Turn off inlining when debugging - it obfuscates things
67 # define STATIC_INLINE static
70 /* -----------------------------------------------------------------------------
72 * -------------------------------------------------------------------------- */
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
81 /* Threads blocked on blackholes.
82 * LOCK: sched_mutex+capability, or all capabilities
84 StgTSO *blackhole_queue = NULL;
86 /* The blackhole_queue should be checked for threads to wake up. See
87 * Schedule.h for more thorough comment.
88 * LOCK: none (doesn't matter if we miss an update)
90 rtsBool blackholes_need_checking = rtsFalse;
92 /* flag that tracks whether we have done any execution in this time slice.
93 * LOCK: currently none, perhaps we should lock (but needs to be
94 * updated in the fast path of the scheduler).
96 nat recent_activity = ACTIVITY_YES;
98 /* if this flag is set as well, give up execution
99 * LOCK: none (changes once, from false->true)
101 rtsBool sched_state = SCHED_RUNNING;
103 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
104 * exists - earlier gccs apparently didn't.
110 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111 * in an MT setting, needed to signal that a worker thread shouldn't hang around
112 * in the scheduler when it is out of work.
114 rtsBool shutting_down_scheduler = rtsFalse;
117 * This mutex protects most of the global scheduler data in
118 * the THREADED_RTS runtime.
120 #if defined(THREADED_RTS)
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
128 /* -----------------------------------------------------------------------------
129 * static function prototypes
130 * -------------------------------------------------------------------------- */
132 static Capability *schedule (Capability *initialCapability, Task *task);
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability *cap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
144 static void scheduleStartSignalHandlers (Capability *cap);
145 static void scheduleCheckBlockedThreads (Capability *cap);
146 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
147 static void scheduleCheckBlackHoles (Capability *cap);
148 static void scheduleDetectDeadlock (Capability *cap, Task *task);
149 static void schedulePushWork(Capability *cap, Task *task);
150 #if defined(PARALLEL_HASKELL)
151 static rtsBool scheduleGetRemoteWork(Capability *cap);
152 static void scheduleSendPendingMessages(void);
154 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
155 static void scheduleActivateSpark(Capability *cap);
157 static void schedulePostRunThread(Capability *cap, StgTSO *t);
158 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
159 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
161 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
162 nat prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
166 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
167 static Capability *scheduleDoGC(Capability *cap, Task *task,
168 rtsBool force_major);
170 static rtsBool checkBlackHoles(Capability *cap);
172 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
173 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
175 static void deleteThread (Capability *cap, StgTSO *tso);
176 static void deleteAllThreads (Capability *cap);
178 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
179 static void deleteThread_(Capability *cap, StgTSO *tso);
183 static char *whatNext_strs[] = {
193 /* -----------------------------------------------------------------------------
194 * Putting a thread on the run queue: different scheduling policies
195 * -------------------------------------------------------------------------- */
198 addToRunQueue( Capability *cap, StgTSO *t )
200 #if defined(PARALLEL_HASKELL)
201 if (RtsFlags.ParFlags.doFairScheduling) {
202 // this does round-robin scheduling; good for concurrency
203 appendToRunQueue(cap,t);
205 // this does unfair scheduling; good for parallelism
206 pushOnRunQueue(cap,t);
209 // this does round-robin scheduling; good for concurrency
210 appendToRunQueue(cap,t);
214 /* ---------------------------------------------------------------------------
215 Main scheduling loop.
217 We use round-robin scheduling, each thread returning to the
218 scheduler loop when one of these conditions is detected:
221 * timer expires (thread yields)
227 In a GranSim setup this loop iterates over the global event queue.
228 This revolves around the global event queue, which determines what
229 to do next. Therefore, it's more complicated than either the
230 concurrent or the parallel (GUM) setup.
231 This version has been entirely removed (JB 2008/08).
234 GUM iterates over incoming messages.
235 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
236 and sends out a fish whenever it has nothing to do; in-between
237 doing the actual reductions (shared code below) it processes the
238 incoming messages and deals with delayed operations
239 (see PendingFetches).
240 This is not the ugliest code you could imagine, but it's bloody close.
242 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
243 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
244 as well as future GUM versions. This file has been refurbished to
245 only contain valid code, which is however incomplete, refers to
246 invalid includes etc.
248 ------------------------------------------------------------------------ */
251 schedule (Capability *initialCapability, Task *task)
255 StgThreadReturnCode ret;
256 #if defined(PARALLEL_HASKELL)
257 rtsBool receivedFinish = rtsFalse;
261 #if defined(THREADED_RTS)
262 rtsBool first = rtsTrue;
265 cap = initialCapability;
267 // Pre-condition: this task owns initialCapability.
268 // The sched_mutex is *NOT* held
269 // NB. on return, we still hold a capability.
271 debugTrace (DEBUG_sched,
272 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
273 task, initialCapability);
277 // -----------------------------------------------------------
278 // Scheduler loop starts here:
280 #if defined(PARALLEL_HASKELL)
281 #define TERMINATION_CONDITION (!receivedFinish)
283 #define TERMINATION_CONDITION rtsTrue
286 while (TERMINATION_CONDITION) {
288 // Check whether we have re-entered the RTS from Haskell without
289 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
291 if (cap->in_haskell) {
292 errorBelch("schedule: re-entered unsafely.\n"
293 " Perhaps a 'foreign import unsafe' should be 'safe'?");
294 stg_exit(EXIT_FAILURE);
297 // The interruption / shutdown sequence.
299 // In order to cleanly shut down the runtime, we want to:
300 // * make sure that all main threads return to their callers
301 // with the state 'Interrupted'.
302 // * clean up all OS threads assocated with the runtime
303 // * free all memory etc.
305 // So the sequence for ^C goes like this:
307 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
308 // arranges for some Capability to wake up
310 // * all threads in the system are halted, and the zombies are
311 // placed on the run queue for cleaning up. We acquire all
312 // the capabilities in order to delete the threads, this is
313 // done by scheduleDoGC() for convenience (because GC already
314 // needs to acquire all the capabilities). We can't kill
315 // threads involved in foreign calls.
317 // * somebody calls shutdownHaskell(), which calls exitScheduler()
319 // * sched_state := SCHED_SHUTTING_DOWN
321 // * all workers exit when the run queue on their capability
322 // drains. All main threads will also exit when their TSO
323 // reaches the head of the run queue and they can return.
325 // * eventually all Capabilities will shut down, and the RTS can
328 // * We might be left with threads blocked in foreign calls,
329 // we should really attempt to kill these somehow (TODO);
331 switch (sched_state) {
334 case SCHED_INTERRUPTING:
335 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
336 #if defined(THREADED_RTS)
337 discardSparksCap(cap);
339 /* scheduleDoGC() deletes all the threads */
340 cap = scheduleDoGC(cap,task,rtsFalse);
342 case SCHED_SHUTTING_DOWN:
343 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
344 // If we are a worker, just exit. If we're a bound thread
345 // then we will exit below when we've removed our TSO from
347 if (task->tso == NULL && emptyRunQueue(cap)) {
352 barf("sched_state: %d", sched_state);
355 scheduleFindWork(cap);
357 /* work pushing, currently relevant only for THREADED_RTS:
358 (pushes threads, wakes up idle capabilities for stealing) */
359 schedulePushWork(cap,task);
361 #if defined(PARALLEL_HASKELL)
362 /* since we perform a blocking receive and continue otherwise,
363 either we never reach here or we definitely have work! */
364 // from here: non-empty run queue
365 ASSERT(!emptyRunQueue(cap));
367 if (PacketsWaiting()) { /* now process incoming messages, if any
370 CAUTION: scheduleGetRemoteWork called
371 above, waits for messages as well! */
372 processMessages(cap, &receivedFinish);
374 #endif // PARALLEL_HASKELL: non-empty run queue!
376 scheduleDetectDeadlock(cap,task);
378 #if defined(THREADED_RTS)
379 cap = task->cap; // reload cap, it might have changed
382 // Normally, the only way we can get here with no threads to
383 // run is if a keyboard interrupt received during
384 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
385 // Additionally, it is not fatal for the
386 // threaded RTS to reach here with no threads to run.
388 // win32: might be here due to awaitEvent() being abandoned
389 // as a result of a console event having been delivered.
391 #if defined(THREADED_RTS)
395 // // don't yield the first time, we want a chance to run this
396 // // thread for a bit, even if there are others banging at the
399 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
402 scheduleYield(&cap,task);
403 if (emptyRunQueue(cap)) continue; // look for work again
406 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
407 if ( emptyRunQueue(cap) ) {
408 ASSERT(sched_state >= SCHED_INTERRUPTING);
413 // Get a thread to run
415 t = popRunQueue(cap);
417 // Sanity check the thread we're about to run. This can be
418 // expensive if there is lots of thread switching going on...
419 IF_DEBUG(sanity,checkTSO(t));
421 #if defined(THREADED_RTS)
422 // Check whether we can run this thread in the current task.
423 // If not, we have to pass our capability to the right task.
425 Task *bound = t->bound;
429 debugTrace(DEBUG_sched,
430 "### Running thread %lu in bound thread", (unsigned long)t->id);
431 // yes, the Haskell thread is bound to the current native thread
433 debugTrace(DEBUG_sched,
434 "### thread %lu bound to another OS thread", (unsigned long)t->id);
435 // no, bound to a different Haskell thread: pass to that thread
436 pushOnRunQueue(cap,t);
440 // The thread we want to run is unbound.
442 debugTrace(DEBUG_sched,
443 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
444 // no, the current native thread is bound to a different
445 // Haskell thread, so pass it to any worker thread
446 pushOnRunQueue(cap,t);
453 /* context switches are initiated by the timer signal, unless
454 * the user specified "context switch as often as possible", with
457 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
458 && !emptyThreadQueues(cap)) {
459 cap->context_switch = 1;
464 // CurrentTSO is the thread to run. t might be different if we
465 // loop back to run_thread, so make sure to set CurrentTSO after
467 cap->r.rCurrentTSO = t;
469 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
470 (long)t->id, whatNext_strs[t->what_next]);
472 startHeapProfTimer();
474 // Check for exceptions blocked on this thread
475 maybePerformBlockedException (cap, t);
477 // ----------------------------------------------------------------------
478 // Run the current thread
480 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
481 ASSERT(t->cap == cap);
482 ASSERT(t->bound ? t->bound->cap == cap : 1);
484 prev_what_next = t->what_next;
486 errno = t->saved_errno;
488 SetLastError(t->saved_winerror);
491 cap->in_haskell = rtsTrue;
495 #if defined(THREADED_RTS)
496 if (recent_activity == ACTIVITY_DONE_GC) {
497 // ACTIVITY_DONE_GC means we turned off the timer signal to
498 // conserve power (see #1623). Re-enable it here.
500 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
501 if (prev == ACTIVITY_DONE_GC) {
505 recent_activity = ACTIVITY_YES;
509 switch (prev_what_next) {
513 /* Thread already finished, return to scheduler. */
514 ret = ThreadFinished;
520 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
521 cap = regTableToCapability(r);
526 case ThreadInterpret:
527 cap = interpretBCO(cap);
532 barf("schedule: invalid what_next field");
535 cap->in_haskell = rtsFalse;
537 // The TSO might have moved, eg. if it re-entered the RTS and a GC
538 // happened. So find the new location:
539 t = cap->r.rCurrentTSO;
541 // We have run some Haskell code: there might be blackhole-blocked
542 // threads to wake up now.
543 // Lock-free test here should be ok, we're just setting a flag.
544 if ( blackhole_queue != END_TSO_QUEUE ) {
545 blackholes_need_checking = rtsTrue;
548 // And save the current errno in this thread.
549 // XXX: possibly bogus for SMP because this thread might already
550 // be running again, see code below.
551 t->saved_errno = errno;
553 // Similarly for Windows error code
554 t->saved_winerror = GetLastError();
557 #if defined(THREADED_RTS)
558 // If ret is ThreadBlocked, and this Task is bound to the TSO that
559 // blocked, we are in limbo - the TSO is now owned by whatever it
560 // is blocked on, and may in fact already have been woken up,
561 // perhaps even on a different Capability. It may be the case
562 // that task->cap != cap. We better yield this Capability
563 // immediately and return to normaility.
564 if (ret == ThreadBlocked) {
565 debugTrace(DEBUG_sched,
566 "--<< thread %lu (%s) stopped: blocked",
567 (unsigned long)t->id, whatNext_strs[t->what_next]);
572 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
573 ASSERT(t->cap == cap);
575 // ----------------------------------------------------------------------
577 // Costs for the scheduler are assigned to CCS_SYSTEM
579 #if defined(PROFILING)
583 schedulePostRunThread(cap,t);
585 t = threadStackUnderflow(task,t);
587 ready_to_gc = rtsFalse;
591 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
595 scheduleHandleStackOverflow(cap,task,t);
599 if (scheduleHandleYield(cap, t, prev_what_next)) {
600 // shortcut for switching between compiler/interpreter:
606 scheduleHandleThreadBlocked(t);
610 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
611 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
615 barf("schedule: invalid thread return code %d", (int)ret);
618 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
619 cap = scheduleDoGC(cap,task,rtsFalse);
621 } /* end of while() */
624 /* ----------------------------------------------------------------------------
625 * Setting up the scheduler loop
626 * ------------------------------------------------------------------------- */
629 schedulePreLoop(void)
631 // initialisation for scheduler - what cannot go into initScheduler()
634 /* -----------------------------------------------------------------------------
637 * Search for work to do, and handle messages from elsewhere.
638 * -------------------------------------------------------------------------- */
641 scheduleFindWork (Capability *cap)
643 scheduleStartSignalHandlers(cap);
645 // Only check the black holes here if we've nothing else to do.
646 // During normal execution, the black hole list only gets checked
647 // at GC time, to avoid repeatedly traversing this possibly long
648 // list each time around the scheduler.
649 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
651 scheduleCheckWakeupThreads(cap);
653 scheduleCheckBlockedThreads(cap);
655 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
656 // Try to activate one of our own sparks
657 if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
660 #if defined(THREADED_RTS)
661 // Try to steak work if we don't have any
662 if (emptyRunQueue(cap)) { stealWork(cap); }
665 #if defined(PARALLEL_HASKELL)
666 // if messages have been buffered...
667 scheduleSendPendingMessages();
670 #if defined(PARALLEL_HASKELL)
671 if (emptyRunQueue(cap)) {
672 receivedFinish = scheduleGetRemoteWork(cap);
673 continue; // a new round, (hopefully) with new work
675 in GUM, this a) sends out a FISH and returns IF no fish is
677 b) (blocking) awaits and receives messages
679 in Eden, this is only the blocking receive, as b) in GUM.
685 #if defined(THREADED_RTS)
686 STATIC_INLINE rtsBool
687 shouldYieldCapability (Capability *cap, Task *task)
689 // we need to yield this capability to someone else if..
690 // - another thread is initiating a GC
691 // - another Task is returning from a foreign call
692 // - the thread at the head of the run queue cannot be run
693 // by this Task (it is bound to another Task, or it is unbound
694 // and this task it bound).
695 return (waiting_for_gc ||
696 cap->returning_tasks_hd != NULL ||
697 (!emptyRunQueue(cap) && (task->tso == NULL
698 ? cap->run_queue_hd->bound != NULL
699 : cap->run_queue_hd->bound != task)));
702 // This is the single place where a Task goes to sleep. There are
703 // two reasons it might need to sleep:
704 // - there are no threads to run
705 // - we need to yield this Capability to someone else
706 // (see shouldYieldCapability())
708 // The return value indicates whether
711 scheduleYield (Capability **pcap, Task *task)
713 Capability *cap = *pcap;
715 // if we have work, and we don't need to give up the Capability, continue.
716 if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
719 // otherwise yield (sleep), and keep yielding if necessary.
721 yieldCapability(&cap,task);
723 while (shouldYieldCapability(cap,task));
725 // note there may still be no threads on the run queue at this
726 // point, the caller has to check.
733 /* -----------------------------------------------------------------------------
736 * Push work to other Capabilities if we have some.
737 * -------------------------------------------------------------------------- */
740 schedulePushWork(Capability *cap USED_IF_THREADS,
741 Task *task USED_IF_THREADS)
743 /* following code not for PARALLEL_HASKELL. I kept the call general,
744 future GUM versions might use pushing in a distributed setup */
745 #if defined(THREADED_RTS)
747 Capability *free_caps[n_capabilities], *cap0;
750 // migration can be turned off with +RTS -qg
751 if (!RtsFlags.ParFlags.migrate) return;
753 // Check whether we have more threads on our run queue, or sparks
754 // in our pool, that we could hand to another Capability.
755 if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
756 && sparkPoolSizeCap(cap) < 2) {
760 // First grab as many free Capabilities as we can.
761 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
762 cap0 = &capabilities[i];
763 if (cap != cap0 && tryGrabCapability(cap0,task)) {
764 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
765 // it already has some work, we just grabbed it at
766 // the wrong moment. Or maybe it's deadlocked!
767 releaseCapability(cap0);
769 free_caps[n_free_caps++] = cap0;
774 // we now have n_free_caps free capabilities stashed in
775 // free_caps[]. Share our run queue equally with them. This is
776 // probably the simplest thing we could do; improvements we might
777 // want to do include:
779 // - giving high priority to moving relatively new threads, on
780 // the gournds that they haven't had time to build up a
781 // working set in the cache on this CPU/Capability.
783 // - giving low priority to moving long-lived threads
785 if (n_free_caps > 0) {
786 StgTSO *prev, *t, *next;
787 rtsBool pushed_to_all;
789 debugTrace(DEBUG_sched,
790 "cap %d: %s and %d free capabilities, sharing...",
792 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
793 "excess threads on run queue":"sparks to share (>=2)",
797 pushed_to_all = rtsFalse;
799 if (cap->run_queue_hd != END_TSO_QUEUE) {
800 prev = cap->run_queue_hd;
802 prev->_link = END_TSO_QUEUE;
803 for (; t != END_TSO_QUEUE; t = next) {
805 t->_link = END_TSO_QUEUE;
806 if (t->what_next == ThreadRelocated
807 || t->bound == task // don't move my bound thread
808 || tsoLocked(t)) { // don't move a locked thread
809 setTSOLink(cap, prev, t);
811 } else if (i == n_free_caps) {
812 pushed_to_all = rtsTrue;
815 setTSOLink(cap, prev, t);
818 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
819 appendToRunQueue(free_caps[i],t);
820 if (t->bound) { t->bound->cap = free_caps[i]; }
821 t->cap = free_caps[i];
825 cap->run_queue_tl = prev;
829 /* JB I left this code in place, it would work but is not necessary */
831 // If there are some free capabilities that we didn't push any
832 // threads to, then try to push a spark to each one.
833 if (!pushed_to_all) {
835 // i is the next free capability to push to
836 for (; i < n_free_caps; i++) {
837 if (emptySparkPoolCap(free_caps[i])) {
838 spark = tryStealSpark(cap->sparks);
840 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
841 newSpark(&(free_caps[i]->r), spark);
846 #endif /* SPARK_PUSHING */
848 // release the capabilities
849 for (i = 0; i < n_free_caps; i++) {
850 task->cap = free_caps[i];
851 releaseAndWakeupCapability(free_caps[i]);
854 task->cap = cap; // reset to point to our Capability.
856 #endif /* THREADED_RTS */
860 /* ----------------------------------------------------------------------------
861 * Start any pending signal handlers
862 * ------------------------------------------------------------------------- */
864 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
866 scheduleStartSignalHandlers(Capability *cap)
868 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
869 // safe outside the lock
870 startSignalHandlers(cap);
875 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
880 /* ----------------------------------------------------------------------------
881 * Check for blocked threads that can be woken up.
882 * ------------------------------------------------------------------------- */
885 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
887 #if !defined(THREADED_RTS)
889 // Check whether any waiting threads need to be woken up. If the
890 // run queue is empty, and there are no other tasks running, we
891 // can wait indefinitely for something to happen.
893 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
895 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
901 /* ----------------------------------------------------------------------------
902 * Check for threads woken up by other Capabilities
903 * ------------------------------------------------------------------------- */
906 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
908 #if defined(THREADED_RTS)
909 // Any threads that were woken up by other Capabilities get
910 // appended to our run queue.
911 if (!emptyWakeupQueue(cap)) {
912 ACQUIRE_LOCK(&cap->lock);
913 if (emptyRunQueue(cap)) {
914 cap->run_queue_hd = cap->wakeup_queue_hd;
915 cap->run_queue_tl = cap->wakeup_queue_tl;
917 setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
918 cap->run_queue_tl = cap->wakeup_queue_tl;
920 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
921 RELEASE_LOCK(&cap->lock);
926 /* ----------------------------------------------------------------------------
927 * Check for threads blocked on BLACKHOLEs that can be woken up
928 * ------------------------------------------------------------------------- */
930 scheduleCheckBlackHoles (Capability *cap)
932 if ( blackholes_need_checking ) // check without the lock first
934 ACQUIRE_LOCK(&sched_mutex);
935 if ( blackholes_need_checking ) {
936 checkBlackHoles(cap);
937 blackholes_need_checking = rtsFalse;
939 RELEASE_LOCK(&sched_mutex);
943 /* ----------------------------------------------------------------------------
944 * Detect deadlock conditions and attempt to resolve them.
945 * ------------------------------------------------------------------------- */
948 scheduleDetectDeadlock (Capability *cap, Task *task)
951 #if defined(PARALLEL_HASKELL)
952 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
957 * Detect deadlock: when we have no threads to run, there are no
958 * threads blocked, waiting for I/O, or sleeping, and all the
959 * other tasks are waiting for work, we must have a deadlock of
962 if ( emptyThreadQueues(cap) )
964 #if defined(THREADED_RTS)
966 * In the threaded RTS, we only check for deadlock if there
967 * has been no activity in a complete timeslice. This means
968 * we won't eagerly start a full GC just because we don't have
969 * any threads to run currently.
971 if (recent_activity != ACTIVITY_INACTIVE) return;
974 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
976 // Garbage collection can release some new threads due to
977 // either (a) finalizers or (b) threads resurrected because
978 // they are unreachable and will therefore be sent an
979 // exception. Any threads thus released will be immediately
981 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
983 recent_activity = ACTIVITY_DONE_GC;
984 // disable timer signals (see #1623)
987 if ( !emptyRunQueue(cap) ) return;
989 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
990 /* If we have user-installed signal handlers, then wait
991 * for signals to arrive rather then bombing out with a
994 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
995 debugTrace(DEBUG_sched,
996 "still deadlocked, waiting for signals...");
1000 if (signals_pending()) {
1001 startSignalHandlers(cap);
1004 // either we have threads to run, or we were interrupted:
1005 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1011 #if !defined(THREADED_RTS)
1012 /* Probably a real deadlock. Send the current main thread the
1013 * Deadlock exception.
1016 switch (task->tso->why_blocked) {
1018 case BlockedOnBlackHole:
1019 case BlockedOnException:
1021 throwToSingleThreaded(cap, task->tso,
1022 (StgClosure *)nonTermination_closure);
1025 barf("deadlock: main thread blocked in a strange way");
1034 /* ----------------------------------------------------------------------------
1035 * Send pending messages (PARALLEL_HASKELL only)
1036 * ------------------------------------------------------------------------- */
1038 #if defined(PARALLEL_HASKELL)
1040 scheduleSendPendingMessages(void)
1043 # if defined(PAR) // global Mem.Mgmt., omit for now
1044 if (PendingFetches != END_BF_QUEUE) {
1049 if (RtsFlags.ParFlags.BufferTime) {
1050 // if we use message buffering, we must send away all message
1051 // packets which have become too old...
1057 /* ----------------------------------------------------------------------------
1058 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1059 * ------------------------------------------------------------------------- */
1061 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1063 scheduleActivateSpark(Capability *cap)
1067 /* We only want to stay here if the run queue is empty and we want some
1068 work. We try to turn a spark into a thread, and add it to the run
1069 queue, from where it will be picked up in the next iteration of the
1072 if (!emptyRunQueue(cap))
1073 /* In the threaded RTS, another task might have pushed a thread
1074 on our run queue in the meantime ? But would need a lock.. */
1078 // Really we should be using reclaimSpark() here, but
1079 // experimentally it doesn't seem to perform as well as just
1080 // stealing from our own spark pool:
1081 // spark = reclaimSpark(cap->sparks);
1082 spark = tryStealSpark(cap->sparks); // defined in Sparks.c
1084 if (spark != NULL) {
1085 debugTrace(DEBUG_sched,
1086 "turning spark of closure %p into a thread",
1087 (StgClosure *)spark);
1088 createSparkThread(cap,spark); // defined in Sparks.c
1091 #endif // PARALLEL_HASKELL || THREADED_RTS
1093 /* ----------------------------------------------------------------------------
1094 * Get work from a remote node (PARALLEL_HASKELL only)
1095 * ------------------------------------------------------------------------- */
1097 #if defined(PARALLEL_HASKELL)
1098 static rtsBool /* return value used in PARALLEL_HASKELL only */
1099 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1101 #if defined(PARALLEL_HASKELL)
1102 rtsBool receivedFinish = rtsFalse;
1104 // idle() , i.e. send all buffers, wait for work
1105 if (RtsFlags.ParFlags.BufferTime) {
1106 IF_PAR_DEBUG(verbose,
1107 debugBelch("...send all pending data,"));
1110 for (i=1; i<=nPEs; i++)
1111 sendImmediately(i); // send all messages away immediately
1115 /* this would be the place for fishing in GUM...
1117 if (no-earlier-fish-around)
1118 sendFish(choosePe());
1121 // Eden:just look for incoming messages (blocking receive)
1122 IF_PAR_DEBUG(verbose,
1123 debugBelch("...wait for incoming messages...\n"));
1124 processMessages(cap, &receivedFinish); // blocking receive...
1127 return receivedFinish;
1128 // reenter scheduling look after having received something
1130 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1132 return rtsFalse; /* return value unused in THREADED_RTS */
1134 #endif /* PARALLEL_HASKELL */
1136 #endif // PARALLEL_HASKELL || THREADED_RTS
1138 /* ----------------------------------------------------------------------------
1139 * After running a thread...
1140 * ------------------------------------------------------------------------- */
1143 schedulePostRunThread (Capability *cap, StgTSO *t)
1145 // We have to be able to catch transactions that are in an
1146 // infinite loop as a result of seeing an inconsistent view of
1150 // [a,b] <- mapM readTVar [ta,tb]
1151 // when (a == b) loop
1153 // and a is never equal to b given a consistent view of memory.
1155 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1156 if (!stmValidateNestOfTransactions (t -> trec)) {
1157 debugTrace(DEBUG_sched | DEBUG_stm,
1158 "trec %p found wasting its time", t);
1160 // strip the stack back to the
1161 // ATOMICALLY_FRAME, aborting the (nested)
1162 // transaction, and saving the stack of any
1163 // partially-evaluated thunks on the heap.
1164 throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1166 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1170 /* some statistics gathering in the parallel case */
1173 /* -----------------------------------------------------------------------------
1174 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1175 * -------------------------------------------------------------------------- */
1178 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1180 // did the task ask for a large block?
1181 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1182 // if so, get one and push it on the front of the nursery.
1186 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1188 debugTrace(DEBUG_sched,
1189 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1190 (long)t->id, whatNext_strs[t->what_next], blocks);
1192 // don't do this if the nursery is (nearly) full, we'll GC first.
1193 if (cap->r.rCurrentNursery->link != NULL ||
1194 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1195 // if the nursery has only one block.
1198 bd = allocGroup( blocks );
1200 cap->r.rNursery->n_blocks += blocks;
1202 // link the new group into the list
1203 bd->link = cap->r.rCurrentNursery;
1204 bd->u.back = cap->r.rCurrentNursery->u.back;
1205 if (cap->r.rCurrentNursery->u.back != NULL) {
1206 cap->r.rCurrentNursery->u.back->link = bd;
1208 #if !defined(THREADED_RTS)
1209 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1210 g0s0 == cap->r.rNursery);
1212 cap->r.rNursery->blocks = bd;
1214 cap->r.rCurrentNursery->u.back = bd;
1216 // initialise it as a nursery block. We initialise the
1217 // step, gen_no, and flags field of *every* sub-block in
1218 // this large block, because this is easier than making
1219 // sure that we always find the block head of a large
1220 // block whenever we call Bdescr() (eg. evacuate() and
1221 // isAlive() in the GC would both have to do this, at
1225 for (x = bd; x < bd + blocks; x++) {
1226 x->step = cap->r.rNursery;
1232 // This assert can be a killer if the app is doing lots
1233 // of large block allocations.
1234 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1236 // now update the nursery to point to the new block
1237 cap->r.rCurrentNursery = bd;
1239 // we might be unlucky and have another thread get on the
1240 // run queue before us and steal the large block, but in that
1241 // case the thread will just end up requesting another large
1243 pushOnRunQueue(cap,t);
1244 return rtsFalse; /* not actually GC'ing */
1248 debugTrace(DEBUG_sched,
1249 "--<< thread %ld (%s) stopped: HeapOverflow",
1250 (long)t->id, whatNext_strs[t->what_next]);
1252 if (cap->context_switch) {
1253 // Sometimes we miss a context switch, e.g. when calling
1254 // primitives in a tight loop, MAYBE_GC() doesn't check the
1255 // context switch flag, and we end up waiting for a GC.
1256 // See #1984, and concurrent/should_run/1984
1257 cap->context_switch = 0;
1258 addToRunQueue(cap,t);
1260 pushOnRunQueue(cap,t);
1263 /* actual GC is done at the end of the while loop in schedule() */
1266 /* -----------------------------------------------------------------------------
1267 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1268 * -------------------------------------------------------------------------- */
1271 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1273 debugTrace (DEBUG_sched,
1274 "--<< thread %ld (%s) stopped, StackOverflow",
1275 (long)t->id, whatNext_strs[t->what_next]);
1277 /* just adjust the stack for this thread, then pop it back
1281 /* enlarge the stack */
1282 StgTSO *new_t = threadStackOverflow(cap, t);
1284 /* The TSO attached to this Task may have moved, so update the
1287 if (task->tso == t) {
1290 pushOnRunQueue(cap,new_t);
1294 /* -----------------------------------------------------------------------------
1295 * Handle a thread that returned to the scheduler with ThreadYielding
1296 * -------------------------------------------------------------------------- */
1299 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1301 // Reset the context switch flag. We don't do this just before
1302 // running the thread, because that would mean we would lose ticks
1303 // during GC, which can lead to unfair scheduling (a thread hogs
1304 // the CPU because the tick always arrives during GC). This way
1305 // penalises threads that do a lot of allocation, but that seems
1306 // better than the alternative.
1307 cap->context_switch = 0;
1309 /* put the thread back on the run queue. Then, if we're ready to
1310 * GC, check whether this is the last task to stop. If so, wake
1311 * up the GC thread. getThread will block during a GC until the
1315 if (t->what_next != prev_what_next) {
1316 debugTrace(DEBUG_sched,
1317 "--<< thread %ld (%s) stopped to switch evaluators",
1318 (long)t->id, whatNext_strs[t->what_next]);
1320 debugTrace(DEBUG_sched,
1321 "--<< thread %ld (%s) stopped, yielding",
1322 (long)t->id, whatNext_strs[t->what_next]);
1327 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1329 ASSERT(t->_link == END_TSO_QUEUE);
1331 // Shortcut if we're just switching evaluators: don't bother
1332 // doing stack squeezing (which can be expensive), just run the
1334 if (t->what_next != prev_what_next) {
1338 addToRunQueue(cap,t);
1343 /* -----------------------------------------------------------------------------
1344 * Handle a thread that returned to the scheduler with ThreadBlocked
1345 * -------------------------------------------------------------------------- */
1348 scheduleHandleThreadBlocked( StgTSO *t
1349 #if !defined(GRAN) && !defined(DEBUG)
1355 // We don't need to do anything. The thread is blocked, and it
1356 // has tidied up its stack and placed itself on whatever queue
1357 // it needs to be on.
1359 // ASSERT(t->why_blocked != NotBlocked);
1360 // Not true: for example,
1361 // - in THREADED_RTS, the thread may already have been woken
1362 // up by another Capability. This actually happens: try
1363 // conc023 +RTS -N2.
1364 // - the thread may have woken itself up already, because
1365 // threadPaused() might have raised a blocked throwTo
1366 // exception, see maybePerformBlockedException().
1369 if (traceClass(DEBUG_sched)) {
1370 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1371 (unsigned long)t->id, whatNext_strs[t->what_next]);
1372 printThreadBlockage(t);
1378 /* -----------------------------------------------------------------------------
1379 * Handle a thread that returned to the scheduler with ThreadFinished
1380 * -------------------------------------------------------------------------- */
1383 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1385 /* Need to check whether this was a main thread, and if so,
1386 * return with the return value.
1388 * We also end up here if the thread kills itself with an
1389 * uncaught exception, see Exception.cmm.
1391 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1392 (unsigned long)t->id, whatNext_strs[t->what_next]);
1395 // Check whether the thread that just completed was a bound
1396 // thread, and if so return with the result.
1398 // There is an assumption here that all thread completion goes
1399 // through this point; we need to make sure that if a thread
1400 // ends up in the ThreadKilled state, that it stays on the run
1401 // queue so it can be dealt with here.
1406 if (t->bound != task) {
1407 #if !defined(THREADED_RTS)
1408 // Must be a bound thread that is not the topmost one. Leave
1409 // it on the run queue until the stack has unwound to the
1410 // point where we can deal with this. Leaving it on the run
1411 // queue also ensures that the garbage collector knows about
1412 // this thread and its return value (it gets dropped from the
1413 // step->threads list so there's no other way to find it).
1414 appendToRunQueue(cap,t);
1417 // this cannot happen in the threaded RTS, because a
1418 // bound thread can only be run by the appropriate Task.
1419 barf("finished bound thread that isn't mine");
1423 ASSERT(task->tso == t);
1425 if (t->what_next == ThreadComplete) {
1427 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1428 *(task->ret) = (StgClosure *)task->tso->sp[1];
1430 task->stat = Success;
1433 *(task->ret) = NULL;
1435 if (sched_state >= SCHED_INTERRUPTING) {
1436 task->stat = Interrupted;
1438 task->stat = Killed;
1442 removeThreadLabel((StgWord)task->tso->id);
1444 return rtsTrue; // tells schedule() to return
1450 /* -----------------------------------------------------------------------------
1451 * Perform a heap census
1452 * -------------------------------------------------------------------------- */
1455 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1457 // When we have +RTS -i0 and we're heap profiling, do a census at
1458 // every GC. This lets us get repeatable runs for debugging.
1459 if (performHeapProfile ||
1460 (RtsFlags.ProfFlags.profileInterval==0 &&
1461 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1468 /* -----------------------------------------------------------------------------
1469 * Perform a garbage collection if necessary
1470 * -------------------------------------------------------------------------- */
1473 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1475 rtsBool heap_census;
1477 /* extern static volatile StgWord waiting_for_gc;
1478 lives inside capability.c */
1479 rtsBool was_waiting;
1484 // In order to GC, there must be no threads running Haskell code.
1485 // Therefore, the GC thread needs to hold *all* the capabilities,
1486 // and release them after the GC has completed.
1488 // This seems to be the simplest way: previous attempts involved
1489 // making all the threads with capabilities give up their
1490 // capabilities and sleep except for the *last* one, which
1491 // actually did the GC. But it's quite hard to arrange for all
1492 // the other tasks to sleep and stay asleep.
1495 /* Other capabilities are prevented from running yet more Haskell
1496 threads if waiting_for_gc is set. Tested inside
1497 yieldCapability() and releaseCapability() in Capability.c */
1499 was_waiting = cas(&waiting_for_gc, 0, 1);
1502 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1503 if (cap) yieldCapability(&cap,task);
1504 } while (waiting_for_gc);
1505 return cap; // NOTE: task->cap might have changed here
1508 setContextSwitches();
1509 for (i=0; i < n_capabilities; i++) {
1510 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1511 if (cap != &capabilities[i]) {
1512 Capability *pcap = &capabilities[i];
1513 // we better hope this task doesn't get migrated to
1514 // another Capability while we're waiting for this one.
1515 // It won't, because load balancing happens while we have
1516 // all the Capabilities, but even so it's a slightly
1517 // unsavoury invariant.
1519 waitForReturnCapability(&pcap, task);
1520 if (pcap != &capabilities[i]) {
1521 barf("scheduleDoGC: got the wrong capability");
1526 waiting_for_gc = rtsFalse;
1529 // so this happens periodically:
1530 if (cap) scheduleCheckBlackHoles(cap);
1532 IF_DEBUG(scheduler, printAllThreads());
1535 * We now have all the capabilities; if we're in an interrupting
1536 * state, then we should take the opportunity to delete all the
1537 * threads in the system.
1539 if (sched_state >= SCHED_INTERRUPTING) {
1540 deleteAllThreads(&capabilities[0]);
1541 sched_state = SCHED_SHUTTING_DOWN;
1544 heap_census = scheduleNeedHeapProfile(rtsTrue);
1546 /* everybody back, start the GC.
1547 * Could do it in this thread, or signal a condition var
1548 * to do it in another thread. Either way, we need to
1549 * broadcast on gc_pending_cond afterward.
1551 #if defined(THREADED_RTS)
1552 debugTrace(DEBUG_sched, "doing GC");
1554 GarbageCollect(force_major || heap_census);
1557 debugTrace(DEBUG_sched, "performing heap census");
1559 performHeapProfile = rtsFalse;
1564 Once we are all together... this would be the place to balance all
1565 spark pools. No concurrent stealing or adding of new sparks can
1566 occur. Should be defined in Sparks.c. */
1567 balanceSparkPoolsCaps(n_capabilities, capabilities);
1570 #if defined(THREADED_RTS)
1571 // release our stash of capabilities.
1572 for (i = 0; i < n_capabilities; i++) {
1573 if (cap != &capabilities[i]) {
1574 task->cap = &capabilities[i];
1575 releaseCapability(&capabilities[i]);
1588 /* ---------------------------------------------------------------------------
1589 * Singleton fork(). Do not copy any running threads.
1590 * ------------------------------------------------------------------------- */
1593 forkProcess(HsStablePtr *entry
1594 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1599 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1606 #if defined(THREADED_RTS)
1607 if (RtsFlags.ParFlags.nNodes > 1) {
1608 errorBelch("forking not supported with +RTS -N<n> greater than 1");
1609 stg_exit(EXIT_FAILURE);
1613 debugTrace(DEBUG_sched, "forking!");
1615 // ToDo: for SMP, we should probably acquire *all* the capabilities
1618 // no funny business: hold locks while we fork, otherwise if some
1619 // other thread is holding a lock when the fork happens, the data
1620 // structure protected by the lock will forever be in an
1621 // inconsistent state in the child. See also #1391.
1622 ACQUIRE_LOCK(&sched_mutex);
1623 ACQUIRE_LOCK(&cap->lock);
1624 ACQUIRE_LOCK(&cap->running_task->lock);
1628 if (pid) { // parent
1630 RELEASE_LOCK(&sched_mutex);
1631 RELEASE_LOCK(&cap->lock);
1632 RELEASE_LOCK(&cap->running_task->lock);
1634 // just return the pid
1640 #if defined(THREADED_RTS)
1641 initMutex(&sched_mutex);
1642 initMutex(&cap->lock);
1643 initMutex(&cap->running_task->lock);
1646 // Now, all OS threads except the thread that forked are
1647 // stopped. We need to stop all Haskell threads, including
1648 // those involved in foreign calls. Also we need to delete
1649 // all Tasks, because they correspond to OS threads that are
1652 for (s = 0; s < total_steps; s++) {
1653 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1654 if (t->what_next == ThreadRelocated) {
1657 next = t->global_link;
1658 // don't allow threads to catch the ThreadKilled
1659 // exception, but we do want to raiseAsync() because these
1660 // threads may be evaluating thunks that we need later.
1661 deleteThread_(cap,t);
1666 // Empty the run queue. It seems tempting to let all the
1667 // killed threads stay on the run queue as zombies to be
1668 // cleaned up later, but some of them correspond to bound
1669 // threads for which the corresponding Task does not exist.
1670 cap->run_queue_hd = END_TSO_QUEUE;
1671 cap->run_queue_tl = END_TSO_QUEUE;
1673 // Any suspended C-calling Tasks are no more, their OS threads
1675 cap->suspended_ccalling_tasks = NULL;
1677 // Empty the threads lists. Otherwise, the garbage
1678 // collector may attempt to resurrect some of these threads.
1679 for (s = 0; s < total_steps; s++) {
1680 all_steps[s].threads = END_TSO_QUEUE;
1683 // Wipe the task list, except the current Task.
1684 ACQUIRE_LOCK(&sched_mutex);
1685 for (task = all_tasks; task != NULL; task=task->all_link) {
1686 if (task != cap->running_task) {
1687 #if defined(THREADED_RTS)
1688 initMutex(&task->lock); // see #1391
1693 RELEASE_LOCK(&sched_mutex);
1695 #if defined(THREADED_RTS)
1696 // Wipe our spare workers list, they no longer exist. New
1697 // workers will be created if necessary.
1698 cap->spare_workers = NULL;
1699 cap->returning_tasks_hd = NULL;
1700 cap->returning_tasks_tl = NULL;
1703 // On Unix, all timers are reset in the child, so we need to start
1708 cap = rts_evalStableIO(cap, entry, NULL); // run the action
1709 rts_checkSchedStatus("forkProcess",cap);
1712 hs_exit(); // clean up and exit
1713 stg_exit(EXIT_SUCCESS);
1715 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1716 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1721 /* ---------------------------------------------------------------------------
1722 * Delete all the threads in the system
1723 * ------------------------------------------------------------------------- */
1726 deleteAllThreads ( Capability *cap )
1728 // NOTE: only safe to call if we own all capabilities.
1733 debugTrace(DEBUG_sched,"deleting all threads");
1734 for (s = 0; s < total_steps; s++) {
1735 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1736 if (t->what_next == ThreadRelocated) {
1739 next = t->global_link;
1740 deleteThread(cap,t);
1745 // The run queue now contains a bunch of ThreadKilled threads. We
1746 // must not throw these away: the main thread(s) will be in there
1747 // somewhere, and the main scheduler loop has to deal with it.
1748 // Also, the run queue is the only thing keeping these threads from
1749 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1751 #if !defined(THREADED_RTS)
1752 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1753 ASSERT(sleeping_queue == END_TSO_QUEUE);
1757 /* -----------------------------------------------------------------------------
1758 Managing the suspended_ccalling_tasks list.
1759 Locks required: sched_mutex
1760 -------------------------------------------------------------------------- */
1763 suspendTask (Capability *cap, Task *task)
1765 ASSERT(task->next == NULL && task->prev == NULL);
1766 task->next = cap->suspended_ccalling_tasks;
1768 if (cap->suspended_ccalling_tasks) {
1769 cap->suspended_ccalling_tasks->prev = task;
1771 cap->suspended_ccalling_tasks = task;
1775 recoverSuspendedTask (Capability *cap, Task *task)
1778 task->prev->next = task->next;
1780 ASSERT(cap->suspended_ccalling_tasks == task);
1781 cap->suspended_ccalling_tasks = task->next;
1784 task->next->prev = task->prev;
1786 task->next = task->prev = NULL;
1789 /* ---------------------------------------------------------------------------
1790 * Suspending & resuming Haskell threads.
1792 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1793 * its capability before calling the C function. This allows another
1794 * task to pick up the capability and carry on running Haskell
1795 * threads. It also means that if the C call blocks, it won't lock
1798 * The Haskell thread making the C call is put to sleep for the
1799 * duration of the call, on the susepended_ccalling_threads queue. We
1800 * give out a token to the task, which it can use to resume the thread
1801 * on return from the C function.
1802 * ------------------------------------------------------------------------- */
1805 suspendThread (StgRegTable *reg)
1812 StgWord32 saved_winerror;
1815 saved_errno = errno;
1817 saved_winerror = GetLastError();
1820 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1822 cap = regTableToCapability(reg);
1824 task = cap->running_task;
1825 tso = cap->r.rCurrentTSO;
1827 debugTrace(DEBUG_sched,
1828 "thread %lu did a safe foreign call",
1829 (unsigned long)cap->r.rCurrentTSO->id);
1831 // XXX this might not be necessary --SDM
1832 tso->what_next = ThreadRunGHC;
1834 threadPaused(cap,tso);
1836 if ((tso->flags & TSO_BLOCKEX) == 0) {
1837 tso->why_blocked = BlockedOnCCall;
1838 tso->flags |= TSO_BLOCKEX;
1839 tso->flags &= ~TSO_INTERRUPTIBLE;
1841 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1844 // Hand back capability
1845 task->suspended_tso = tso;
1847 ACQUIRE_LOCK(&cap->lock);
1849 suspendTask(cap,task);
1850 cap->in_haskell = rtsFalse;
1851 releaseCapability_(cap,rtsFalse);
1853 RELEASE_LOCK(&cap->lock);
1855 #if defined(THREADED_RTS)
1856 /* Preparing to leave the RTS, so ensure there's a native thread/task
1857 waiting to take over.
1859 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1862 errno = saved_errno;
1864 SetLastError(saved_winerror);
1870 resumeThread (void *task_)
1877 StgWord32 saved_winerror;
1880 saved_errno = errno;
1882 saved_winerror = GetLastError();
1886 // Wait for permission to re-enter the RTS with the result.
1887 waitForReturnCapability(&cap,task);
1888 // we might be on a different capability now... but if so, our
1889 // entry on the suspended_ccalling_tasks list will also have been
1892 // Remove the thread from the suspended list
1893 recoverSuspendedTask(cap,task);
1895 tso = task->suspended_tso;
1896 task->suspended_tso = NULL;
1897 tso->_link = END_TSO_QUEUE; // no write barrier reqd
1898 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1900 if (tso->why_blocked == BlockedOnCCall) {
1901 awakenBlockedExceptionQueue(cap,tso);
1902 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1905 /* Reset blocking status */
1906 tso->why_blocked = NotBlocked;
1908 cap->r.rCurrentTSO = tso;
1909 cap->in_haskell = rtsTrue;
1910 errno = saved_errno;
1912 SetLastError(saved_winerror);
1915 /* We might have GC'd, mark the TSO dirty again */
1918 IF_DEBUG(sanity, checkTSO(tso));
1923 /* ---------------------------------------------------------------------------
1926 * scheduleThread puts a thread on the end of the runnable queue.
1927 * This will usually be done immediately after a thread is created.
1928 * The caller of scheduleThread must create the thread using e.g.
1929 * createThread and push an appropriate closure
1930 * on this thread's stack before the scheduler is invoked.
1931 * ------------------------------------------------------------------------ */
1934 scheduleThread(Capability *cap, StgTSO *tso)
1936 // The thread goes at the *end* of the run-queue, to avoid possible
1937 // starvation of any threads already on the queue.
1938 appendToRunQueue(cap,tso);
1942 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1944 #if defined(THREADED_RTS)
1945 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1946 // move this thread from now on.
1947 cpu %= RtsFlags.ParFlags.nNodes;
1948 if (cpu == cap->no) {
1949 appendToRunQueue(cap,tso);
1951 wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1954 appendToRunQueue(cap,tso);
1959 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1963 // We already created/initialised the Task
1964 task = cap->running_task;
1966 // This TSO is now a bound thread; make the Task and TSO
1967 // point to each other.
1973 task->stat = NoStatus;
1975 appendToRunQueue(cap,tso);
1977 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1979 cap = schedule(cap,task);
1981 ASSERT(task->stat != NoStatus);
1982 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1984 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1988 /* ----------------------------------------------------------------------------
1990 * ------------------------------------------------------------------------- */
1992 #if defined(THREADED_RTS)
1993 void OSThreadProcAttr
1994 workerStart(Task *task)
1998 // See startWorkerTask().
1999 ACQUIRE_LOCK(&task->lock);
2001 RELEASE_LOCK(&task->lock);
2003 // set the thread-local pointer to the Task:
2006 // schedule() runs without a lock.
2007 cap = schedule(cap,task);
2009 // On exit from schedule(), we have a Capability.
2010 releaseCapability(cap);
2011 workerTaskStop(task);
2015 /* ---------------------------------------------------------------------------
2018 * Initialise the scheduler. This resets all the queues - if the
2019 * queues contained any threads, they'll be garbage collected at the
2022 * ------------------------------------------------------------------------ */
2027 #if !defined(THREADED_RTS)
2028 blocked_queue_hd = END_TSO_QUEUE;
2029 blocked_queue_tl = END_TSO_QUEUE;
2030 sleeping_queue = END_TSO_QUEUE;
2033 blackhole_queue = END_TSO_QUEUE;
2035 sched_state = SCHED_RUNNING;
2036 recent_activity = ACTIVITY_YES;
2038 #if defined(THREADED_RTS)
2039 /* Initialise the mutex and condition variables used by
2041 initMutex(&sched_mutex);
2044 ACQUIRE_LOCK(&sched_mutex);
2046 /* A capability holds the state a native thread needs in
2047 * order to execute STG code. At least one capability is
2048 * floating around (only THREADED_RTS builds have more than one).
2054 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2058 #if defined(THREADED_RTS)
2060 * Eagerly start one worker to run each Capability, except for
2061 * Capability 0. The idea is that we're probably going to start a
2062 * bound thread on Capability 0 pretty soon, so we don't want a
2063 * worker task hogging it.
2068 for (i = 1; i < n_capabilities; i++) {
2069 cap = &capabilities[i];
2070 ACQUIRE_LOCK(&cap->lock);
2071 startWorkerTask(cap, workerStart);
2072 RELEASE_LOCK(&cap->lock);
2077 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2079 RELEASE_LOCK(&sched_mutex);
2084 rtsBool wait_foreign
2085 #if !defined(THREADED_RTS)
2086 __attribute__((unused))
2089 /* see Capability.c, shutdownCapability() */
2093 #if defined(THREADED_RTS)
2094 ACQUIRE_LOCK(&sched_mutex);
2095 task = newBoundTask();
2096 RELEASE_LOCK(&sched_mutex);
2099 // If we haven't killed all the threads yet, do it now.
2100 if (sched_state < SCHED_SHUTTING_DOWN) {
2101 sched_state = SCHED_INTERRUPTING;
2102 scheduleDoGC(NULL,task,rtsFalse);
2104 sched_state = SCHED_SHUTTING_DOWN;
2106 #if defined(THREADED_RTS)
2110 for (i = 0; i < n_capabilities; i++) {
2111 shutdownCapability(&capabilities[i], task, wait_foreign);
2113 boundTaskExiting(task);
2117 freeCapability(&MainCapability);
2122 freeScheduler( void )
2125 if (n_capabilities != 1) {
2126 stgFree(capabilities);
2128 #if defined(THREADED_RTS)
2129 closeMutex(&sched_mutex);
2133 /* -----------------------------------------------------------------------------
2136 This is the interface to the garbage collector from Haskell land.
2137 We provide this so that external C code can allocate and garbage
2138 collect when called from Haskell via _ccall_GC.
2139 -------------------------------------------------------------------------- */
2142 performGC_(rtsBool force_major)
2145 // We must grab a new Task here, because the existing Task may be
2146 // associated with a particular Capability, and chained onto the
2147 // suspended_ccalling_tasks queue.
2148 ACQUIRE_LOCK(&sched_mutex);
2149 task = newBoundTask();
2150 RELEASE_LOCK(&sched_mutex);
2151 scheduleDoGC(NULL,task,force_major);
2152 boundTaskExiting(task);
2158 performGC_(rtsFalse);
2162 performMajorGC(void)
2164 performGC_(rtsTrue);
2167 /* -----------------------------------------------------------------------------
2170 If the thread has reached its maximum stack size, then raise the
2171 StackOverflow exception in the offending thread. Otherwise
2172 relocate the TSO into a larger chunk of memory and adjust its stack
2174 -------------------------------------------------------------------------- */
2177 threadStackOverflow(Capability *cap, StgTSO *tso)
2179 nat new_stack_size, stack_words;
2184 IF_DEBUG(sanity,checkTSO(tso));
2186 // don't allow throwTo() to modify the blocked_exceptions queue
2187 // while we are moving the TSO:
2188 lockClosure((StgClosure *)tso);
2190 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2191 // NB. never raise a StackOverflow exception if the thread is
2192 // inside Control.Exceptino.block. It is impractical to protect
2193 // against stack overflow exceptions, since virtually anything
2194 // can raise one (even 'catch'), so this is the only sensible
2195 // thing to do here. See bug #767.
2197 debugTrace(DEBUG_gc,
2198 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2199 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2201 /* If we're debugging, just print out the top of the stack */
2202 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2205 // Send this thread the StackOverflow exception
2207 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2211 /* Try to double the current stack size. If that takes us over the
2212 * maximum stack size for this thread, then use the maximum instead
2213 * (that is, unless we're already at or over the max size and we
2214 * can't raise the StackOverflow exception (see above), in which
2215 * case just double the size). Finally round up so the TSO ends up as
2216 * a whole number of blocks.
2218 if (tso->stack_size >= tso->max_stack_size) {
2219 new_stack_size = tso->stack_size * 2;
2221 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2223 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2224 TSO_STRUCT_SIZE)/sizeof(W_);
2225 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2226 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2228 debugTrace(DEBUG_sched,
2229 "increasing stack size from %ld words to %d.",
2230 (long)tso->stack_size, new_stack_size);
2232 dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2233 TICK_ALLOC_TSO(new_stack_size,0);
2235 /* copy the TSO block and the old stack into the new area */
2236 memcpy(dest,tso,TSO_STRUCT_SIZE);
2237 stack_words = tso->stack + tso->stack_size - tso->sp;
2238 new_sp = (P_)dest + new_tso_size - stack_words;
2239 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2241 /* relocate the stack pointers... */
2243 dest->stack_size = new_stack_size;
2245 /* Mark the old TSO as relocated. We have to check for relocated
2246 * TSOs in the garbage collector and any primops that deal with TSOs.
2248 * It's important to set the sp value to just beyond the end
2249 * of the stack, so we don't attempt to scavenge any part of the
2252 tso->what_next = ThreadRelocated;
2253 setTSOLink(cap,tso,dest);
2254 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2255 tso->why_blocked = NotBlocked;
2257 IF_PAR_DEBUG(verbose,
2258 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2259 tso->id, tso, tso->stack_size);
2260 /* If we're debugging, just print out the top of the stack */
2261 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2267 IF_DEBUG(sanity,checkTSO(dest));
2269 IF_DEBUG(scheduler,printTSO(dest));
2276 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2278 bdescr *bd, *new_bd;
2279 lnat free_w, tso_size_w;
2282 tso_size_w = tso_sizeW(tso);
2284 if (tso_size_w < MBLOCK_SIZE_W ||
2285 (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
2290 // don't allow throwTo() to modify the blocked_exceptions queue
2291 // while we are moving the TSO:
2292 lockClosure((StgClosure *)tso);
2294 // this is the number of words we'll free
2295 free_w = round_to_mblocks(tso_size_w/2);
2297 bd = Bdescr((StgPtr)tso);
2298 new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2299 bd->free = bd->start + TSO_STRUCT_SIZEW;
2301 new_tso = (StgTSO *)new_bd->start;
2302 memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2303 new_tso->stack_size = new_bd->free - new_tso->stack;
2305 debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2306 (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2308 tso->what_next = ThreadRelocated;
2309 tso->_link = new_tso; // no write barrier reqd: same generation
2311 // The TSO attached to this Task may have moved, so update the
2313 if (task->tso == tso) {
2314 task->tso = new_tso;
2320 IF_DEBUG(sanity,checkTSO(new_tso));
2325 /* ---------------------------------------------------------------------------
2327 - usually called inside a signal handler so it mustn't do anything fancy.
2328 ------------------------------------------------------------------------ */
2331 interruptStgRts(void)
2333 sched_state = SCHED_INTERRUPTING;
2334 setContextSwitches();
2338 /* -----------------------------------------------------------------------------
2341 This function causes at least one OS thread to wake up and run the
2342 scheduler loop. It is invoked when the RTS might be deadlocked, or
2343 an external event has arrived that may need servicing (eg. a
2344 keyboard interrupt).
2346 In the single-threaded RTS we don't do anything here; we only have
2347 one thread anyway, and the event that caused us to want to wake up
2348 will have interrupted any blocking system call in progress anyway.
2349 -------------------------------------------------------------------------- */
2354 #if defined(THREADED_RTS)
2355 // This forces the IO Manager thread to wakeup, which will
2356 // in turn ensure that some OS thread wakes up and runs the
2357 // scheduler loop, which will cause a GC and deadlock check.
2362 /* -----------------------------------------------------------------------------
2365 * Check the blackhole_queue for threads that can be woken up. We do
2366 * this periodically: before every GC, and whenever the run queue is
2369 * An elegant solution might be to just wake up all the blocked
2370 * threads with awakenBlockedQueue occasionally: they'll go back to
2371 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2372 * doesn't give us a way to tell whether we've actually managed to
2373 * wake up any threads, so we would be busy-waiting.
2375 * -------------------------------------------------------------------------- */
2378 checkBlackHoles (Capability *cap)
2381 rtsBool any_woke_up = rtsFalse;
2384 // blackhole_queue is global:
2385 ASSERT_LOCK_HELD(&sched_mutex);
2387 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2389 // ASSUMES: sched_mutex
2390 prev = &blackhole_queue;
2391 t = blackhole_queue;
2392 while (t != END_TSO_QUEUE) {
2393 ASSERT(t->why_blocked == BlockedOnBlackHole);
2394 type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2395 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2396 IF_DEBUG(sanity,checkTSO(t));
2397 t = unblockOne(cap, t);
2399 any_woke_up = rtsTrue;
2409 /* -----------------------------------------------------------------------------
2412 This is used for interruption (^C) and forking, and corresponds to
2413 raising an exception but without letting the thread catch the
2415 -------------------------------------------------------------------------- */
2418 deleteThread (Capability *cap, StgTSO *tso)
2420 // NOTE: must only be called on a TSO that we have exclusive
2421 // access to, because we will call throwToSingleThreaded() below.
2422 // The TSO must be on the run queue of the Capability we own, or
2423 // we must own all Capabilities.
2425 if (tso->why_blocked != BlockedOnCCall &&
2426 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2427 throwToSingleThreaded(cap,tso,NULL);
2431 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2433 deleteThread_(Capability *cap, StgTSO *tso)
2434 { // for forkProcess only:
2435 // like deleteThread(), but we delete threads in foreign calls, too.
2437 if (tso->why_blocked == BlockedOnCCall ||
2438 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2439 unblockOne(cap,tso);
2440 tso->what_next = ThreadKilled;
2442 deleteThread(cap,tso);
2447 /* -----------------------------------------------------------------------------
2448 raiseExceptionHelper
2450 This function is called by the raise# primitve, just so that we can
2451 move some of the tricky bits of raising an exception from C-- into
2452 C. Who knows, it might be a useful re-useable thing here too.
2453 -------------------------------------------------------------------------- */
2456 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2458 Capability *cap = regTableToCapability(reg);
2459 StgThunk *raise_closure = NULL;
2461 StgRetInfoTable *info;
2463 // This closure represents the expression 'raise# E' where E
2464 // is the exception raise. It is used to overwrite all the
2465 // thunks which are currently under evaluataion.
2468 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2469 // LDV profiling: stg_raise_info has THUNK as its closure
2470 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2471 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2472 // 1 does not cause any problem unless profiling is performed.
2473 // However, when LDV profiling goes on, we need to linearly scan
2474 // small object pool, where raise_closure is stored, so we should
2475 // use MIN_UPD_SIZE.
2477 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2478 // sizeofW(StgClosure)+1);
2482 // Walk up the stack, looking for the catch frame. On the way,
2483 // we update any closures pointed to from update frames with the
2484 // raise closure that we just built.
2488 info = get_ret_itbl((StgClosure *)p);
2489 next = p + stack_frame_sizeW((StgClosure *)p);
2490 switch (info->i.type) {
2493 // Only create raise_closure if we need to.
2494 if (raise_closure == NULL) {
2496 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2497 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2498 raise_closure->payload[0] = exception;
2500 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2504 case ATOMICALLY_FRAME:
2505 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2507 return ATOMICALLY_FRAME;
2513 case CATCH_STM_FRAME:
2514 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2516 return CATCH_STM_FRAME;
2522 case CATCH_RETRY_FRAME:
2531 /* -----------------------------------------------------------------------------
2532 findRetryFrameHelper
2534 This function is called by the retry# primitive. It traverses the stack
2535 leaving tso->sp referring to the frame which should handle the retry.
2537 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2538 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2540 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2541 create) because retries are not considered to be exceptions, despite the
2542 similar implementation.
2544 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2545 not be created within memory transactions.
2546 -------------------------------------------------------------------------- */
2549 findRetryFrameHelper (StgTSO *tso)
2552 StgRetInfoTable *info;
2556 info = get_ret_itbl((StgClosure *)p);
2557 next = p + stack_frame_sizeW((StgClosure *)p);
2558 switch (info->i.type) {
2560 case ATOMICALLY_FRAME:
2561 debugTrace(DEBUG_stm,
2562 "found ATOMICALLY_FRAME at %p during retry", p);
2564 return ATOMICALLY_FRAME;
2566 case CATCH_RETRY_FRAME:
2567 debugTrace(DEBUG_stm,
2568 "found CATCH_RETRY_FRAME at %p during retrry", p);
2570 return CATCH_RETRY_FRAME;
2572 case CATCH_STM_FRAME: {
2573 StgTRecHeader *trec = tso -> trec;
2574 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2575 debugTrace(DEBUG_stm,
2576 "found CATCH_STM_FRAME at %p during retry", p);
2577 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2578 stmAbortTransaction(tso -> cap, trec);
2579 stmFreeAbortedTRec(tso -> cap, trec);
2580 tso -> trec = outer;
2587 ASSERT(info->i.type != CATCH_FRAME);
2588 ASSERT(info->i.type != STOP_FRAME);
2595 /* -----------------------------------------------------------------------------
2596 resurrectThreads is called after garbage collection on the list of
2597 threads found to be garbage. Each of these threads will be woken
2598 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2599 on an MVar, or NonTermination if the thread was blocked on a Black
2602 Locks: assumes we hold *all* the capabilities.
2603 -------------------------------------------------------------------------- */
2606 resurrectThreads (StgTSO *threads)
2612 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2613 next = tso->global_link;
2615 step = Bdescr((P_)tso)->step;
2616 tso->global_link = step->threads;
2617 step->threads = tso;
2619 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2621 // Wake up the thread on the Capability it was last on
2624 switch (tso->why_blocked) {
2626 case BlockedOnException:
2627 /* Called by GC - sched_mutex lock is currently held. */
2628 throwToSingleThreaded(cap, tso,
2629 (StgClosure *)blockedOnDeadMVar_closure);
2631 case BlockedOnBlackHole:
2632 throwToSingleThreaded(cap, tso,
2633 (StgClosure *)nonTermination_closure);
2636 throwToSingleThreaded(cap, tso,
2637 (StgClosure *)blockedIndefinitely_closure);
2640 /* This might happen if the thread was blocked on a black hole
2641 * belonging to a thread that we've just woken up (raiseAsync
2642 * can wake up threads, remember...).
2646 barf("resurrectThreads: thread blocked in a strange way");
2651 /* -----------------------------------------------------------------------------
2652 performPendingThrowTos is called after garbage collection, and
2653 passed a list of threads that were found to have pending throwTos
2654 (tso->blocked_exceptions was not empty), and were blocked.
2655 Normally this doesn't happen, because we would deliver the
2656 exception directly if the target thread is blocked, but there are
2657 small windows where it might occur on a multiprocessor (see
2660 NB. we must be holding all the capabilities at this point, just
2661 like resurrectThreads().
2662 -------------------------------------------------------------------------- */
2665 performPendingThrowTos (StgTSO *threads)
2671 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2672 next = tso->global_link;
2674 step = Bdescr((P_)tso)->step;
2675 tso->global_link = step->threads;
2676 step->threads = tso;
2678 debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2681 maybePerformBlockedException(cap, tso);