X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSparks.c;h=2167de0dcf7c07de20649243209a6ecee4d8ef28;hb=de75026f5a48d3d052135a973ab4dff76c5b20f5;hp=1a839ab315a2aee1d53b7c44322417e9a0de282c;hpb=dc6008a61acedd3d0785111cf8955c479cb226a4;p=ghc-hetmet.git diff --git a/rts/Sparks.c b/rts/Sparks.c index 1a839ab..2167de0 100644 --- a/rts/Sparks.c +++ b/rts/Sparks.c @@ -1,10 +1,10 @@ /* --------------------------------------------------------------------------- * - * (c) The GHC Team, 2000-2006 + * (c) The GHC Team, 2000-2008 * * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS. * - * -------------------------------------------------------------------------*/ + -------------------------------------------------------------------------*/ #include "PosixSource.h" #include "Rts.h" @@ -14,39 +14,14 @@ #include "RtsFlags.h" #include "RtsUtils.h" #include "ParTicky.h" -# if defined(PARALLEL_HASKELL) -# include "ParallelRts.h" -# include "GranSimRts.h" // for GR_... -# elif defined(GRAN) -# include "GranSimRts.h" -# endif -#include "Sparks.h" #include "Trace.h" +#include "Prelude.h" -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) - -static INLINE_ME void bump_hd (StgSparkPool *p) -{ p->hd++; if (p->hd == p->lim) p->hd = p->base; } - -static INLINE_ME void bump_tl (StgSparkPool *p) -{ p->tl++; if (p->tl == p->lim) p->tl = p->base; } +#include "SMP.h" // for cas -/* ----------------------------------------------------------------------------- - * - * Initialising spark pools. - * - * -------------------------------------------------------------------------- */ +#include "Sparks.h" -static void -initSparkPool(StgSparkPool *pool) -{ - pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks - * sizeof(StgClosure *), - "initSparkPools"); - pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks; - pool->hd = pool->base; - pool->tl = pool->base; -} +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) void initSparkPools( void ) @@ -55,50 +30,18 @@ initSparkPools( void ) /* walk over the capabilities, allocating a spark pool for each one */ nat i; for (i = 0; i < n_capabilities; i++) { - initSparkPool(&capabilities[i].r.rSparks); + capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks); } #else /* allocate a single spark pool */ - initSparkPool(&MainCapability.r.rSparks); + MainCapability->sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks); #endif } void -freeSparkPool(StgSparkPool *pool) { - stgFree(pool->base); -} - -/* ----------------------------------------------------------------------------- - * - * findSpark: find a spark on the current Capability that we can fork - * into a thread. - * - * -------------------------------------------------------------------------- */ - -StgClosure * -findSpark (Capability *cap) +freeSparkPool (SparkPool *pool) { - StgSparkPool *pool; - StgClosure *spark; - - pool = &(cap->r.rSparks); - ASSERT_SPARK_POOL_INVARIANTS(pool); - - while (pool->hd != pool->tl) { - spark = *pool->hd; - bump_hd(pool); - if (closure_SHOULD_SPARK(spark)) { -#ifdef GRAN - if (RtsFlags.ParFlags.ParStats.Sparks) - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_STEALING, ((StgTSO *)NULL), spark, - 0, 0 /* spark_queue_len(ADVISORY_POOL) */); -#endif - return spark; - } - } - // spark pool is now empty - return NULL; + freeWSDeque(pool); } /* ----------------------------------------------------------------------------- @@ -108,26 +51,28 @@ findSpark (Capability *cap) * -------------------------------------------------------------------------- */ void -createSparkThread (Capability *cap, StgClosure *p) +createSparkThread (Capability *cap) { StgTSO *tso; - tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p); + tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, + &base_GHCziConc_runSparks_closure); + + postEvent(cap, EVENT_CREATE_SPARK_THREAD, 0, tso->id); + appendToRunQueue(cap,tso); } -/* ----------------------------------------------------------------------------- - * - * Create a new spark - * +/* -------------------------------------------------------------------------- + * newSpark: create a new spark, as a result of calling "par" + * Called directly from STG. * -------------------------------------------------------------------------- */ -#define DISCARD_NEW - StgInt newSpark (StgRegTable *reg, StgClosure *p) { - StgSparkPool *pool = &(reg->rSparks); + Capability *cap = regTableToCapability(reg); + SparkPool *pool = cap->sparks; /* I am not sure whether this is the right thing to do. * Maybe it is better to exploit the tag information @@ -135,105 +80,232 @@ newSpark (StgRegTable *reg, StgClosure *p) */ p = UNTAG_CLOSURE(p); - ASSERT_SPARK_POOL_INVARIANTS(pool); - if (closure_SHOULD_SPARK(p)) { -#ifdef DISCARD_NEW - StgClosure **new_tl; - new_tl = pool->tl + 1; - if (new_tl == pool->lim) { new_tl = pool->base; } - if (new_tl != pool->hd) { - *pool->tl = p; - pool->tl = new_tl; - } else if (!closure_SHOULD_SPARK(*pool->hd)) { - // if the old closure is not sparkable, discard it and - // keep the new one. Otherwise, keep the old one. - *pool->tl = p; - bump_hd(pool); - } -#else /* DISCARD OLD */ - *pool->tl = p; - bump_tl(pool); - if (pool->tl == pool->hd) { bump_hd(pool); } -#endif + pushWSDeque(pool,p); } - ASSERT_SPARK_POOL_INVARIANTS(pool); + cap->sparks_created++; + + postEvent(cap, EVENT_CREATE_SPARK, cap->r.rCurrentTSO->id, 0); + return 1; } /* ----------------------------------------------------------------------------- - * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an - * implicit slide i.e. after marking all sparks are at the beginning of the - * spark pool and the spark pool only contains sparkable closures + * + * tryStealSpark: try to steal a spark from a Capability. + * + * Returns a valid spark, or NULL if the pool was empty, and can + * occasionally return NULL if there was a race with another thread + * stealing from the same pool. In this case, try again later. + * + -------------------------------------------------------------------------- */ + +StgClosure * +tryStealSpark (Capability *cap) +{ + SparkPool *pool = cap->sparks; + StgClosure *stolen; + + do { + stolen = stealWSDeque_(pool); + // use the no-loopy version, stealWSDeque_(), since if we get a + // spurious NULL here the caller may want to try stealing from + // other pools before trying again. + } while (stolen != NULL && !closure_SHOULD_SPARK(stolen)); + + return stolen; +} + +/* -------------------------------------------------------------------------- + * Remove all sparks from the spark queues which should not spark any + * more. Called after GC. We assume exclusive access to the structure + * and replace all sparks in the queue, see explanation below. At exit, + * the spark pool only contains sparkable closures. * -------------------------------------------------------------------------- */ void -markSparkQueue (evac_fn evac, void *user, Capability *cap) +pruneSparkQueue (evac_fn evac, void *user, Capability *cap) { - StgClosure *spark, **sparkp, **to_sparkp; + SparkPool *pool; + StgClosurePtr spark, tmp, *elements; nat n, pruned_sparks; // stats only - StgSparkPool *pool; + StgWord botInd,oldBotInd,currInd; // indices in array (always < size) + const StgInfoTable *info; PAR_TICKY_MARK_SPARK_QUEUE_START(); n = 0; pruned_sparks = 0; - pool = &(cap->r.rSparks); + pool = cap->sparks; - ASSERT_SPARK_POOL_INVARIANTS(pool); - - sparkp = pool->hd; - to_sparkp = pool->hd; - while (sparkp != pool->tl) { - ASSERT(*sparkp!=NULL); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp))); - // ToDo?: statistics gathering here (also for GUM!) - evac(user,sparkp); - spark = *sparkp; - if (!closure_SHOULD_SPARK(spark)) { - pruned_sparks++; - } else{ - *to_sparkp++ = spark; - if (to_sparkp == pool->lim) { - to_sparkp = pool->base; - } - n++; - } - sparkp++; - if (sparkp == pool->lim) { - sparkp = pool->base; - } - } - pool->tl = to_sparkp; - + // it is possible that top > bottom, indicating an empty pool. We + // fix that here; this is only necessary because the loop below + // assumes it. + if (pool->top > pool->bottom) + pool->top = pool->bottom; + + // Take this opportunity to reset top/bottom modulo the size of + // the array, to avoid overflow. This is only possible because no + // stealing is happening during GC. + pool->bottom -= pool->top & ~pool->moduloSize; + pool->top &= pool->moduloSize; + pool->topBound = pool->top; + + debugTrace(DEBUG_sched, + "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)", + sparkPoolSize(pool), pool->bottom, pool->top); + + ASSERT_WSDEQUE_INVARIANTS(pool); + + elements = (StgClosurePtr *)pool->elements; + + /* We have exclusive access to the structure here, so we can reset + bottom and top counters, and prune invalid sparks. Contents are + copied in-place if they are valuable, otherwise discarded. The + routine uses "real" indices t and b, starts by computing them + as the modulus size of top and bottom, + + Copying: + + At the beginning, the pool structure can look like this: + ( bottom % size >= top % size , no wrap-around) + t b + ___________***********_________________ + + or like this ( bottom % size < top % size, wrap-around ) + b t + ***********__________****************** + As we need to remove useless sparks anyway, we make one pass + between t and b, moving valuable content to b and subsequent + cells (wrapping around when the size is reached). + + b t + ***********OOO_______XX_X__X?********** + ^____move?____/ + + After this movement, botInd becomes the new bottom, and old + bottom becomes the new top index, both as indices in the array + size range. + */ + // starting here + currInd = (pool->top) & (pool->moduloSize); // mod + + // copies of evacuated closures go to space from botInd on + // we keep oldBotInd to know when to stop + oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod + + // on entry to loop, we are within the bounds + ASSERT( currInd < pool->size && botInd < pool->size ); + + while (currInd != oldBotInd ) { + /* must use != here, wrap-around at size + subtle: loop not entered if queue empty + */ + + /* check element at currInd. if valuable, evacuate and move to + botInd, otherwise move on */ + spark = elements[currInd]; + + // We have to be careful here: in the parallel GC, another + // thread might evacuate this closure while we're looking at it, + // so grab the info pointer just once. + info = spark->header.info; + if (IS_FORWARDING_PTR(info)) { + tmp = (StgClosure*)UN_FORWARDING_PTR(info); + /* if valuable work: shift inside the pool */ + if (closure_SHOULD_SPARK(tmp)) { + elements[botInd] = tmp; // keep entry (new address) + botInd++; + n++; + } else { + pruned_sparks++; // discard spark + cap->sparks_pruned++; + } + } else { + if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) { + elements[botInd] = spark; // keep entry (new address) + evac (user, &elements[botInd]); + botInd++; + n++; + } else { + pruned_sparks++; // discard spark + cap->sparks_pruned++; + } + } + currInd++; + + // in the loop, we may reach the bounds, and instantly wrap around + ASSERT( currInd <= pool->size && botInd <= pool->size ); + if ( currInd == pool->size ) { currInd = 0; } + if ( botInd == pool->size ) { botInd = 0; } + + } // while-loop over spark pool elements + + ASSERT(currInd == oldBotInd); + + pool->top = oldBotInd; // where we started writing + pool->topBound = pool->top; + + pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); + // first free place we did not use (corrected by wraparound) + PAR_TICKY_MARK_SPARK_QUEUE_END(n); - - debugTrace(DEBUG_sched, - "marked %d sparks, pruned %d sparks", - n, pruned_sparks); + + debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks); debugTrace(DEBUG_sched, - "new spark queue len=%d; (hd=%p; tl=%p)", - sparkPoolSize(pool), pool->hd, pool->tl); + "new spark queue len=%ld; (hd=%ld; tl=%ld)", + sparkPoolSize(pool), pool->bottom, pool->top); + + ASSERT_WSDEQUE_INVARIANTS(pool); } +/* GC for the spark pool, called inside Capability.c for all + capabilities in turn. Blindly "evac"s complete spark pool. */ void traverseSparkQueue (evac_fn evac, void *user, Capability *cap) { StgClosure **sparkp; - StgSparkPool *pool; + SparkPool *pool; + StgWord top,bottom, modMask; - pool = &(cap->r.rSparks); - sparkp = pool->hd; - while (sparkp != pool->tl) { - evac(sparkp, user); - sparkp++; - if (sparkp == pool->lim) { - sparkp = pool->base; - } + pool = cap->sparks; + + ASSERT_WSDEQUE_INVARIANTS(pool); + + top = pool->top; + bottom = pool->bottom; + sparkp = (StgClosurePtr*)pool->elements; + modMask = pool->moduloSize; + + while (top < bottom) { + /* call evac for all closures in range (wrap-around via modulo) + * In GHC-6.10, evac takes an additional 1st argument to hold a + * GC-specific register, see rts/sm/GC.c::mark_root() + */ + evac( user , sparkp + (top & modMask) ); + top++; } + + debugTrace(DEBUG_sched, + "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)", + sparkPoolSize(pool), pool->bottom, pool->top); +} + +/* ---------------------------------------------------------------------------- + * balanceSparkPoolsCaps: takes an array of capabilities (usually: all + * capabilities) and its size. Accesses all spark pools and equally + * distributes the sparks among them. + * + * Could be called after GC, before Cap. release, from scheduler. + * -------------------------------------------------------------------------- */ +void balanceSparkPoolsCaps(nat n_caps, Capability caps[]); + +void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, + Capability caps[] STG_UNUSED) { + barf("not implemented"); } #else @@ -253,6 +325,8 @@ newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED) * * GRAN & PARALLEL_HASKELL stuff beyond here. * + * TODO "nuke" this! + * * -------------------------------------------------------------------------- */ #if defined(PARALLEL_HASKELL) || defined(GRAN)