X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=66af8be7cb5b6e02c03700102d102cd10b639aa3;hb=763598e44760872c7ca68aca72457b7267a42947;hp=dd33f6f2d24d60f729d1960fb6325254573e53d5;hpb=c123d6799b1f777a2c19ff2e10b93191d2ad7797;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index dd33f6f..66af8be 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -17,7 +17,7 @@ #include "Interpreter.h" #include "Printer.h" #include "RtsSignals.h" -#include "Sanity.h" +#include "sm/Sanity.h" #include "Stats.h" #include "STM.h" #include "Prelude.h" @@ -26,7 +26,6 @@ #include "Proftimer.h" #include "ProfHeap.h" #include "Weak.h" -#include "eventlog/EventLog.h" #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N #include "Sparks.h" #include "Capability.h" @@ -136,7 +135,7 @@ static Capability *schedule (Capability *initialCapability, Task *task); static void schedulePreLoop (void); static void scheduleFindWork (Capability *cap); #if defined(THREADED_RTS) -static void scheduleYield (Capability **pcap, Task *task); +static void scheduleYield (Capability **pcap, Task *task, rtsBool); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); @@ -163,7 +162,7 @@ static Capability *scheduleDoGC(Capability *cap, Task *task, static rtsBool checkBlackHoles(Capability *cap); static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso); -static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso); +static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso); static void deleteThread (Capability *cap, StgTSO *tso); static void deleteAllThreads (Capability *cap); @@ -172,17 +171,6 @@ static void deleteAllThreads (Capability *cap); static void deleteThread_(Capability *cap, StgTSO *tso); #endif -#ifdef DEBUG -static char *whatNext_strs[] = { - [0] = "(unknown)", - [ThreadRunGHC] = "ThreadRunGHC", - [ThreadInterpret] = "ThreadInterpret", - [ThreadKilled] = "ThreadKilled", - [ThreadRelocated] = "ThreadRelocated", - [ThreadComplete] = "ThreadComplete" -}; -#endif - /* ----------------------------------------------------------------------------- * Putting a thread on the run queue: different scheduling policies * -------------------------------------------------------------------------- */ @@ -240,6 +228,7 @@ schedule (Capability *initialCapability, Task *task) rtsBool ready_to_gc; #if defined(THREADED_RTS) rtsBool first = rtsTrue; + rtsBool force_yield = rtsFalse; #endif cap = initialCapability; @@ -248,17 +237,7 @@ schedule (Capability *initialCapability, Task *task) // The sched_mutex is *NOT* held // NB. on return, we still hold a capability. - debugTrace (DEBUG_sched, - "### NEW SCHEDULER LOOP (task: %p, cap: %p)", - task, initialCapability); - - if (running_finalizers) { - errorBelch("error: a C finalizer called back into Haskell.\n" - " This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n" - " To create finalizers that may call back into Haskll, use\n" - " Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr."); - stg_exit(EXIT_FAILURE); - } + debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no); schedulePreLoop(); @@ -374,7 +353,9 @@ schedule (Capability *initialCapability, Task *task) } yield: - scheduleYield(&cap,task); + scheduleYield(&cap,task,force_yield); + force_yield = rtsFalse; + if (emptyRunQueue(cap)) continue; // look for work again #endif @@ -401,12 +382,11 @@ schedule (Capability *initialCapability, Task *task) if (bound) { if (bound == task) { - debugTrace(DEBUG_sched, - "### Running thread %lu in bound thread", (unsigned long)t->id); // yes, the Haskell thread is bound to the current native thread } else { debugTrace(DEBUG_sched, - "### thread %lu bound to another OS thread", (unsigned long)t->id); + "thread %lu bound to another OS thread", + (unsigned long)t->id); // no, bound to a different Haskell thread: pass to that thread pushOnRunQueue(cap,t); continue; @@ -415,7 +395,8 @@ schedule (Capability *initialCapability, Task *task) // The thread we want to run is unbound. if (task->tso) { debugTrace(DEBUG_sched, - "### this OS thread cannot run thread %lu", (unsigned long)t->id); + "this OS thread cannot run thread %lu", + (unsigned long)t->id); // no, the current native thread is bound to a different // Haskell thread, so pass it to any worker thread pushOnRunQueue(cap,t); @@ -450,9 +431,6 @@ run_thread: // that. cap->r.rCurrentTSO = t; - debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", - (long)t->id, whatNext_strs[t->what_next]); - startHeapProfTimer(); // Check for exceptions blocked on this thread @@ -485,12 +463,16 @@ run_thread: if (prev == ACTIVITY_DONE_GC) { startTimer(); } - } else { + } else if (recent_activity != ACTIVITY_INACTIVE) { + // If we reached ACTIVITY_INACTIVE, then don't reset it until + // we've done the GC. The thread running here might just be + // the IO manager thread that handle_tick() woke up via + // wakeUpRts(). recent_activity = ACTIVITY_YES; } #endif - postEvent(cap, EVENT_RUN_THREAD, t->id, 0); + traceEventRunThread(cap, t); switch (prev_what_next) { @@ -540,7 +522,7 @@ run_thread: t->saved_winerror = GetLastError(); #endif - postEvent (cap, EVENT_STOP_THREAD, t->id, ret); + traceEventStopThread(cap, t, ret); #if defined(THREADED_RTS) // If ret is ThreadBlocked, and this Task is bound to the TSO that @@ -550,9 +532,7 @@ run_thread: // that task->cap != cap. We better yield this Capability // immediately and return to normaility. if (ret == ThreadBlocked) { - debugTrace(DEBUG_sched, - "--<< thread %lu (%s) stopped: blocked", - (unsigned long)t->id, whatNext_strs[t->what_next]); + force_yield = rtsTrue; goto yield; } #endif @@ -571,7 +551,7 @@ run_thread: schedulePostRunThread(cap,t); if (ret != StackOverflow) { - t = threadStackUnderflow(task,t); + t = threadStackUnderflow(cap,task,t); } ready_to_gc = rtsFalse; @@ -675,12 +655,23 @@ shouldYieldCapability (Capability *cap, Task *task) // and also check the benchmarks in nofib/parallel for regressions. static void -scheduleYield (Capability **pcap, Task *task) +scheduleYield (Capability **pcap, Task *task, rtsBool force_yield) { Capability *cap = *pcap; // if we have work, and we don't need to give up the Capability, continue. - if (!shouldYieldCapability(cap,task) && + // + // The force_yield flag is used when a bound thread blocks. This + // is a particularly tricky situation: the current Task does not + // own the TSO any more, since it is on some queue somewhere, and + // might be woken up or manipulated by another thread at any time. + // The TSO and Task might be migrated to another Capability. + // Certain invariants might be in doubt, such as task->bound->cap + // == cap. We have to yield the current Capability immediately, + // no messing around. + // + if (!force_yield && + !shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || !emptyWakeupQueue(cap) || blackholes_need_checking || @@ -718,7 +709,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, Capability *free_caps[n_capabilities], *cap0; nat i, n_free_caps; - // migration can be turned off with +RTS -qg + // migration can be turned off with +RTS -qm if (!RtsFlags.ParFlags.migrate) return; // Check whether we have more threads on our run queue, or sparks @@ -788,10 +779,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, setTSOLink(cap, prev, t); prev = t; } else { - debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no); appendToRunQueue(free_caps[i],t); - postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no); + traceEventMigrateThread (cap, t, free_caps[i]->no); if (t->bound) { t->bound->cap = free_caps[i]; } t->cap = free_caps[i]; @@ -815,7 +805,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, if (spark != NULL) { debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no); - postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no); + traceEventStealSpark(free_caps[i], t, cap->no); newSpark(&(free_caps[i]->r), spark); } @@ -1075,7 +1065,7 @@ schedulePostRunThread (Capability *cap, StgTSO *t) // partially-evaluated thunks on the heap. throwToSingleThreaded_(cap, t, NULL, rtsTrue); - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); +// ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); } } @@ -1099,7 +1089,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) debugTrace(DEBUG_sched, "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", - (long)t->id, whatNext_strs[t->what_next], blocks); + (long)t->id, what_next_strs[t->what_next], blocks); // don't do this if the nursery is (nearly) full, we'll GC first. if (cap->r.rCurrentNursery->link != NULL || @@ -1117,10 +1107,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) if (cap->r.rCurrentNursery->u.back != NULL) { cap->r.rCurrentNursery->u.back->link = bd; } else { -#if !defined(THREADED_RTS) - ASSERT(g0s0->blocks == cap->r.rCurrentNursery && - g0s0 == cap->r.rNursery); -#endif cap->r.rNursery->blocks = bd; } cap->r.rCurrentNursery->u.back = bd; @@ -1135,8 +1121,8 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) { bdescr *x; for (x = bd; x < bd + blocks; x++) { - x->step = cap->r.rNursery; - x->gen_no = 0; + initBdescr(x,g0,g0); + x->free = x->start; x->flags = 0; } } @@ -1157,10 +1143,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } } - debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped: HeapOverflow", - (long)t->id, whatNext_strs[t->what_next]); - if (cap->r.rHpLim == NULL || cap->context_switch) { // Sometimes we miss a context switch, e.g. when calling // primitives in a tight loop, MAYBE_GC() doesn't check the @@ -1182,10 +1164,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) static void scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) { - debugTrace (DEBUG_sched, - "--<< thread %ld (%s) stopped, StackOverflow", - (long)t->id, whatNext_strs[t->what_next]); - /* just adjust the stack for this thread, then pop it back * on the run queue. */ @@ -1210,43 +1188,36 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) { - // Reset the context switch flag. We don't do this just before - // running the thread, because that would mean we would lose ticks - // during GC, which can lead to unfair scheduling (a thread hogs - // the CPU because the tick always arrives during GC). This way - // penalises threads that do a lot of allocation, but that seems - // better than the alternative. - cap->context_switch = 0; - /* put the thread back on the run queue. Then, if we're ready to * GC, check whether this is the last task to stop. If so, wake * up the GC thread. getThread will block during a GC until the * GC is finished. */ -#ifdef DEBUG - if (t->what_next != prev_what_next) { - debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped to switch evaluators", - (long)t->id, whatNext_strs[t->what_next]); - } else { - debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped, yielding", - (long)t->id, whatNext_strs[t->what_next]); - } -#endif - - IF_DEBUG(sanity, - //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id); - checkTSO(t)); + ASSERT(t->_link == END_TSO_QUEUE); // Shortcut if we're just switching evaluators: don't bother // doing stack squeezing (which can be expensive), just run the // thread. - if (t->what_next != prev_what_next) { + if (cap->context_switch == 0 && t->what_next != prev_what_next) { + debugTrace(DEBUG_sched, + "--<< thread %ld (%s) stopped to switch evaluators", + (long)t->id, what_next_strs[t->what_next]); return rtsTrue; } + // Reset the context switch flag. We don't do this just before + // running the thread, because that would mean we would lose ticks + // during GC, which can lead to unfair scheduling (a thread hogs + // the CPU because the tick always arrives during GC). This way + // penalises threads that do a lot of allocation, but that seems + // better than the alternative. + cap->context_switch = 0; + + IF_DEBUG(sanity, + //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id); + checkTSO(t)); + addToRunQueue(cap,t); return rtsFalse; @@ -1278,12 +1249,7 @@ scheduleHandleThreadBlocked( StgTSO *t // exception, see maybePerformBlockedException(). #ifdef DEBUG - if (traceClass(DEBUG_sched)) { - debugTraceBegin("--<< thread %lu (%s) stopped: ", - (unsigned long)t->id, whatNext_strs[t->what_next]); - printThreadBlockage(t); - debugTraceEnd(); - } + traceThreadStatus(DEBUG_sched, t); #endif } @@ -1300,8 +1266,6 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) * We also end up here if the thread kills itself with an * uncaught exception, see Exception.cmm. */ - debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", - (unsigned long)t->id, whatNext_strs[t->what_next]); // blocked exceptions can now complete, even if the thread was in // blocked mode (see #2910). This unconditionally calls @@ -1363,6 +1327,17 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) #ifdef DEBUG removeThreadLabel((StgWord)task->tso->id); #endif + + // We no longer consider this thread and task to be bound to + // each other. The TSO lives on until it is GC'd, but the + // task is about to be released by the caller, and we don't + // want anyone following the pointer from the TSO to the + // defunct task (which might have already been + // re-used). This was a real bug: the GC updated + // tso->bound->tso which lead to a deadlock. + t->bound = NULL; + task->tso = NULL; + return rtsTrue; // tells schedule() to return } @@ -1413,7 +1388,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) if (sched_state < SCHED_INTERRUPTING && RtsFlags.ParFlags.parGcEnabled && N >= RtsFlags.ParFlags.parGcGen - && ! oldest_gen->steps[0].mark) + && ! oldest_gen->mark) { gc_type = PENDING_GC_PAR; } else { @@ -1453,7 +1428,19 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) if (gc_type == PENDING_GC_SEQ) { - postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0); + traceEventRequestSeqGc(cap); + } + else + { + traceEventRequestParGc(cap); + debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); + } + + // do this while the other Capabilities stop: + if (cap) scheduleCheckBlackHoles(cap); + + if (gc_type == PENDING_GC_SEQ) + { // single-threaded GC: grab all the capabilities for (i=0; i < n_capabilities; i++) { debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); @@ -1476,16 +1463,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) { // multi-threaded GC: make sure all the Capabilities donate one // GC thread each. - postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0); - debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); - waitForGcThreads(cap); } -#endif - // so this happens periodically: +#else /* !THREADED_RTS */ + + // do this while the other Capabilities stop: if (cap) scheduleCheckBlackHoles(cap); - + +#endif + IF_DEBUG(scheduler, printAllThreads()); delete_threads_and_gc: @@ -1501,9 +1488,8 @@ delete_threads_and_gc: heap_census = scheduleNeedHeapProfile(rtsTrue); + traceEventGcStart(cap); #if defined(THREADED_RTS) - postEvent(cap, EVENT_GC_START, 0, 0); - debugTrace(DEBUG_sched, "doing GC"); // reset waiting_for_gc *before* GC, so that when the GC threads // emerge they don't immediately re-enter the GC. waiting_for_gc = 0; @@ -1511,7 +1497,7 @@ delete_threads_and_gc: #else GarbageCollect(force_major || heap_census, 0, cap); #endif - postEvent(cap, EVENT_GC_END, 0, 0); + traceEventGcEnd(cap); if (recent_activity == ACTIVITY_INACTIVE && force_major) { @@ -1604,7 +1590,7 @@ forkProcess(HsStablePtr *entry pid_t pid; StgTSO* t,*next; Capability *cap; - nat s; + nat g; #if defined(THREADED_RTS) if (RtsFlags.ParFlags.nNodes > 1) { @@ -1652,8 +1638,8 @@ forkProcess(HsStablePtr *entry // all Tasks, because they correspond to OS threads that are // now gone. - for (s = 0; s < total_steps; s++) { - for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) { if (t->what_next == ThreadRelocated) { next = t->_link; } else { @@ -1679,8 +1665,8 @@ forkProcess(HsStablePtr *entry // Empty the threads lists. Otherwise, the garbage // collector may attempt to resurrect some of these threads. - for (s = 0; s < total_steps; s++) { - all_steps[s].threads = END_TSO_QUEUE; + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + generations[g].threads = END_TSO_QUEUE; } // Wipe the task list, except the current Task. @@ -1708,6 +1694,10 @@ forkProcess(HsStablePtr *entry initTimer(); startTimer(); +#if defined(THREADED_RTS) + cap = ioManagerStartCap(cap); +#endif + cap = rts_evalStableIO(cap, entry, NULL); // run the action rts_checkSchedStatus("forkProcess",cap); @@ -1730,19 +1720,19 @@ deleteAllThreads ( Capability *cap ) // NOTE: only safe to call if we own all capabilities. StgTSO* t, *next; - nat s; + nat g; debugTrace(DEBUG_sched,"deleting all threads"); - for (s = 0; s < total_steps; s++) { - for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { - if (t->what_next == ThreadRelocated) { - next = t->_link; - } else { - next = t->global_link; - deleteThread(cap,t); - } - } - } + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) { + if (t->what_next == ThreadRelocated) { + next = t->_link; + } else { + next = t->global_link; + deleteThread(cap,t); + } + } + } // The run queue now contains a bunch of ThreadKilled threads. We // must not throw these away: the main thread(s) will be in there @@ -1826,10 +1816,7 @@ suspendThread (StgRegTable *reg) task = cap->running_task; tso = cap->r.rCurrentTSO; - postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL); - debugTrace(DEBUG_sched, - "thread %lu did a safe foreign call", - (unsigned long)cap->r.rCurrentTSO->id); + traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL); // XXX this might not be necessary --SDM tso->what_next = ThreadRunGHC; @@ -1855,13 +1842,6 @@ suspendThread (StgRegTable *reg) RELEASE_LOCK(&cap->lock); -#if defined(THREADED_RTS) - /* Preparing to leave the RTS, so ensure there's a native thread/task - waiting to take over. - */ - debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id); -#endif - errno = saved_errno; #if mingw32_HOST_OS SetLastError(saved_winerror); @@ -1899,8 +1879,7 @@ resumeThread (void *task_) task->suspended_tso = NULL; tso->_link = END_TSO_QUEUE; // no write barrier reqd - postEvent(cap, EVENT_RUN_THREAD, tso->id, 0); - debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id); + traceEventRunThread(cap, tso); if (tso->why_blocked == BlockedOnCCall) { // avoid locking the TSO if we don't have to @@ -1956,7 +1935,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { - postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no); + traceEventMigrateThread (cap, tso, capabilities[cpu].no); wakeupThreadOnCapability(cap, &capabilities[cpu], tso); } #else @@ -1968,6 +1947,7 @@ Capability * scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) { Task *task; + StgThreadID id; // We already created/initialised the Task task = cap->running_task; @@ -1983,14 +1963,15 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) appendToRunQueue(cap,tso); - debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id); + id = tso->id; + debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id); cap = schedule(cap,task); ASSERT(task->stat != NoStatus); ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); - debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id); + debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id); return cap; } @@ -2120,7 +2101,8 @@ exitScheduler( if (sched_state < SCHED_SHUTTING_DOWN) { sched_state = SCHED_INTERRUPTING; waitForReturnCapability(&task->cap,task); - scheduleDoGC(task->cap,task,rtsFalse); + scheduleDoGC(task->cap,task,rtsFalse); + ASSERT(task->tso == NULL); releaseCapability(task->cap); } sched_state = SCHED_SHUTTING_DOWN; @@ -2130,11 +2112,13 @@ exitScheduler( nat i; for (i = 0; i < n_capabilities; i++) { + ASSERT(task->tso == NULL); shutdownCapability(&capabilities[i], task, wait_foreign); } - boundTaskExiting(task); } #endif + + boundTaskExiting(task); } void @@ -2221,12 +2205,28 @@ threadStackOverflow(Capability *cap, StgTSO *tso) // while we are moving the TSO: lockClosure((StgClosure *)tso); - if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) { + if (tso->stack_size >= tso->max_stack_size + && !(tso->flags & TSO_BLOCKEX)) { // NB. never raise a StackOverflow exception if the thread is // inside Control.Exceptino.block. It is impractical to protect // against stack overflow exceptions, since virtually anything // can raise one (even 'catch'), so this is the only sensible // thing to do here. See bug #767. + // + + if (tso->flags & TSO_SQUEEZED) { + unlockTSO(tso); + return tso; + } + // #3677: In a stack overflow situation, stack squeezing may + // reduce the stack size, but we don't know whether it has been + // reduced enough for the stack check to succeed if we try + // again. Fortunately stack squeezing is idempotent, so all we + // need to do is record whether *any* squeezing happened. If we + // are at the stack's absolute -K limit, and stack squeezing + // happened, then we try running the thread again. The + // TSO_SQUEEZED flag is set by threadPaused() to tell us whether + // squeezing happened or not. debugTrace(DEBUG_gc, "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)", @@ -2242,6 +2242,21 @@ threadStackOverflow(Capability *cap, StgTSO *tso) return tso; } + + // We also want to avoid enlarging the stack if squeezing has + // already released some of it. However, we don't want to get into + // a pathalogical situation where a thread has a nearly full stack + // (near its current limit, but not near the absolute -K limit), + // keeps allocating a little bit, squeezing removes a little bit, + // and then it runs again. So to avoid this, if we squeezed *and* + // there is still less than BLOCK_SIZE_W words free, then we enlarge + // the stack anyway. + if ((tso->flags & TSO_SQUEEZED) && + ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) { + unlockTSO(tso); + return tso; + } + /* Try to double the current stack size. If that takes us over the * maximum stack size for this thread, then use the maximum instead * (that is, unless we're already at or over the max size and we @@ -2263,7 +2278,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso) "increasing stack size from %ld words to %d.", (long)tso->stack_size, new_stack_size); - dest = (StgTSO *)allocateLocal(cap,new_tso_size); + dest = (StgTSO *)allocate(cap,new_tso_size); TICK_ALLOC_TSO(new_stack_size,0); /* copy the TSO block and the old stack into the new area */ @@ -2300,7 +2315,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso) } static StgTSO * -threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso) +threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) { bdescr *bd, *new_bd; lnat free_w, tso_size_w; @@ -2338,6 +2353,13 @@ threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso) memcpy(new_tso,tso,TSO_STRUCT_SIZE); new_tso->stack_size = new_bd->free - new_tso->stack; + // The original TSO was dirty and probably on the mutable + // list. The new TSO is not yet on the mutable list, so we better + // put it there. + new_tso->dirty = 0; + new_tso->flags &= ~TSO_LINK_DIRTY; + dirty_TSO(cap, new_tso); + debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu", (long)tso->id, tso_size_w, tso_sizeW(new_tso)); @@ -2534,11 +2556,12 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) // Only create raise_closure if we need to. if (raise_closure == NULL) { raise_closure = - (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1); + (StgThunk *)allocate(cap,sizeofW(StgThunk)+1); SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } - UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure); + UPD_IND(cap, ((StgUpdateFrame *)p)->updatee, + (StgClosure *)raise_closure); p = next; continue; @@ -2612,7 +2635,7 @@ findRetryFrameHelper (StgTSO *tso) case CATCH_STM_FRAME: { StgTRecHeader *trec = tso -> trec; - StgTRecHeader *outer = stmGetEnclosingTRec(trec); + StgTRecHeader *outer = trec -> enclosing_trec; debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p during retry", p); debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer); @@ -2648,14 +2671,14 @@ resurrectThreads (StgTSO *threads) { StgTSO *tso, *next; Capability *cap; - step *step; + generation *gen; for (tso = threads; tso != END_TSO_QUEUE; tso = next) { next = tso->global_link; - step = Bdescr((P_)tso)->step; - tso->global_link = step->threads; - step->threads = tso; + gen = Bdescr((P_)tso)->gen; + tso->global_link = gen->threads; + gen->threads = tso; debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id); @@ -2664,10 +2687,9 @@ resurrectThreads (StgTSO *threads) switch (tso->why_blocked) { case BlockedOnMVar: - case BlockedOnException: /* Called by GC - sched_mutex lock is currently held. */ throwToSingleThreaded(cap, tso, - (StgClosure *)blockedOnDeadMVar_closure); + (StgClosure *)blockedIndefinitelyOnMVar_closure); break; case BlockedOnBlackHole: throwToSingleThreaded(cap, tso, @@ -2675,7 +2697,7 @@ resurrectThreads (StgTSO *threads) break; case BlockedOnSTM: throwToSingleThreaded(cap, tso, - (StgClosure *)blockedIndefinitely_closure); + (StgClosure *)blockedIndefinitelyOnSTM_closure); break; case NotBlocked: /* This might happen if the thread was blocked on a black hole @@ -2683,6 +2705,11 @@ resurrectThreads (StgTSO *threads) * can wake up threads, remember...). */ continue; + case BlockedOnException: + // throwTo should never block indefinitely: if the target + // thread dies or completes, throwTo returns. + barf("resurrectThreads: thread BlockedOnException"); + break; default: barf("resurrectThreads: thread blocked in a strange way"); } @@ -2707,18 +2734,32 @@ performPendingThrowTos (StgTSO *threads) { StgTSO *tso, *next; Capability *cap; - step *step; + Task *task, *saved_task;; + generation *gen; + + task = myTask(); + cap = task->cap; for (tso = threads; tso != END_TSO_QUEUE; tso = next) { next = tso->global_link; - step = Bdescr((P_)tso)->step; - tso->global_link = step->threads; - step->threads = tso; + gen = Bdescr((P_)tso)->gen; + tso->global_link = gen->threads; + gen->threads = tso; debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); - cap = tso->cap; - maybePerformBlockedException(cap, tso); - } + // We must pretend this Capability belongs to the current Task + // for the time being, as invariants will be broken otherwise. + // In fact the current Task has exclusive access to the systme + // at this point, so this is just bookkeeping: + task->cap = tso->cap; + saved_task = tso->cap->running_task; + tso->cap->running_task = task; + maybePerformBlockedException(tso->cap, tso); + tso->cap->running_task = saved_task; + } + + // Restore our original Capability: + task->cap = cap; }