1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 1998-2006
5 * The scheduler and thread-related functionality
7 * --------------------------------------------------------------------------*/
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
15 #include "OSThreads.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
23 #include "RtsSignals.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
32 #include "Proftimer.h"
35 /* PARALLEL_HASKELL includes go here */
38 #include "Capability.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
45 #include "RaiseAsync.h"
47 #include "ThrIOManager.h"
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
64 // Turn off inlining when debugging - it obfuscates things
67 # define STATIC_INLINE static
70 /* -----------------------------------------------------------------------------
72 * -------------------------------------------------------------------------- */
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
81 /* Threads blocked on blackholes.
82 * LOCK: sched_mutex+capability, or all capabilities
84 StgTSO *blackhole_queue = NULL;
86 /* The blackhole_queue should be checked for threads to wake up. See
87 * Schedule.h for more thorough comment.
88 * LOCK: none (doesn't matter if we miss an update)
90 rtsBool blackholes_need_checking = rtsFalse;
92 /* flag that tracks whether we have done any execution in this time slice.
93 * LOCK: currently none, perhaps we should lock (but needs to be
94 * updated in the fast path of the scheduler).
96 nat recent_activity = ACTIVITY_YES;
98 /* if this flag is set as well, give up execution
99 * LOCK: none (changes once, from false->true)
101 rtsBool sched_state = SCHED_RUNNING;
103 /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually
104 * exists - earlier gccs apparently didn't.
110 * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111 * in an MT setting, needed to signal that a worker thread shouldn't hang around
112 * in the scheduler when it is out of work.
114 rtsBool shutting_down_scheduler = rtsFalse;
117 * This mutex protects most of the global scheduler data in
118 * the THREADED_RTS runtime.
120 #if defined(THREADED_RTS)
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
128 /* -----------------------------------------------------------------------------
129 * static function prototypes
130 * -------------------------------------------------------------------------- */
132 static Capability *schedule (Capability *initialCapability, Task *task);
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability *cap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
144 static void scheduleStartSignalHandlers (Capability *cap);
145 static void scheduleCheckBlockedThreads (Capability *cap);
146 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
147 static void scheduleCheckBlackHoles (Capability *cap);
148 static void scheduleDetectDeadlock (Capability *cap, Task *task);
149 static void schedulePushWork(Capability *cap, Task *task);
150 #if defined(PARALLEL_HASKELL)
151 static rtsBool scheduleGetRemoteWork(Capability *cap);
152 static void scheduleSendPendingMessages(void);
154 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
155 static void scheduleActivateSpark(Capability *cap);
157 static void schedulePostRunThread(Capability *cap, StgTSO *t);
158 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
159 static void scheduleHandleStackOverflow( Capability *cap, Task *task,
161 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
162 nat prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
166 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
167 static Capability *scheduleDoGC(Capability *cap, Task *task,
168 rtsBool force_major);
170 static rtsBool checkBlackHoles(Capability *cap);
172 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
173 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
175 static void deleteThread (Capability *cap, StgTSO *tso);
176 static void deleteAllThreads (Capability *cap);
178 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
179 static void deleteThread_(Capability *cap, StgTSO *tso);
183 static char *whatNext_strs[] = {
193 /* -----------------------------------------------------------------------------
194 * Putting a thread on the run queue: different scheduling policies
195 * -------------------------------------------------------------------------- */
198 addToRunQueue( Capability *cap, StgTSO *t )
200 #if defined(PARALLEL_HASKELL)
201 if (RtsFlags.ParFlags.doFairScheduling) {
202 // this does round-robin scheduling; good for concurrency
203 appendToRunQueue(cap,t);
205 // this does unfair scheduling; good for parallelism
206 pushOnRunQueue(cap,t);
209 // this does round-robin scheduling; good for concurrency
210 appendToRunQueue(cap,t);
214 /* ---------------------------------------------------------------------------
215 Main scheduling loop.
217 We use round-robin scheduling, each thread returning to the
218 scheduler loop when one of these conditions is detected:
221 * timer expires (thread yields)
227 In a GranSim setup this loop iterates over the global event queue.
228 This revolves around the global event queue, which determines what
229 to do next. Therefore, it's more complicated than either the
230 concurrent or the parallel (GUM) setup.
231 This version has been entirely removed (JB 2008/08).
234 GUM iterates over incoming messages.
235 It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
236 and sends out a fish whenever it has nothing to do; in-between
237 doing the actual reductions (shared code below) it processes the
238 incoming messages and deals with delayed operations
239 (see PendingFetches).
240 This is not the ugliest code you could imagine, but it's bloody close.
242 (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
243 now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
244 as well as future GUM versions. This file has been refurbished to
245 only contain valid code, which is however incomplete, refers to
246 invalid includes etc.
248 ------------------------------------------------------------------------ */
251 schedule (Capability *initialCapability, Task *task)
255 StgThreadReturnCode ret;
256 #if defined(PARALLEL_HASKELL)
257 rtsBool receivedFinish = rtsFalse;
261 #if defined(THREADED_RTS)
262 rtsBool first = rtsTrue;
265 cap = initialCapability;
267 // Pre-condition: this task owns initialCapability.
268 // The sched_mutex is *NOT* held
269 // NB. on return, we still hold a capability.
271 debugTrace (DEBUG_sched,
272 "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
273 task, initialCapability);
277 // -----------------------------------------------------------
278 // Scheduler loop starts here:
280 #if defined(PARALLEL_HASKELL)
281 #define TERMINATION_CONDITION (!receivedFinish)
283 #define TERMINATION_CONDITION rtsTrue
286 while (TERMINATION_CONDITION) {
288 // Check whether we have re-entered the RTS from Haskell without
289 // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
291 if (cap->in_haskell) {
292 errorBelch("schedule: re-entered unsafely.\n"
293 " Perhaps a 'foreign import unsafe' should be 'safe'?");
294 stg_exit(EXIT_FAILURE);
297 // The interruption / shutdown sequence.
299 // In order to cleanly shut down the runtime, we want to:
300 // * make sure that all main threads return to their callers
301 // with the state 'Interrupted'.
302 // * clean up all OS threads assocated with the runtime
303 // * free all memory etc.
305 // So the sequence for ^C goes like this:
307 // * ^C handler sets sched_state := SCHED_INTERRUPTING and
308 // arranges for some Capability to wake up
310 // * all threads in the system are halted, and the zombies are
311 // placed on the run queue for cleaning up. We acquire all
312 // the capabilities in order to delete the threads, this is
313 // done by scheduleDoGC() for convenience (because GC already
314 // needs to acquire all the capabilities). We can't kill
315 // threads involved in foreign calls.
317 // * somebody calls shutdownHaskell(), which calls exitScheduler()
319 // * sched_state := SCHED_SHUTTING_DOWN
321 // * all workers exit when the run queue on their capability
322 // drains. All main threads will also exit when their TSO
323 // reaches the head of the run queue and they can return.
325 // * eventually all Capabilities will shut down, and the RTS can
328 // * We might be left with threads blocked in foreign calls,
329 // we should really attempt to kill these somehow (TODO);
331 switch (sched_state) {
334 case SCHED_INTERRUPTING:
335 debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
336 #if defined(THREADED_RTS)
337 discardSparksCap(cap);
339 /* scheduleDoGC() deletes all the threads */
340 cap = scheduleDoGC(cap,task,rtsFalse);
342 case SCHED_SHUTTING_DOWN:
343 debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
344 // If we are a worker, just exit. If we're a bound thread
345 // then we will exit below when we've removed our TSO from
347 if (task->tso == NULL && emptyRunQueue(cap)) {
352 barf("sched_state: %d", sched_state);
355 scheduleFindWork(cap);
357 /* work pushing, currently relevant only for THREADED_RTS:
358 (pushes threads, wakes up idle capabilities for stealing) */
359 schedulePushWork(cap,task);
361 #if defined(PARALLEL_HASKELL)
362 /* since we perform a blocking receive and continue otherwise,
363 either we never reach here or we definitely have work! */
364 // from here: non-empty run queue
365 ASSERT(!emptyRunQueue(cap));
367 if (PacketsWaiting()) { /* now process incoming messages, if any
370 CAUTION: scheduleGetRemoteWork called
371 above, waits for messages as well! */
372 processMessages(cap, &receivedFinish);
374 #endif // PARALLEL_HASKELL: non-empty run queue!
376 scheduleDetectDeadlock(cap,task);
378 #if defined(THREADED_RTS)
379 cap = task->cap; // reload cap, it might have changed
382 // Normally, the only way we can get here with no threads to
383 // run is if a keyboard interrupt received during
384 // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
385 // Additionally, it is not fatal for the
386 // threaded RTS to reach here with no threads to run.
388 // win32: might be here due to awaitEvent() being abandoned
389 // as a result of a console event having been delivered.
391 #if defined(THREADED_RTS)
395 // // don't yield the first time, we want a chance to run this
396 // // thread for a bit, even if there are others banging at the
399 // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
403 scheduleYield(&cap,task);
404 if (emptyRunQueue(cap)) continue; // look for work again
407 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
408 if ( emptyRunQueue(cap) ) {
409 ASSERT(sched_state >= SCHED_INTERRUPTING);
414 // Get a thread to run
416 t = popRunQueue(cap);
418 // Sanity check the thread we're about to run. This can be
419 // expensive if there is lots of thread switching going on...
420 IF_DEBUG(sanity,checkTSO(t));
422 #if defined(THREADED_RTS)
423 // Check whether we can run this thread in the current task.
424 // If not, we have to pass our capability to the right task.
426 Task *bound = t->bound;
430 debugTrace(DEBUG_sched,
431 "### Running thread %lu in bound thread", (unsigned long)t->id);
432 // yes, the Haskell thread is bound to the current native thread
434 debugTrace(DEBUG_sched,
435 "### thread %lu bound to another OS thread", (unsigned long)t->id);
436 // no, bound to a different Haskell thread: pass to that thread
437 pushOnRunQueue(cap,t);
441 // The thread we want to run is unbound.
443 debugTrace(DEBUG_sched,
444 "### this OS thread cannot run thread %lu", (unsigned long)t->id);
445 // no, the current native thread is bound to a different
446 // Haskell thread, so pass it to any worker thread
447 pushOnRunQueue(cap,t);
454 /* context switches are initiated by the timer signal, unless
455 * the user specified "context switch as often as possible", with
458 if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
459 && !emptyThreadQueues(cap)) {
460 cap->context_switch = 1;
465 // CurrentTSO is the thread to run. t might be different if we
466 // loop back to run_thread, so make sure to set CurrentTSO after
468 cap->r.rCurrentTSO = t;
470 debugTrace(DEBUG_sched, "-->> running thread %ld %s ...",
471 (long)t->id, whatNext_strs[t->what_next]);
473 startHeapProfTimer();
475 // Check for exceptions blocked on this thread
476 maybePerformBlockedException (cap, t);
478 // ----------------------------------------------------------------------
479 // Run the current thread
481 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
482 ASSERT(t->cap == cap);
483 ASSERT(t->bound ? t->bound->cap == cap : 1);
485 prev_what_next = t->what_next;
487 errno = t->saved_errno;
489 SetLastError(t->saved_winerror);
492 cap->in_haskell = rtsTrue;
496 #if defined(THREADED_RTS)
497 if (recent_activity == ACTIVITY_DONE_GC) {
498 // ACTIVITY_DONE_GC means we turned off the timer signal to
499 // conserve power (see #1623). Re-enable it here.
501 prev = xchg((P_)&recent_activity, ACTIVITY_YES);
502 if (prev == ACTIVITY_DONE_GC) {
506 recent_activity = ACTIVITY_YES;
510 switch (prev_what_next) {
514 /* Thread already finished, return to scheduler. */
515 ret = ThreadFinished;
521 r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
522 cap = regTableToCapability(r);
527 case ThreadInterpret:
528 cap = interpretBCO(cap);
533 barf("schedule: invalid what_next field");
536 cap->in_haskell = rtsFalse;
538 // The TSO might have moved, eg. if it re-entered the RTS and a GC
539 // happened. So find the new location:
540 t = cap->r.rCurrentTSO;
542 // We have run some Haskell code: there might be blackhole-blocked
543 // threads to wake up now.
544 // Lock-free test here should be ok, we're just setting a flag.
545 if ( blackhole_queue != END_TSO_QUEUE ) {
546 blackholes_need_checking = rtsTrue;
549 // And save the current errno in this thread.
550 // XXX: possibly bogus for SMP because this thread might already
551 // be running again, see code below.
552 t->saved_errno = errno;
554 // Similarly for Windows error code
555 t->saved_winerror = GetLastError();
558 #if defined(THREADED_RTS)
559 // If ret is ThreadBlocked, and this Task is bound to the TSO that
560 // blocked, we are in limbo - the TSO is now owned by whatever it
561 // is blocked on, and may in fact already have been woken up,
562 // perhaps even on a different Capability. It may be the case
563 // that task->cap != cap. We better yield this Capability
564 // immediately and return to normaility.
565 if (ret == ThreadBlocked) {
566 debugTrace(DEBUG_sched,
567 "--<< thread %lu (%s) stopped: blocked",
568 (unsigned long)t->id, whatNext_strs[t->what_next]);
573 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
574 ASSERT(t->cap == cap);
576 // ----------------------------------------------------------------------
578 // Costs for the scheduler are assigned to CCS_SYSTEM
580 #if defined(PROFILING)
584 schedulePostRunThread(cap,t);
586 t = threadStackUnderflow(task,t);
588 ready_to_gc = rtsFalse;
592 ready_to_gc = scheduleHandleHeapOverflow(cap,t);
596 scheduleHandleStackOverflow(cap,task,t);
600 if (scheduleHandleYield(cap, t, prev_what_next)) {
601 // shortcut for switching between compiler/interpreter:
607 scheduleHandleThreadBlocked(t);
611 if (scheduleHandleThreadFinished(cap, task, t)) return cap;
612 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
616 barf("schedule: invalid thread return code %d", (int)ret);
619 if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
620 cap = scheduleDoGC(cap,task,rtsFalse);
622 } /* end of while() */
625 /* ----------------------------------------------------------------------------
626 * Setting up the scheduler loop
627 * ------------------------------------------------------------------------- */
630 schedulePreLoop(void)
632 // initialisation for scheduler - what cannot go into initScheduler()
635 /* -----------------------------------------------------------------------------
638 * Search for work to do, and handle messages from elsewhere.
639 * -------------------------------------------------------------------------- */
642 scheduleFindWork (Capability *cap)
644 scheduleStartSignalHandlers(cap);
646 // Only check the black holes here if we've nothing else to do.
647 // During normal execution, the black hole list only gets checked
648 // at GC time, to avoid repeatedly traversing this possibly long
649 // list each time around the scheduler.
650 if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
652 scheduleCheckWakeupThreads(cap);
654 scheduleCheckBlockedThreads(cap);
656 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
657 // Try to activate one of our own sparks
658 if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
661 #if defined(THREADED_RTS)
662 // Try to steak work if we don't have any
663 if (emptyRunQueue(cap)) { stealWork(cap); }
666 #if defined(PARALLEL_HASKELL)
667 // if messages have been buffered...
668 scheduleSendPendingMessages();
671 #if defined(PARALLEL_HASKELL)
672 if (emptyRunQueue(cap)) {
673 receivedFinish = scheduleGetRemoteWork(cap);
674 continue; // a new round, (hopefully) with new work
676 in GUM, this a) sends out a FISH and returns IF no fish is
678 b) (blocking) awaits and receives messages
680 in Eden, this is only the blocking receive, as b) in GUM.
686 #if defined(THREADED_RTS)
687 STATIC_INLINE rtsBool
688 shouldYieldCapability (Capability *cap, Task *task)
690 // we need to yield this capability to someone else if..
691 // - another thread is initiating a GC
692 // - another Task is returning from a foreign call
693 // - the thread at the head of the run queue cannot be run
694 // by this Task (it is bound to another Task, or it is unbound
695 // and this task it bound).
696 return (waiting_for_gc ||
697 cap->returning_tasks_hd != NULL ||
698 (!emptyRunQueue(cap) && (task->tso == NULL
699 ? cap->run_queue_hd->bound != NULL
700 : cap->run_queue_hd->bound != task)));
703 // This is the single place where a Task goes to sleep. There are
704 // two reasons it might need to sleep:
705 // - there are no threads to run
706 // - we need to yield this Capability to someone else
707 // (see shouldYieldCapability())
709 // The return value indicates whether
712 scheduleYield (Capability **pcap, Task *task)
714 Capability *cap = *pcap;
716 // if we have work, and we don't need to give up the Capability, continue.
717 if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
720 // otherwise yield (sleep), and keep yielding if necessary.
722 yieldCapability(&cap,task);
724 while (shouldYieldCapability(cap,task));
726 // note there may still be no threads on the run queue at this
727 // point, the caller has to check.
734 /* -----------------------------------------------------------------------------
737 * Push work to other Capabilities if we have some.
738 * -------------------------------------------------------------------------- */
741 schedulePushWork(Capability *cap USED_IF_THREADS,
742 Task *task USED_IF_THREADS)
744 /* following code not for PARALLEL_HASKELL. I kept the call general,
745 future GUM versions might use pushing in a distributed setup */
746 #if defined(THREADED_RTS)
748 Capability *free_caps[n_capabilities], *cap0;
751 // migration can be turned off with +RTS -qg
752 if (!RtsFlags.ParFlags.migrate) return;
754 // Check whether we have more threads on our run queue, or sparks
755 // in our pool, that we could hand to another Capability.
756 if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
757 && sparkPoolSizeCap(cap) < 2) {
761 // First grab as many free Capabilities as we can.
762 for (i=0, n_free_caps=0; i < n_capabilities; i++) {
763 cap0 = &capabilities[i];
764 if (cap != cap0 && tryGrabCapability(cap0,task)) {
765 if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
766 // it already has some work, we just grabbed it at
767 // the wrong moment. Or maybe it's deadlocked!
768 releaseCapability(cap0);
770 free_caps[n_free_caps++] = cap0;
775 // we now have n_free_caps free capabilities stashed in
776 // free_caps[]. Share our run queue equally with them. This is
777 // probably the simplest thing we could do; improvements we might
778 // want to do include:
780 // - giving high priority to moving relatively new threads, on
781 // the gournds that they haven't had time to build up a
782 // working set in the cache on this CPU/Capability.
784 // - giving low priority to moving long-lived threads
786 if (n_free_caps > 0) {
787 StgTSO *prev, *t, *next;
788 rtsBool pushed_to_all;
790 debugTrace(DEBUG_sched,
791 "cap %d: %s and %d free capabilities, sharing...",
793 (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
794 "excess threads on run queue":"sparks to share (>=2)",
798 pushed_to_all = rtsFalse;
800 if (cap->run_queue_hd != END_TSO_QUEUE) {
801 prev = cap->run_queue_hd;
803 prev->_link = END_TSO_QUEUE;
804 for (; t != END_TSO_QUEUE; t = next) {
806 t->_link = END_TSO_QUEUE;
807 if (t->what_next == ThreadRelocated
808 || t->bound == task // don't move my bound thread
809 || tsoLocked(t)) { // don't move a locked thread
810 setTSOLink(cap, prev, t);
812 } else if (i == n_free_caps) {
813 pushed_to_all = rtsTrue;
816 setTSOLink(cap, prev, t);
819 debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
820 appendToRunQueue(free_caps[i],t);
821 if (t->bound) { t->bound->cap = free_caps[i]; }
822 t->cap = free_caps[i];
826 cap->run_queue_tl = prev;
830 /* JB I left this code in place, it would work but is not necessary */
832 // If there are some free capabilities that we didn't push any
833 // threads to, then try to push a spark to each one.
834 if (!pushed_to_all) {
836 // i is the next free capability to push to
837 for (; i < n_free_caps; i++) {
838 if (emptySparkPoolCap(free_caps[i])) {
839 spark = tryStealSpark(cap->sparks);
841 debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
842 newSpark(&(free_caps[i]->r), spark);
847 #endif /* SPARK_PUSHING */
849 // release the capabilities
850 for (i = 0; i < n_free_caps; i++) {
851 task->cap = free_caps[i];
852 releaseAndWakeupCapability(free_caps[i]);
855 task->cap = cap; // reset to point to our Capability.
857 #endif /* THREADED_RTS */
861 /* ----------------------------------------------------------------------------
862 * Start any pending signal handlers
863 * ------------------------------------------------------------------------- */
865 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
867 scheduleStartSignalHandlers(Capability *cap)
869 if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
870 // safe outside the lock
871 startSignalHandlers(cap);
876 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
881 /* ----------------------------------------------------------------------------
882 * Check for blocked threads that can be woken up.
883 * ------------------------------------------------------------------------- */
886 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
888 #if !defined(THREADED_RTS)
890 // Check whether any waiting threads need to be woken up. If the
891 // run queue is empty, and there are no other tasks running, we
892 // can wait indefinitely for something to happen.
894 if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
896 awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
902 /* ----------------------------------------------------------------------------
903 * Check for threads woken up by other Capabilities
904 * ------------------------------------------------------------------------- */
907 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
909 #if defined(THREADED_RTS)
910 // Any threads that were woken up by other Capabilities get
911 // appended to our run queue.
912 if (!emptyWakeupQueue(cap)) {
913 ACQUIRE_LOCK(&cap->lock);
914 if (emptyRunQueue(cap)) {
915 cap->run_queue_hd = cap->wakeup_queue_hd;
916 cap->run_queue_tl = cap->wakeup_queue_tl;
918 setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
919 cap->run_queue_tl = cap->wakeup_queue_tl;
921 cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
922 RELEASE_LOCK(&cap->lock);
927 /* ----------------------------------------------------------------------------
928 * Check for threads blocked on BLACKHOLEs that can be woken up
929 * ------------------------------------------------------------------------- */
931 scheduleCheckBlackHoles (Capability *cap)
933 if ( blackholes_need_checking ) // check without the lock first
935 ACQUIRE_LOCK(&sched_mutex);
936 if ( blackholes_need_checking ) {
937 checkBlackHoles(cap);
938 blackholes_need_checking = rtsFalse;
940 RELEASE_LOCK(&sched_mutex);
944 /* ----------------------------------------------------------------------------
945 * Detect deadlock conditions and attempt to resolve them.
946 * ------------------------------------------------------------------------- */
949 scheduleDetectDeadlock (Capability *cap, Task *task)
952 #if defined(PARALLEL_HASKELL)
953 // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
958 * Detect deadlock: when we have no threads to run, there are no
959 * threads blocked, waiting for I/O, or sleeping, and all the
960 * other tasks are waiting for work, we must have a deadlock of
963 if ( emptyThreadQueues(cap) )
965 #if defined(THREADED_RTS)
967 * In the threaded RTS, we only check for deadlock if there
968 * has been no activity in a complete timeslice. This means
969 * we won't eagerly start a full GC just because we don't have
970 * any threads to run currently.
972 if (recent_activity != ACTIVITY_INACTIVE) return;
975 debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
977 // Garbage collection can release some new threads due to
978 // either (a) finalizers or (b) threads resurrected because
979 // they are unreachable and will therefore be sent an
980 // exception. Any threads thus released will be immediately
982 cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
984 recent_activity = ACTIVITY_DONE_GC;
985 // disable timer signals (see #1623)
988 if ( !emptyRunQueue(cap) ) return;
990 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
991 /* If we have user-installed signal handlers, then wait
992 * for signals to arrive rather then bombing out with a
995 if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
996 debugTrace(DEBUG_sched,
997 "still deadlocked, waiting for signals...");
1001 if (signals_pending()) {
1002 startSignalHandlers(cap);
1005 // either we have threads to run, or we were interrupted:
1006 ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1012 #if !defined(THREADED_RTS)
1013 /* Probably a real deadlock. Send the current main thread the
1014 * Deadlock exception.
1017 switch (task->tso->why_blocked) {
1019 case BlockedOnBlackHole:
1020 case BlockedOnException:
1022 throwToSingleThreaded(cap, task->tso,
1023 (StgClosure *)nonTermination_closure);
1026 barf("deadlock: main thread blocked in a strange way");
1035 /* ----------------------------------------------------------------------------
1036 * Send pending messages (PARALLEL_HASKELL only)
1037 * ------------------------------------------------------------------------- */
1039 #if defined(PARALLEL_HASKELL)
1041 scheduleSendPendingMessages(void)
1044 # if defined(PAR) // global Mem.Mgmt., omit for now
1045 if (PendingFetches != END_BF_QUEUE) {
1050 if (RtsFlags.ParFlags.BufferTime) {
1051 // if we use message buffering, we must send away all message
1052 // packets which have become too old...
1058 /* ----------------------------------------------------------------------------
1059 * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1060 * ------------------------------------------------------------------------- */
1062 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1064 scheduleActivateSpark(Capability *cap)
1068 /* We only want to stay here if the run queue is empty and we want some
1069 work. We try to turn a spark into a thread, and add it to the run
1070 queue, from where it will be picked up in the next iteration of the
1073 if (!emptyRunQueue(cap))
1074 /* In the threaded RTS, another task might have pushed a thread
1075 on our run queue in the meantime ? But would need a lock.. */
1079 // Really we should be using reclaimSpark() here, but
1080 // experimentally it doesn't seem to perform as well as just
1081 // stealing from our own spark pool:
1082 // spark = reclaimSpark(cap->sparks);
1083 spark = tryStealSpark(cap->sparks); // defined in Sparks.c
1085 if (spark != NULL) {
1086 debugTrace(DEBUG_sched,
1087 "turning spark of closure %p into a thread",
1088 (StgClosure *)spark);
1089 createSparkThread(cap,spark); // defined in Sparks.c
1092 #endif // PARALLEL_HASKELL || THREADED_RTS
1094 /* ----------------------------------------------------------------------------
1095 * Get work from a remote node (PARALLEL_HASKELL only)
1096 * ------------------------------------------------------------------------- */
1098 #if defined(PARALLEL_HASKELL)
1099 static rtsBool /* return value used in PARALLEL_HASKELL only */
1100 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1102 #if defined(PARALLEL_HASKELL)
1103 rtsBool receivedFinish = rtsFalse;
1105 // idle() , i.e. send all buffers, wait for work
1106 if (RtsFlags.ParFlags.BufferTime) {
1107 IF_PAR_DEBUG(verbose,
1108 debugBelch("...send all pending data,"));
1111 for (i=1; i<=nPEs; i++)
1112 sendImmediately(i); // send all messages away immediately
1116 /* this would be the place for fishing in GUM...
1118 if (no-earlier-fish-around)
1119 sendFish(choosePe());
1122 // Eden:just look for incoming messages (blocking receive)
1123 IF_PAR_DEBUG(verbose,
1124 debugBelch("...wait for incoming messages...\n"));
1125 processMessages(cap, &receivedFinish); // blocking receive...
1128 return receivedFinish;
1129 // reenter scheduling look after having received something
1131 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1133 return rtsFalse; /* return value unused in THREADED_RTS */
1135 #endif /* PARALLEL_HASKELL */
1137 #endif // PARALLEL_HASKELL || THREADED_RTS
1139 /* ----------------------------------------------------------------------------
1140 * After running a thread...
1141 * ------------------------------------------------------------------------- */
1144 schedulePostRunThread (Capability *cap, StgTSO *t)
1146 // We have to be able to catch transactions that are in an
1147 // infinite loop as a result of seeing an inconsistent view of
1151 // [a,b] <- mapM readTVar [ta,tb]
1152 // when (a == b) loop
1154 // and a is never equal to b given a consistent view of memory.
1156 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1157 if (!stmValidateNestOfTransactions (t -> trec)) {
1158 debugTrace(DEBUG_sched | DEBUG_stm,
1159 "trec %p found wasting its time", t);
1161 // strip the stack back to the
1162 // ATOMICALLY_FRAME, aborting the (nested)
1163 // transaction, and saving the stack of any
1164 // partially-evaluated thunks on the heap.
1165 throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1167 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1171 /* some statistics gathering in the parallel case */
1174 /* -----------------------------------------------------------------------------
1175 * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1176 * -------------------------------------------------------------------------- */
1179 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1181 // did the task ask for a large block?
1182 if (cap->r.rHpAlloc > BLOCK_SIZE) {
1183 // if so, get one and push it on the front of the nursery.
1187 blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1189 debugTrace(DEBUG_sched,
1190 "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
1191 (long)t->id, whatNext_strs[t->what_next], blocks);
1193 // don't do this if the nursery is (nearly) full, we'll GC first.
1194 if (cap->r.rCurrentNursery->link != NULL ||
1195 cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
1196 // if the nursery has only one block.
1199 bd = allocGroup( blocks );
1201 cap->r.rNursery->n_blocks += blocks;
1203 // link the new group into the list
1204 bd->link = cap->r.rCurrentNursery;
1205 bd->u.back = cap->r.rCurrentNursery->u.back;
1206 if (cap->r.rCurrentNursery->u.back != NULL) {
1207 cap->r.rCurrentNursery->u.back->link = bd;
1209 #if !defined(THREADED_RTS)
1210 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1211 g0s0 == cap->r.rNursery);
1213 cap->r.rNursery->blocks = bd;
1215 cap->r.rCurrentNursery->u.back = bd;
1217 // initialise it as a nursery block. We initialise the
1218 // step, gen_no, and flags field of *every* sub-block in
1219 // this large block, because this is easier than making
1220 // sure that we always find the block head of a large
1221 // block whenever we call Bdescr() (eg. evacuate() and
1222 // isAlive() in the GC would both have to do this, at
1226 for (x = bd; x < bd + blocks; x++) {
1227 x->step = cap->r.rNursery;
1233 // This assert can be a killer if the app is doing lots
1234 // of large block allocations.
1235 IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1237 // now update the nursery to point to the new block
1238 cap->r.rCurrentNursery = bd;
1240 // we might be unlucky and have another thread get on the
1241 // run queue before us and steal the large block, but in that
1242 // case the thread will just end up requesting another large
1244 pushOnRunQueue(cap,t);
1245 return rtsFalse; /* not actually GC'ing */
1249 debugTrace(DEBUG_sched,
1250 "--<< thread %ld (%s) stopped: HeapOverflow",
1251 (long)t->id, whatNext_strs[t->what_next]);
1253 if (cap->context_switch) {
1254 // Sometimes we miss a context switch, e.g. when calling
1255 // primitives in a tight loop, MAYBE_GC() doesn't check the
1256 // context switch flag, and we end up waiting for a GC.
1257 // See #1984, and concurrent/should_run/1984
1258 cap->context_switch = 0;
1259 addToRunQueue(cap,t);
1261 pushOnRunQueue(cap,t);
1264 /* actual GC is done at the end of the while loop in schedule() */
1267 /* -----------------------------------------------------------------------------
1268 * Handle a thread that returned to the scheduler with ThreadStackOverflow
1269 * -------------------------------------------------------------------------- */
1272 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1274 debugTrace (DEBUG_sched,
1275 "--<< thread %ld (%s) stopped, StackOverflow",
1276 (long)t->id, whatNext_strs[t->what_next]);
1278 /* just adjust the stack for this thread, then pop it back
1282 /* enlarge the stack */
1283 StgTSO *new_t = threadStackOverflow(cap, t);
1285 /* The TSO attached to this Task may have moved, so update the
1288 if (task->tso == t) {
1291 pushOnRunQueue(cap,new_t);
1295 /* -----------------------------------------------------------------------------
1296 * Handle a thread that returned to the scheduler with ThreadYielding
1297 * -------------------------------------------------------------------------- */
1300 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1302 // Reset the context switch flag. We don't do this just before
1303 // running the thread, because that would mean we would lose ticks
1304 // during GC, which can lead to unfair scheduling (a thread hogs
1305 // the CPU because the tick always arrives during GC). This way
1306 // penalises threads that do a lot of allocation, but that seems
1307 // better than the alternative.
1308 cap->context_switch = 0;
1310 /* put the thread back on the run queue. Then, if we're ready to
1311 * GC, check whether this is the last task to stop. If so, wake
1312 * up the GC thread. getThread will block during a GC until the
1316 if (t->what_next != prev_what_next) {
1317 debugTrace(DEBUG_sched,
1318 "--<< thread %ld (%s) stopped to switch evaluators",
1319 (long)t->id, whatNext_strs[t->what_next]);
1321 debugTrace(DEBUG_sched,
1322 "--<< thread %ld (%s) stopped, yielding",
1323 (long)t->id, whatNext_strs[t->what_next]);
1328 //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1330 ASSERT(t->_link == END_TSO_QUEUE);
1332 // Shortcut if we're just switching evaluators: don't bother
1333 // doing stack squeezing (which can be expensive), just run the
1335 if (t->what_next != prev_what_next) {
1339 addToRunQueue(cap,t);
1344 /* -----------------------------------------------------------------------------
1345 * Handle a thread that returned to the scheduler with ThreadBlocked
1346 * -------------------------------------------------------------------------- */
1349 scheduleHandleThreadBlocked( StgTSO *t
1350 #if !defined(GRAN) && !defined(DEBUG)
1356 // We don't need to do anything. The thread is blocked, and it
1357 // has tidied up its stack and placed itself on whatever queue
1358 // it needs to be on.
1360 // ASSERT(t->why_blocked != NotBlocked);
1361 // Not true: for example,
1362 // - in THREADED_RTS, the thread may already have been woken
1363 // up by another Capability. This actually happens: try
1364 // conc023 +RTS -N2.
1365 // - the thread may have woken itself up already, because
1366 // threadPaused() might have raised a blocked throwTo
1367 // exception, see maybePerformBlockedException().
1370 if (traceClass(DEBUG_sched)) {
1371 debugTraceBegin("--<< thread %lu (%s) stopped: ",
1372 (unsigned long)t->id, whatNext_strs[t->what_next]);
1373 printThreadBlockage(t);
1379 /* -----------------------------------------------------------------------------
1380 * Handle a thread that returned to the scheduler with ThreadFinished
1381 * -------------------------------------------------------------------------- */
1384 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1386 /* Need to check whether this was a main thread, and if so,
1387 * return with the return value.
1389 * We also end up here if the thread kills itself with an
1390 * uncaught exception, see Exception.cmm.
1392 debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
1393 (unsigned long)t->id, whatNext_strs[t->what_next]);
1396 // Check whether the thread that just completed was a bound
1397 // thread, and if so return with the result.
1399 // There is an assumption here that all thread completion goes
1400 // through this point; we need to make sure that if a thread
1401 // ends up in the ThreadKilled state, that it stays on the run
1402 // queue so it can be dealt with here.
1407 if (t->bound != task) {
1408 #if !defined(THREADED_RTS)
1409 // Must be a bound thread that is not the topmost one. Leave
1410 // it on the run queue until the stack has unwound to the
1411 // point where we can deal with this. Leaving it on the run
1412 // queue also ensures that the garbage collector knows about
1413 // this thread and its return value (it gets dropped from the
1414 // step->threads list so there's no other way to find it).
1415 appendToRunQueue(cap,t);
1418 // this cannot happen in the threaded RTS, because a
1419 // bound thread can only be run by the appropriate Task.
1420 barf("finished bound thread that isn't mine");
1424 ASSERT(task->tso == t);
1426 if (t->what_next == ThreadComplete) {
1428 // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1429 *(task->ret) = (StgClosure *)task->tso->sp[1];
1431 task->stat = Success;
1434 *(task->ret) = NULL;
1436 if (sched_state >= SCHED_INTERRUPTING) {
1437 task->stat = Interrupted;
1439 task->stat = Killed;
1443 removeThreadLabel((StgWord)task->tso->id);
1445 return rtsTrue; // tells schedule() to return
1451 /* -----------------------------------------------------------------------------
1452 * Perform a heap census
1453 * -------------------------------------------------------------------------- */
1456 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1458 // When we have +RTS -i0 and we're heap profiling, do a census at
1459 // every GC. This lets us get repeatable runs for debugging.
1460 if (performHeapProfile ||
1461 (RtsFlags.ProfFlags.profileInterval==0 &&
1462 RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1469 /* -----------------------------------------------------------------------------
1470 * Perform a garbage collection if necessary
1471 * -------------------------------------------------------------------------- */
1474 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1476 rtsBool heap_census;
1478 /* extern static volatile StgWord waiting_for_gc;
1479 lives inside capability.c */
1480 rtsBool was_waiting;
1485 // In order to GC, there must be no threads running Haskell code.
1486 // Therefore, the GC thread needs to hold *all* the capabilities,
1487 // and release them after the GC has completed.
1489 // This seems to be the simplest way: previous attempts involved
1490 // making all the threads with capabilities give up their
1491 // capabilities and sleep except for the *last* one, which
1492 // actually did the GC. But it's quite hard to arrange for all
1493 // the other tasks to sleep and stay asleep.
1496 /* Other capabilities are prevented from running yet more Haskell
1497 threads if waiting_for_gc is set. Tested inside
1498 yieldCapability() and releaseCapability() in Capability.c */
1500 was_waiting = cas(&waiting_for_gc, 0, 1);
1503 debugTrace(DEBUG_sched, "someone else is trying to GC...");
1504 if (cap) yieldCapability(&cap,task);
1505 } while (waiting_for_gc);
1506 return cap; // NOTE: task->cap might have changed here
1509 setContextSwitches();
1510 for (i=0; i < n_capabilities; i++) {
1511 debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1512 if (cap != &capabilities[i]) {
1513 Capability *pcap = &capabilities[i];
1514 // we better hope this task doesn't get migrated to
1515 // another Capability while we're waiting for this one.
1516 // It won't, because load balancing happens while we have
1517 // all the Capabilities, but even so it's a slightly
1518 // unsavoury invariant.
1520 waitForReturnCapability(&pcap, task);
1521 if (pcap != &capabilities[i]) {
1522 barf("scheduleDoGC: got the wrong capability");
1527 waiting_for_gc = rtsFalse;
1530 // so this happens periodically:
1531 if (cap) scheduleCheckBlackHoles(cap);
1533 IF_DEBUG(scheduler, printAllThreads());
1536 * We now have all the capabilities; if we're in an interrupting
1537 * state, then we should take the opportunity to delete all the
1538 * threads in the system.
1540 if (sched_state >= SCHED_INTERRUPTING) {
1541 deleteAllThreads(&capabilities[0]);
1542 sched_state = SCHED_SHUTTING_DOWN;
1545 heap_census = scheduleNeedHeapProfile(rtsTrue);
1547 /* everybody back, start the GC.
1548 * Could do it in this thread, or signal a condition var
1549 * to do it in another thread. Either way, we need to
1550 * broadcast on gc_pending_cond afterward.
1552 #if defined(THREADED_RTS)
1553 debugTrace(DEBUG_sched, "doing GC");
1555 GarbageCollect(force_major || heap_census);
1558 debugTrace(DEBUG_sched, "performing heap census");
1560 performHeapProfile = rtsFalse;
1565 Once we are all together... this would be the place to balance all
1566 spark pools. No concurrent stealing or adding of new sparks can
1567 occur. Should be defined in Sparks.c. */
1568 balanceSparkPoolsCaps(n_capabilities, capabilities);
1571 #if defined(THREADED_RTS)
1572 // release our stash of capabilities.
1573 for (i = 0; i < n_capabilities; i++) {
1574 if (cap != &capabilities[i]) {
1575 task->cap = &capabilities[i];
1576 releaseCapability(&capabilities[i]);
1589 /* ---------------------------------------------------------------------------
1590 * Singleton fork(). Do not copy any running threads.
1591 * ------------------------------------------------------------------------- */
1594 forkProcess(HsStablePtr *entry
1595 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1600 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1607 #if defined(THREADED_RTS)
1608 if (RtsFlags.ParFlags.nNodes > 1) {
1609 errorBelch("forking not supported with +RTS -N<n> greater than 1");
1610 stg_exit(EXIT_FAILURE);
1614 debugTrace(DEBUG_sched, "forking!");
1616 // ToDo: for SMP, we should probably acquire *all* the capabilities
1619 // no funny business: hold locks while we fork, otherwise if some
1620 // other thread is holding a lock when the fork happens, the data
1621 // structure protected by the lock will forever be in an
1622 // inconsistent state in the child. See also #1391.
1623 ACQUIRE_LOCK(&sched_mutex);
1624 ACQUIRE_LOCK(&cap->lock);
1625 ACQUIRE_LOCK(&cap->running_task->lock);
1629 if (pid) { // parent
1631 RELEASE_LOCK(&sched_mutex);
1632 RELEASE_LOCK(&cap->lock);
1633 RELEASE_LOCK(&cap->running_task->lock);
1635 // just return the pid
1641 #if defined(THREADED_RTS)
1642 initMutex(&sched_mutex);
1643 initMutex(&cap->lock);
1644 initMutex(&cap->running_task->lock);
1647 // Now, all OS threads except the thread that forked are
1648 // stopped. We need to stop all Haskell threads, including
1649 // those involved in foreign calls. Also we need to delete
1650 // all Tasks, because they correspond to OS threads that are
1653 for (s = 0; s < total_steps; s++) {
1654 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1655 if (t->what_next == ThreadRelocated) {
1658 next = t->global_link;
1659 // don't allow threads to catch the ThreadKilled
1660 // exception, but we do want to raiseAsync() because these
1661 // threads may be evaluating thunks that we need later.
1662 deleteThread_(cap,t);
1667 // Empty the run queue. It seems tempting to let all the
1668 // killed threads stay on the run queue as zombies to be
1669 // cleaned up later, but some of them correspond to bound
1670 // threads for which the corresponding Task does not exist.
1671 cap->run_queue_hd = END_TSO_QUEUE;
1672 cap->run_queue_tl = END_TSO_QUEUE;
1674 // Any suspended C-calling Tasks are no more, their OS threads
1676 cap->suspended_ccalling_tasks = NULL;
1678 // Empty the threads lists. Otherwise, the garbage
1679 // collector may attempt to resurrect some of these threads.
1680 for (s = 0; s < total_steps; s++) {
1681 all_steps[s].threads = END_TSO_QUEUE;
1684 // Wipe the task list, except the current Task.
1685 ACQUIRE_LOCK(&sched_mutex);
1686 for (task = all_tasks; task != NULL; task=task->all_link) {
1687 if (task != cap->running_task) {
1688 #if defined(THREADED_RTS)
1689 initMutex(&task->lock); // see #1391
1694 RELEASE_LOCK(&sched_mutex);
1696 #if defined(THREADED_RTS)
1697 // Wipe our spare workers list, they no longer exist. New
1698 // workers will be created if necessary.
1699 cap->spare_workers = NULL;
1700 cap->returning_tasks_hd = NULL;
1701 cap->returning_tasks_tl = NULL;
1704 // On Unix, all timers are reset in the child, so we need to start
1709 cap = rts_evalStableIO(cap, entry, NULL); // run the action
1710 rts_checkSchedStatus("forkProcess",cap);
1713 hs_exit(); // clean up and exit
1714 stg_exit(EXIT_SUCCESS);
1716 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1717 barf("forkProcess#: primop not supported on this platform, sorry!\n");
1722 /* ---------------------------------------------------------------------------
1723 * Delete all the threads in the system
1724 * ------------------------------------------------------------------------- */
1727 deleteAllThreads ( Capability *cap )
1729 // NOTE: only safe to call if we own all capabilities.
1734 debugTrace(DEBUG_sched,"deleting all threads");
1735 for (s = 0; s < total_steps; s++) {
1736 for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1737 if (t->what_next == ThreadRelocated) {
1740 next = t->global_link;
1741 deleteThread(cap,t);
1746 // The run queue now contains a bunch of ThreadKilled threads. We
1747 // must not throw these away: the main thread(s) will be in there
1748 // somewhere, and the main scheduler loop has to deal with it.
1749 // Also, the run queue is the only thing keeping these threads from
1750 // being GC'd, and we don't want the "main thread has been GC'd" panic.
1752 #if !defined(THREADED_RTS)
1753 ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1754 ASSERT(sleeping_queue == END_TSO_QUEUE);
1758 /* -----------------------------------------------------------------------------
1759 Managing the suspended_ccalling_tasks list.
1760 Locks required: sched_mutex
1761 -------------------------------------------------------------------------- */
1764 suspendTask (Capability *cap, Task *task)
1766 ASSERT(task->next == NULL && task->prev == NULL);
1767 task->next = cap->suspended_ccalling_tasks;
1769 if (cap->suspended_ccalling_tasks) {
1770 cap->suspended_ccalling_tasks->prev = task;
1772 cap->suspended_ccalling_tasks = task;
1776 recoverSuspendedTask (Capability *cap, Task *task)
1779 task->prev->next = task->next;
1781 ASSERT(cap->suspended_ccalling_tasks == task);
1782 cap->suspended_ccalling_tasks = task->next;
1785 task->next->prev = task->prev;
1787 task->next = task->prev = NULL;
1790 /* ---------------------------------------------------------------------------
1791 * Suspending & resuming Haskell threads.
1793 * When making a "safe" call to C (aka _ccall_GC), the task gives back
1794 * its capability before calling the C function. This allows another
1795 * task to pick up the capability and carry on running Haskell
1796 * threads. It also means that if the C call blocks, it won't lock
1799 * The Haskell thread making the C call is put to sleep for the
1800 * duration of the call, on the susepended_ccalling_threads queue. We
1801 * give out a token to the task, which it can use to resume the thread
1802 * on return from the C function.
1803 * ------------------------------------------------------------------------- */
1806 suspendThread (StgRegTable *reg)
1813 StgWord32 saved_winerror;
1816 saved_errno = errno;
1818 saved_winerror = GetLastError();
1821 /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1823 cap = regTableToCapability(reg);
1825 task = cap->running_task;
1826 tso = cap->r.rCurrentTSO;
1828 debugTrace(DEBUG_sched,
1829 "thread %lu did a safe foreign call",
1830 (unsigned long)cap->r.rCurrentTSO->id);
1832 // XXX this might not be necessary --SDM
1833 tso->what_next = ThreadRunGHC;
1835 threadPaused(cap,tso);
1837 if ((tso->flags & TSO_BLOCKEX) == 0) {
1838 tso->why_blocked = BlockedOnCCall;
1839 tso->flags |= TSO_BLOCKEX;
1840 tso->flags &= ~TSO_INTERRUPTIBLE;
1842 tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1845 // Hand back capability
1846 task->suspended_tso = tso;
1848 ACQUIRE_LOCK(&cap->lock);
1850 suspendTask(cap,task);
1851 cap->in_haskell = rtsFalse;
1852 releaseCapability_(cap,rtsFalse);
1854 RELEASE_LOCK(&cap->lock);
1856 #if defined(THREADED_RTS)
1857 /* Preparing to leave the RTS, so ensure there's a native thread/task
1858 waiting to take over.
1860 debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1863 errno = saved_errno;
1865 SetLastError(saved_winerror);
1871 resumeThread (void *task_)
1878 StgWord32 saved_winerror;
1881 saved_errno = errno;
1883 saved_winerror = GetLastError();
1887 // Wait for permission to re-enter the RTS with the result.
1888 waitForReturnCapability(&cap,task);
1889 // we might be on a different capability now... but if so, our
1890 // entry on the suspended_ccalling_tasks list will also have been
1893 // Remove the thread from the suspended list
1894 recoverSuspendedTask(cap,task);
1896 tso = task->suspended_tso;
1897 task->suspended_tso = NULL;
1898 tso->_link = END_TSO_QUEUE; // no write barrier reqd
1899 debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1901 if (tso->why_blocked == BlockedOnCCall) {
1902 awakenBlockedExceptionQueue(cap,tso);
1903 tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1906 /* Reset blocking status */
1907 tso->why_blocked = NotBlocked;
1909 cap->r.rCurrentTSO = tso;
1910 cap->in_haskell = rtsTrue;
1911 errno = saved_errno;
1913 SetLastError(saved_winerror);
1916 /* We might have GC'd, mark the TSO dirty again */
1919 IF_DEBUG(sanity, checkTSO(tso));
1924 /* ---------------------------------------------------------------------------
1927 * scheduleThread puts a thread on the end of the runnable queue.
1928 * This will usually be done immediately after a thread is created.
1929 * The caller of scheduleThread must create the thread using e.g.
1930 * createThread and push an appropriate closure
1931 * on this thread's stack before the scheduler is invoked.
1932 * ------------------------------------------------------------------------ */
1935 scheduleThread(Capability *cap, StgTSO *tso)
1937 // The thread goes at the *end* of the run-queue, to avoid possible
1938 // starvation of any threads already on the queue.
1939 appendToRunQueue(cap,tso);
1943 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1945 #if defined(THREADED_RTS)
1946 tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1947 // move this thread from now on.
1948 cpu %= RtsFlags.ParFlags.nNodes;
1949 if (cpu == cap->no) {
1950 appendToRunQueue(cap,tso);
1952 wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1955 appendToRunQueue(cap,tso);
1960 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1964 // We already created/initialised the Task
1965 task = cap->running_task;
1967 // This TSO is now a bound thread; make the Task and TSO
1968 // point to each other.
1974 task->stat = NoStatus;
1976 appendToRunQueue(cap,tso);
1978 debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1980 cap = schedule(cap,task);
1982 ASSERT(task->stat != NoStatus);
1983 ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1985 debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1989 /* ----------------------------------------------------------------------------
1991 * ------------------------------------------------------------------------- */
1993 #if defined(THREADED_RTS)
1994 void OSThreadProcAttr
1995 workerStart(Task *task)
1999 // See startWorkerTask().
2000 ACQUIRE_LOCK(&task->lock);
2002 RELEASE_LOCK(&task->lock);
2004 // set the thread-local pointer to the Task:
2007 // schedule() runs without a lock.
2008 cap = schedule(cap,task);
2010 // On exit from schedule(), we have a Capability.
2011 releaseCapability(cap);
2012 workerTaskStop(task);
2016 /* ---------------------------------------------------------------------------
2019 * Initialise the scheduler. This resets all the queues - if the
2020 * queues contained any threads, they'll be garbage collected at the
2023 * ------------------------------------------------------------------------ */
2028 #if !defined(THREADED_RTS)
2029 blocked_queue_hd = END_TSO_QUEUE;
2030 blocked_queue_tl = END_TSO_QUEUE;
2031 sleeping_queue = END_TSO_QUEUE;
2034 blackhole_queue = END_TSO_QUEUE;
2036 sched_state = SCHED_RUNNING;
2037 recent_activity = ACTIVITY_YES;
2039 #if defined(THREADED_RTS)
2040 /* Initialise the mutex and condition variables used by
2042 initMutex(&sched_mutex);
2045 ACQUIRE_LOCK(&sched_mutex);
2047 /* A capability holds the state a native thread needs in
2048 * order to execute STG code. At least one capability is
2049 * floating around (only THREADED_RTS builds have more than one).
2055 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2059 #if defined(THREADED_RTS)
2061 * Eagerly start one worker to run each Capability, except for
2062 * Capability 0. The idea is that we're probably going to start a
2063 * bound thread on Capability 0 pretty soon, so we don't want a
2064 * worker task hogging it.
2069 for (i = 1; i < n_capabilities; i++) {
2070 cap = &capabilities[i];
2071 ACQUIRE_LOCK(&cap->lock);
2072 startWorkerTask(cap, workerStart);
2073 RELEASE_LOCK(&cap->lock);
2078 trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2080 RELEASE_LOCK(&sched_mutex);
2085 rtsBool wait_foreign
2086 #if !defined(THREADED_RTS)
2087 __attribute__((unused))
2090 /* see Capability.c, shutdownCapability() */
2094 #if defined(THREADED_RTS)
2095 ACQUIRE_LOCK(&sched_mutex);
2096 task = newBoundTask();
2097 RELEASE_LOCK(&sched_mutex);
2100 // If we haven't killed all the threads yet, do it now.
2101 if (sched_state < SCHED_SHUTTING_DOWN) {
2102 sched_state = SCHED_INTERRUPTING;
2103 scheduleDoGC(NULL,task,rtsFalse);
2105 sched_state = SCHED_SHUTTING_DOWN;
2107 #if defined(THREADED_RTS)
2111 for (i = 0; i < n_capabilities; i++) {
2112 shutdownCapability(&capabilities[i], task, wait_foreign);
2114 boundTaskExiting(task);
2118 freeCapability(&MainCapability);
2123 freeScheduler( void )
2126 if (n_capabilities != 1) {
2127 stgFree(capabilities);
2129 #if defined(THREADED_RTS)
2130 closeMutex(&sched_mutex);
2134 /* -----------------------------------------------------------------------------
2137 This is the interface to the garbage collector from Haskell land.
2138 We provide this so that external C code can allocate and garbage
2139 collect when called from Haskell via _ccall_GC.
2140 -------------------------------------------------------------------------- */
2143 performGC_(rtsBool force_major)
2146 // We must grab a new Task here, because the existing Task may be
2147 // associated with a particular Capability, and chained onto the
2148 // suspended_ccalling_tasks queue.
2149 ACQUIRE_LOCK(&sched_mutex);
2150 task = newBoundTask();
2151 RELEASE_LOCK(&sched_mutex);
2152 scheduleDoGC(NULL,task,force_major);
2153 boundTaskExiting(task);
2159 performGC_(rtsFalse);
2163 performMajorGC(void)
2165 performGC_(rtsTrue);
2168 /* -----------------------------------------------------------------------------
2171 If the thread has reached its maximum stack size, then raise the
2172 StackOverflow exception in the offending thread. Otherwise
2173 relocate the TSO into a larger chunk of memory and adjust its stack
2175 -------------------------------------------------------------------------- */
2178 threadStackOverflow(Capability *cap, StgTSO *tso)
2180 nat new_stack_size, stack_words;
2185 IF_DEBUG(sanity,checkTSO(tso));
2187 // don't allow throwTo() to modify the blocked_exceptions queue
2188 // while we are moving the TSO:
2189 lockClosure((StgClosure *)tso);
2191 if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2192 // NB. never raise a StackOverflow exception if the thread is
2193 // inside Control.Exceptino.block. It is impractical to protect
2194 // against stack overflow exceptions, since virtually anything
2195 // can raise one (even 'catch'), so this is the only sensible
2196 // thing to do here. See bug #767.
2198 debugTrace(DEBUG_gc,
2199 "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2200 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2202 /* If we're debugging, just print out the top of the stack */
2203 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2206 // Send this thread the StackOverflow exception
2208 throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2212 /* Try to double the current stack size. If that takes us over the
2213 * maximum stack size for this thread, then use the maximum instead
2214 * (that is, unless we're already at or over the max size and we
2215 * can't raise the StackOverflow exception (see above), in which
2216 * case just double the size). Finally round up so the TSO ends up as
2217 * a whole number of blocks.
2219 if (tso->stack_size >= tso->max_stack_size) {
2220 new_stack_size = tso->stack_size * 2;
2222 new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2224 new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
2225 TSO_STRUCT_SIZE)/sizeof(W_);
2226 new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
2227 new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2229 debugTrace(DEBUG_sched,
2230 "increasing stack size from %ld words to %d.",
2231 (long)tso->stack_size, new_stack_size);
2233 dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2234 TICK_ALLOC_TSO(new_stack_size,0);
2236 /* copy the TSO block and the old stack into the new area */
2237 memcpy(dest,tso,TSO_STRUCT_SIZE);
2238 stack_words = tso->stack + tso->stack_size - tso->sp;
2239 new_sp = (P_)dest + new_tso_size - stack_words;
2240 memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2242 /* relocate the stack pointers... */
2244 dest->stack_size = new_stack_size;
2246 /* Mark the old TSO as relocated. We have to check for relocated
2247 * TSOs in the garbage collector and any primops that deal with TSOs.
2249 * It's important to set the sp value to just beyond the end
2250 * of the stack, so we don't attempt to scavenge any part of the
2253 tso->what_next = ThreadRelocated;
2254 setTSOLink(cap,tso,dest);
2255 tso->sp = (P_)&(tso->stack[tso->stack_size]);
2256 tso->why_blocked = NotBlocked;
2258 IF_PAR_DEBUG(verbose,
2259 debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2260 tso->id, tso, tso->stack_size);
2261 /* If we're debugging, just print out the top of the stack */
2262 printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
2268 IF_DEBUG(sanity,checkTSO(dest));
2270 IF_DEBUG(scheduler,printTSO(dest));
2277 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2279 bdescr *bd, *new_bd;
2280 lnat free_w, tso_size_w;
2283 tso_size_w = tso_sizeW(tso);
2285 if (tso_size_w < MBLOCK_SIZE_W ||
2286 (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
2291 // don't allow throwTo() to modify the blocked_exceptions queue
2292 // while we are moving the TSO:
2293 lockClosure((StgClosure *)tso);
2295 // this is the number of words we'll free
2296 free_w = round_to_mblocks(tso_size_w/2);
2298 bd = Bdescr((StgPtr)tso);
2299 new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2300 bd->free = bd->start + TSO_STRUCT_SIZEW;
2302 new_tso = (StgTSO *)new_bd->start;
2303 memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2304 new_tso->stack_size = new_bd->free - new_tso->stack;
2306 debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2307 (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2309 tso->what_next = ThreadRelocated;
2310 tso->_link = new_tso; // no write barrier reqd: same generation
2312 // The TSO attached to this Task may have moved, so update the
2314 if (task->tso == tso) {
2315 task->tso = new_tso;
2321 IF_DEBUG(sanity,checkTSO(new_tso));
2326 /* ---------------------------------------------------------------------------
2328 - usually called inside a signal handler so it mustn't do anything fancy.
2329 ------------------------------------------------------------------------ */
2332 interruptStgRts(void)
2334 sched_state = SCHED_INTERRUPTING;
2335 setContextSwitches();
2339 /* -----------------------------------------------------------------------------
2342 This function causes at least one OS thread to wake up and run the
2343 scheduler loop. It is invoked when the RTS might be deadlocked, or
2344 an external event has arrived that may need servicing (eg. a
2345 keyboard interrupt).
2347 In the single-threaded RTS we don't do anything here; we only have
2348 one thread anyway, and the event that caused us to want to wake up
2349 will have interrupted any blocking system call in progress anyway.
2350 -------------------------------------------------------------------------- */
2355 #if defined(THREADED_RTS)
2356 // This forces the IO Manager thread to wakeup, which will
2357 // in turn ensure that some OS thread wakes up and runs the
2358 // scheduler loop, which will cause a GC and deadlock check.
2363 /* -----------------------------------------------------------------------------
2366 * Check the blackhole_queue for threads that can be woken up. We do
2367 * this periodically: before every GC, and whenever the run queue is
2370 * An elegant solution might be to just wake up all the blocked
2371 * threads with awakenBlockedQueue occasionally: they'll go back to
2372 * sleep again if the object is still a BLACKHOLE. Unfortunately this
2373 * doesn't give us a way to tell whether we've actually managed to
2374 * wake up any threads, so we would be busy-waiting.
2376 * -------------------------------------------------------------------------- */
2379 checkBlackHoles (Capability *cap)
2382 rtsBool any_woke_up = rtsFalse;
2385 // blackhole_queue is global:
2386 ASSERT_LOCK_HELD(&sched_mutex);
2388 debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2390 // ASSUMES: sched_mutex
2391 prev = &blackhole_queue;
2392 t = blackhole_queue;
2393 while (t != END_TSO_QUEUE) {
2394 ASSERT(t->why_blocked == BlockedOnBlackHole);
2395 type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2396 if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2397 IF_DEBUG(sanity,checkTSO(t));
2398 t = unblockOne(cap, t);
2400 any_woke_up = rtsTrue;
2410 /* -----------------------------------------------------------------------------
2413 This is used for interruption (^C) and forking, and corresponds to
2414 raising an exception but without letting the thread catch the
2416 -------------------------------------------------------------------------- */
2419 deleteThread (Capability *cap, StgTSO *tso)
2421 // NOTE: must only be called on a TSO that we have exclusive
2422 // access to, because we will call throwToSingleThreaded() below.
2423 // The TSO must be on the run queue of the Capability we own, or
2424 // we must own all Capabilities.
2426 if (tso->why_blocked != BlockedOnCCall &&
2427 tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2428 throwToSingleThreaded(cap,tso,NULL);
2432 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2434 deleteThread_(Capability *cap, StgTSO *tso)
2435 { // for forkProcess only:
2436 // like deleteThread(), but we delete threads in foreign calls, too.
2438 if (tso->why_blocked == BlockedOnCCall ||
2439 tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2440 unblockOne(cap,tso);
2441 tso->what_next = ThreadKilled;
2443 deleteThread(cap,tso);
2448 /* -----------------------------------------------------------------------------
2449 raiseExceptionHelper
2451 This function is called by the raise# primitve, just so that we can
2452 move some of the tricky bits of raising an exception from C-- into
2453 C. Who knows, it might be a useful re-useable thing here too.
2454 -------------------------------------------------------------------------- */
2457 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2459 Capability *cap = regTableToCapability(reg);
2460 StgThunk *raise_closure = NULL;
2462 StgRetInfoTable *info;
2464 // This closure represents the expression 'raise# E' where E
2465 // is the exception raise. It is used to overwrite all the
2466 // thunks which are currently under evaluataion.
2469 // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2470 // LDV profiling: stg_raise_info has THUNK as its closure
2471 // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2472 // payload, MIN_UPD_SIZE is more approprate than 1. It seems that
2473 // 1 does not cause any problem unless profiling is performed.
2474 // However, when LDV profiling goes on, we need to linearly scan
2475 // small object pool, where raise_closure is stored, so we should
2476 // use MIN_UPD_SIZE.
2478 // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2479 // sizeofW(StgClosure)+1);
2483 // Walk up the stack, looking for the catch frame. On the way,
2484 // we update any closures pointed to from update frames with the
2485 // raise closure that we just built.
2489 info = get_ret_itbl((StgClosure *)p);
2490 next = p + stack_frame_sizeW((StgClosure *)p);
2491 switch (info->i.type) {
2494 // Only create raise_closure if we need to.
2495 if (raise_closure == NULL) {
2497 (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2498 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2499 raise_closure->payload[0] = exception;
2501 UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2505 case ATOMICALLY_FRAME:
2506 debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2508 return ATOMICALLY_FRAME;
2514 case CATCH_STM_FRAME:
2515 debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2517 return CATCH_STM_FRAME;
2523 case CATCH_RETRY_FRAME:
2532 /* -----------------------------------------------------------------------------
2533 findRetryFrameHelper
2535 This function is called by the retry# primitive. It traverses the stack
2536 leaving tso->sp referring to the frame which should handle the retry.
2538 This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#)
2539 or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).
2541 We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2542 create) because retries are not considered to be exceptions, despite the
2543 similar implementation.
2545 We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2546 not be created within memory transactions.
2547 -------------------------------------------------------------------------- */
2550 findRetryFrameHelper (StgTSO *tso)
2553 StgRetInfoTable *info;
2557 info = get_ret_itbl((StgClosure *)p);
2558 next = p + stack_frame_sizeW((StgClosure *)p);
2559 switch (info->i.type) {
2561 case ATOMICALLY_FRAME:
2562 debugTrace(DEBUG_stm,
2563 "found ATOMICALLY_FRAME at %p during retry", p);
2565 return ATOMICALLY_FRAME;
2567 case CATCH_RETRY_FRAME:
2568 debugTrace(DEBUG_stm,
2569 "found CATCH_RETRY_FRAME at %p during retrry", p);
2571 return CATCH_RETRY_FRAME;
2573 case CATCH_STM_FRAME: {
2574 StgTRecHeader *trec = tso -> trec;
2575 StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2576 debugTrace(DEBUG_stm,
2577 "found CATCH_STM_FRAME at %p during retry", p);
2578 debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2579 stmAbortTransaction(tso -> cap, trec);
2580 stmFreeAbortedTRec(tso -> cap, trec);
2581 tso -> trec = outer;
2588 ASSERT(info->i.type != CATCH_FRAME);
2589 ASSERT(info->i.type != STOP_FRAME);
2596 /* -----------------------------------------------------------------------------
2597 resurrectThreads is called after garbage collection on the list of
2598 threads found to be garbage. Each of these threads will be woken
2599 up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2600 on an MVar, or NonTermination if the thread was blocked on a Black
2603 Locks: assumes we hold *all* the capabilities.
2604 -------------------------------------------------------------------------- */
2607 resurrectThreads (StgTSO *threads)
2613 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2614 next = tso->global_link;
2616 step = Bdescr((P_)tso)->step;
2617 tso->global_link = step->threads;
2618 step->threads = tso;
2620 debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2622 // Wake up the thread on the Capability it was last on
2625 switch (tso->why_blocked) {
2627 case BlockedOnException:
2628 /* Called by GC - sched_mutex lock is currently held. */
2629 throwToSingleThreaded(cap, tso,
2630 (StgClosure *)blockedOnDeadMVar_closure);
2632 case BlockedOnBlackHole:
2633 throwToSingleThreaded(cap, tso,
2634 (StgClosure *)nonTermination_closure);
2637 throwToSingleThreaded(cap, tso,
2638 (StgClosure *)blockedIndefinitely_closure);
2641 /* This might happen if the thread was blocked on a black hole
2642 * belonging to a thread that we've just woken up (raiseAsync
2643 * can wake up threads, remember...).
2647 barf("resurrectThreads: thread blocked in a strange way");
2652 /* -----------------------------------------------------------------------------
2653 performPendingThrowTos is called after garbage collection, and
2654 passed a list of threads that were found to have pending throwTos
2655 (tso->blocked_exceptions was not empty), and were blocked.
2656 Normally this doesn't happen, because we would deliver the
2657 exception directly if the target thread is blocked, but there are
2658 small windows where it might occur on a multiprocessor (see
2661 NB. we must be holding all the capabilities at this point, just
2662 like resurrectThreads().
2663 -------------------------------------------------------------------------- */
2666 performPendingThrowTos (StgTSO *threads)
2672 for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2673 next = tso->global_link;
2675 step = Bdescr((P_)tso)->step;
2676 tso->global_link = step->threads;
2677 step->threads = tso;
2679 debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2682 maybePerformBlockedException(cap, tso);