X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSparks.c;h=2167de0dcf7c07de20649243209a6ecee4d8ef28;hb=de75026f5a48d3d052135a973ab4dff76c5b20f5;hp=ac11172a9d958299166be02b01e45c351a2f40ac;hpb=cf9650f2a1690c04051c716124bb0350adc74ae7;p=ghc-hetmet.git diff --git a/rts/Sparks.c b/rts/Sparks.c index ac11172..2167de0 100644 --- a/rts/Sparks.c +++ b/rts/Sparks.c @@ -4,35 +4,6 @@ * * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS. * - * The implementation uses Double-Ended Queues with lock-free access - * (thereby often called "deque") as described in - * - * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque. - * SPAA'05, July 2005, Las Vegas, USA. - * ACM 1-58113-986-1/05/0007 - * - * Author: Jost Berthold MSRC 07-09/2008 - * - * The DeQue is held as a circular array with known length. Positions - * of top (read-end) and bottom (write-end) always increase, and the - * array is accessed with indices modulo array-size. While this bears - * the risk of overflow, we assume that (with 64 bit indices), a - * program must run very long to reach that point. - * - * The write end of the queue (position bottom) can only be used with - * mutual exclusion, i.e. by exactly one caller at a time. At this - * end, new items can be enqueued using pushBottom()/newSpark(), and - * removed using popBottom()/reclaimSpark() (the latter implying a cas - * synchronisation with potential concurrent readers for the case of - * just one element). - * - * Multiple readers can steal()/findSpark() from the read end - * (position top), and are synchronised without a lock, based on a cas - * of the top position. One reader wins, the others return NULL for a - * failure. - * - * Both popBottom and steal also return NULL when the queue is empty. - * -------------------------------------------------------------------------*/ #include "PosixSource.h" @@ -44,6 +15,7 @@ #include "RtsUtils.h" #include "ParTicky.h" #include "Trace.h" +#include "Prelude.h" #include "SMP.h" // for cas @@ -51,75 +23,6 @@ #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) -/* internal helpers ... */ - -StgWord roundUp2(StgWord val); - -StgWord roundUp2(StgWord val) { - StgWord rounded = 1; - - /* StgWord is unsigned anyway, only catch 0 */ - if (val == 0) { - barf("DeQue,roundUp2: invalid size 0 requested"); - } - /* at least 1 bit set, shift up to its place */ - do { - rounded = rounded << 1; - } while (0 != (val = val>>1)); - return rounded; -} - -INLINE_HEADER -rtsBool casTop(StgPtr addr, StgWord old, StgWord new); - -#if !defined(THREADED_RTS) -/* missing def. in non THREADED RTS, and makes no sense anyway... */ -StgWord cas(StgPtr addr,StgWord old,StgWord new); -StgWord cas(StgPtr addr,StgWord old,StgWord new) { - barf("cas: not implemented without multithreading"); - old = new = *addr; /* to avoid gcc warnings */ -} -#endif - -INLINE_HEADER -rtsBool casTop(StgWord* addr, StgWord old, StgWord new) { - StgWord res = cas((StgPtr) addr, old, new); - return ((res == old)); -} - -/* or simply like this */ -#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new))) - -/* ----------------------------------------------------------------------------- - * - * Initialising spark pools. - * - * -------------------------------------------------------------------------- */ - -/* constructor */ -SparkPool* initPool(StgWord size) { - - StgWord realsize; - SparkPool *q; - - realsize = roundUp2(size); /* to compute modulo as a bitwise & */ - - q = (SparkPool*) stgMallocBytes(sizeof(SparkPool), /* admin fields */ - "newSparkPool"); - q->elements = (StgClosurePtr*) - stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */ - "newSparkPool:data space"); - q->top=0; - q->bottom=0; - q->topBound=0; /* read by writer, updated each time top is read */ - - q->size = realsize; /* power of 2 */ - q->moduloSize = realsize - 1; /* n % size == n & moduloSize */ - - ASSERT_SPARK_POOL_INVARIANTS(q); - return q; -} - void initSparkPools( void ) { @@ -127,152 +30,18 @@ initSparkPools( void ) /* walk over the capabilities, allocating a spark pool for each one */ nat i; for (i = 0; i < n_capabilities; i++) { - capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks); + capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks); } #else /* allocate a single spark pool */ - MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks); + MainCapability->sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks); #endif } void -freeSparkPool(SparkPool *pool) { - /* should not interfere with concurrent findSpark() calls! And - nobody should use the pointer any more. We cross our fingers...*/ - stgFree(pool->elements); - stgFree(pool); -} - -/* reclaimSpark(cap): remove a spark from the write end of the queue. - * Returns the removed spark, and NULL if a race is lost or the pool - * empty. - * - * If only one spark is left in the pool, we synchronise with - * concurrently stealing threads by using cas to modify the top field. - * This routine should NEVER be called by a task which does not own - * the capability. Can this be checked here? - */ -StgClosure* reclaimSpark(Capability *cap) { - SparkPool *deque = cap->sparks; - /* also a bit tricky, has to avoid concurrent steal() calls by - accessing top with cas, when there is only one element left */ - StgWord t, b; - StgClosurePtr* pos; - long currSize; - StgClosurePtr removed; - - ASSERT_SPARK_POOL_INVARIANTS(deque); - - b = deque->bottom; - /* "decrement b as a test, see what happens" */ - deque->bottom = --b; - pos = (deque->elements) + (b & (deque->moduloSize)); - t = deque->top; /* using topBound would give an *upper* bound, we - need a lower bound. We use the real top here, but - can update the topBound value */ - deque->topBound = t; - currSize = b - t; - if (currSize < 0) { /* was empty before decrementing b, set b - consistently and abort */ - deque->bottom = t; - return NULL; - } - removed = *pos; - if (currSize > 0) { /* no danger, still elements in buffer after b-- */ - return removed; - } - /* otherwise, has someone meanwhile stolen the same (last) element? - Check and increment top value to know */ - if ( !(CASTOP(&(deque->top),t,t+1)) ) { - removed = NULL; /* no success, but continue adjusting bottom */ - } - deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */ - deque->topBound = t+1; /* ...and cached top value as well */ - - ASSERT_SPARK_POOL_INVARIANTS(deque); - - return removed; -} - -/* ----------------------------------------------------------------------------- - * - * findSpark: find a spark on the current Capability that we can fork - * into a thread. - * - * May be called by concurrent threads, which synchronise on top - * variable. Returns a spark, or NULL if pool empty or race lost. - * - -------------------------------------------------------------------------- */ - -StgClosurePtr steal(SparkPool *deque); - -/* steal an element from the read end. Synchronises multiple callers - by failing with NULL return. Returns NULL when deque is empty. */ -StgClosurePtr steal(SparkPool *deque) { - StgClosurePtr* pos; - StgClosurePtr* arraybase; - StgWord sz; - StgClosurePtr stolen; - StgWord b,t; - - ASSERT_SPARK_POOL_INVARIANTS(deque); - - b = deque->bottom; - t = deque->top; - if (b - t <= 0 ) { - return NULL; /* already looks empty, abort */ - } - - /* now access array, see pushBottom() */ - arraybase = deque->elements; - sz = deque->moduloSize; - pos = arraybase + (t & sz); - stolen = *pos; - - /* now decide whether we have won */ - if ( !(CASTOP(&(deque->top),t,t+1)) ) { - /* lost the race, someon else has changed top in the meantime */ - stolen = NULL; - } /* else: OK, top has been incremented by the cas call */ - - - ASSERT_SPARK_POOL_INVARIANTS(deque); - /* return NULL or stolen element */ - return stolen; -} - -StgClosure * -findSpark (Capability *cap) +freeSparkPool (SparkPool *pool) { - SparkPool *deque = (cap->sparks); - StgClosure *stolen; - - ASSERT_SPARK_POOL_INVARIANTS(deque); - - do { - /* keep trying until good spark found or pool looks empty. - TODO is this a good idea? */ - - stolen = steal(deque); - - } while ( ( !stolen /* nothing stolen*/ - || !closure_SHOULD_SPARK(stolen)) /* spark not OK */ - && !looksEmpty(deque)); /* run empty, give up */ - - /* return stolen element */ - return stolen; -} - - -/* "guesses" whether a deque is empty. Can return false negatives in - presence of concurrent steal() calls, and false positives in - presence of a concurrent pushBottom().*/ -rtsBool looksEmpty(SparkPool* deque) { - StgWord t = deque->top; - StgWord b = deque->bottom; - /* try to prefer false negatives by reading top first */ - return (b - t <= 0); - /* => array is *never* completely filled, always 1 place free! */ + freeWSDeque(pool); } /* ----------------------------------------------------------------------------- @@ -282,79 +51,28 @@ rtsBool looksEmpty(SparkPool* deque) { * -------------------------------------------------------------------------- */ void -createSparkThread (Capability *cap, StgClosure *p) +createSparkThread (Capability *cap) { StgTSO *tso; - tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p); - appendToRunQueue(cap,tso); -} - -/* ----------------------------------------------------------------------------- - * - * Create a new spark - * - * -------------------------------------------------------------------------- */ + tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, + &base_GHCziConc_runSparks_closure); -#define DISCARD_NEW -void pushBottom(SparkPool* deque, StgClosurePtr elem); + postEvent(cap, EVENT_CREATE_SPARK_THREAD, 0, tso->id); -/* enqueue an element. Should always succeed by resizing the array - (not implemented yet, silently fails in that case). */ -void pushBottom(SparkPool* deque, StgClosurePtr elem) { - StgWord t; - StgClosurePtr* pos; - StgWord sz = deque->moduloSize; - StgWord b = deque->bottom; - - ASSERT_SPARK_POOL_INVARIANTS(deque); - - /* we try to avoid reading deque->top (accessed by all) and use - deque->topBound (accessed only by writer) instead. - This is why we do not just call empty(deque) here. - */ - t = deque->topBound; - if ( b - t >= sz ) { /* nota bene: sz == deque->size - 1, thus ">=" */ - /* could be full, check the real top value in this case */ - t = deque->top; - deque->topBound = t; - if (b - t >= sz) { /* really no space left :-( */ - /* reallocate the array, copying the values. Concurrent steal()s - will in the meantime use the old one and modify only top. - This means: we cannot safely free the old space! Can keep it - on a free list internally here... - - Potential bug in combination with steal(): if array is - replaced, it is unclear which one concurrent steal operations - use. Must read the array base address in advance in steal(). - */ -#if defined(DISCARD_NEW) - ASSERT_SPARK_POOL_INVARIANTS(deque); - return; /* for now, silently fail */ -#else - /* could make room by incrementing the top position here. In - * this case, should use CASTOP. If this fails, someone else has - * removed something, and new room will be available. - */ - ASSERT_SPARK_POOL_INVARIANTS(deque); -#endif - } - } - pos = (deque->elements) + (b & sz); - *pos = elem; - (deque->bottom)++; - - ASSERT_SPARK_POOL_INVARIANTS(deque); - return; + appendToRunQueue(cap,tso); } +/* -------------------------------------------------------------------------- + * newSpark: create a new spark, as a result of calling "par" + * Called directly from STG. + * -------------------------------------------------------------------------- */ -/* this is called as a direct C-call from Stg => we need to keep the - pool in a register (???) */ StgInt newSpark (StgRegTable *reg, StgClosure *p) { - SparkPool *pool = (reg->rCurrentTSO->cap->sparks); + Capability *cap = regTableToCapability(reg); + SparkPool *pool = cap->sparks; /* I am not sure whether this is the right thing to do. * Maybe it is better to exploit the tag information @@ -362,17 +80,42 @@ newSpark (StgRegTable *reg, StgClosure *p) */ p = UNTAG_CLOSURE(p); - ASSERT_SPARK_POOL_INVARIANTS(pool); - if (closure_SHOULD_SPARK(p)) { - pushBottom(pool,p); + pushWSDeque(pool,p); } - ASSERT_SPARK_POOL_INVARIANTS(pool); + cap->sparks_created++; + + postEvent(cap, EVENT_CREATE_SPARK, cap->r.rCurrentTSO->id, 0); + return 1; } +/* ----------------------------------------------------------------------------- + * + * tryStealSpark: try to steal a spark from a Capability. + * + * Returns a valid spark, or NULL if the pool was empty, and can + * occasionally return NULL if there was a race with another thread + * stealing from the same pool. In this case, try again later. + * + -------------------------------------------------------------------------- */ + +StgClosure * +tryStealSpark (Capability *cap) +{ + SparkPool *pool = cap->sparks; + StgClosure *stolen; + + do { + stolen = stealWSDeque_(pool); + // use the no-loopy version, stealWSDeque_(), since if we get a + // spurious NULL here the caller may want to try stealing from + // other pools before trying again. + } while (stolen != NULL && !closure_SHOULD_SPARK(stolen)); + return stolen; +} /* -------------------------------------------------------------------------- * Remove all sparks from the spark queues which should not spark any @@ -381,13 +124,14 @@ newSpark (StgRegTable *reg, StgClosure *p) * the spark pool only contains sparkable closures. * -------------------------------------------------------------------------- */ -static void -pruneSparkQueue (Capability *cap) +void +pruneSparkQueue (evac_fn evac, void *user, Capability *cap) { SparkPool *pool; - StgClosurePtr spark, evacspark, *elements; + StgClosurePtr spark, tmp, *elements; nat n, pruned_sparks; // stats only StgWord botInd,oldBotInd,currInd; // indices in array (always < size) + const StgInfoTable *info; PAR_TICKY_MARK_SPARK_QUEUE_START(); @@ -396,12 +140,26 @@ pruneSparkQueue (Capability *cap) pool = cap->sparks; + // it is possible that top > bottom, indicating an empty pool. We + // fix that here; this is only necessary because the loop below + // assumes it. + if (pool->top > pool->bottom) + pool->top = pool->bottom; + + // Take this opportunity to reset top/bottom modulo the size of + // the array, to avoid overflow. This is only possible because no + // stealing is happening during GC. + pool->bottom -= pool->top & ~pool->moduloSize; + pool->top &= pool->moduloSize; + pool->topBound = pool->top; + debugTrace(DEBUG_sched, - "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)", + "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)", sparkPoolSize(pool), pool->bottom, pool->top); - ASSERT_SPARK_POOL_INVARIANTS(pool); - elements = pool->elements; + ASSERT_WSDEQUE_INVARIANTS(pool); + + elements = (StgClosurePtr *)pool->elements; /* We have exclusive access to the structure here, so we can reset bottom and top counters, and prune invalid sparks. Contents are @@ -450,13 +208,31 @@ pruneSparkQueue (Capability *cap) botInd, otherwise move on */ spark = elements[currInd]; - /* if valuable work: shift inside the pool */ - if ( closure_SHOULD_SPARK(spark) ) { - elements[botInd] = spark; // keep entry (new address) - botInd++; - n++; - } else { - pruned_sparks++; // discard spark + // We have to be careful here: in the parallel GC, another + // thread might evacuate this closure while we're looking at it, + // so grab the info pointer just once. + info = spark->header.info; + if (IS_FORWARDING_PTR(info)) { + tmp = (StgClosure*)UN_FORWARDING_PTR(info); + /* if valuable work: shift inside the pool */ + if (closure_SHOULD_SPARK(tmp)) { + elements[botInd] = tmp; // keep entry (new address) + botInd++; + n++; + } else { + pruned_sparks++; // discard spark + cap->sparks_pruned++; + } + } else { + if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) { + elements[botInd] = spark; // keep entry (new address) + evac (user, &elements[botInd]); + botInd++; + n++; + } else { + pruned_sparks++; // discard spark + cap->sparks_pruned++; + } } currInd++; @@ -480,19 +256,10 @@ pruneSparkQueue (Capability *cap) debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks); debugTrace(DEBUG_sched, - "new spark queue len=%d; (hd=%ld; tl=%ld)", + "new spark queue len=%ld; (hd=%ld; tl=%ld)", sparkPoolSize(pool), pool->bottom, pool->top); - ASSERT_SPARK_POOL_INVARIANTS(pool); -} - -void -pruneSparkQueues (void) -{ - nat i; - for (i = 0; i < n_capabilities; i++) { - pruneSparkQueue(&capabilities[i]); - } + ASSERT_WSDEQUE_INVARIANTS(pool); } /* GC for the spark pool, called inside Capability.c for all @@ -506,11 +273,11 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap) pool = cap->sparks; - ASSERT_SPARK_POOL_INVARIANTS(pool); + ASSERT_WSDEQUE_INVARIANTS(pool); top = pool->top; bottom = pool->bottom; - sparkp = pool->elements; + sparkp = (StgClosurePtr*)pool->elements; modMask = pool->moduloSize; while (top < bottom) { @@ -523,12 +290,11 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap) } debugTrace(DEBUG_sched, - "traversed spark queue, len=%d; (hd=%ld; tl=%ld)", + "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)", sparkPoolSize(pool), pool->bottom, pool->top); } /* ---------------------------------------------------------------------------- - * balanceSparkPoolsCaps: takes an array of capabilities (usually: all * capabilities) and its size. Accesses all spark pools and equally * distributes the sparks among them. @@ -537,7 +303,8 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap) * -------------------------------------------------------------------------- */ void balanceSparkPoolsCaps(nat n_caps, Capability caps[]); -void balanceSparkPoolsCaps(nat n_caps, Capability caps[]) { +void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, + Capability caps[] STG_UNUSED) { barf("not implemented"); }