X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FSchedule.c;h=3ae1fe0a1bba2f27b7fc12c1ebfef06380131b84;hp=e17c6534d667cc0475297ee5a02cce9265664c85;hb=5270423a6afe69f1dc57e5e5a474812182718d40;hpb=99df892cc9620fcc92747b79bba75dad8a1d295c diff --git a/rts/Schedule.c b/rts/Schedule.c index e17c653..3ae1fe0 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -9,31 +9,24 @@ #include "PosixSource.h" #define KEEP_LOCKCLOSURE #include "Rts.h" -#include "SchedAPI.h" + +#include "sm/Storage.h" #include "RtsUtils.h" -#include "RtsFlags.h" -#include "OSThreads.h" -#include "Storage.h" #include "StgRun.h" -#include "Hooks.h" #include "Schedule.h" -#include "StgMiscClosures.h" #include "Interpreter.h" #include "Printer.h" #include "RtsSignals.h" #include "Sanity.h" #include "Stats.h" #include "STM.h" -#include "Timer.h" #include "Prelude.h" #include "ThreadLabels.h" -#include "LdvProfile.h" #include "Updates.h" #include "Proftimer.h" #include "ProfHeap.h" - -/* PARALLEL_HASKELL includes go here */ - +#include "Weak.h" +#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N #include "Sparks.h" #include "Capability.h" #include "Task.h" @@ -44,7 +37,8 @@ #include "Trace.h" #include "RaiseAsync.h" #include "Threads.h" -#include "ThrIOManager.h" +#include "Timer.h" +#include "ThreadPaused.h" #ifdef HAVE_SYS_TYPES_H #include @@ -61,12 +55,6 @@ #include #endif -// Turn off inlining when debugging - it obfuscates things -#ifdef DEBUG -# undef STATIC_INLINE -# define STATIC_INLINE static -#endif - /* ----------------------------------------------------------------------------- * Global variables * -------------------------------------------------------------------------- */ @@ -89,16 +77,24 @@ StgTSO *blackhole_queue = NULL; */ rtsBool blackholes_need_checking = rtsFalse; +/* Set to true when the latest garbage collection failed to reclaim + * enough space, and the runtime should proceed to shut itself down in + * an orderly fashion (emitting profiling info etc.) + */ +rtsBool heap_overflow = rtsFalse; + /* flag that tracks whether we have done any execution in this time slice. * LOCK: currently none, perhaps we should lock (but needs to be * updated in the fast path of the scheduler). + * + * NB. must be StgWord, we do xchg() on it. */ -nat recent_activity = ACTIVITY_YES; +volatile StgWord recent_activity = ACTIVITY_YES; /* if this flag is set as well, give up execution - * LOCK: none (changes once, from false->true) + * LOCK: none (changes monotonically) */ -rtsBool sched_state = SCHED_RUNNING; +volatile StgWord sched_state = SCHED_RUNNING; /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually * exists - earlier gccs apparently didn't. @@ -139,7 +135,7 @@ static Capability *schedule (Capability *initialCapability, Task *task); static void schedulePreLoop (void); static void scheduleFindWork (Capability *cap); #if defined(THREADED_RTS) -static void scheduleYield (Capability **pcap, Task *task); +static void scheduleYield (Capability **pcap, Task *task, rtsBool); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); @@ -147,11 +143,7 @@ static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); static void schedulePushWork(Capability *cap, Task *task); -#if defined(PARALLEL_HASKELL) -static rtsBool scheduleGetRemoteWork(Capability *cap); -static void scheduleSendPendingMessages(void); -#endif -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +#if defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap); #endif static void schedulePostRunThread(Capability *cap, StgTSO *t); @@ -179,17 +171,6 @@ static void deleteAllThreads (Capability *cap); static void deleteThread_(Capability *cap, StgTSO *tso); #endif -#ifdef DEBUG -static char *whatNext_strs[] = { - "(unknown)", - "ThreadRunGHC", - "ThreadInterpret", - "ThreadKilled", - "ThreadRelocated", - "ThreadComplete" -}; -#endif - /* ----------------------------------------------------------------------------- * Putting a thread on the run queue: different scheduling policies * -------------------------------------------------------------------------- */ @@ -197,18 +178,8 @@ static char *whatNext_strs[] = { STATIC_INLINE void addToRunQueue( Capability *cap, StgTSO *t ) { -#if defined(PARALLEL_HASKELL) - if (RtsFlags.ParFlags.doFairScheduling) { - // this does round-robin scheduling; good for concurrency - appendToRunQueue(cap,t); - } else { - // this does unfair scheduling; good for parallelism - pushOnRunQueue(cap,t); - } -#else // this does round-robin scheduling; good for concurrency appendToRunQueue(cap,t); -#endif } /* --------------------------------------------------------------------------- @@ -253,13 +224,11 @@ schedule (Capability *initialCapability, Task *task) StgTSO *t; Capability *cap; StgThreadReturnCode ret; -#if defined(PARALLEL_HASKELL) - rtsBool receivedFinish = rtsFalse; -#endif nat prev_what_next; rtsBool ready_to_gc; #if defined(THREADED_RTS) rtsBool first = rtsTrue; + rtsBool force_yield = rtsFalse; #endif cap = initialCapability; @@ -268,22 +237,14 @@ schedule (Capability *initialCapability, Task *task) // The sched_mutex is *NOT* held // NB. on return, we still hold a capability. - debugTrace (DEBUG_sched, - "### NEW SCHEDULER LOOP (task: %p, cap: %p)", - task, initialCapability); + debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no); schedulePreLoop(); // ----------------------------------------------------------- // Scheduler loop starts here: -#if defined(PARALLEL_HASKELL) -#define TERMINATION_CONDITION (!receivedFinish) -#else -#define TERMINATION_CONDITION rtsTrue -#endif - - while (TERMINATION_CONDITION) { + while (1) { // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign @@ -338,7 +299,14 @@ schedule (Capability *initialCapability, Task *task) #endif /* scheduleDoGC() deletes all the threads */ cap = scheduleDoGC(cap,task,rtsFalse); - break; + + // after scheduleDoGC(), we must be shutting down. Either some + // other Capability did the final GC, or we did it above, + // either way we can fall through to the SCHED_SHUTTING_DOWN + // case now. + ASSERT(sched_state == SCHED_SHUTTING_DOWN); + // fall through + case SCHED_SHUTTING_DOWN: debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN"); // If we are a worker, just exit. If we're a bound thread @@ -358,21 +326,6 @@ schedule (Capability *initialCapability, Task *task) (pushes threads, wakes up idle capabilities for stealing) */ schedulePushWork(cap,task); -#if defined(PARALLEL_HASKELL) - /* since we perform a blocking receive and continue otherwise, - either we never reach here or we definitely have work! */ - // from here: non-empty run queue - ASSERT(!emptyRunQueue(cap)); - - if (PacketsWaiting()) { /* now process incoming messages, if any - pending... - - CAUTION: scheduleGetRemoteWork called - above, waits for messages as well! */ - processMessages(cap, &receivedFinish); - } -#endif // PARALLEL_HASKELL: non-empty run queue! - scheduleDetectDeadlock(cap,task); #if defined(THREADED_RTS) @@ -399,7 +352,10 @@ schedule (Capability *initialCapability, Task *task) // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); } - scheduleYield(&cap,task); + yield: + scheduleYield(&cap,task,force_yield); + force_yield = rtsFalse; + if (emptyRunQueue(cap)) continue; // look for work again #endif @@ -426,12 +382,11 @@ schedule (Capability *initialCapability, Task *task) if (bound) { if (bound == task) { - debugTrace(DEBUG_sched, - "### Running thread %lu in bound thread", (unsigned long)t->id); // yes, the Haskell thread is bound to the current native thread } else { debugTrace(DEBUG_sched, - "### thread %lu bound to another OS thread", (unsigned long)t->id); + "thread %lu bound to another OS thread", + (unsigned long)t->id); // no, bound to a different Haskell thread: pass to that thread pushOnRunQueue(cap,t); continue; @@ -440,7 +395,8 @@ schedule (Capability *initialCapability, Task *task) // The thread we want to run is unbound. if (task->tso) { debugTrace(DEBUG_sched, - "### this OS thread cannot run thread %lu", (unsigned long)t->id); + "this OS thread cannot run thread %lu", + (unsigned long)t->id); // no, the current native thread is bound to a different // Haskell thread, so pass it to any worker thread pushOnRunQueue(cap,t); @@ -450,6 +406,15 @@ schedule (Capability *initialCapability, Task *task) } #endif + // If we're shutting down, and this thread has not yet been + // killed, kill it now. This sometimes happens when a finalizer + // thread is created by the final GC, or a thread previously + // in a foreign call returns. + if (sched_state >= SCHED_INTERRUPTING && + !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) { + deleteThread(cap,t); + } + /* context switches are initiated by the timer signal, unless * the user specified "context switch as often as possible", with * +RTS -C0 @@ -466,9 +431,6 @@ run_thread: // that. cap->r.rCurrentTSO = t; - debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", - (long)t->id, whatNext_strs[t->what_next]); - startHeapProfTimer(); // Check for exceptions blocked on this thread @@ -479,6 +441,7 @@ run_thread: ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); ASSERT(t->cap == cap); + ASSERT(t->bound ? t->bound->cap == cap : 1); prev_what_next = t->what_next; @@ -505,6 +468,8 @@ run_thread: } #endif + traceSchedEvent(cap, EVENT_RUN_THREAD, t, 0); + switch (prev_what_next) { case ThreadKilled: @@ -553,6 +518,8 @@ run_thread: t->saved_winerror = GetLastError(); #endif + traceSchedEvent (cap, EVENT_STOP_THREAD, t, ret); + #if defined(THREADED_RTS) // If ret is ThreadBlocked, and this Task is bound to the TSO that // blocked, we are in limbo - the TSO is now owned by whatever it @@ -561,10 +528,8 @@ run_thread: // that task->cap != cap. We better yield this Capability // immediately and return to normaility. if (ret == ThreadBlocked) { - debugTrace(DEBUG_sched, - "--<< thread %lu (%s) stopped: blocked", - (unsigned long)t->id, whatNext_strs[t->what_next]); - continue; + force_yield = rtsTrue; + goto yield; } #endif @@ -581,7 +546,9 @@ run_thread: schedulePostRunThread(cap,t); - t = threadStackUnderflow(task,t); + if (ret != StackOverflow) { + t = threadStackUnderflow(task,t); + } ready_to_gc = rtsFalse; @@ -651,33 +618,8 @@ scheduleFindWork (Capability *cap) scheduleCheckBlockedThreads(cap); -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) - // Try to activate one of our own sparks - if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); } -#endif - #if defined(THREADED_RTS) - // Try to steak work if we don't have any - if (emptyRunQueue(cap)) { stealWork(cap); } -#endif - -#if defined(PARALLEL_HASKELL) - // if messages have been buffered... - scheduleSendPendingMessages(); -#endif - -#if defined(PARALLEL_HASKELL) - if (emptyRunQueue(cap)) { - receivedFinish = scheduleGetRemoteWork(cap); - continue; // a new round, (hopefully) with new work - /* - in GUM, this a) sends out a FISH and returns IF no fish is - out already - b) (blocking) awaits and receives messages - - in Eden, this is only the blocking receive, as b) in GUM. - */ - } + if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); } #endif } @@ -704,15 +646,32 @@ shouldYieldCapability (Capability *cap, Task *task) // - we need to yield this Capability to someone else // (see shouldYieldCapability()) // -// The return value indicates whether +// Careful: the scheduler loop is quite delicate. Make sure you run +// the tests in testsuite/concurrent (all ways) after modifying this, +// and also check the benchmarks in nofib/parallel for regressions. static void -scheduleYield (Capability **pcap, Task *task) +scheduleYield (Capability **pcap, Task *task, rtsBool force_yield) { Capability *cap = *pcap; // if we have work, and we don't need to give up the Capability, continue. - if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task)) + // + // The force_yield flag is used when a bound thread blocks. This + // is a particularly tricky situation: the current Task does not + // own the TSO any more, since it is on some queue somewhere, and + // might be woken up or manipulated by another thread at any time. + // The TSO and Task might be migrated to another Capability. + // Certain invariants might be in doubt, such as task->bound->cap + // == cap. We have to yield the current Capability immediately, + // no messing around. + // + if (!force_yield && + !shouldYieldCapability(cap,task) && + (!emptyRunQueue(cap) || + !emptyWakeupQueue(cap) || + blackholes_need_checking || + sched_state >= SCHED_INTERRUPTING)) return; // otherwise yield (sleep), and keep yielding if necessary. @@ -751,9 +710,11 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // Check whether we have more threads on our run queue, or sparks // in our pool, that we could hand to another Capability. - if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE) - && sparkPoolSizeCap(cap) < 2) { - return; + if (cap->run_queue_hd == END_TSO_QUEUE) { + if (sparkPoolSizeCap(cap) < 2) return; + } else { + if (cap->run_queue_hd->_link == END_TSO_QUEUE && + sparkPoolSizeCap(cap) < 1) return; } // First grab as many free Capabilities as we can. @@ -816,6 +777,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, } else { debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no); appendToRunQueue(free_caps[i],t); + + traceSchedEvent (cap, EVENT_MIGRATE_THREAD, t, free_caps[i]->no); + if (t->bound) { t->bound->cap = free_caps[i]; } t->cap = free_caps[i]; i++; @@ -837,6 +801,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, spark = tryStealSpark(cap->sparks); if (spark != NULL) { debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no); + + traceSchedEvent(free_caps[i], EVENT_STEAL_SPARK, t, cap->no); + newSpark(&(free_caps[i]->r), spark); } } @@ -932,8 +899,13 @@ scheduleCheckBlackHoles (Capability *cap) { ACQUIRE_LOCK(&sched_mutex); if ( blackholes_need_checking ) { - checkBlackHoles(cap); blackholes_need_checking = rtsFalse; + // important that we reset the flag *before* checking the + // blackhole queue, otherwise we could get deadlock. This + // happens as follows: we wake up a thread that + // immediately runs on another Capability, blocks on a + // blackhole, and then we reset the blackholes_need_checking flag. + checkBlackHoles(cap); } RELEASE_LOCK(&sched_mutex); } @@ -946,12 +918,6 @@ scheduleCheckBlackHoles (Capability *cap) static void scheduleDetectDeadlock (Capability *cap, Task *task) { - -#if defined(PARALLEL_HASKELL) - // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL - return; -#endif - /* * Detect deadlock: when we have no threads to run, there are no * threads blocked, waiting for I/O, or sleeping, and all the @@ -977,12 +943,11 @@ scheduleDetectDeadlock (Capability *cap, Task *task) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/); + cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/); + // when force_major == rtsTrue. scheduleDoGC sets + // recent_activity to ACTIVITY_DONE_GC and turns off the timer + // signal. - recent_activity = ACTIVITY_DONE_GC; - // disable timer signals (see #1623) - stopTimer(); - if ( !emptyRunQueue(cap) ) return; #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS) @@ -1057,84 +1022,19 @@ scheduleSendPendingMessages(void) * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS) * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +#if defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap) { - StgClosure *spark; - -/* We only want to stay here if the run queue is empty and we want some - work. We try to turn a spark into a thread, and add it to the run - queue, from where it will be picked up in the next iteration of the - scheduler loop. -*/ - if (!emptyRunQueue(cap)) - /* In the threaded RTS, another task might have pushed a thread - on our run queue in the meantime ? But would need a lock.. */ - return; - - - // Really we should be using reclaimSpark() here, but - // experimentally it doesn't seem to perform as well as just - // stealing from our own spark pool: - // spark = reclaimSpark(cap->sparks); - spark = tryStealSpark(cap->sparks); // defined in Sparks.c - - if (spark != NULL) { - debugTrace(DEBUG_sched, - "turning spark of closure %p into a thread", - (StgClosure *)spark); - createSparkThread(cap,spark); // defined in Sparks.c + if (anySparks()) + { + createSparkThread(cap); + debugTrace(DEBUG_sched, "creating a spark thread"); } } #endif // PARALLEL_HASKELL || THREADED_RTS /* ---------------------------------------------------------------------------- - * Get work from a remote node (PARALLEL_HASKELL only) - * ------------------------------------------------------------------------- */ - -#if defined(PARALLEL_HASKELL) -static rtsBool /* return value used in PARALLEL_HASKELL only */ -scheduleGetRemoteWork (Capability *cap STG_UNUSED) -{ -#if defined(PARALLEL_HASKELL) - rtsBool receivedFinish = rtsFalse; - - // idle() , i.e. send all buffers, wait for work - if (RtsFlags.ParFlags.BufferTime) { - IF_PAR_DEBUG(verbose, - debugBelch("...send all pending data,")); - { - nat i; - for (i=1; i<=nPEs; i++) - sendImmediately(i); // send all messages away immediately - } - } - - /* this would be the place for fishing in GUM... - - if (no-earlier-fish-around) - sendFish(choosePe()); - */ - - // Eden:just look for incoming messages (blocking receive) - IF_PAR_DEBUG(verbose, - debugBelch("...wait for incoming messages...\n")); - processMessages(cap, &receivedFinish); // blocking receive... - - - return receivedFinish; - // reenter scheduling look after having received something - -#else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */ - - return rtsFalse; /* return value unused in THREADED_RTS */ - -#endif /* PARALLEL_HASKELL */ -} -#endif // PARALLEL_HASKELL || THREADED_RTS - -/* ---------------------------------------------------------------------------- * After running a thread... * ------------------------------------------------------------------------- */ @@ -1160,9 +1060,9 @@ schedulePostRunThread (Capability *cap, StgTSO *t) // ATOMICALLY_FRAME, aborting the (nested) // transaction, and saving the stack of any // partially-evaluated thunks on the heap. - throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL); + throwToSingleThreaded_(cap, t, NULL, rtsTrue); - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); +// ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); } } @@ -1186,7 +1086,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) debugTrace(DEBUG_sched, "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", - (long)t->id, whatNext_strs[t->what_next], blocks); + (long)t->id, what_next_strs[t->what_next], blocks); // don't do this if the nursery is (nearly) full, we'll GC first. if (cap->r.rCurrentNursery->link != NULL || @@ -1204,10 +1104,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) if (cap->r.rCurrentNursery->u.back != NULL) { cap->r.rCurrentNursery->u.back->link = bd; } else { -#if !defined(THREADED_RTS) - ASSERT(g0s0->blocks == cap->r.rCurrentNursery && - g0s0 == cap->r.rNursery); -#endif cap->r.rNursery->blocks = bd; } cap->r.rCurrentNursery->u.back = bd; @@ -1222,8 +1118,8 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) { bdescr *x; for (x = bd; x < bd + blocks; x++) { - x->step = cap->r.rNursery; - x->gen_no = 0; + initBdescr(x,cap->r.rNursery); + x->free = x->start; x->flags = 0; } } @@ -1244,11 +1140,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } } - debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped: HeapOverflow", - (long)t->id, whatNext_strs[t->what_next]); - - if (cap->context_switch) { + if (cap->r.rHpLim == NULL || cap->context_switch) { // Sometimes we miss a context switch, e.g. when calling // primitives in a tight loop, MAYBE_GC() doesn't check the // context switch flag, and we end up waiting for a GC. @@ -1269,10 +1161,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) static void scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) { - debugTrace (DEBUG_sched, - "--<< thread %ld (%s) stopped, StackOverflow", - (long)t->id, whatNext_strs[t->what_next]); - /* just adjust the stack for this thread, then pop it back * on the run queue. */ @@ -1314,11 +1202,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) if (t->what_next != prev_what_next) { debugTrace(DEBUG_sched, "--<< thread %ld (%s) stopped to switch evaluators", - (long)t->id, whatNext_strs[t->what_next]); - } else { - debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped, yielding", - (long)t->id, whatNext_strs[t->what_next]); + (long)t->id, what_next_strs[t->what_next]); } #endif @@ -1345,7 +1229,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) static void scheduleHandleThreadBlocked( StgTSO *t -#if !defined(GRAN) && !defined(DEBUG) +#if !defined(DEBUG) STG_UNUSED #endif ) @@ -1365,12 +1249,7 @@ scheduleHandleThreadBlocked( StgTSO *t // exception, see maybePerformBlockedException(). #ifdef DEBUG - if (traceClass(DEBUG_sched)) { - debugTraceBegin("--<< thread %lu (%s) stopped: ", - (unsigned long)t->id, whatNext_strs[t->what_next]); - printThreadBlockage(t); - debugTraceEnd(); - } + traceThreadStatus(DEBUG_sched, t); #endif } @@ -1387,8 +1266,12 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) * We also end up here if the thread kills itself with an * uncaught exception, see Exception.cmm. */ - debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", - (unsigned long)t->id, whatNext_strs[t->what_next]); + + // blocked exceptions can now complete, even if the thread was in + // blocked mode (see #2910). This unconditionally calls + // lockTSO(), which ensures that we don't miss any threads that + // are engaged in throwTo() with this thread as a target. + awakenBlockedExceptionQueue (cap, t); // // Check whether the thread that just completed was a bound @@ -1432,7 +1315,11 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) *(task->ret) = NULL; } if (sched_state >= SCHED_INTERRUPTING) { - task->stat = Interrupted; + if (heap_overflow) { + task->stat = HeapExhausted; + } else { + task->stat = Interrupted; + } } else { task->stat = Killed; } @@ -1475,11 +1362,28 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) #ifdef THREADED_RTS /* extern static volatile StgWord waiting_for_gc; lives inside capability.c */ - rtsBool was_waiting; + rtsBool gc_type, prev_pending_gc; nat i; #endif + if (sched_state == SCHED_SHUTTING_DOWN) { + // The final GC has already been done, and the system is + // shutting down. We'll probably deadlock if we try to GC + // now. + return cap; + } + #ifdef THREADED_RTS + if (sched_state < SCHED_INTERRUPTING + && RtsFlags.ParFlags.parGcEnabled + && N >= RtsFlags.ParFlags.parGcGen + && ! oldest_gen->steps[0].mark) + { + gc_type = PENDING_GC_PAR; + } else { + gc_type = PENDING_GC_SEQ; + } + // In order to GC, there must be no threads running Haskell code. // Therefore, the GC thread needs to hold *all* the capabilities, // and release them after the GC has completed. @@ -1490,74 +1394,147 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) // actually did the GC. But it's quite hard to arrange for all // the other tasks to sleep and stay asleep. // - + /* Other capabilities are prevented from running yet more Haskell threads if waiting_for_gc is set. Tested inside yieldCapability() and releaseCapability() in Capability.c */ - was_waiting = cas(&waiting_for_gc, 0, 1); - if (was_waiting) { + prev_pending_gc = cas(&waiting_for_gc, 0, gc_type); + if (prev_pending_gc) { do { - debugTrace(DEBUG_sched, "someone else is trying to GC..."); - if (cap) yieldCapability(&cap,task); + debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", + prev_pending_gc); + ASSERT(cap); + yieldCapability(&cap,task); } while (waiting_for_gc); return cap; // NOTE: task->cap might have changed here } setContextSwitches(); - for (i=0; i < n_capabilities; i++) { - debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); - if (cap != &capabilities[i]) { - Capability *pcap = &capabilities[i]; - // we better hope this task doesn't get migrated to - // another Capability while we're waiting for this one. - // It won't, because load balancing happens while we have - // all the Capabilities, but even so it's a slightly - // unsavoury invariant. - task->cap = pcap; - waitForReturnCapability(&pcap, task); - if (pcap != &capabilities[i]) { - barf("scheduleDoGC: got the wrong capability"); - } - } + + // The final shutdown GC is always single-threaded, because it's + // possible that some of the Capabilities have no worker threads. + + if (gc_type == PENDING_GC_SEQ) + { + traceSchedEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0); + } + else + { + traceSchedEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0); + debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); } - waiting_for_gc = rtsFalse; -#endif + // do this while the other Capabilities stop: + if (cap) scheduleCheckBlackHoles(cap); - // so this happens periodically: + if (gc_type == PENDING_GC_SEQ) + { + // single-threaded GC: grab all the capabilities + for (i=0; i < n_capabilities; i++) { + debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); + if (cap != &capabilities[i]) { + Capability *pcap = &capabilities[i]; + // we better hope this task doesn't get migrated to + // another Capability while we're waiting for this one. + // It won't, because load balancing happens while we have + // all the Capabilities, but even so it's a slightly + // unsavoury invariant. + task->cap = pcap; + waitForReturnCapability(&pcap, task); + if (pcap != &capabilities[i]) { + barf("scheduleDoGC: got the wrong capability"); + } + } + } + } + else + { + // multi-threaded GC: make sure all the Capabilities donate one + // GC thread each. + waitForGcThreads(cap); + } + +#else /* !THREADED_RTS */ + + // do this while the other Capabilities stop: if (cap) scheduleCheckBlackHoles(cap); - + +#endif + IF_DEBUG(scheduler, printAllThreads()); +delete_threads_and_gc: /* * We now have all the capabilities; if we're in an interrupting * state, then we should take the opportunity to delete all the * threads in the system. */ - if (sched_state >= SCHED_INTERRUPTING) { - deleteAllThreads(&capabilities[0]); + if (sched_state == SCHED_INTERRUPTING) { + deleteAllThreads(cap); sched_state = SCHED_SHUTTING_DOWN; } heap_census = scheduleNeedHeapProfile(rtsTrue); - /* everybody back, start the GC. - * Could do it in this thread, or signal a condition var - * to do it in another thread. Either way, we need to - * broadcast on gc_pending_cond afterward. - */ #if defined(THREADED_RTS) - debugTrace(DEBUG_sched, "doing GC"); + traceSchedEvent(cap, EVENT_GC_START, 0, 0); + // reset waiting_for_gc *before* GC, so that when the GC threads + // emerge they don't immediately re-enter the GC. + waiting_for_gc = 0; + GarbageCollect(force_major || heap_census, gc_type, cap); +#else + GarbageCollect(force_major || heap_census, 0, cap); #endif - GarbageCollect(force_major || heap_census); - + traceSchedEvent(cap, EVENT_GC_END, 0, 0); + + if (recent_activity == ACTIVITY_INACTIVE && force_major) + { + // We are doing a GC because the system has been idle for a + // timeslice and we need to check for deadlock. Record the + // fact that we've done a GC and turn off the timer signal; + // it will get re-enabled if we run any threads after the GC. + recent_activity = ACTIVITY_DONE_GC; + stopTimer(); + } + else + { + // the GC might have taken long enough for the timer to set + // recent_activity = ACTIVITY_INACTIVE, but we aren't + // necessarily deadlocked: + recent_activity = ACTIVITY_YES; + } + +#if defined(THREADED_RTS) + if (gc_type == PENDING_GC_PAR) + { + releaseGCThreads(cap); + } +#endif + if (heap_census) { debugTrace(DEBUG_sched, "performing heap census"); heapCensus(); performHeapProfile = rtsFalse; } + if (heap_overflow && sched_state < SCHED_INTERRUPTING) { + // GC set the heap_overflow flag, so we should proceed with + // an orderly shutdown now. Ultimately we want the main + // thread to return to its caller with HeapExhausted, at which + // point the caller should call hs_exit(). The first step is + // to delete all the threads. + // + // Another way to do this would be to raise an exception in + // the main thread, which we really should do because it gives + // the program a chance to clean up. But how do we find the + // main thread? It should presumably be the same one that + // gets ^C exceptions, but that's all done on the Haskell side + // (GHC.TopHandler). + sched_state = SCHED_INTERRUPTING; + goto delete_threads_and_gc; + } + #ifdef SPARKBALANCE /* JB Once we are all together... this would be the place to balance all @@ -1567,12 +1544,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) #endif #if defined(THREADED_RTS) - // release our stash of capabilities. - for (i = 0; i < n_capabilities; i++) { - if (cap != &capabilities[i]) { - task->cap = &capabilities[i]; - releaseCapability(&capabilities[i]); - } + if (gc_type == PENDING_GC_SEQ) { + // release our stash of capabilities. + for (i = 0; i < n_capabilities; i++) { + if (cap != &capabilities[i]) { + task->cap = &capabilities[i]; + releaseCapability(&capabilities[i]); + } + } } if (cap) { task->cap = cap; @@ -1704,6 +1683,10 @@ forkProcess(HsStablePtr *entry initTimer(); startTimer(); +#if defined(THREADED_RTS) + cap = ioManagerStartCap(cap); +#endif + cap = rts_evalStableIO(cap, entry, NULL); // run the action rts_checkSchedStatus("forkProcess",cap); @@ -1713,7 +1696,6 @@ forkProcess(HsStablePtr *entry } #else /* !FORKPROCESS_PRIMOP_SUPPORTED */ barf("forkProcess#: primop not supported on this platform, sorry!\n"); - return -1; #endif } @@ -1823,9 +1805,7 @@ suspendThread (StgRegTable *reg) task = cap->running_task; tso = cap->r.rCurrentTSO; - debugTrace(DEBUG_sched, - "thread %lu did a safe foreign call", - (unsigned long)cap->r.rCurrentTSO->id); + traceSchedEvent(cap, EVENT_STOP_THREAD, tso, THREAD_SUSPENDED_FOREIGN_CALL); // XXX this might not be necessary --SDM tso->what_next = ThreadRunGHC; @@ -1851,13 +1831,6 @@ suspendThread (StgRegTable *reg) RELEASE_LOCK(&cap->lock); -#if defined(THREADED_RTS) - /* Preparing to leave the RTS, so ensure there's a native thread/task - waiting to take over. - */ - debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id); -#endif - errno = saved_errno; #if mingw32_HOST_OS SetLastError(saved_winerror); @@ -1894,10 +1867,14 @@ resumeThread (void *task_) tso = task->suspended_tso; task->suspended_tso = NULL; tso->_link = END_TSO_QUEUE; // no write barrier reqd - debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id); + + traceSchedEvent(cap, EVENT_RUN_THREAD, tso, tso->what_next); if (tso->why_blocked == BlockedOnCCall) { - awakenBlockedExceptionQueue(cap,tso); + // avoid locking the TSO if we don't have to + if (tso->blocked_exceptions != END_TSO_QUEUE) { + awakenBlockedExceptionQueue(cap,tso); + } tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE); } @@ -1947,6 +1924,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { + traceSchedEvent (cap, EVENT_MIGRATE_THREAD, tso, capabilities[cpu].no); wakeupThreadOnCapability(cap, &capabilities[cpu], tso); } #else @@ -1999,15 +1977,32 @@ workerStart(Task *task) cap = task->cap; RELEASE_LOCK(&task->lock); + if (RtsFlags.ParFlags.setAffinity) { + setThreadAffinity(cap->no, n_capabilities); + } + // set the thread-local pointer to the Task: taskEnter(task); // schedule() runs without a lock. cap = schedule(cap,task); - // On exit from schedule(), we have a Capability. - releaseCapability(cap); + // On exit from schedule(), we have a Capability, but possibly not + // the same one we started with. + + // During shutdown, the requirement is that after all the + // Capabilities are shut down, all workers that are shutting down + // have finished workerTaskStop(). This is why we hold on to + // cap->lock until we've finished workerTaskStop() below. + // + // There may be workers still involved in foreign calls; those + // will just block in waitForReturnCapability() because the + // Capability has been shut down. + // + ACQUIRE_LOCK(&cap->lock); + releaseCapability_(cap,rtsFalse); workerTaskStop(task); + RELEASE_LOCK(&cap->lock); } #endif @@ -2050,7 +2045,7 @@ initScheduler(void) initTaskManager(); -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) +#if defined(THREADED_RTS) initSparkPools(); #endif @@ -2073,8 +2068,6 @@ initScheduler(void) } #endif - trace(TRACE_sched, "start: %d capabilities", n_capabilities); - RELEASE_LOCK(&sched_mutex); } @@ -2089,16 +2082,14 @@ exitScheduler( { Task *task = NULL; -#if defined(THREADED_RTS) - ACQUIRE_LOCK(&sched_mutex); task = newBoundTask(); - RELEASE_LOCK(&sched_mutex); -#endif // If we haven't killed all the threads yet, do it now. if (sched_state < SCHED_SHUTTING_DOWN) { sched_state = SCHED_INTERRUPTING; - scheduleDoGC(NULL,task,rtsFalse); + waitForReturnCapability(&task->cap,task); + scheduleDoGC(task->cap,task,rtsFalse); + releaseCapability(task->cap); } sched_state = SCHED_SHUTTING_DOWN; @@ -2109,21 +2100,32 @@ exitScheduler( for (i = 0; i < n_capabilities; i++) { shutdownCapability(&capabilities[i], task, wait_foreign); } - boundTaskExiting(task); - stopTaskManager(); } -#else - freeCapability(&MainCapability); #endif + + boundTaskExiting(task); } void freeScheduler( void ) { - freeTaskManager(); - if (n_capabilities != 1) { - stgFree(capabilities); + nat still_running; + + ACQUIRE_LOCK(&sched_mutex); + still_running = freeTaskManager(); + // We can only free the Capabilities if there are no Tasks still + // running. We might have a Task about to return from a foreign + // call into waitForReturnCapability(), for example (actually, + // this should be the *only* thing that a still-running Task can + // do at this point, and it will block waiting for the + // Capability). + if (still_running == 0) { + freeCapabilities(); + if (n_capabilities != 1) { + stgFree(capabilities); + } } + RELEASE_LOCK(&sched_mutex); #if defined(THREADED_RTS) closeMutex(&sched_mutex); #endif @@ -2141,13 +2143,15 @@ static void performGC_(rtsBool force_major) { Task *task; + // We must grab a new Task here, because the existing Task may be // associated with a particular Capability, and chained onto the // suspended_ccalling_tasks queue. - ACQUIRE_LOCK(&sched_mutex); task = newBoundTask(); - RELEASE_LOCK(&sched_mutex); - scheduleDoGC(NULL,task,force_major); + + waitForReturnCapability(&task->cap,task); + scheduleDoGC(task->cap,task,force_major); + releaseCapability(task->cap); boundTaskExiting(task); } @@ -2186,12 +2190,27 @@ threadStackOverflow(Capability *cap, StgTSO *tso) // while we are moving the TSO: lockClosure((StgClosure *)tso); - if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) { + if (tso->stack_size >= tso->max_stack_size + && !(tso->flags & TSO_BLOCKEX)) { // NB. never raise a StackOverflow exception if the thread is // inside Control.Exceptino.block. It is impractical to protect // against stack overflow exceptions, since virtually anything // can raise one (even 'catch'), so this is the only sensible // thing to do here. See bug #767. + // + + if (tso->flags & TSO_SQUEEZED) { + return tso; + } + // #3677: In a stack overflow situation, stack squeezing may + // reduce the stack size, but we don't know whether it has been + // reduced enough for the stack check to succeed if we try + // again. Fortunately stack squeezing is idempotent, so all we + // need to do is record whether *any* squeezing happened. If we + // are at the stack's absolute -K limit, and stack squeezing + // happened, then we try running the thread again. The + // TSO_SQUEEZED flag is set by threadPaused() to tell us whether + // squeezing happened or not. debugTrace(DEBUG_gc, "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)", @@ -2207,6 +2226,21 @@ threadStackOverflow(Capability *cap, StgTSO *tso) return tso; } + + // We also want to avoid enlarging the stack if squeezing has + // already released some of it. However, we don't want to get into + // a pathalogical situation where a thread has a nearly full stack + // (near its current limit, but not near the absolute -K limit), + // keeps allocating a little bit, squeezing removes a little bit, + // and then it runs again. So to avoid this, if we squeezed *and* + // there is still less than BLOCK_SIZE_W words free, then we enlarge + // the stack anyway. + if ((tso->flags & TSO_SQUEEZED) && + ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) { + unlockTSO(tso); + return tso; + } + /* Try to double the current stack size. If that takes us over the * maximum stack size for this thread, then use the maximum instead * (that is, unless we're already at or over the max size and we @@ -2228,7 +2262,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso) "increasing stack size from %ld words to %d.", (long)tso->stack_size, new_stack_size); - dest = (StgTSO *)allocateLocal(cap,new_tso_size); + dest = (StgTSO *)allocate(cap,new_tso_size); TICK_ALLOC_TSO(new_stack_size,0); /* copy the TSO block and the old stack into the new area */ @@ -2253,13 +2287,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; - IF_PAR_DEBUG(verbose, - debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n", - tso->id, tso, tso->stack_size); - /* If we're debugging, just print out the top of the stack */ - printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, - tso->sp+64))); - unlockTSO(dest); unlockTSO(tso); @@ -2280,9 +2307,18 @@ threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso) tso_size_w = tso_sizeW(tso); - if (tso_size_w < MBLOCK_SIZE_W || + if (tso_size_w < MBLOCK_SIZE_W || + // TSO is less than 2 mblocks (since the first mblock is + // shorter than MBLOCK_SIZE_W) + (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 || + // or TSO is not a whole number of megablocks (ensuring + // precondition of splitLargeBlock() below) + (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) || + // or TSO is smaller than the minimum stack size (rounded up) (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) + // or stack is using more than 1/4 of the available space { + // then do nothing return tso; } @@ -2331,7 +2367,9 @@ interruptStgRts(void) { sched_state = SCHED_INTERRUPTING; setContextSwitches(); +#if defined(THREADED_RTS) wakeUpRts(); +#endif } /* ----------------------------------------------------------------------------- @@ -2347,16 +2385,15 @@ interruptStgRts(void) will have interrupted any blocking system call in progress anyway. -------------------------------------------------------------------------- */ -void -wakeUpRts(void) -{ #if defined(THREADED_RTS) +void wakeUpRts(void) +{ // This forces the IO Manager thread to wakeup, which will // in turn ensure that some OS thread wakes up and runs the // scheduler loop, which will cause a GC and deadlock check. ioManagerWakeup(); -#endif } +#endif /* ----------------------------------------------------------------------------- * checkBlackHoles() @@ -2389,6 +2426,10 @@ checkBlackHoles (Capability *cap) prev = &blackhole_queue; t = blackhole_queue; while (t != END_TSO_QUEUE) { + if (t->what_next == ThreadRelocated) { + t = t->_link; + continue; + } ASSERT(t->why_blocked == BlockedOnBlackHole); type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type; if (type != BLACKHOLE && type != CAF_BLACKHOLE) { @@ -2492,7 +2533,7 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) // Only create raise_closure if we need to. if (raise_closure == NULL) { raise_closure = - (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1); + (StgThunk *)allocate(cap,sizeofW(StgThunk)+1); SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } @@ -2570,7 +2611,7 @@ findRetryFrameHelper (StgTSO *tso) case CATCH_STM_FRAME: { StgTRecHeader *trec = tso -> trec; - StgTRecHeader *outer = stmGetEnclosingTRec(trec); + StgTRecHeader *outer = trec -> enclosing_trec; debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p during retry", p); debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer); @@ -2622,10 +2663,9 @@ resurrectThreads (StgTSO *threads) switch (tso->why_blocked) { case BlockedOnMVar: - case BlockedOnException: /* Called by GC - sched_mutex lock is currently held. */ throwToSingleThreaded(cap, tso, - (StgClosure *)blockedOnDeadMVar_closure); + (StgClosure *)blockedIndefinitelyOnMVar_closure); break; case BlockedOnBlackHole: throwToSingleThreaded(cap, tso, @@ -2633,7 +2673,7 @@ resurrectThreads (StgTSO *threads) break; case BlockedOnSTM: throwToSingleThreaded(cap, tso, - (StgClosure *)blockedIndefinitely_closure); + (StgClosure *)blockedIndefinitelyOnSTM_closure); break; case NotBlocked: /* This might happen if the thread was blocked on a black hole @@ -2641,6 +2681,11 @@ resurrectThreads (StgTSO *threads) * can wake up threads, remember...). */ continue; + case BlockedOnException: + // throwTo should never block indefinitely: if the target + // thread dies or completes, throwTo returns. + barf("resurrectThreads: thread BlockedOnException"); + break; default: barf("resurrectThreads: thread blocked in a strange way"); } @@ -2665,8 +2710,12 @@ performPendingThrowTos (StgTSO *threads) { StgTSO *tso, *next; Capability *cap; + Task *task, *saved_task;; step *step; + task = myTask(); + cap = task->cap; + for (tso = threads; tso != END_TSO_QUEUE; tso = next) { next = tso->global_link; @@ -2676,7 +2725,17 @@ performPendingThrowTos (StgTSO *threads) debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); - cap = tso->cap; - maybePerformBlockedException(cap, tso); + // We must pretend this Capability belongs to the current Task + // for the time being, as invariants will be broken otherwise. + // In fact the current Task has exclusive access to the systme + // at this point, so this is just bookkeeping: + task->cap = tso->cap; + saved_task = tso->cap->running_task; + tso->cap->running_task = task; + maybePerformBlockedException(tso->cap, tso); + tso->cap->running_task = saved_task; } + + // Restore our original Capability: + task->cap = cap; }