X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FCapability.c;h=a81d71073a20877d6cc1259cedcc4232706cfb10;hb=9fe7b8ea2136a4a07752b2851840c9366706f832;hp=948922a3b2859ca1832c8372f64f66df5d88fd02;hpb=99df892cc9620fcc92747b79bba75dad8a1d295c;p=ghc-hetmet.git diff --git a/rts/Capability.c b/rts/Capability.c index 948922a..a81d710 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -26,6 +26,7 @@ #include "Schedule.h" #include "Sparks.h" #include "Trace.h" +#include "GC.h" // one global capability, this is the Capability for non-threaded // builds, and for +RTS -N1 @@ -54,25 +55,39 @@ globalWorkToDo (void) #endif #if defined(THREADED_RTS) -rtsBool -stealWork (Capability *cap) +StgClosure * +findSpark (Capability *cap) { - /* use the normal Sparks.h interface (internally modified to enable - concurrent stealing) - and immediately turn the spark into a thread when successful - */ Capability *robbed; StgClosurePtr spark; - rtsBool success = rtsFalse; rtsBool retry; nat i = 0; + if (!emptyRunQueue(cap)) { + // If there are other threads, don't try to run any new + // sparks: sparks might be speculative, we don't want to take + // resources away from the main computation. + return 0; + } + + // first try to get a spark from our own pool. + // We should be using reclaimSpark(), because it works without + // needing any atomic instructions: + // spark = reclaimSpark(cap->sparks); + // However, measurements show that this makes at least one benchmark + // slower (prsa) and doesn't affect the others. + spark = tryStealSpark(cap); + if (spark != NULL) { + cap->sparks_converted++; + return spark; + } + + if (n_capabilities == 1) { return NULL; } // makes no sense... + debugTrace(DEBUG_sched, "cap %d: Trying to steal work from other capabilities", cap->no); - if (n_capabilities == 1) { return rtsFalse; } // makes no sense... - do { retry = rtsFalse; @@ -86,7 +101,7 @@ stealWork (Capability *cap) if (emptySparkPoolCap(robbed)) // nothing to steal here continue; - spark = tryStealSpark(robbed->sparks); + spark = tryStealSpark(robbed); if (spark == NULL && !emptySparkPoolCap(robbed)) { // we conflicted with another thread while trying to steal; // try again later. @@ -97,16 +112,32 @@ stealWork (Capability *cap) debugTrace(DEBUG_sched, "cap %d: Stole a spark from capability %d", cap->no, robbed->no); - - createSparkThread(cap,spark); - return rtsTrue; + cap->sparks_converted++; + return spark; } // otherwise: no success, try next one } } while (retry); debugTrace(DEBUG_sched, "No sparks stolen"); - return rtsFalse; + return NULL; +} + +// Returns True if any spark pool is non-empty at this moment in time +// The result is only valid for an instant, of course, so in a sense +// is immediately invalid, and should not be relied upon for +// correctness. +rtsBool +anySparks (void) +{ + nat i; + + for (i=0; i < n_capabilities; i++) { + if (!emptySparkPoolCap(&capabilities[i])) { + return rtsTrue; + } + } + return rtsFalse; } #endif @@ -160,6 +191,7 @@ initCapability( Capability *cap, nat i ) cap->no = i; cap->in_haskell = rtsFalse; + cap->in_gc = rtsFalse; cap->run_queue_hd = END_TSO_QUEUE; cap->run_queue_tl = END_TSO_QUEUE; @@ -178,12 +210,16 @@ initCapability( Capability *cap, nat i ) cap->sparks_pruned = 0; #endif + cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info; cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; cap->f.stgGCFun = (F_)__stg_gc_fun; cap->mut_lists = stgMallocBytes(sizeof(bdescr *) * RtsFlags.GcFlags.generations, "initCapability"); + cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) * + RtsFlags.GcFlags.generations, + "initCapability"); for (g = 0; g < RtsFlags.GcFlags.generations; g++) { cap->mut_lists[g] = NULL; @@ -327,14 +363,7 @@ releaseCapability_ (Capability* cap, return; } - /* if waiting_for_gc was the reason to release the cap: thread - comes from yieldCap->releaseAndQueueWorker. Unconditionally set - cap. free and return (see default after the if-protected other - special cases). Thread will wait on cond.var and re-acquire the - same cap after GC (GC-triggering cap. calls releaseCap and - enters the spare_workers case) - */ - if (waiting_for_gc) { + if (waiting_for_gc == PENDING_GC_SEQ) { last_free_capability = cap; // needed? trace(TRACE_sched | DEBUG_sched, "GC pending, set capability %d free", cap->no); @@ -526,6 +555,12 @@ yieldCapability (Capability** pCap, Task *task) { Capability *cap = *pCap; + if (waiting_for_gc == PENDING_GC_PAR) { + debugTrace(DEBUG_sched, "capability %d: becoming a GC thread", cap->no); + gcWorkerThread(cap); + return; + } + debugTrace(DEBUG_sched, "giving up capability %d", cap->no); // We must now release the capability and wait to be woken up @@ -624,58 +659,21 @@ wakeupThreadOnCapability (Capability *my_cap, } /* ---------------------------------------------------------------------------- - * prodCapabilities + * prodCapability * - * Used to indicate that the interrupted flag is now set, or some - * other global condition that might require waking up a Task on each - * Capability. + * If a Capability is currently idle, wake up a Task on it. Used to + * get every Capability into the GC. * ------------------------------------------------------------------------- */ -static void -prodCapabilities(rtsBool all) -{ - nat i; - Capability *cap; - Task *task; - - for (i=0; i < n_capabilities; i++) { - cap = &capabilities[i]; - ACQUIRE_LOCK(&cap->lock); - if (!cap->running_task) { - if (cap->spare_workers) { - trace(TRACE_sched, "resuming capability %d", cap->no); - task = cap->spare_workers; - ASSERT(!task->stopped); - giveCapabilityToTask(cap,task); - if (!all) { - RELEASE_LOCK(&cap->lock); - return; - } - } - } - RELEASE_LOCK(&cap->lock); - } - return; -} - void -prodAllCapabilities (void) +prodCapability (Capability *cap, Task *task) { - prodCapabilities(rtsTrue); -} - -/* ---------------------------------------------------------------------------- - * prodOneCapability - * - * Like prodAllCapabilities, but we only require a single Task to wake - * up in order to service some global event, such as checking for - * deadlock after some idle time has passed. - * ------------------------------------------------------------------------- */ - -void -prodOneCapability (void) -{ - prodCapabilities(rtsFalse); + ACQUIRE_LOCK(&cap->lock); + if (!cap->running_task) { + cap->running_task = task; + releaseCapability_(cap,rtsTrue); + } + RELEASE_LOCK(&cap->lock); } /* ---------------------------------------------------------------------------- @@ -698,8 +696,6 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe) { nat i; - ASSERT(sched_state == SCHED_SHUTTING_DOWN); - task->cap = cap; // Loop indefinitely until all the workers have exited and there @@ -709,6 +705,8 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe) // isn't safe, for one thing). for (i = 0; /* i < 50 */; i++) { + ASSERT(sched_state == SCHED_SHUTTING_DOWN); + debugTrace(DEBUG_sched, "shutting down capability %d, attempt %d", cap->no, i); ACQUIRE_LOCK(&cap->lock); @@ -768,7 +766,6 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe) } debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no); - freeCapability(cap); RELEASE_LOCK(&cap->lock); break; } @@ -806,14 +803,28 @@ tryGrabCapability (Capability *cap, Task *task) #endif /* THREADED_RTS */ -void -freeCapability (Capability *cap) { +static void +freeCapability (Capability *cap) +{ stgFree(cap->mut_lists); #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) freeSparkPool(cap->sparks); #endif } +void +freeCapabilities (void) +{ +#if defined(THREADED_RTS) + nat i; + for (i=0; i < n_capabilities; i++) { + freeCapability(&capabilities[i]); + } +#else + freeCapability(&MainCapability); +#endif +} + /* --------------------------------------------------------------------------- Mark everything directly reachable from the Capabilities. When using multiple GC threads, each GC thread marks all Capabilities @@ -821,7 +832,8 @@ freeCapability (Capability *cap) { ------------------------------------------------------------------------ */ void -markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta) +markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, + rtsBool prune_sparks USED_IF_THREADS) { nat i; Capability *cap; @@ -848,7 +860,11 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta) } #if defined(THREADED_RTS) - traverseSparkQueue (evac, user, cap); + if (prune_sparks) { + pruneSparkQueue (evac, user, cap); + } else { + traverseSparkQueue (evac, user, cap); + } #endif } @@ -859,22 +875,8 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta) #endif } -// This function is used by the compacting GC to thread all the -// pointers from spark queues. -void -traverseSparkQueues (evac_fn evac USED_IF_THREADS, void *user USED_IF_THREADS) -{ -#if defined(THREADED_RTS) - nat i; - for (i = 0; i < n_capabilities; i++) { - traverseSparkQueue (evac, user, &capabilities[i]); - } -#endif // THREADED_RTS - -} - void markCapabilities (evac_fn evac, void *user) { - markSomeCapabilities(evac, user, 0, 1); + markSomeCapabilities(evac, user, 0, 1, rtsFalse); }