X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FSchedule.c;h=302ec1e7a546e82fd3b319cca90760c18ace843c;hp=51a8d2a842466069fbed9adbb82bd426cb71198a;hb=0856ac59cfb455d32a3042317fdba0f5e85cab9c;hpb=dd56e9ab4544e83d27532a8d9058140bfe81825c diff --git a/rts/Schedule.c b/rts/Schedule.c index 51a8d2a..302ec1e 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -9,34 +9,24 @@ #include "PosixSource.h" #define KEEP_LOCKCLOSURE #include "Rts.h" -#include "SchedAPI.h" + +#include "sm/Storage.h" #include "RtsUtils.h" -#include "RtsFlags.h" -#include "OSThreads.h" -#include "Storage.h" #include "StgRun.h" -#include "Hooks.h" #include "Schedule.h" -#include "StgMiscClosures.h" #include "Interpreter.h" #include "Printer.h" #include "RtsSignals.h" #include "Sanity.h" #include "Stats.h" #include "STM.h" -#include "Timer.h" #include "Prelude.h" #include "ThreadLabels.h" -#include "LdvProfile.h" #include "Updates.h" #include "Proftimer.h" #include "ProfHeap.h" -#include "GC.h" #include "Weak.h" -#include "EventLog.h" - -/* PARALLEL_HASKELL includes go here */ - +#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N #include "Sparks.h" #include "Capability.h" #include "Task.h" @@ -47,7 +37,8 @@ #include "Trace.h" #include "RaiseAsync.h" #include "Threads.h" -#include "ThrIOManager.h" +#include "Timer.h" +#include "ThreadPaused.h" #ifdef HAVE_SYS_TYPES_H #include @@ -64,12 +55,6 @@ #include #endif -// Turn off inlining when debugging - it obfuscates things -#ifdef DEBUG -# undef STATIC_INLINE -# define STATIC_INLINE static -#endif - /* ----------------------------------------------------------------------------- * Global variables * -------------------------------------------------------------------------- */ @@ -150,7 +135,7 @@ static Capability *schedule (Capability *initialCapability, Task *task); static void schedulePreLoop (void); static void scheduleFindWork (Capability *cap); #if defined(THREADED_RTS) -static void scheduleYield (Capability **pcap, Task *task); +static void scheduleYield (Capability **pcap, Task *task, rtsBool); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); @@ -186,17 +171,6 @@ static void deleteAllThreads (Capability *cap); static void deleteThread_(Capability *cap, StgTSO *tso); #endif -#ifdef DEBUG -static char *whatNext_strs[] = { - "(unknown)", - "ThreadRunGHC", - "ThreadInterpret", - "ThreadKilled", - "ThreadRelocated", - "ThreadComplete" -}; -#endif - /* ----------------------------------------------------------------------------- * Putting a thread on the run queue: different scheduling policies * -------------------------------------------------------------------------- */ @@ -254,6 +228,7 @@ schedule (Capability *initialCapability, Task *task) rtsBool ready_to_gc; #if defined(THREADED_RTS) rtsBool first = rtsTrue; + rtsBool force_yield = rtsFalse; #endif cap = initialCapability; @@ -262,17 +237,7 @@ schedule (Capability *initialCapability, Task *task) // The sched_mutex is *NOT* held // NB. on return, we still hold a capability. - debugTrace (DEBUG_sched, - "### NEW SCHEDULER LOOP (task: %p, cap: %p)", - task, initialCapability); - - if (running_finalizers) { - errorBelch("error: a C finalizer called back into Haskell.\n" - " This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n" - " To create finalizers that may call back into Haskll, use\n" - " Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr."); - stg_exit(EXIT_FAILURE); - } + debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no); schedulePreLoop(); @@ -388,7 +353,9 @@ schedule (Capability *initialCapability, Task *task) } yield: - scheduleYield(&cap,task); + scheduleYield(&cap,task,force_yield); + force_yield = rtsFalse; + if (emptyRunQueue(cap)) continue; // look for work again #endif @@ -415,12 +382,11 @@ schedule (Capability *initialCapability, Task *task) if (bound) { if (bound == task) { - debugTrace(DEBUG_sched, - "### Running thread %lu in bound thread", (unsigned long)t->id); // yes, the Haskell thread is bound to the current native thread } else { debugTrace(DEBUG_sched, - "### thread %lu bound to another OS thread", (unsigned long)t->id); + "thread %lu bound to another OS thread", + (unsigned long)t->id); // no, bound to a different Haskell thread: pass to that thread pushOnRunQueue(cap,t); continue; @@ -429,7 +395,8 @@ schedule (Capability *initialCapability, Task *task) // The thread we want to run is unbound. if (task->tso) { debugTrace(DEBUG_sched, - "### this OS thread cannot run thread %lu", (unsigned long)t->id); + "this OS thread cannot run thread %lu", + (unsigned long)t->id); // no, the current native thread is bound to a different // Haskell thread, so pass it to any worker thread pushOnRunQueue(cap,t); @@ -464,9 +431,6 @@ run_thread: // that. cap->r.rCurrentTSO = t; - debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", - (long)t->id, whatNext_strs[t->what_next]); - startHeapProfTimer(); // Check for exceptions blocked on this thread @@ -504,7 +468,7 @@ run_thread: } #endif - postEvent(cap, EVENT_RUN_THREAD, t->id, 0); + traceSchedEvent(cap, EVENT_RUN_THREAD, t, 0); switch (prev_what_next) { @@ -554,7 +518,7 @@ run_thread: t->saved_winerror = GetLastError(); #endif - postEvent (cap, EVENT_STOP_THREAD, t->id, ret); + traceSchedEvent (cap, EVENT_STOP_THREAD, t, ret); #if defined(THREADED_RTS) // If ret is ThreadBlocked, and this Task is bound to the TSO that @@ -564,9 +528,7 @@ run_thread: // that task->cap != cap. We better yield this Capability // immediately and return to normaility. if (ret == ThreadBlocked) { - debugTrace(DEBUG_sched, - "--<< thread %lu (%s) stopped: blocked", - (unsigned long)t->id, whatNext_strs[t->what_next]); + force_yield = rtsTrue; goto yield; } #endif @@ -584,7 +546,9 @@ run_thread: schedulePostRunThread(cap,t); - t = threadStackUnderflow(task,t); + if (ret != StackOverflow) { + t = threadStackUnderflow(task,t); + } ready_to_gc = rtsFalse; @@ -687,12 +651,23 @@ shouldYieldCapability (Capability *cap, Task *task) // and also check the benchmarks in nofib/parallel for regressions. static void -scheduleYield (Capability **pcap, Task *task) +scheduleYield (Capability **pcap, Task *task, rtsBool force_yield) { Capability *cap = *pcap; // if we have work, and we don't need to give up the Capability, continue. - if (!shouldYieldCapability(cap,task) && + // + // The force_yield flag is used when a bound thread blocks. This + // is a particularly tricky situation: the current Task does not + // own the TSO any more, since it is on some queue somewhere, and + // might be woken up or manipulated by another thread at any time. + // The TSO and Task might be migrated to another Capability. + // Certain invariants might be in doubt, such as task->bound->cap + // == cap. We have to yield the current Capability immediately, + // no messing around. + // + if (!force_yield && + !shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || !emptyWakeupQueue(cap) || blackholes_need_checking || @@ -803,7 +778,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no); appendToRunQueue(free_caps[i],t); - postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no); + traceSchedEvent (cap, EVENT_MIGRATE_THREAD, t, free_caps[i]->no); if (t->bound) { t->bound->cap = free_caps[i]; } t->cap = free_caps[i]; @@ -827,7 +802,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, if (spark != NULL) { debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no); - postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no); + traceSchedEvent(free_caps[i], EVENT_STEAL_SPARK, t, cap->no); newSpark(&(free_caps[i]->r), spark); } @@ -1087,7 +1062,7 @@ schedulePostRunThread (Capability *cap, StgTSO *t) // partially-evaluated thunks on the heap. throwToSingleThreaded_(cap, t, NULL, rtsTrue); - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); +// ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); } } @@ -1111,7 +1086,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) debugTrace(DEBUG_sched, "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", - (long)t->id, whatNext_strs[t->what_next], blocks); + (long)t->id, what_next_strs[t->what_next], blocks); // don't do this if the nursery is (nearly) full, we'll GC first. if (cap->r.rCurrentNursery->link != NULL || @@ -1129,10 +1104,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) if (cap->r.rCurrentNursery->u.back != NULL) { cap->r.rCurrentNursery->u.back->link = bd; } else { -#if !defined(THREADED_RTS) - ASSERT(g0s0->blocks == cap->r.rCurrentNursery && - g0s0 == cap->r.rNursery); -#endif cap->r.rNursery->blocks = bd; } cap->r.rCurrentNursery->u.back = bd; @@ -1169,10 +1140,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } } - debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped: HeapOverflow", - (long)t->id, whatNext_strs[t->what_next]); - if (cap->r.rHpLim == NULL || cap->context_switch) { // Sometimes we miss a context switch, e.g. when calling // primitives in a tight loop, MAYBE_GC() doesn't check the @@ -1194,10 +1161,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) static void scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) { - debugTrace (DEBUG_sched, - "--<< thread %ld (%s) stopped, StackOverflow", - (long)t->id, whatNext_strs[t->what_next]); - /* just adjust the stack for this thread, then pop it back * on the run queue. */ @@ -1239,11 +1202,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) if (t->what_next != prev_what_next) { debugTrace(DEBUG_sched, "--<< thread %ld (%s) stopped to switch evaluators", - (long)t->id, whatNext_strs[t->what_next]); - } else { - debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped, yielding", - (long)t->id, whatNext_strs[t->what_next]); + (long)t->id, what_next_strs[t->what_next]); } #endif @@ -1290,12 +1249,7 @@ scheduleHandleThreadBlocked( StgTSO *t // exception, see maybePerformBlockedException(). #ifdef DEBUG - if (traceClass(DEBUG_sched)) { - debugTraceBegin("--<< thread %lu (%s) stopped: ", - (unsigned long)t->id, whatNext_strs[t->what_next]); - printThreadBlockage(t); - debugTraceEnd(); - } + traceThreadStatus(DEBUG_sched, t); #endif } @@ -1312,8 +1266,6 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) * We also end up here if the thread kills itself with an * uncaught exception, see Exception.cmm. */ - debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", - (unsigned long)t->id, whatNext_strs[t->what_next]); // blocked exceptions can now complete, even if the thread was in // blocked mode (see #2910). This unconditionally calls @@ -1465,7 +1417,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) if (gc_type == PENDING_GC_SEQ) { - postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0); + traceSchedEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0); // single-threaded GC: grab all the capabilities for (i=0; i < n_capabilities; i++) { debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); @@ -1488,7 +1440,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) { // multi-threaded GC: make sure all the Capabilities donate one // GC thread each. - postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0); + traceSchedEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0); debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); waitForGcThreads(cap); @@ -1514,8 +1466,7 @@ delete_threads_and_gc: heap_census = scheduleNeedHeapProfile(rtsTrue); #if defined(THREADED_RTS) - postEvent(cap, EVENT_GC_START, 0, 0); - debugTrace(DEBUG_sched, "doing GC"); + traceSchedEvent(cap, EVENT_GC_START, 0, 0); // reset waiting_for_gc *before* GC, so that when the GC threads // emerge they don't immediately re-enter the GC. waiting_for_gc = 0; @@ -1523,7 +1474,7 @@ delete_threads_and_gc: #else GarbageCollect(force_major || heap_census, 0, cap); #endif - postEvent(cap, EVENT_GC_END, 0, 0); + traceSchedEvent(cap, EVENT_GC_END, 0, 0); if (recent_activity == ACTIVITY_INACTIVE && force_major) { @@ -1729,7 +1680,6 @@ forkProcess(HsStablePtr *entry } #else /* !FORKPROCESS_PRIMOP_SUPPORTED */ barf("forkProcess#: primop not supported on this platform, sorry!\n"); - return -1; #endif } @@ -1839,10 +1789,7 @@ suspendThread (StgRegTable *reg) task = cap->running_task; tso = cap->r.rCurrentTSO; - postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL); - debugTrace(DEBUG_sched, - "thread %lu did a safe foreign call", - (unsigned long)cap->r.rCurrentTSO->id); + traceSchedEvent(cap, EVENT_STOP_THREAD, tso, THREAD_SUSPENDED_FOREIGN_CALL); // XXX this might not be necessary --SDM tso->what_next = ThreadRunGHC; @@ -1868,13 +1815,6 @@ suspendThread (StgRegTable *reg) RELEASE_LOCK(&cap->lock); -#if defined(THREADED_RTS) - /* Preparing to leave the RTS, so ensure there's a native thread/task - waiting to take over. - */ - debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id); -#endif - errno = saved_errno; #if mingw32_HOST_OS SetLastError(saved_winerror); @@ -1912,8 +1852,7 @@ resumeThread (void *task_) task->suspended_tso = NULL; tso->_link = END_TSO_QUEUE; // no write barrier reqd - postEvent(cap, EVENT_RUN_THREAD, tso->id, 0); - debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id); + traceSchedEvent(cap, EVENT_RUN_THREAD, tso, tso->what_next); if (tso->why_blocked == BlockedOnCCall) { // avoid locking the TSO if we don't have to @@ -1969,7 +1908,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { - postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no); + traceSchedEvent (cap, EVENT_MIGRATE_THREAD, tso, capabilities[cpu].no); wakeupThreadOnCapability(cap, &capabilities[cpu], tso); } #else @@ -2381,7 +2320,9 @@ interruptStgRts(void) { sched_state = SCHED_INTERRUPTING; setContextSwitches(); +#if defined(THREADED_RTS) wakeUpRts(); +#endif } /* ----------------------------------------------------------------------------- @@ -2397,16 +2338,15 @@ interruptStgRts(void) will have interrupted any blocking system call in progress anyway. -------------------------------------------------------------------------- */ -void -wakeUpRts(void) -{ #if defined(THREADED_RTS) +void wakeUpRts(void) +{ // This forces the IO Manager thread to wakeup, which will // in turn ensure that some OS thread wakes up and runs the // scheduler loop, which will cause a GC and deadlock check. ioManagerWakeup(); -#endif } +#endif /* ----------------------------------------------------------------------------- * checkBlackHoles() @@ -2624,7 +2564,7 @@ findRetryFrameHelper (StgTSO *tso) case CATCH_STM_FRAME: { StgTRecHeader *trec = tso -> trec; - StgTRecHeader *outer = stmGetEnclosingTRec(trec); + StgTRecHeader *outer = trec -> enclosing_trec; debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p during retry", p); debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer); @@ -2676,10 +2616,9 @@ resurrectThreads (StgTSO *threads) switch (tso->why_blocked) { case BlockedOnMVar: - case BlockedOnException: /* Called by GC - sched_mutex lock is currently held. */ throwToSingleThreaded(cap, tso, - (StgClosure *)blockedOnDeadMVar_closure); + (StgClosure *)blockedIndefinitelyOnMVar_closure); break; case BlockedOnBlackHole: throwToSingleThreaded(cap, tso, @@ -2687,7 +2626,7 @@ resurrectThreads (StgTSO *threads) break; case BlockedOnSTM: throwToSingleThreaded(cap, tso, - (StgClosure *)blockedIndefinitely_closure); + (StgClosure *)blockedIndefinitelyOnSTM_closure); break; case NotBlocked: /* This might happen if the thread was blocked on a black hole @@ -2695,6 +2634,11 @@ resurrectThreads (StgTSO *threads) * can wake up threads, remember...). */ continue; + case BlockedOnException: + // throwTo should never block indefinitely: if the target + // thread dies or completes, throwTo returns. + barf("resurrectThreads: thread BlockedOnException"); + break; default: barf("resurrectThreads: thread blocked in a strange way"); } @@ -2719,8 +2663,12 @@ performPendingThrowTos (StgTSO *threads) { StgTSO *tso, *next; Capability *cap; + Task *task, *saved_task;; step *step; + task = myTask(); + cap = task->cap; + for (tso = threads; tso != END_TSO_QUEUE; tso = next) { next = tso->global_link; @@ -2730,7 +2678,17 @@ performPendingThrowTos (StgTSO *threads) debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); - cap = tso->cap; - maybePerformBlockedException(cap, tso); - } + // We must pretend this Capability belongs to the current Task + // for the time being, as invariants will be broken otherwise. + // In fact the current Task has exclusive access to the systme + // at this point, so this is just bookkeeping: + task->cap = tso->cap; + saved_task = tso->cap->running_task; + tso->cap->running_task = task; + maybePerformBlockedException(tso->cap, tso); + tso->cap->running_task = saved_task; + } + + // Restore our original Capability: + task->cap = cap; }