From 829a7d022e91da80295913e6c70179f211e5b966 Mon Sep 17 00:00:00 2001 From: Simon Marlow Date: Thu, 5 Feb 2009 12:46:48 +0000 Subject: [PATCH] Refactor the spark queue implementation into a generic work-stealing deque So we can use this abstraction elsewhere in the RTS --- rts/Sparks.c | 336 ++++++------------------------------------------ rts/Sparks.h | 90 ++++--------- rts/parallel/WSDeque.c | 270 ++++++++++++++++++++++++++++++++++++++ rts/parallel/WSDeque.h | 130 +++++++++++++++++++ 4 files changed, 459 insertions(+), 367 deletions(-) create mode 100644 rts/parallel/WSDeque.c create mode 100644 rts/parallel/WSDeque.h diff --git a/rts/Sparks.c b/rts/Sparks.c index 9e4492a..3fccdb6 100644 --- a/rts/Sparks.c +++ b/rts/Sparks.c @@ -4,35 +4,6 @@ * * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS. * - * The implementation uses Double-Ended Queues with lock-free access - * (thereby often called "deque") as described in - * - * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque. - * SPAA'05, July 2005, Las Vegas, USA. - * ACM 1-58113-986-1/05/0007 - * - * Author: Jost Berthold MSRC 07-09/2008 - * - * The DeQue is held as a circular array with known length. Positions - * of top (read-end) and bottom (write-end) always increase, and the - * array is accessed with indices modulo array-size. While this bears - * the risk of overflow, we assume that (with 64 bit indices), a - * program must run very long to reach that point. - * - * The write end of the queue (position bottom) can only be used with - * mutual exclusion, i.e. by exactly one caller at a time. At this - * end, new items can be enqueued using pushBottom()/newSpark(), and - * removed using popBottom()/reclaimSpark() (the latter implying a cas - * synchronisation with potential concurrent readers for the case of - * just one element). - * - * Multiple readers can steal()/findSpark() from the read end - * (position top), and are synchronised without a lock, based on a cas - * of the top position. One reader wins, the others return NULL for a - * failure. - * - * Both popBottom and steal also return NULL when the queue is empty. - * -------------------------------------------------------------------------*/ #include "PosixSource.h" @@ -52,57 +23,6 @@ #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) -/* internal helpers ... */ - -static StgWord -roundUp2(StgWord val) -{ - StgWord rounded = 1; - - /* StgWord is unsigned anyway, only catch 0 */ - if (val == 0) { - barf("DeQue,roundUp2: invalid size 0 requested"); - } - /* at least 1 bit set, shift up to its place */ - do { - rounded = rounded << 1; - } while (0 != (val = val>>1)); - return rounded; -} - -#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new))) - -/* ----------------------------------------------------------------------------- - * - * Initialising spark pools. - * - * -------------------------------------------------------------------------- */ - -/* constructor */ -static SparkPool* -initPool(StgWord size) -{ - StgWord realsize; - SparkPool *q; - - realsize = roundUp2(size); /* to compute modulo as a bitwise & */ - - q = (SparkPool*) stgMallocBytes(sizeof(SparkPool), /* admin fields */ - "newSparkPool"); - q->elements = (StgClosurePtr*) - stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */ - "newSparkPool:data space"); - q->top=0; - q->bottom=0; - q->topBound=0; /* read by writer, updated each time top is read */ - - q->size = realsize; /* power of 2 */ - q->moduloSize = realsize - 1; /* n % size == n & moduloSize */ - - ASSERT_SPARK_POOL_INVARIANTS(q); - return q; -} - void initSparkPools( void ) { @@ -110,159 +30,18 @@ initSparkPools( void ) /* walk over the capabilities, allocating a spark pool for each one */ nat i; for (i = 0; i < n_capabilities; i++) { - capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks); + capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks); } #else /* allocate a single spark pool */ - MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks); + MainCapability->sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks); #endif } void freeSparkPool (SparkPool *pool) { - /* should not interfere with concurrent findSpark() calls! And - nobody should use the pointer any more. We cross our fingers...*/ - stgFree(pool->elements); - stgFree(pool); -} - -/* ----------------------------------------------------------------------------- - * - * reclaimSpark: remove a spark from the write end of the queue. - * Returns the removed spark, and NULL if a race is lost or the pool - * empty. - * - * If only one spark is left in the pool, we synchronise with - * concurrently stealing threads by using cas to modify the top field. - * This routine should NEVER be called by a task which does not own - * the capability. Can this be checked here? - * - * -------------------------------------------------------------------------- */ - -StgClosure * -reclaimSpark (SparkPool *deque) -{ - /* also a bit tricky, has to avoid concurrent steal() calls by - accessing top with cas, when there is only one element left */ - StgWord t, b; - StgClosurePtr* pos; - long currSize; - StgClosurePtr removed; - - ASSERT_SPARK_POOL_INVARIANTS(deque); - - b = deque->bottom; - /* "decrement b as a test, see what happens" */ - deque->bottom = --b; - pos = (deque->elements) + (b & (deque->moduloSize)); - t = deque->top; /* using topBound would give an *upper* bound, we - need a lower bound. We use the real top here, but - can update the topBound value */ - deque->topBound = t; - currSize = b - t; - if (currSize < 0) { /* was empty before decrementing b, set b - consistently and abort */ - deque->bottom = t; - return NULL; - } - removed = *pos; - if (currSize > 0) { /* no danger, still elements in buffer after b-- */ - return removed; - } - /* otherwise, has someone meanwhile stolen the same (last) element? - Check and increment top value to know */ - if ( !(CASTOP(&(deque->top),t,t+1)) ) { - removed = NULL; /* no success, but continue adjusting bottom */ - } - deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */ - deque->topBound = t+1; /* ...and cached top value as well */ - - ASSERT_SPARK_POOL_INVARIANTS(deque); - - return removed; -} - -/* ----------------------------------------------------------------------------- - * - * tryStealSpark: try to steal a spark from a Capability. - * - * Returns a valid spark, or NULL if the pool was empty, and can - * occasionally return NULL if there was a race with another thread - * stealing from the same pool. In this case, try again later. - * - -------------------------------------------------------------------------- */ - -static StgClosurePtr -steal(SparkPool *deque) -{ - StgClosurePtr* pos; - StgClosurePtr* arraybase; - StgWord sz; - StgClosurePtr stolen; - StgWord b,t; - -// Can't do this on someone else's spark pool: -// ASSERT_SPARK_POOL_INVARIANTS(deque); - - b = deque->bottom; - t = deque->top; - - // NB. b and t are unsigned; we need a signed value for the test - // below. - if ((long)b - (long)t <= 0 ) { - return NULL; /* already looks empty, abort */ - } - - /* now access array, see pushBottom() */ - arraybase = deque->elements; - sz = deque->moduloSize; - pos = arraybase + (t & sz); - stolen = *pos; - - /* now decide whether we have won */ - if ( !(CASTOP(&(deque->top),t,t+1)) ) { - /* lost the race, someon else has changed top in the meantime */ - return NULL; - } /* else: OK, top has been incremented by the cas call */ - -// Can't do this on someone else's spark pool: -// ASSERT_SPARK_POOL_INVARIANTS(deque); - - /* return stolen element */ - return stolen; -} - -StgClosure * -tryStealSpark (Capability *cap) -{ - SparkPool *pool = cap->sparks; - StgClosure *stolen; - - do { - stolen = steal(pool); - } while (stolen != NULL && !closure_SHOULD_SPARK(stolen)); - - return stolen; -} - - -/* ----------------------------------------------------------------------------- - * - * "guesses" whether a deque is empty. Can return false negatives in - * presence of concurrent steal() calls, and false positives in - * presence of a concurrent pushBottom(). - * - * -------------------------------------------------------------------------- */ - -rtsBool -looksEmpty(SparkPool* deque) -{ - StgWord t = deque->top; - StgWord b = deque->bottom; - /* try to prefer false negatives by reading top first */ - return ((long)b - (long)t <= 0); - /* => array is *never* completely filled, always 1 place free! */ + freeWSDeque(pool); } /* ----------------------------------------------------------------------------- @@ -281,69 +60,6 @@ createSparkThread (Capability *cap) appendToRunQueue(cap,tso); } -/* ----------------------------------------------------------------------------- - * - * Create a new spark - * - * -------------------------------------------------------------------------- */ - -#define DISCARD_NEW - -/* enqueue an element. Should always succeed by resizing the array - (not implemented yet, silently fails in that case). */ -static void -pushBottom (SparkPool* deque, StgClosurePtr elem) -{ - StgWord t; - StgClosurePtr* pos; - StgWord sz = deque->moduloSize; - StgWord b = deque->bottom; - - ASSERT_SPARK_POOL_INVARIANTS(deque); - - /* we try to avoid reading deque->top (accessed by all) and use - deque->topBound (accessed only by writer) instead. - This is why we do not just call empty(deque) here. - */ - t = deque->topBound; - if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) { - /* NB. 1. sz == deque->size - 1, thus ">=" - 2. signed comparison, it is possible that t > b - */ - /* could be full, check the real top value in this case */ - t = deque->top; - deque->topBound = t; - if (b - t >= sz) { /* really no space left :-( */ - /* reallocate the array, copying the values. Concurrent steal()s - will in the meantime use the old one and modify only top. - This means: we cannot safely free the old space! Can keep it - on a free list internally here... - - Potential bug in combination with steal(): if array is - replaced, it is unclear which one concurrent steal operations - use. Must read the array base address in advance in steal(). - */ -#if defined(DISCARD_NEW) - ASSERT_SPARK_POOL_INVARIANTS(deque); - return; /* for now, silently fail */ -#else - /* could make room by incrementing the top position here. In - * this case, should use CASTOP. If this fails, someone else has - * removed something, and new room will be available. - */ - ASSERT_SPARK_POOL_INVARIANTS(deque); -#endif - } - } - pos = (deque->elements) + (b & sz); - *pos = elem; - (deque->bottom)++; - - ASSERT_SPARK_POOL_INVARIANTS(deque); - return; -} - - /* -------------------------------------------------------------------------- * newSpark: create a new spark, as a result of calling "par" * Called directly from STG. @@ -361,19 +77,40 @@ newSpark (StgRegTable *reg, StgClosure *p) */ p = UNTAG_CLOSURE(p); - ASSERT_SPARK_POOL_INVARIANTS(pool); - if (closure_SHOULD_SPARK(p)) { - pushBottom(pool,p); + pushWSDeque(pool,p); } cap->sparks_created++; - ASSERT_SPARK_POOL_INVARIANTS(pool); return 1; } +/* ----------------------------------------------------------------------------- + * + * tryStealSpark: try to steal a spark from a Capability. + * + * Returns a valid spark, or NULL if the pool was empty, and can + * occasionally return NULL if there was a race with another thread + * stealing from the same pool. In this case, try again later. + * + -------------------------------------------------------------------------- */ +StgClosure * +tryStealSpark (Capability *cap) +{ + SparkPool *pool = cap->sparks; + StgClosure *stolen; + + do { + stolen = stealWSDeque_(pool); + // use the no-loopy version, stealWSDeque_(), since if we get a + // spurious NULL here the caller may want to try stealing from + // other pools before trying again. + } while (stolen != NULL && !closure_SHOULD_SPARK(stolen)); + + return stolen; +} /* -------------------------------------------------------------------------- * Remove all sparks from the spark queues which should not spark any @@ -412,11 +149,12 @@ pruneSparkQueue (evac_fn evac, void *user, Capability *cap) pool->topBound = pool->top; debugTrace(DEBUG_sched, - "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)", + "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)", sparkPoolSize(pool), pool->bottom, pool->top); - ASSERT_SPARK_POOL_INVARIANTS(pool); - elements = pool->elements; + ASSERT_WSDEQUE_INVARIANTS(pool); + + elements = (StgClosurePtr *)pool->elements; /* We have exclusive access to the structure here, so we can reset bottom and top counters, and prune invalid sparks. Contents are @@ -513,10 +251,10 @@ pruneSparkQueue (evac_fn evac, void *user, Capability *cap) debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks); debugTrace(DEBUG_sched, - "new spark queue len=%d; (hd=%ld; tl=%ld)", + "new spark queue len=%ld; (hd=%ld; tl=%ld)", sparkPoolSize(pool), pool->bottom, pool->top); - ASSERT_SPARK_POOL_INVARIANTS(pool); + ASSERT_WSDEQUE_INVARIANTS(pool); } /* GC for the spark pool, called inside Capability.c for all @@ -530,11 +268,11 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap) pool = cap->sparks; - ASSERT_SPARK_POOL_INVARIANTS(pool); + ASSERT_WSDEQUE_INVARIANTS(pool); top = pool->top; bottom = pool->bottom; - sparkp = pool->elements; + sparkp = (StgClosurePtr*)pool->elements; modMask = pool->moduloSize; while (top < bottom) { @@ -547,7 +285,7 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap) } debugTrace(DEBUG_sched, - "traversed spark queue, len=%d; (hd=%ld; tl=%ld)", + "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)", sparkPoolSize(pool), pool->bottom, pool->top); } diff --git a/rts/Sparks.h b/rts/Sparks.h index e2c60e9..105742f 100644 --- a/rts/Sparks.h +++ b/rts/Sparks.h @@ -1,6 +1,6 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 2000-2006 + * (c) The GHC Team, 2000-2009 * * Sparking support for GRAN, PAR and THREADED_RTS versions of the RTS. * @@ -9,6 +9,8 @@ #ifndef SPARKS_H #define SPARKS_H +#include "WSDeque.h" + #if defined(PARALLEL_HASKELL) #error Sparks.c using new internal structure, needs major overhaul! #endif @@ -17,68 +19,18 @@ #if defined(THREADED_RTS) -/* Spark pools: used to store pending sparks - * (THREADED_RTS & PARALLEL_HASKELL only) - * Implementation uses a DeQue to enable concurrent read accesses at - * the top end. - */ -typedef struct SparkPool_ { - /* Size of elements array. Used for modulo calculation: we round up - to powers of 2 and use the dyadic log (modulo == bitwise &) */ - StgWord size; - StgWord moduloSize; /* bitmask for modulo */ - - /* top, index where multiple readers steal() (protected by a cas) */ - volatile StgWord top; - - /* bottom, index of next free place where one writer can push - elements. This happens unsynchronised. */ - volatile StgWord bottom; - /* both position indices are continuously incremented, and used as - an index modulo the current array size. */ - - /* lower bound on the current top value. This is an internal - optimisation to avoid unnecessarily accessing the top field - inside pushBottom */ - volatile StgWord topBound; - - /* The elements array */ - StgClosurePtr* elements; - /* Please note: the dataspace cannot follow the admin fields - immediately, as it should be possible to enlarge it without - disposing the old one automatically (as realloc would)! */ - -} SparkPool; - - -/* INVARIANTS, in this order: reasonable size, - topBound consistent, space pointer, space accessible to us. - - NB. This is safe to use only (a) on a spark pool owned by the - current thread, or (b) when there's only one thread running, or no - stealing going on (e.g. during GC). -*/ -#define ASSERT_SPARK_POOL_INVARIANTS(p) \ - ASSERT((p)->size > 0); \ - ASSERT((p)->topBound <= (p)->top); \ - ASSERT((p)->elements != NULL); \ - ASSERT(*((p)->elements) || 1); \ - ASSERT(*((p)->elements - 1 + ((p)->size)) || 1); - -// No: it is possible that top > bottom when using reclaimSpark() -// ASSERT((p)->bottom >= (p)->top); -// ASSERT((p)->size > (p)->bottom - (p)->top); +typedef WSDeque SparkPool; // Initialisation void initSparkPools (void); // Take a spark from the "write" end of the pool. Can be called // by the pool owner only. -StgClosure* reclaimSpark(SparkPool *pool); +INLINE_HEADER StgClosure* reclaimSpark(SparkPool *pool); // Returns True if the spark pool is empty (can give a false positive // if the pool is almost empty). -rtsBool looksEmpty(SparkPool* deque); +INLINE_HEADER rtsBool looksEmpty(SparkPool* deque); StgClosure * tryStealSpark (Capability *cap); void freeSparkPool (SparkPool *pool); @@ -87,30 +39,32 @@ void traverseSparkQueue(evac_fn evac, void *user, Capability *cap); void pruneSparkQueue (evac_fn evac, void *user, Capability *cap); INLINE_HEADER void discardSparks (SparkPool *pool); -INLINE_HEADER nat sparkPoolSize (SparkPool *pool); -#endif +INLINE_HEADER long sparkPoolSize (SparkPool *pool); /* ----------------------------------------------------------------------------- * PRIVATE below here * -------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +INLINE_HEADER StgClosure* reclaimSpark(SparkPool *pool) +{ + return popWSDeque(pool); +} -INLINE_HEADER rtsBool -emptySparkPool (SparkPool *pool) -{ return looksEmpty(pool); } +INLINE_HEADER rtsBool looksEmpty(SparkPool* deque) +{ + return looksEmptyWSDeque(deque); +} -INLINE_HEADER nat -sparkPoolSize (SparkPool *pool) -{ return (pool->bottom - pool->top); } +INLINE_HEADER long sparkPoolSize (SparkPool *pool) +{ + return dequeElements(pool); +} -INLINE_HEADER void -discardSparks (SparkPool *pool) +INLINE_HEADER void discardSparks (SparkPool *pool) { - pool->top = pool->bottom; -// pool->topBound = pool->top; + discardElements(pool); } -#endif +#endif // THREADED_RTS #endif /* SPARKS_H */ diff --git a/rts/parallel/WSDeque.c b/rts/parallel/WSDeque.c new file mode 100644 index 0000000..8c403c3 --- /dev/null +++ b/rts/parallel/WSDeque.c @@ -0,0 +1,270 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team, 2009 + * + * Work-stealing Deque data structure + * + * The implementation uses Double-Ended Queues with lock-free access + * (thereby often called "deque") as described in + * + * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque. + * SPAA'05, July 2005, Las Vegas, USA. + * ACM 1-58113-986-1/05/0007 + * + * Author: Jost Berthold MSRC 07-09/2008 + * + * The DeQue is held as a circular array with known length. Positions + * of top (read-end) and bottom (write-end) always increase, and the + * array is accessed with indices modulo array-size. While this bears + * the risk of overflow, we assume that (with 64 bit indices), a + * program must run very long to reach that point. + * + * The write end of the queue (position bottom) can only be used with + * mutual exclusion, i.e. by exactly one caller at a time. At this + * end, new items can be enqueued using pushBottom()/newSpark(), and + * removed using popBottom()/reclaimSpark() (the latter implying a cas + * synchronisation with potential concurrent readers for the case of + * just one element). + * + * Multiple readers can steal from the read end (position top), and + * are synchronised without a lock, based on a cas of the top + * position. One reader wins, the others return NULL for a failure. + * + * Both popBottom and steal also return NULL when the queue is empty. + * + * ---------------------------------------------------------------------------*/ + +#include "Rts.h" +#include "RtsUtils.h" +#include "WSDeque.h" +#include "SMP.h" // for cas + +#if defined(THREADED_RTS) + +#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new))) + +/* ----------------------------------------------------------------------------- + * newWSDeque + * -------------------------------------------------------------------------- */ + +/* internal helpers ... */ + +static StgWord +roundUp2(StgWord val) +{ + StgWord rounded = 1; + + /* StgWord is unsigned anyway, only catch 0 */ + if (val == 0) { + barf("DeQue,roundUp2: invalid size 0 requested"); + } + /* at least 1 bit set, shift up to its place */ + do { + rounded = rounded << 1; + } while (0 != (val = val>>1)); + return rounded; +} + +WSDeque * +newWSDeque (nat size) +{ + StgWord realsize; + WSDeque *q; + + realsize = roundUp2(size); /* to compute modulo as a bitwise & */ + + q = (WSDeque*) stgMallocBytes(sizeof(WSDeque), /* admin fields */ + "newWSDeque"); + q->elements = stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */ + "newWSDeque:data space"); + q->top=0; + q->bottom=0; + q->topBound=0; /* read by writer, updated each time top is read */ + + q->size = realsize; /* power of 2 */ + q->moduloSize = realsize - 1; /* n % size == n & moduloSize */ + + ASSERT_WSDEQUE_INVARIANTS(q); + return q; +} + +/* ----------------------------------------------------------------------------- + * freeWSDeque + * -------------------------------------------------------------------------- */ + +void +freeWSDeque (WSDeque *q) +{ + stgFree(q->elements); + stgFree(q); +} + +/* ----------------------------------------------------------------------------- + * + * popWSDeque: remove an element from the write end of the queue. + * Returns the removed spark, and NULL if a race is lost or the pool + * empty. + * + * If only one spark is left in the pool, we synchronise with + * concurrently stealing threads by using cas to modify the top field. + * This routine should NEVER be called by a task which does not own + * this deque. + * + * -------------------------------------------------------------------------- */ + +void * +popWSDeque (WSDeque *q) +{ + /* also a bit tricky, has to avoid concurrent steal() calls by + accessing top with cas, when there is only one element left */ + StgWord t, b; + void ** pos; + long currSize; + void * removed; + + ASSERT_WSDEQUE_INVARIANTS(q); + + b = q->bottom; + /* "decrement b as a test, see what happens" */ + q->bottom = --b; + pos = (q->elements) + (b & (q->moduloSize)); + t = q->top; /* using topBound would give an *upper* bound, we + need a lower bound. We use the real top here, but + can update the topBound value */ + q->topBound = t; + currSize = b - t; + if (currSize < 0) { /* was empty before decrementing b, set b + consistently and abort */ + q->bottom = t; + return NULL; + } + removed = *pos; + if (currSize > 0) { /* no danger, still elements in buffer after b-- */ + return removed; + } + /* otherwise, has someone meanwhile stolen the same (last) element? + Check and increment top value to know */ + if ( !(CASTOP(&(q->top),t,t+1)) ) { + removed = NULL; /* no success, but continue adjusting bottom */ + } + q->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */ + q->topBound = t+1; /* ...and cached top value as well */ + + ASSERT_WSDEQUE_INVARIANTS(q); + + return removed; +} + +/* ----------------------------------------------------------------------------- + * stealWSDeque + * -------------------------------------------------------------------------- */ + +void * +stealWSDeque_ (WSDeque *q) +{ + void ** pos; + void ** arraybase; + StgWord sz; + void * stolen; + StgWord b,t; + +// Can't do this on someone else's spark pool: +// ASSERT_WSDEQUE_INVARIANTS(q); + + b = q->bottom; + t = q->top; + + // NB. b and t are unsigned; we need a signed value for the test + // below. + if ((long)b - (long)t <= 0 ) { + return NULL; /* already looks empty, abort */ + } + + /* now access array, see pushBottom() */ + arraybase = q->elements; + sz = q->moduloSize; + pos = arraybase + (t & sz); + stolen = *pos; + + /* now decide whether we have won */ + if ( !(CASTOP(&(q->top),t,t+1)) ) { + /* lost the race, someon else has changed top in the meantime */ + return NULL; + } /* else: OK, top has been incremented by the cas call */ + +// Can't do this on someone else's spark pool: +// ASSERT_WSDEQUE_INVARIANTS(q); + + return stolen; +} + +void * +stealWSDeque (WSDeque *q) +{ + void *stolen; + + do { + stolen = stealWSDeque_(q); + } while (stolen == NULL && !looksEmptyWSDeque(q)); + + return stolen; +} + + +#define DISCARD_NEW + +/* enqueue an element. Should always succeed by resizing the array + (not implemented yet, silently fails in that case). */ +rtsBool +pushWSDeque (WSDeque* q, void * elem) +{ + StgWord t; + void ** pos; + StgWord sz = q->moduloSize; + StgWord b = q->bottom; + + ASSERT_WSDEQUE_INVARIANTS(q); + + /* we try to avoid reading q->top (accessed by all) and use + q->topBound (accessed only by writer) instead. + This is why we do not just call empty(q) here. + */ + t = q->topBound; + if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) { + /* NB. 1. sz == q->size - 1, thus ">=" + 2. signed comparison, it is possible that t > b + */ + /* could be full, check the real top value in this case */ + t = q->top; + q->topBound = t; + if (b - t >= sz) { /* really no space left :-( */ + /* reallocate the array, copying the values. Concurrent steal()s + will in the meantime use the old one and modify only top. + This means: we cannot safely free the old space! Can keep it + on a free list internally here... + + Potential bug in combination with steal(): if array is + replaced, it is unclear which one concurrent steal operations + use. Must read the array base address in advance in steal(). + */ +#if defined(DISCARD_NEW) + ASSERT_WSDEQUE_INVARIANTS(q); + return rtsFalse; // we didn't push anything +#else + /* could make room by incrementing the top position here. In + * this case, should use CASTOP. If this fails, someone else has + * removed something, and new room will be available. + */ + ASSERT_WSDEQUE_INVARIANTS(q); +#endif + } + } + pos = (q->elements) + (b & sz); + *pos = elem; + (q->bottom)++; + + ASSERT_WSDEQUE_INVARIANTS(q); + return rtsTrue; +} + +#endif diff --git a/rts/parallel/WSDeque.h b/rts/parallel/WSDeque.h new file mode 100644 index 0000000..c254671 --- /dev/null +++ b/rts/parallel/WSDeque.h @@ -0,0 +1,130 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team, 2009 + * + * Work-stealing Deque data structure + * + * ---------------------------------------------------------------------------*/ + +#ifndef WSDEQUE_H +#define WSDEQUE_H + +#if defined(THREADED_RTS) + +typedef struct WSDeque_ { + // Size of elements array. Used for modulo calculation: we round up + // to powers of 2 and use the dyadic log (modulo == bitwise &) + StgWord size; + StgWord moduloSize; /* bitmask for modulo */ + + // top, index where multiple readers steal() (protected by a cas) + volatile StgWord top; + + // bottom, index of next free place where one writer can push + // elements. This happens unsynchronised. + volatile StgWord bottom; + + // both top and bottom are continuously incremented, and used as + // an index modulo the current array size. + + // lower bound on the current top value. This is an internal + // optimisation to avoid unnecessarily accessing the top field + // inside pushBottom + volatile StgWord topBound; + + // The elements array + void ** elements; + + // Please note: the dataspace cannot follow the admin fields + // immediately, as it should be possible to enlarge it without + // disposing the old one automatically (as realloc would)! + +} WSDeque; + +/* INVARIANTS, in this order: reasonable size, + topBound consistent, space pointer, space accessible to us. + + NB. This is safe to use only (a) on a spark pool owned by the + current thread, or (b) when there's only one thread running, or no + stealing going on (e.g. during GC). +*/ +#define ASSERT_WSDEQUE_INVARIANTS(p) \ + ASSERT((p)->size > 0); \ + ASSERT((p)->topBound <= (p)->top); \ + ASSERT((p)->elements != NULL); \ + ASSERT(*((p)->elements) || 1); \ + ASSERT(*((p)->elements - 1 + ((p)->size)) || 1); + +// No: it is possible that top > bottom when using pop() +// ASSERT((p)->bottom >= (p)->top); +// ASSERT((p)->size > (p)->bottom - (p)->top); + +/* ----------------------------------------------------------------------------- + * Operations + * + * A WSDeque has an *owner* thread. The owner can perform any operation; + * other threads are only allowed to call stealWSDeque_(), + * stealWSDeque(), looksEmptyWSDeque(), and dequeElements(). + * + * -------------------------------------------------------------------------- */ + +// Allocation, deallocation +WSDeque * newWSDeque (nat size); +void freeWSDeque (WSDeque *q); + +// Take an element from the "write" end of the pool. Can be called +// by the pool owner only. +void* popWSDeque (WSDeque *q); + +// Push onto the "write" end of the pool. Return true if the push +// succeeded, or false if the deque is full. +rtsBool pushWSDeque (WSDeque *q, void *elem); + +// Removes all elements from the deque +INLINE_HEADER void discardElements (WSDeque *q); + +// Removes an element of the deque from the "read" end, or returns +// NULL if the pool is empty, or if there was a collision with another +// thief. +void * stealWSDeque_ (WSDeque *q); + +// Removes an element of the deque from the "read" end, or returns +// NULL if the pool is empty. +void * stealWSDeque (WSDeque *q); + +// "guesses" whether a deque is empty. Can return false negatives in +// presence of concurrent steal() calls, and false positives in +// presence of a concurrent pushBottom(). +INLINE_HEADER rtsBool looksEmptyWSDeque (WSDeque* q); + +INLINE_HEADER long dequeElements (WSDeque *q); + +/* ----------------------------------------------------------------------------- + * PRIVATE below here + * -------------------------------------------------------------------------- */ + +INLINE_HEADER long +dequeElements (WSDeque *q) +{ + StgWord t = q->top; + StgWord b = q->bottom; + // try to prefer false negatives by reading top first + return ((long)b - (long)t); +} + +INLINE_HEADER rtsBool +looksEmptyWSDeque (WSDeque *q) +{ + return (dequeElements(q) <= 0); +} + +INLINE_HEADER void +discardElements (WSDeque *q) +{ + q->top = q->bottom; +// pool->topBound = pool->top; +} + +#endif // THREADED_RTS + +#endif // WSDEQUE_H -- 1.7.10.4