X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=a9ea81bbe44f75aad7d945d03c7d81d1a707d463;hb=27de38efce6d73d2a0209f803cfa98c82773e773;hp=11b9f87d592579dfc4ffda7f23d7f1874abb84e4;hpb=b1953bbb1ed3cb16497e5447db7487f0c2d9e41a;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index 11b9f87..a9ea81b 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -7,11 +7,11 @@ * --------------------------------------------------------------------------*/ #include "PosixSource.h" +#define KEEP_LOCKCLOSURE #include "Rts.h" #include "SchedAPI.h" #include "RtsUtils.h" #include "RtsFlags.h" -#include "BlockAlloc.h" #include "OSThreads.h" #include "Storage.h" #include "StgRun.h" @@ -29,10 +29,8 @@ #include "ThreadLabels.h" #include "LdvProfile.h" #include "Updates.h" -#ifdef PROFILING #include "Proftimer.h" #include "ProfHeap.h" -#endif #if defined(GRAN) || defined(PARALLEL_HASKELL) # include "GranSimRts.h" # include "GranSim.h" @@ -52,6 +50,7 @@ #include "Trace.h" #include "RaiseAsync.h" #include "Threads.h" +#include "ThrIOManager.h" #ifdef HAVE_SYS_TYPES_H #include @@ -119,12 +118,6 @@ StgTSO *blackhole_queue = NULL; */ rtsBool blackholes_need_checking = rtsFalse; -/* Linked list of all threads. - * Used for detecting garbage collected threads. - * LOCK: sched_mutex+capability, or all capabilities - */ -StgTSO *all_threads = NULL; - /* flag set by signal handler to precipitate a context switch * LOCK: none (just an advisory flag) */ @@ -207,7 +200,7 @@ static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish); #if defined(PAR) || defined(GRAN) static void scheduleGranParReport(void); #endif -static void schedulePostRunThread(void); +static void schedulePostRunThread(StgTSO *t); static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ); static void scheduleHandleStackOverflow( Capability *cap, Task *task, StgTSO *t); @@ -216,15 +209,14 @@ static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, static void scheduleHandleThreadBlocked( StgTSO *t ); static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task, StgTSO *t ); -static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc); +static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc); static Capability *scheduleDoGC(Capability *cap, Task *task, - rtsBool force_major, - void (*get_roots)(evac_fn)); + rtsBool force_major); static rtsBool checkBlackHoles(Capability *cap); -static void AllRoots(evac_fn evac); static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso); +static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso); static void deleteThread (Capability *cap, StgTSO *tso); static void deleteAllThreads (Capability *cap); @@ -421,7 +413,7 @@ schedule (Capability *initialCapability, Task *task) discardSparksCap(cap); #endif /* scheduleDoGC() deletes all the threads */ - cap = scheduleDoGC(cap,task,rtsFalse,GetRoots); + cap = scheduleDoGC(cap,task,rtsFalse); break; case SCHED_SHUTTING_DOWN: debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN"); @@ -535,11 +527,11 @@ schedule (Capability *initialCapability, Task *task) if (bound) { if (bound == task) { debugTrace(DEBUG_sched, - "### Running thread %d in bound thread", t->id); + "### Running thread %lu in bound thread", (unsigned long)t->id); // yes, the Haskell thread is bound to the current native thread } else { debugTrace(DEBUG_sched, - "### thread %d bound to another OS thread", t->id); + "### thread %lu bound to another OS thread", (unsigned long)t->id); // no, bound to a different Haskell thread: pass to that thread pushOnRunQueue(cap,t); continue; @@ -548,7 +540,7 @@ schedule (Capability *initialCapability, Task *task) // The thread we want to run is unbound. if (task->tso) { debugTrace(DEBUG_sched, - "### this OS thread cannot run thread %d", t->id); + "### this OS thread cannot run thread %lu", (unsigned long)t->id); // no, the current native thread is bound to a different // Haskell thread, so pass it to any worker thread pushOnRunQueue(cap,t); @@ -558,8 +550,6 @@ schedule (Capability *initialCapability, Task *task) } #endif - cap->r.rCurrentTSO = t; - /* context switches are initiated by the timer signal, unless * the user specified "context switch as often as possible", with * +RTS -C0 @@ -571,12 +561,15 @@ schedule (Capability *initialCapability, Task *task) run_thread: + // CurrentTSO is the thread to run. t might be different if we + // loop back to run_thread, so make sure to set CurrentTSO after + // that. + cap->r.rCurrentTSO = t; + debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", (long)t->id, whatNext_strs[t->what_next]); -#if defined(PROFILING) startHeapProfTimer(); -#endif // Check for exceptions blocked on this thread maybePerformBlockedException (cap, t); @@ -590,11 +583,27 @@ run_thread: prev_what_next = t->what_next; errno = t->saved_errno; +#if mingw32_HOST_OS + SetLastError(t->saved_winerror); +#endif + cap->in_haskell = rtsTrue; - dirtyTSO(t); + dirty_TSO(cap,t); - recent_activity = ACTIVITY_YES; +#if defined(THREADED_RTS) + if (recent_activity == ACTIVITY_DONE_GC) { + // ACTIVITY_DONE_GC means we turned off the timer signal to + // conserve power (see #1623). Re-enable it here. + nat prev; + prev = xchg((P_)&recent_activity, ACTIVITY_YES); + if (prev == ACTIVITY_DONE_GC) { + startTimer(); + } + } else { + recent_activity = ACTIVITY_YES; + } +#endif switch (prev_what_next) { @@ -639,6 +648,10 @@ run_thread: // XXX: possibly bogus for SMP because this thread might already // be running again, see code below. t->saved_errno = errno; +#if mingw32_HOST_OS + // Similarly for Windows error code + t->saved_winerror = GetLastError(); +#endif #if defined(THREADED_RTS) // If ret is ThreadBlocked, and this Task is bound to the TSO that @@ -649,8 +662,8 @@ run_thread: // immediately and return to normaility. if (ret == ThreadBlocked) { debugTrace(DEBUG_sched, - "--<< thread %d (%s) stopped: blocked", - t->id, whatNext_strs[t->what_next]); + "--<< thread %lu (%s) stopped: blocked", + (unsigned long)t->id, whatNext_strs[t->what_next]); continue; } #endif @@ -661,12 +674,14 @@ run_thread: // ---------------------------------------------------------------------- // Costs for the scheduler are assigned to CCS_SYSTEM -#if defined(PROFILING) stopHeapProfTimer(); +#if defined(PROFILING) CCCS = CCS_SYSTEM; #endif - schedulePostRunThread(); + schedulePostRunThread(t); + + t = threadStackUnderflow(task,t); ready_to_gc = rtsFalse; @@ -699,14 +714,10 @@ run_thread: barf("schedule: invalid thread return code %d", (int)ret); } - if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; } - if (ready_to_gc) { - cap = scheduleDoGC(cap,task,rtsFalse,GetRoots); + if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) { + cap = scheduleDoGC(cap,task,rtsFalse); } } /* end of while() */ - - debugTrace(PAR_DEBUG_verbose, - "== Leaving schedule() after having received Finish"); } /* ---------------------------------------------------------------------------- @@ -754,7 +765,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // Check whether we have more threads on our run queue, or sparks // in our pool, that we could hand to another Capability. - if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) + if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE) && sparkPoolSizeCap(cap) < 2) { return; } @@ -795,24 +806,24 @@ schedulePushWork(Capability *cap USED_IF_THREADS, if (cap->run_queue_hd != END_TSO_QUEUE) { prev = cap->run_queue_hd; - t = prev->link; - prev->link = END_TSO_QUEUE; + t = prev->_link; + prev->_link = END_TSO_QUEUE; for (; t != END_TSO_QUEUE; t = next) { - next = t->link; - t->link = END_TSO_QUEUE; + next = t->_link; + t->_link = END_TSO_QUEUE; if (t->what_next == ThreadRelocated || t->bound == task // don't move my bound thread || tsoLocked(t)) { // don't move a locked thread - prev->link = t; + setTSOLink(cap, prev, t); prev = t; } else if (i == n_free_caps) { pushed_to_all = rtsTrue; i = 0; // keep one for us - prev->link = t; + setTSOLink(cap, prev, t); prev = t; } else { - debugTrace(DEBUG_sched, "pushing thread %d to capability %d", t->id, free_caps[i]->no); + debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no); appendToRunQueue(free_caps[i],t); if (t->bound) { t->bound->cap = free_caps[i]; } t->cap = free_caps[i]; @@ -852,11 +863,12 @@ schedulePushWork(Capability *cap USED_IF_THREADS, * Start any pending signal handlers * ------------------------------------------------------------------------- */ -#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS)) +#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS) static void scheduleStartSignalHandlers(Capability *cap) { - if (signals_pending()) { // safe outside the lock + if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) { + // safe outside the lock startSignalHandlers(cap); } } @@ -904,7 +916,7 @@ scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) cap->run_queue_hd = cap->wakeup_queue_hd; cap->run_queue_tl = cap->wakeup_queue_tl; } else { - cap->run_queue_tl->link = cap->wakeup_queue_hd; + setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd); cap->run_queue_tl = cap->wakeup_queue_tl; } cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; @@ -968,18 +980,20 @@ scheduleDetectDeadlock (Capability *cap, Task *task) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/, GetRoots); + cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/); recent_activity = ACTIVITY_DONE_GC; + // disable timer signals (see #1623) + stopTimer(); if ( !emptyRunQueue(cap) ) return; -#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS)) +#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS) /* If we have user-installed signal handlers, then wait * for signals to arrive rather then bombing out with a * deadlock. */ - if ( anyUserHandlers() ) { + if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) { debugTrace(DEBUG_sched, "still deadlocked, waiting for signals..."); @@ -991,6 +1005,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task) // either we have threads to run, or we were interrupted: ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING); + + return; } #endif @@ -1005,7 +1021,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) case BlockedOnException: case BlockedOnMVar: throwToSingleThreaded(cap, task->tso, - (StgClosure *)NonTermination_closure); + (StgClosure *)nonTermination_closure); return; default: barf("deadlock: main thread blocked in a strange way"); @@ -1439,8 +1455,34 @@ JB: TODO: investigate wether state change field could be nuked * ------------------------------------------------------------------------- */ static void -schedulePostRunThread(void) +schedulePostRunThread (StgTSO *t) { + // We have to be able to catch transactions that are in an + // infinite loop as a result of seeing an inconsistent view of + // memory, e.g. + // + // atomically $ do + // [a,b] <- mapM readTVar [ta,tb] + // when (a == b) loop + // + // and a is never equal to b given a consistent view of memory. + // + if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateNestOfTransactions (t -> trec)) { + debugTrace(DEBUG_sched | DEBUG_stm, + "trec %p found wasting its time", t); + + // strip the stack back to the + // ATOMICALLY_FRAME, aborting the (nested) + // transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + throwToSingleThreaded_(&capabilities[0], t, + NULL, rtsTrue, NULL); + + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); + } + } + #if defined(PAR) /* HACK 675: if the last thread didn't yield, make sure to print a SCHEDULE event to the log file when StgRunning the next thread, even @@ -1601,7 +1643,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped: HeapOverflow\n", + "--<< thread %ld (%s) stopped: HeapOverflow", (long)t->id, whatNext_strs[t->what_next]); #if defined(GRAN) @@ -1617,7 +1659,16 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } #endif - pushOnRunQueue(cap,t); + if (context_switch) { + // Sometimes we miss a context switch, e.g. when calling + // primitives in a tight loop, MAYBE_GC() doesn't check the + // context switch flag, and we end up waiting for a GC. + // See #1984, and concurrent/should_run/1984 + context_switch = 0; + addToRunQueue(cap,t); + } else { + pushOnRunQueue(cap,t); + } return rtsTrue; /* actual GC is done at the end of the while loop in schedule() */ } @@ -1685,7 +1736,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) IF_DEBUG(sanity, //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id); checkTSO(t)); - ASSERT(t->link == END_TSO_QUEUE); + ASSERT(t->_link == END_TSO_QUEUE); // Shortcut if we're just switching evaluators: don't bother // doing stack squeezing (which can be expensive), just run the @@ -1770,18 +1821,19 @@ scheduleHandleThreadBlocked( StgTSO *t // has tidied up its stack and placed itself on whatever queue // it needs to be on. -#if !defined(THREADED_RTS) - ASSERT(t->why_blocked != NotBlocked); - // This might not be true under THREADED_RTS: we don't have - // exclusive access to this TSO, so someone might have - // woken it up by now. This actually happens: try - // conc023 +RTS -N2. -#endif + // ASSERT(t->why_blocked != NotBlocked); + // Not true: for example, + // - in THREADED_RTS, the thread may already have been woken + // up by another Capability. This actually happens: try + // conc023 +RTS -N2. + // - the thread may have woken itself up already, because + // threadPaused() might have raised a blocked throwTo + // exception, see maybePerformBlockedException(). #ifdef DEBUG if (traceClass(DEBUG_sched)) { - debugTraceBegin("--<< thread %d (%s) stopped: ", - t->id, whatNext_strs[t->what_next]); + debugTraceBegin("--<< thread %lu (%s) stopped: ", + (unsigned long)t->id, whatNext_strs[t->what_next]); printThreadBlockage(t); debugTraceEnd(); } @@ -1807,8 +1859,8 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) * We also end up here if the thread kills itself with an * uncaught exception, see Exception.cmm. */ - debugTrace(DEBUG_sched, "--++ thread %d (%s) finished", - t->id, whatNext_strs[t->what_next]); + debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", + (unsigned long)t->id, whatNext_strs[t->what_next]); #if defined(GRAN) endThread(t, CurrentProc); // clean-up the thread @@ -1871,7 +1923,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) // point where we can deal with this. Leaving it on the run // queue also ensures that the garbage collector knows about // this thread and its return value (it gets dropped from the - // all_threads list so there's no other way to find it). + // step->threads list so there's no other way to find it). appendToRunQueue(cap,t); return rtsFalse; #else @@ -1909,36 +1961,21 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) } /* ----------------------------------------------------------------------------- - * Perform a heap census, if PROFILING + * Perform a heap census * -------------------------------------------------------------------------- */ static rtsBool -scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) +scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED ) { -#if defined(PROFILING) // When we have +RTS -i0 and we're heap profiling, do a census at // every GC. This lets us get repeatable runs for debugging. if (performHeapProfile || (RtsFlags.ProfFlags.profileInterval==0 && RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) { - - // checking black holes is necessary before GC, otherwise - // there may be threads that are unreachable except by the - // blackhole queue, which the GC will consider to be - // deadlocked. - scheduleCheckBlackHoles(&MainCapability); - - debugTrace(DEBUG_sched, "garbage collecting before heap census"); - GarbageCollect(GetRoots, rtsTrue); - - debugTrace(DEBUG_sched, "performing heap census"); - heapCensus(); - - performHeapProfile = rtsFalse; - return rtsTrue; // true <=> we already GC'd + return rtsTrue; + } else { + return rtsFalse; } -#endif - return rtsFalse; } /* ----------------------------------------------------------------------------- @@ -1946,10 +1983,10 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) * -------------------------------------------------------------------------- */ static Capability * -scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, - rtsBool force_major, void (*get_roots)(evac_fn)) +scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) { StgTSO *t; + rtsBool heap_census; #ifdef THREADED_RTS static volatile StgWord waiting_for_gc; rtsBool was_waiting; @@ -1998,51 +2035,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, waiting_for_gc = rtsFalse; #endif - /* Kick any transactions which are invalid back to their - * atomically frames. When next scheduled they will try to - * commit, this commit will fail and they will retry. - */ - { - StgTSO *next; - - for (t = all_threads; t != END_TSO_QUEUE; t = next) { - if (t->what_next == ThreadRelocated) { - next = t->link; - } else { - next = t->global_link; - - // This is a good place to check for blocked - // exceptions. It might be the case that a thread is - // blocked on delivering an exception to a thread that - // is also blocked - we try to ensure that this - // doesn't happen in throwTo(), but it's too hard (or - // impossible) to close all the race holes, so we - // accept that some might get through and deal with - // them here. A GC will always happen at some point, - // even if the system is otherwise deadlocked. - maybePerformBlockedException (&capabilities[0], t); - - if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { - if (!stmValidateNestOfTransactions (t -> trec)) { - debugTrace(DEBUG_sched | DEBUG_stm, - "trec %p found wasting its time", t); - - // strip the stack back to the - // ATOMICALLY_FRAME, aborting the (nested) - // transaction, and saving the stack of any - // partially-evaluated thunks on the heap. - throwToSingleThreaded_(&capabilities[0], t, - NULL, rtsTrue, NULL); - -#ifdef REG_R1 - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); -#endif - } - } - } - } - } - // so this happens periodically: if (cap) scheduleCheckBlackHoles(cap); @@ -2057,6 +2049,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, deleteAllThreads(&capabilities[0]); sched_state = SCHED_SHUTTING_DOWN; } + + heap_census = scheduleNeedHeapProfile(rtsTrue); /* everybody back, start the GC. * Could do it in this thread, or signal a condition var @@ -2066,8 +2060,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, #if defined(THREADED_RTS) debugTrace(DEBUG_sched, "doing GC"); #endif - GarbageCollect(get_roots, force_major); + GarbageCollect(force_major || heap_census); + if (heap_census) { + debugTrace(DEBUG_sched, "performing heap census"); + heapCensus(); + performHeapProfile = rtsFalse; + } + #if defined(THREADED_RTS) // release our stash of capabilities. for (i = 0; i < n_capabilities; i++) { @@ -2101,7 +2101,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, * Singleton fork(). Do not copy any running threads. * ------------------------------------------------------------------------- */ -StgInt +pid_t forkProcess(HsStablePtr *entry #ifndef FORKPROCESS_PRIMOP_SUPPORTED STG_UNUSED @@ -2113,6 +2113,7 @@ forkProcess(HsStablePtr *entry pid_t pid; StgTSO* t,*next; Capability *cap; + nat s; #if defined(THREADED_RTS) if (RtsFlags.ParFlags.nNodes > 1) { @@ -2126,25 +2127,44 @@ forkProcess(HsStablePtr *entry // ToDo: for SMP, we should probably acquire *all* the capabilities cap = rts_lock(); + // no funny business: hold locks while we fork, otherwise if some + // other thread is holding a lock when the fork happens, the data + // structure protected by the lock will forever be in an + // inconsistent state in the child. See also #1391. + ACQUIRE_LOCK(&sched_mutex); + ACQUIRE_LOCK(&cap->lock); + ACQUIRE_LOCK(&cap->running_task->lock); + pid = fork(); if (pid) { // parent + RELEASE_LOCK(&sched_mutex); + RELEASE_LOCK(&cap->lock); + RELEASE_LOCK(&cap->running_task->lock); + // just return the pid rts_unlock(cap); return pid; } else { // child +#if defined(THREADED_RTS) + initMutex(&sched_mutex); + initMutex(&cap->lock); + initMutex(&cap->running_task->lock); +#endif + // Now, all OS threads except the thread that forked are // stopped. We need to stop all Haskell threads, including // those involved in foreign calls. Also we need to delete // all Tasks, because they correspond to OS threads that are // now gone. - for (t = all_threads; t != END_TSO_QUEUE; t = next) { + for (s = 0; s < total_steps; s++) { + for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { if (t->what_next == ThreadRelocated) { - next = t->link; + next = t->_link; } else { next = t->global_link; // don't allow threads to catch the ThreadKilled @@ -2152,6 +2172,7 @@ forkProcess(HsStablePtr *entry // threads may be evaluating thunks that we need later. deleteThread_(cap,t); } + } } // Empty the run queue. It seems tempting to let all the @@ -2165,14 +2186,19 @@ forkProcess(HsStablePtr *entry // don't exist now: cap->suspended_ccalling_tasks = NULL; - // Empty the all_threads list. Otherwise, the garbage + // Empty the threads lists. Otherwise, the garbage // collector may attempt to resurrect some of these threads. - all_threads = END_TSO_QUEUE; + for (s = 0; s < total_steps; s++) { + all_steps[s].threads = END_TSO_QUEUE; + } // Wipe the task list, except the current Task. ACQUIRE_LOCK(&sched_mutex); for (task = all_tasks; task != NULL; task=task->all_link) { if (task != cap->running_task) { +#if defined(THREADED_RTS) + initMutex(&task->lock); // see #1391 +#endif discardTask(task); } } @@ -2186,6 +2212,11 @@ forkProcess(HsStablePtr *entry cap->returning_tasks_tl = NULL; #endif + // On Unix, all timers are reset in the child, so we need to start + // the timer again. + initTimer(); + startTimer(); + cap = rts_evalStableIO(cap, entry, NULL); // run the action rts_checkSchedStatus("forkProcess",cap); @@ -2209,14 +2240,18 @@ deleteAllThreads ( Capability *cap ) // NOTE: only safe to call if we own all capabilities. StgTSO* t, *next; + nat s; + debugTrace(DEBUG_sched,"deleting all threads"); - for (t = all_threads; t != END_TSO_QUEUE; t = next) { + for (s = 0; s < total_steps; s++) { + for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { if (t->what_next == ThreadRelocated) { - next = t->link; + next = t->_link; } else { next = t->global_link; deleteThread(cap,t); } + } } // The run queue now contains a bunch of ThreadKilled threads. We @@ -2282,9 +2317,17 @@ void * suspendThread (StgRegTable *reg) { Capability *cap; - int saved_errno = errno; + int saved_errno; StgTSO *tso; Task *task; +#if mingw32_HOST_OS + StgWord32 saved_winerror; +#endif + + saved_errno = errno; +#if mingw32_HOST_OS + saved_winerror = GetLastError(); +#endif /* assume that *reg is a pointer to the StgRegTable part of a Capability. */ @@ -2294,8 +2337,8 @@ suspendThread (StgRegTable *reg) tso = cap->r.rCurrentTSO; debugTrace(DEBUG_sched, - "thread %d did a safe foreign call", - cap->r.rCurrentTSO->id); + "thread %lu did a safe foreign call", + (unsigned long)cap->r.rCurrentTSO->id); // XXX this might not be necessary --SDM tso->what_next = ThreadRunGHC; @@ -2325,10 +2368,13 @@ suspendThread (StgRegTable *reg) /* Preparing to leave the RTS, so ensure there's a native thread/task waiting to take over. */ - debugTrace(DEBUG_sched, "thread %d: leaving RTS", tso->id); + debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id); #endif errno = saved_errno; +#if mingw32_HOST_OS + SetLastError(saved_winerror); +#endif return task; } @@ -2337,8 +2383,16 @@ resumeThread (void *task_) { StgTSO *tso; Capability *cap; - int saved_errno = errno; Task *task = task_; + int saved_errno; +#if mingw32_HOST_OS + StgWord32 saved_winerror; +#endif + + saved_errno = errno; +#if mingw32_HOST_OS + saved_winerror = GetLastError(); +#endif cap = task->cap; // Wait for permission to re-enter the RTS with the result. @@ -2352,8 +2406,8 @@ resumeThread (void *task_) tso = task->suspended_tso; task->suspended_tso = NULL; - tso->link = END_TSO_QUEUE; - debugTrace(DEBUG_sched, "thread %d: re-entering RTS", tso->id); + tso->_link = END_TSO_QUEUE; // no write barrier reqd + debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id); if (tso->why_blocked == BlockedOnCCall) { awakenBlockedExceptionQueue(cap,tso); @@ -2366,9 +2420,12 @@ resumeThread (void *task_) cap->r.rCurrentTSO = tso; cap->in_haskell = rtsTrue; errno = saved_errno; +#if mingw32_HOST_OS + SetLastError(saved_winerror); +#endif /* We might have GC'd, mark the TSO dirty again */ - dirtyTSO(tso); + dirty_TSO(cap,tso); IF_DEBUG(sanity, checkTSO(tso)); @@ -2429,7 +2486,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) appendToRunQueue(cap,tso); - debugTrace(DEBUG_sched, "new bound thread (%d)", tso->id); + debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id); #if defined(GRAN) /* GranSim specific init */ @@ -2443,7 +2500,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) ASSERT(task->stat != NoStatus); ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); - debugTrace(DEBUG_sched, "bound thread (%d) finished", task->tso->id); + debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id); return cap; } @@ -2504,14 +2561,11 @@ initScheduler(void) #endif blackhole_queue = END_TSO_QUEUE; - all_threads = END_TSO_QUEUE; context_switch = 0; sched_state = SCHED_RUNNING; + recent_activity = ACTIVITY_YES; - RtsFlags.ConcFlags.ctxtSwitchTicks = - RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS; - #if defined(THREADED_RTS) /* Initialise the mutex and condition variables used by * the scheduler. */ @@ -2557,7 +2611,13 @@ initScheduler(void) } void -exitScheduler( void ) +exitScheduler( + rtsBool wait_foreign +#if !defined(THREADED_RTS) + __attribute__((unused)) +#endif +) + /* see Capability.c, shutdownCapability() */ { Task *task = NULL; @@ -2570,7 +2630,7 @@ exitScheduler( void ) // If we haven't killed all the threads yet, do it now. if (sched_state < SCHED_SHUTTING_DOWN) { sched_state = SCHED_INTERRUPTING; - scheduleDoGC(NULL,task,rtsFalse,GetRoots); + scheduleDoGC(NULL,task,rtsFalse); } sched_state = SCHED_SHUTTING_DOWN; @@ -2579,90 +2639,25 @@ exitScheduler( void ) nat i; for (i = 0; i < n_capabilities; i++) { - shutdownCapability(&capabilities[i], task); + shutdownCapability(&capabilities[i], task, wait_foreign); } boundTaskExiting(task); stopTaskManager(); } +#else + freeCapability(&MainCapability); #endif } -/* --------------------------------------------------------------------------- - Where are the roots that we know about? - - - all the threads on the runnable queue - - all the threads on the blocked queue - - all the threads on the sleeping queue - - all the thread currently executing a _ccall_GC - - all the "main threads" - - ------------------------------------------------------------------------ */ - -/* This has to be protected either by the scheduler monitor, or by the - garbage collection monitor (probably the latter). - KH @ 25/10/99 -*/ - void -GetRoots( evac_fn evac ) +freeScheduler( void ) { - nat i; - Capability *cap; - Task *task; - -#if defined(GRAN) - for (i=0; i<=RtsFlags.GranFlags.proc; i++) { - if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL))) - evac((StgClosure **)&run_queue_hds[i]); - if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL))) - evac((StgClosure **)&run_queue_tls[i]); - - if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL))) - evac((StgClosure **)&blocked_queue_hds[i]); - if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL))) - evac((StgClosure **)&blocked_queue_tls[i]); - if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL))) - evac((StgClosure **)&ccalling_threads[i]); + freeTaskManager(); + if (n_capabilities != 1) { + stgFree(capabilities); } - - markEventQueue(); - -#else /* !GRAN */ - - for (i = 0; i < n_capabilities; i++) { - cap = &capabilities[i]; - evac((StgClosure **)(void *)&cap->run_queue_hd); - evac((StgClosure **)(void *)&cap->run_queue_tl); #if defined(THREADED_RTS) - evac((StgClosure **)(void *)&cap->wakeup_queue_hd); - evac((StgClosure **)(void *)&cap->wakeup_queue_tl); -#endif - for (task = cap->suspended_ccalling_tasks; task != NULL; - task=task->next) { - debugTrace(DEBUG_sched, - "evac'ing suspended TSO %d", task->suspended_tso->id); - evac((StgClosure **)(void *)&task->suspended_tso); - } - - } - - -#if !defined(THREADED_RTS) - evac((StgClosure **)(void *)&blocked_queue_hd); - evac((StgClosure **)(void *)&blocked_queue_tl); - evac((StgClosure **)(void *)&sleeping_queue); -#endif -#endif - - // evac((StgClosure **)&blackhole_queue); - -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN) - markSparkQueue(evac); -#endif - -#if defined(RTS_USER_SIGNALS) - // mark the signal handlers (signals should be already blocked) - markSignalHandlers(evac); + closeMutex(&sched_mutex); #endif } @@ -2672,17 +2667,10 @@ GetRoots( evac_fn evac ) This is the interface to the garbage collector from Haskell land. We provide this so that external C code can allocate and garbage collect when called from Haskell via _ccall_GC. - - It might be useful to provide an interface whereby the programmer - can specify more roots (ToDo). - - This needs to be protected by the GC condition variable above. KH. -------------------------------------------------------------------------- */ -static void (*extra_roots)(evac_fn); - static void -performGC_(rtsBool force_major, void (*get_roots)(evac_fn)) +performGC_(rtsBool force_major) { Task *task; // We must grab a new Task here, because the existing Task may be @@ -2691,34 +2679,20 @@ performGC_(rtsBool force_major, void (*get_roots)(evac_fn)) ACQUIRE_LOCK(&sched_mutex); task = newBoundTask(); RELEASE_LOCK(&sched_mutex); - scheduleDoGC(NULL,task,force_major, get_roots); + scheduleDoGC(NULL,task,force_major); boundTaskExiting(task); } void performGC(void) { - performGC_(rtsFalse, GetRoots); + performGC_(rtsFalse); } void performMajorGC(void) { - performGC_(rtsTrue, GetRoots); -} - -static void -AllRoots(evac_fn evac) -{ - GetRoots(evac); // the scheduler's roots - extra_roots(evac); // the user's roots -} - -void -performGCWithRoots(void (*get_roots)(evac_fn)) -{ - extra_roots = get_roots; - performGC_(rtsFalse, AllRoots); + performGC_(rtsTrue); } /* ----------------------------------------------------------------------------- @@ -2744,7 +2718,12 @@ threadStackOverflow(Capability *cap, StgTSO *tso) // while we are moving the TSO: lockClosure((StgClosure *)tso); - if (tso->stack_size >= tso->max_stack_size) { + if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) { + // NB. never raise a StackOverflow exception if the thread is + // inside Control.Exceptino.block. It is impractical to protect + // against stack overflow exceptions, since virtually anything + // can raise one (even 'catch'), so this is the only sensible + // thing to do here. See bug #767. debugTrace(DEBUG_gc, "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)", @@ -2774,7 +2753,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso) "increasing stack size from %ld words to %d.", (long)tso->stack_size, new_stack_size); - dest = (StgTSO *)allocate(new_tso_size); + dest = (StgTSO *)allocateLocal(cap,new_tso_size); TICK_ALLOC_TSO(new_stack_size,0); /* copy the TSO block and the old stack into the new area */ @@ -2795,7 +2774,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso) * dead TSO's stack. */ tso->what_next = ThreadRelocated; - tso->link = dest; + setTSOLink(cap,tso,dest); tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; @@ -2817,6 +2796,56 @@ threadStackOverflow(Capability *cap, StgTSO *tso) return dest; } +static StgTSO * +threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso) +{ + bdescr *bd, *new_bd; + lnat new_tso_size_w, tso_size_w; + StgTSO *new_tso; + + tso_size_w = tso_sizeW(tso); + + if (tso_size_w < MBLOCK_SIZE_W || + (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) + { + return tso; + } + + // don't allow throwTo() to modify the blocked_exceptions queue + // while we are moving the TSO: + lockClosure((StgClosure *)tso); + + new_tso_size_w = round_to_mblocks(tso_size_w/2); + + debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu", + tso->id, tso_size_w, new_tso_size_w); + + bd = Bdescr((StgPtr)tso); + new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W); + new_bd->free = bd->free; + bd->free = bd->start + TSO_STRUCT_SIZEW; + + new_tso = (StgTSO *)new_bd->start; + memcpy(new_tso,tso,TSO_STRUCT_SIZE); + new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW; + + tso->what_next = ThreadRelocated; + tso->_link = new_tso; // no write barrier reqd: same generation + + // The TSO attached to this Task may have moved, so update the + // pointer to it. + if (task->tso == tso) { + task->tso = new_tso; + } + + unlockTSO(new_tso); + unlockTSO(tso); + + IF_DEBUG(sanity,checkTSO(new_tso)); + + return new_tso; +} + /* --------------------------------------------------------------------------- Interrupt execution - usually called inside a signal handler so it mustn't do anything fancy. @@ -2847,17 +2876,10 @@ void wakeUpRts(void) { #if defined(THREADED_RTS) -#if !defined(mingw32_HOST_OS) // This forces the IO Manager thread to wakeup, which will // in turn ensure that some OS thread wakes up and runs the // scheduler loop, which will cause a GC and deadlock check. ioManagerWakeup(); -#else - // On Windows this might be safe enough, because we aren't - // in a signal handler. Later we should use the IO Manager, - // though. - prodOneCapability(); -#endif #endif } @@ -2893,7 +2915,7 @@ checkBlackHoles (Capability *cap) t = blackhole_queue; while (t != END_TSO_QUEUE) { ASSERT(t->why_blocked == BlockedOnBlackHole); - type = get_itbl(t->block_info.closure)->type; + type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type; if (type != BLACKHOLE && type != CAF_BLACKHOLE) { IF_DEBUG(sanity,checkTSO(t)); t = unblockOne(cap, t); @@ -2902,8 +2924,8 @@ checkBlackHoles (Capability *cap) *prev = t; any_woke_up = rtsTrue; } else { - prev = &t->link; - t = t->link; + prev = &t->_link; + t = t->_link; } } @@ -3041,8 +3063,9 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) or should be a ATOMICALLY_FRAME (if the retry# reaches the top level). - We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions, - despite the similar implementation. + We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they + create) because retries are not considered to be exceptions, despite the + similar implementation. We should not expect to see CATCH_FRAME or STOP_FRAME because those should not be created within memory transactions. @@ -3062,7 +3085,7 @@ findRetryFrameHelper (StgTSO *tso) case ATOMICALLY_FRAME: debugTrace(DEBUG_stm, - "found ATOMICALLY_FRAME at %p during retrry", p); + "found ATOMICALLY_FRAME at %p during retry", p); tso->sp = p; return ATOMICALLY_FRAME; @@ -3072,7 +3095,20 @@ findRetryFrameHelper (StgTSO *tso) tso->sp = p; return CATCH_RETRY_FRAME; - case CATCH_STM_FRAME: + case CATCH_STM_FRAME: { + StgTRecHeader *trec = tso -> trec; + StgTRecHeader *outer = stmGetEnclosingTRec(trec); + debugTrace(DEBUG_stm, + "found CATCH_STM_FRAME at %p during retry", p); + debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer); + stmAbortTransaction(tso -> cap, trec); + stmFreeAbortedTRec(tso -> cap, trec); + tso -> trec = outer; + p = next; + continue; + } + + default: ASSERT(info->i.type != CATCH_FRAME); ASSERT(info->i.type != STOP_FRAME); @@ -3097,12 +3133,16 @@ resurrectThreads (StgTSO *threads) { StgTSO *tso, *next; Capability *cap; + step *step; for (tso = threads; tso != END_TSO_QUEUE; tso = next) { next = tso->global_link; - tso->global_link = all_threads; - all_threads = tso; - debugTrace(DEBUG_sched, "resurrecting thread %d", tso->id); + + step = Bdescr((P_)tso)->step; + tso->global_link = step->threads; + step->threads = tso; + + debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id); // Wake up the thread on the Capability it was last on cap = tso->cap; @@ -3116,7 +3156,7 @@ resurrectThreads (StgTSO *threads) break; case BlockedOnBlackHole: throwToSingleThreaded(cap, tso, - (StgClosure *)NonTermination_closure); + (StgClosure *)nonTermination_closure); break; case BlockedOnSTM: throwToSingleThreaded(cap, tso, @@ -3133,3 +3173,37 @@ resurrectThreads (StgTSO *threads) } } } + +/* ----------------------------------------------------------------------------- + performPendingThrowTos is called after garbage collection, and + passed a list of threads that were found to have pending throwTos + (tso->blocked_exceptions was not empty), and were blocked. + Normally this doesn't happen, because we would deliver the + exception directly if the target thread is blocked, but there are + small windows where it might occur on a multiprocessor (see + throwTo()). + + NB. we must be holding all the capabilities at this point, just + like resurrectThreads(). + -------------------------------------------------------------------------- */ + +void +performPendingThrowTos (StgTSO *threads) +{ + StgTSO *tso, *next; + Capability *cap; + step *step; + + for (tso = threads; tso != END_TSO_QUEUE; tso = next) { + next = tso->global_link; + + step = Bdescr((P_)tso)->step; + tso->global_link = step->threads; + step->threads = tso; + + debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); + + cap = tso->cap; + maybePerformBlockedException(cap, tso); + } +}