X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=31a487515a369458a6e8fedee6f57234600863c3;hb=8305bb1641490429912a8ac5c3b1265a21937689;hp=d25a32186f9fdb21c2391647046438144694b10e;hpb=cb90532723bd5801b8ac29973dd14e8e30b1e0a9;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index d25a321..31a4875 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -31,6 +31,7 @@ #include "Updates.h" #include "Proftimer.h" #include "ProfHeap.h" +#include "GC.h" /* PARALLEL_HASKELL includes go here */ @@ -466,7 +467,7 @@ schedule (Capability *initialCapability, Task *task) // in a foreign call returns. if (sched_state >= SCHED_INTERRUPTING && !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) { - deleteThread_(cap,t); + deleteThread(cap,t); } /* context switches are initiated by the timer signal, unless @@ -1001,12 +1002,11 @@ scheduleDetectDeadlock (Capability *cap, Task *task) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/); + cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/); + // when force_major == rtsTrue. scheduleDoGC sets + // recent_activity to ACTIVITY_DONE_GC and turns off the timer + // signal. - recent_activity = ACTIVITY_DONE_GC; - // disable timer signals (see #1623) - stopTimer(); - if ( !emptyRunQueue(cap) ) return; #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS) @@ -1164,7 +1164,7 @@ schedulePostRunThread (Capability *cap, StgTSO *t) // ATOMICALLY_FRAME, aborting the (nested) // transaction, and saving the stack of any // partially-evaluated thunks on the heap. - throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL); + throwToSingleThreaded_(cap, t, NULL, rtsTrue); ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); } @@ -1479,7 +1479,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) #ifdef THREADED_RTS /* extern static volatile StgWord waiting_for_gc; lives inside capability.c */ - rtsBool was_waiting; + rtsBool gc_type, prev_pending_gc; nat i; #endif @@ -1491,6 +1491,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) } #ifdef THREADED_RTS + if (sched_state < SCHED_INTERRUPTING + && RtsFlags.ParFlags.parGcEnabled + && N >= RtsFlags.ParFlags.parGcGen + && ! oldest_gen->steps[0].mark) + { + gc_type = PENDING_GC_PAR; + } else { + gc_type = PENDING_GC_SEQ; + } + // In order to GC, there must be no threads running Haskell code. // Therefore, the GC thread needs to hold *all* the capabilities, // and release them after the GC has completed. @@ -1501,39 +1511,55 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) // actually did the GC. But it's quite hard to arrange for all // the other tasks to sleep and stay asleep. // - + /* Other capabilities are prevented from running yet more Haskell threads if waiting_for_gc is set. Tested inside yieldCapability() and releaseCapability() in Capability.c */ - was_waiting = cas(&waiting_for_gc, 0, 1); - if (was_waiting) { + prev_pending_gc = cas(&waiting_for_gc, 0, gc_type); + if (prev_pending_gc) { do { - debugTrace(DEBUG_sched, "someone else is trying to GC..."); - if (cap) yieldCapability(&cap,task); + debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", + prev_pending_gc); + ASSERT(cap); + yieldCapability(&cap,task); } while (waiting_for_gc); return cap; // NOTE: task->cap might have changed here } setContextSwitches(); - for (i=0; i < n_capabilities; i++) { - debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); - if (cap != &capabilities[i]) { - Capability *pcap = &capabilities[i]; - // we better hope this task doesn't get migrated to - // another Capability while we're waiting for this one. - // It won't, because load balancing happens while we have - // all the Capabilities, but even so it's a slightly - // unsavoury invariant. - task->cap = pcap; - waitForReturnCapability(&pcap, task); - if (pcap != &capabilities[i]) { - barf("scheduleDoGC: got the wrong capability"); - } - } + + // The final shutdown GC is always single-threaded, because it's + // possible that some of the Capabilities have no worker threads. + + if (gc_type == PENDING_GC_SEQ) + { + // single-threaded GC: grab all the capabilities + for (i=0; i < n_capabilities; i++) { + debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); + if (cap != &capabilities[i]) { + Capability *pcap = &capabilities[i]; + // we better hope this task doesn't get migrated to + // another Capability while we're waiting for this one. + // It won't, because load balancing happens while we have + // all the Capabilities, but even so it's a slightly + // unsavoury invariant. + task->cap = pcap; + waitForReturnCapability(&pcap, task); + if (pcap != &capabilities[i]) { + barf("scheduleDoGC: got the wrong capability"); + } + } + } } + else + { + // multi-threaded GC: make sure all the Capabilities donate one + // GC thread each. + debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); - waiting_for_gc = rtsFalse; + waitForGcThreads(cap); + } #endif // so this happens periodically: @@ -1546,23 +1572,23 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) * state, then we should take the opportunity to delete all the * threads in the system. */ - if (sched_state >= SCHED_INTERRUPTING) { - deleteAllThreads(&capabilities[0]); + if (sched_state == SCHED_INTERRUPTING) { + deleteAllThreads(cap); sched_state = SCHED_SHUTTING_DOWN; } heap_census = scheduleNeedHeapProfile(rtsTrue); - /* everybody back, start the GC. - * Could do it in this thread, or signal a condition var - * to do it in another thread. Either way, we need to - * broadcast on gc_pending_cond afterward. - */ #if defined(THREADED_RTS) debugTrace(DEBUG_sched, "doing GC"); + // reset waiting_for_gc *before* GC, so that when the GC threads + // emerge they don't immediately re-enter the GC. + waiting_for_gc = 0; + GarbageCollect(force_major || heap_census, gc_type, cap); +#else + GarbageCollect(force_major || heap_census, 0, cap); #endif - GarbageCollect(force_major || heap_census); - + if (heap_census) { debugTrace(DEBUG_sched, "performing heap census"); heapCensus(); @@ -1577,13 +1603,25 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) balanceSparkPoolsCaps(n_capabilities, capabilities); #endif + if (force_major) + { + // We've just done a major GC and we don't need the timer + // signal turned on any more (#1623). + // NB. do this *before* releasing the Capabilities, to avoid + // deadlocks! + recent_activity = ACTIVITY_DONE_GC; + stopTimer(); + } + #if defined(THREADED_RTS) - // release our stash of capabilities. - for (i = 0; i < n_capabilities; i++) { - if (cap != &capabilities[i]) { - task->cap = &capabilities[i]; - releaseCapability(&capabilities[i]); - } + if (gc_type == PENDING_GC_SEQ) { + // release our stash of capabilities. + for (i = 0; i < n_capabilities; i++) { + if (cap != &capabilities[i]) { + task->cap = &capabilities[i]; + releaseCapability(&capabilities[i]); + } + } } if (cap) { task->cap = cap; @@ -2016,9 +2054,22 @@ workerStart(Task *task) // schedule() runs without a lock. cap = schedule(cap,task); - // On exit from schedule(), we have a Capability. - releaseCapability(cap); + // On exit from schedule(), we have a Capability, but possibly not + // the same one we started with. + + // During shutdown, the requirement is that after all the + // Capabilities are shut down, all workers that are shutting down + // have finished workerTaskStop(). This is why we hold on to + // cap->lock until we've finished workerTaskStop() below. + // + // There may be workers still involved in foreign calls; those + // will just block in waitForReturnCapability() because the + // Capability has been shut down. + // + ACQUIRE_LOCK(&cap->lock); + releaseCapability_(cap,rtsFalse); workerTaskStop(task); + RELEASE_LOCK(&cap->lock); } #endif @@ -2109,7 +2160,13 @@ exitScheduler( // If we haven't killed all the threads yet, do it now. if (sched_state < SCHED_SHUTTING_DOWN) { sched_state = SCHED_INTERRUPTING; - scheduleDoGC(NULL,task,rtsFalse); +#if defined(THREADED_RTS) + waitForReturnCapability(&task->cap,task); + scheduleDoGC(task->cap,task,rtsFalse); + releaseCapability(task->cap); +#else + scheduleDoGC(&MainCapability,task,rtsFalse); +#endif } sched_state = SCHED_SHUTTING_DOWN; @@ -2121,7 +2178,6 @@ exitScheduler( shutdownCapability(&capabilities[i], task, wait_foreign); } boundTaskExiting(task); - stopTaskManager(); } #endif } @@ -2129,11 +2185,23 @@ exitScheduler( void freeScheduler( void ) { - freeCapabilities(); - freeTaskManager(); - if (n_capabilities != 1) { - stgFree(capabilities); + nat still_running; + + ACQUIRE_LOCK(&sched_mutex); + still_running = freeTaskManager(); + // We can only free the Capabilities if there are no Tasks still + // running. We might have a Task about to return from a foreign + // call into waitForReturnCapability(), for example (actually, + // this should be the *only* thing that a still-running Task can + // do at this point, and it will block waiting for the + // Capability). + if (still_running == 0) { + freeCapabilities(); + if (n_capabilities != 1) { + stgFree(capabilities); + } } + RELEASE_LOCK(&sched_mutex); #if defined(THREADED_RTS) closeMutex(&sched_mutex); #endif @@ -2151,13 +2219,17 @@ static void performGC_(rtsBool force_major) { Task *task; + // We must grab a new Task here, because the existing Task may be // associated with a particular Capability, and chained onto the // suspended_ccalling_tasks queue. ACQUIRE_LOCK(&sched_mutex); task = newBoundTask(); RELEASE_LOCK(&sched_mutex); - scheduleDoGC(NULL,task,force_major); + + waitForReturnCapability(&task->cap,task); + scheduleDoGC(task->cap,task,force_major); + releaseCapability(task->cap); boundTaskExiting(task); }