Refactor the spark queue implementation into a generic work-stealing deque
authorSimon Marlow <marlowsd@gmail.com>
Thu, 5 Feb 2009 12:46:48 +0000 (12:46 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Thu, 5 Feb 2009 12:46:48 +0000 (12:46 +0000)
So we can use this abstraction elsewhere in the RTS

rts/Sparks.c
rts/Sparks.h
rts/parallel/WSDeque.c [new file with mode: 0644]
rts/parallel/WSDeque.h [new file with mode: 0644]

index 9e4492a..3fccdb6 100644 (file)
@@ -4,35 +4,6 @@
  *
  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
  *
- * The implementation uses Double-Ended Queues with lock-free access
- * (thereby often called "deque") as described in
- *
- * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
- * SPAA'05, July 2005, Las Vegas, USA.
- * ACM 1-58113-986-1/05/0007
- *
- * Author: Jost Berthold MSRC 07-09/2008
- *
- * The DeQue is held as a circular array with known length. Positions
- * of top (read-end) and bottom (write-end) always increase, and the
- * array is accessed with indices modulo array-size. While this bears
- * the risk of overflow, we assume that (with 64 bit indices), a
- * program must run very long to reach that point.
- * 
- * The write end of the queue (position bottom) can only be used with
- * mutual exclusion, i.e. by exactly one caller at a time.  At this
- * end, new items can be enqueued using pushBottom()/newSpark(), and
- * removed using popBottom()/reclaimSpark() (the latter implying a cas
- * synchronisation with potential concurrent readers for the case of
- * just one element).
- * 
- * Multiple readers can steal()/findSpark() from the read end
- * (position top), and are synchronised without a lock, based on a cas
- * of the top position. One reader wins, the others return NULL for a
- * failure.
- * 
- * Both popBottom and steal also return NULL when the queue is empty.
- * 
  -------------------------------------------------------------------------*/
 
 #include "PosixSource.h"
 
 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 
-/* internal helpers ... */
-
-static StgWord
-roundUp2(StgWord val)
-{
-  StgWord rounded = 1;
-
-  /* StgWord is unsigned anyway, only catch 0 */
-  if (val == 0) {
-    barf("DeQue,roundUp2: invalid size 0 requested");
-  }
-  /* at least 1 bit set, shift up to its place */
-  do {
-    rounded = rounded << 1;
-  } while (0 != (val = val>>1));
-  return rounded;
-}
-
-#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
-
-/* -----------------------------------------------------------------------------
- * 
- * Initialising spark pools.
- *
- * -------------------------------------------------------------------------- */
-
-/* constructor */
-static SparkPool*
-initPool(StgWord size)
-{
-  StgWord realsize; 
-  SparkPool *q;
-
-  realsize = roundUp2(size); /* to compute modulo as a bitwise & */
-
-  q = (SparkPool*) stgMallocBytes(sizeof(SparkPool),   /* admin fields */
-                             "newSparkPool");
-  q->elements = (StgClosurePtr*) 
-                stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
-                              "newSparkPool:data space");
-  q->top=0;
-  q->bottom=0;
-  q->topBound=0; /* read by writer, updated each time top is read */
-
-  q->size = realsize;  /* power of 2 */
-  q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */
-
-  ASSERT_SPARK_POOL_INVARIANTS(q); 
-  return q;
-}
-
 void
 initSparkPools( void )
 {
@@ -110,159 +30,18 @@ initSparkPools( void )
     /* walk over the capabilities, allocating a spark pool for each one */
     nat i;
     for (i = 0; i < n_capabilities; i++) {
-      capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
+      capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
     }
 #else
     /* allocate a single spark pool */
-    MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
+    MainCapability->sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
 #endif
 }
 
 void
 freeSparkPool (SparkPool *pool)
 {
-  /* should not interfere with concurrent findSpark() calls! And
-     nobody should use the pointer any more. We cross our fingers...*/
-  stgFree(pool->elements);
-  stgFree(pool);
-}
-
-/* -----------------------------------------------------------------------------
- * 
- * reclaimSpark: remove a spark from the write end of the queue.
- * Returns the removed spark, and NULL if a race is lost or the pool
- * empty.
- *
- * If only one spark is left in the pool, we synchronise with
- * concurrently stealing threads by using cas to modify the top field.
- * This routine should NEVER be called by a task which does not own
- * the capability. Can this be checked here?
- *
- * -------------------------------------------------------------------------- */
-
-StgClosure *
-reclaimSpark (SparkPool *deque)
-{
-  /* also a bit tricky, has to avoid concurrent steal() calls by
-     accessing top with cas, when there is only one element left */
-  StgWord t, b;
-  StgClosurePtr* pos;
-  long  currSize;
-  StgClosurePtr removed;
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  b = deque->bottom;
-  /* "decrement b as a test, see what happens" */
-  deque->bottom = --b; 
-  pos = (deque->elements) + (b & (deque->moduloSize));
-  t = deque->top; /* using topBound would give an *upper* bound, we
-                    need a lower bound. We use the real top here, but
-                    can update the topBound value */
-  deque->topBound = t;
-  currSize = b - t;
-  if (currSize < 0) { /* was empty before decrementing b, set b
-                        consistently and abort */
-    deque->bottom = t;
-    return NULL;
-  }
-  removed = *pos;
-  if (currSize > 0) { /* no danger, still elements in buffer after b-- */
-    return removed;
-  } 
-  /* otherwise, has someone meanwhile stolen the same (last) element?
-     Check and increment top value to know  */
-  if ( !(CASTOP(&(deque->top),t,t+1)) ) {
-    removed = NULL; /* no success, but continue adjusting bottom */
-  }
-  deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
-  deque->topBound = t+1; /* ...and cached top value as well */
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  return removed;
-}
-
-/* -----------------------------------------------------------------------------
- * 
- * tryStealSpark: try to steal a spark from a Capability.
- *
- * Returns a valid spark, or NULL if the pool was empty, and can
- * occasionally return NULL if there was a race with another thread
- * stealing from the same pool.  In this case, try again later.
- *
- -------------------------------------------------------------------------- */
-
-static StgClosurePtr
-steal(SparkPool *deque)
-{
-  StgClosurePtr* pos;
-  StgClosurePtr* arraybase;
-  StgWord sz;
-  StgClosurePtr stolen;
-  StgWord b,t; 
-
-// Can't do this on someone else's spark pool:
-// ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  b = deque->bottom;
-  t = deque->top;
-
-  // NB. b and t are unsigned; we need a signed value for the test
-  // below.
-  if ((long)b - (long)t <= 0 ) { 
-    return NULL; /* already looks empty, abort */
-  }
-
-  /* now access array, see pushBottom() */
-  arraybase = deque->elements;
-  sz = deque->moduloSize;
-  pos = arraybase + (t & sz);  
-  stolen = *pos;
-
-  /* now decide whether we have won */
-  if ( !(CASTOP(&(deque->top),t,t+1)) ) {
-      /* lost the race, someon else has changed top in the meantime */
-      return NULL;
-  }  /* else: OK, top has been incremented by the cas call */
-
-// Can't do this on someone else's spark pool:
-// ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  /* return stolen element */
-  return stolen;
-}
-
-StgClosure *
-tryStealSpark (Capability *cap)
-{
-  SparkPool *pool = cap->sparks;
-  StgClosure *stolen;
-
-  do { 
-      stolen = steal(pool);
-  } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
-
-  return stolen;
-}
-
-
-/* -----------------------------------------------------------------------------
- * 
- * "guesses" whether a deque is empty. Can return false negatives in
- *  presence of concurrent steal() calls, and false positives in
- *  presence of a concurrent pushBottom().
- *
- * -------------------------------------------------------------------------- */
-
-rtsBool
-looksEmpty(SparkPool* deque)
-{
-  StgWord t = deque->top;
-  StgWord b = deque->bottom;
-  /* try to prefer false negatives by reading top first */
-  return ((long)b - (long)t <= 0);
-  /* => array is *never* completely filled, always 1 place free! */
+    freeWSDeque(pool);
 }
 
 /* -----------------------------------------------------------------------------
@@ -281,69 +60,6 @@ createSparkThread (Capability *cap)
     appendToRunQueue(cap,tso);
 }
 
-/* -----------------------------------------------------------------------------
- * 
- * Create a new spark
- *
- * -------------------------------------------------------------------------- */
-
-#define DISCARD_NEW
-
-/* enqueue an element. Should always succeed by resizing the array
-   (not implemented yet, silently fails in that case). */
-static void
-pushBottom (SparkPool* deque, StgClosurePtr elem)
-{
-  StgWord t;
-  StgClosurePtr* pos;
-  StgWord sz = deque->moduloSize; 
-  StgWord b = deque->bottom;
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  /* we try to avoid reading deque->top (accessed by all) and use
-     deque->topBound (accessed only by writer) instead. 
-     This is why we do not just call empty(deque) here.
-  */
-  t = deque->topBound;
-  if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) { 
-    /* NB. 1. sz == deque->size - 1, thus ">="
-           2. signed comparison, it is possible that t > b
-    */
-    /* could be full, check the real top value in this case */
-    t = deque->top;
-    deque->topBound = t;
-    if (b - t >= sz) { /* really no space left :-( */
-      /* reallocate the array, copying the values. Concurrent steal()s
-        will in the meantime use the old one and modify only top.
-        This means: we cannot safely free the old space! Can keep it
-        on a free list internally here...
-
-        Potential bug in combination with steal(): if array is
-        replaced, it is unclear which one concurrent steal operations
-        use. Must read the array base address in advance in steal().
-      */
-#if defined(DISCARD_NEW)
-      ASSERT_SPARK_POOL_INVARIANTS(deque); 
-      return; /* for now, silently fail */
-#else
-      /* could make room by incrementing the top position here.  In
-       * this case, should use CASTOP. If this fails, someone else has
-       * removed something, and new room will be available.
-       */
-      ASSERT_SPARK_POOL_INVARIANTS(deque); 
-#endif
-    }
-  }
-  pos = (deque->elements) + (b & sz);
-  *pos = elem;
-  (deque->bottom)++;
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-  return;
-}
-
-
 /* --------------------------------------------------------------------------
  * newSpark: create a new spark, as a result of calling "par"
  * Called directly from STG.
@@ -361,19 +77,40 @@ newSpark (StgRegTable *reg, StgClosure *p)
      */
     p = UNTAG_CLOSURE(p);
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
-
     if (closure_SHOULD_SPARK(p)) {
-      pushBottom(pool,p);
+        pushWSDeque(pool,p);
     }  
 
     cap->sparks_created++;
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
     return 1;
 }
 
+/* -----------------------------------------------------------------------------
+ * 
+ * tryStealSpark: try to steal a spark from a Capability.
+ *
+ * Returns a valid spark, or NULL if the pool was empty, and can
+ * occasionally return NULL if there was a race with another thread
+ * stealing from the same pool.  In this case, try again later.
+ *
+ -------------------------------------------------------------------------- */
 
+StgClosure *
+tryStealSpark (Capability *cap)
+{
+  SparkPool *pool = cap->sparks;
+  StgClosure *stolen;
+
+  do { 
+      stolen = stealWSDeque_(pool); 
+      // use the no-loopy version, stealWSDeque_(), since if we get a
+      // spurious NULL here the caller may want to try stealing from
+      // other pools before trying again.
+  } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
+
+  return stolen;
+}
 
 /* --------------------------------------------------------------------------
  * Remove all sparks from the spark queues which should not spark any
@@ -412,11 +149,12 @@ pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
     pool->topBound = pool->top;
 
     debugTrace(DEBUG_sched,
-               "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
+               "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)",
                sparkPoolSize(pool), pool->bottom, pool->top);
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
 
-    elements = pool->elements;
+    ASSERT_WSDEQUE_INVARIANTS(pool);
+
+    elements = (StgClosurePtr *)pool->elements;
 
     /* We have exclusive access to the structure here, so we can reset
        bottom and top counters, and prune invalid sparks. Contents are
@@ -513,10 +251,10 @@ pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
     
     debugTrace(DEBUG_sched,
-               "new spark queue len=%d; (hd=%ld; tl=%ld)",
+               "new spark queue len=%ld; (hd=%ld; tl=%ld)",
                sparkPoolSize(pool), pool->bottom, pool->top);
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
+    ASSERT_WSDEQUE_INVARIANTS(pool);
 }
 
 /* GC for the spark pool, called inside Capability.c for all
@@ -530,11 +268,11 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
     
     pool = cap->sparks;
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
+    ASSERT_WSDEQUE_INVARIANTS(pool);
 
     top = pool->top;
     bottom = pool->bottom;
-    sparkp = pool->elements;
+    sparkp = (StgClosurePtr*)pool->elements;
     modMask = pool->moduloSize;
 
     while (top < bottom) {
@@ -547,7 +285,7 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
     }
 
     debugTrace(DEBUG_sched,
-               "traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
+               "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)",
                sparkPoolSize(pool), pool->bottom, pool->top);
 }
 
index e2c60e9..105742f 100644 (file)
@@ -1,6 +1,6 @@
 /* -----------------------------------------------------------------------------
  *
- * (c) The GHC Team, 2000-2006
+ * (c) The GHC Team, 2000-2009
  *
  * Sparking support for GRAN, PAR and THREADED_RTS versions of the RTS.
  * 
@@ -9,6 +9,8 @@
 #ifndef SPARKS_H
 #define SPARKS_H
 
+#include "WSDeque.h"
+
 #if defined(PARALLEL_HASKELL)
 #error Sparks.c using new internal structure, needs major overhaul!
 #endif
 
 #if defined(THREADED_RTS)
 
-/* Spark pools: used to store pending sparks 
- *  (THREADED_RTS & PARALLEL_HASKELL only)
- * Implementation uses a DeQue to enable concurrent read accesses at
- * the top end.
- */
-typedef struct  SparkPool_ {
-  /* Size of elements array. Used for modulo calculation: we round up
-     to powers of 2 and use the dyadic log (modulo == bitwise &) */
-  StgWord size; 
-  StgWord moduloSize; /* bitmask for modulo */
-
-  /* top, index where multiple readers steal() (protected by a cas) */
-  volatile StgWord top;
-
-  /* bottom, index of next free place where one writer can push
-     elements. This happens unsynchronised. */
-  volatile StgWord bottom;
-  /* both position indices are continuously incremented, and used as
-     an index modulo the current array size. */
-  
-  /* lower bound on the current top value. This is an internal
-     optimisation to avoid unnecessarily accessing the top field
-     inside pushBottom */
-  volatile StgWord topBound;
-
-  /* The elements array */
-  StgClosurePtr* elements;
-  /*  Please note: the dataspace cannot follow the admin fields
-      immediately, as it should be possible to enlarge it without
-      disposing the old one automatically (as realloc would)! */
-
-} SparkPool;
-
-
-/* INVARIANTS, in this order: reasonable size,
-   topBound consistent, space pointer, space accessible to us.
-   
-   NB. This is safe to use only (a) on a spark pool owned by the
-   current thread, or (b) when there's only one thread running, or no
-   stealing going on (e.g. during GC).
-*/
-#define ASSERT_SPARK_POOL_INVARIANTS(p)         \
-  ASSERT((p)->size > 0);                        \
-  ASSERT((p)->topBound <= (p)->top);            \
-  ASSERT((p)->elements != NULL);                \
-  ASSERT(*((p)->elements) || 1);                \
-  ASSERT(*((p)->elements - 1  + ((p)->size)) || 1);
-
-// No: it is possible that top > bottom when using reclaimSpark()
-//  ASSERT((p)->bottom >= (p)->top);           
-//  ASSERT((p)->size > (p)->bottom - (p)->top);
+typedef WSDeque SparkPool;
 
 // Initialisation
 void initSparkPools (void);
 
 // Take a spark from the "write" end of the pool.  Can be called
 // by the pool owner only.
-StgClosure* reclaimSpark(SparkPool *pool);
+INLINE_HEADER StgClosure* reclaimSpark(SparkPool *pool);
 
 // Returns True if the spark pool is empty (can give a false positive
 // if the pool is almost empty).
-rtsBool looksEmpty(SparkPool* deque);
+INLINE_HEADER rtsBool looksEmpty(SparkPool* deque);
 
 StgClosure * tryStealSpark     (Capability *cap);
 void         freeSparkPool     (SparkPool *pool);
@@ -87,30 +39,32 @@ void         traverseSparkQueue(evac_fn evac, void *user, Capability *cap);
 void         pruneSparkQueue   (evac_fn evac, void *user, Capability *cap);
 
 INLINE_HEADER void discardSparks  (SparkPool *pool);
-INLINE_HEADER nat  sparkPoolSize  (SparkPool *pool);
-#endif
+INLINE_HEADER long sparkPoolSize  (SparkPool *pool);
 
 /* -----------------------------------------------------------------------------
  * PRIVATE below here
  * -------------------------------------------------------------------------- */
 
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+INLINE_HEADER StgClosure* reclaimSpark(SparkPool *pool)
+{
+    return popWSDeque(pool);
+}
 
-INLINE_HEADER rtsBool  
-emptySparkPool (SparkPool *pool) 
-{ return looksEmpty(pool); }
+INLINE_HEADER rtsBool looksEmpty(SparkPool* deque)
+{
+    return looksEmptyWSDeque(deque);
+}
 
-INLINE_HEADER nat
-sparkPoolSize (SparkPool *pool) 
-{ return (pool->bottom - pool->top); }
+INLINE_HEADER long sparkPoolSize (SparkPool *pool) 
+{ 
+    return dequeElements(pool);
+}
 
-INLINE_HEADER void
-discardSparks (SparkPool *pool)
+INLINE_HEADER void discardSparks (SparkPool *pool)
 {
-    pool->top = pool->bottom;
-//    pool->topBound = pool->top;
+    discardElements(pool);
 }
 
-#endif
+#endif // THREADED_RTS
 
 #endif /* SPARKS_H */
diff --git a/rts/parallel/WSDeque.c b/rts/parallel/WSDeque.c
new file mode 100644 (file)
index 0000000..8c403c3
--- /dev/null
@@ -0,0 +1,270 @@
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2009
+ *
+ * Work-stealing Deque data structure
+ * 
+ * The implementation uses Double-Ended Queues with lock-free access
+ * (thereby often called "deque") as described in
+ *
+ * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
+ * SPAA'05, July 2005, Las Vegas, USA.
+ * ACM 1-58113-986-1/05/0007
+ *
+ * Author: Jost Berthold MSRC 07-09/2008
+ *
+ * The DeQue is held as a circular array with known length. Positions
+ * of top (read-end) and bottom (write-end) always increase, and the
+ * array is accessed with indices modulo array-size. While this bears
+ * the risk of overflow, we assume that (with 64 bit indices), a
+ * program must run very long to reach that point.
+ * 
+ * The write end of the queue (position bottom) can only be used with
+ * mutual exclusion, i.e. by exactly one caller at a time.  At this
+ * end, new items can be enqueued using pushBottom()/newSpark(), and
+ * removed using popBottom()/reclaimSpark() (the latter implying a cas
+ * synchronisation with potential concurrent readers for the case of
+ * just one element).
+ * 
+ * Multiple readers can steal from the read end (position top), and
+ * are synchronised without a lock, based on a cas of the top
+ * position. One reader wins, the others return NULL for a failure.
+ * 
+ * Both popBottom and steal also return NULL when the queue is empty.
+ * 
+ * ---------------------------------------------------------------------------*/
+
+#include "Rts.h"
+#include "RtsUtils.h"
+#include "WSDeque.h"
+#include "SMP.h" // for cas
+
+#if defined(THREADED_RTS)
+
+#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
+
+/* -----------------------------------------------------------------------------
+ * newWSDeque
+ * -------------------------------------------------------------------------- */
+
+/* internal helpers ... */
+
+static StgWord
+roundUp2(StgWord val)
+{
+    StgWord rounded = 1;
+    
+    /* StgWord is unsigned anyway, only catch 0 */
+    if (val == 0) {
+        barf("DeQue,roundUp2: invalid size 0 requested");
+    }
+    /* at least 1 bit set, shift up to its place */
+    do {
+        rounded = rounded << 1;
+    } while (0 != (val = val>>1));
+    return rounded;
+}
+
+WSDeque *
+newWSDeque (nat size)
+{
+    StgWord realsize; 
+    WSDeque *q;
+    
+    realsize = roundUp2(size); /* to compute modulo as a bitwise & */
+    
+    q = (WSDeque*) stgMallocBytes(sizeof(WSDeque),   /* admin fields */
+                                  "newWSDeque");
+    q->elements = stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
+                                 "newWSDeque:data space");
+    q->top=0;
+    q->bottom=0;
+    q->topBound=0; /* read by writer, updated each time top is read */
+    
+    q->size = realsize;  /* power of 2 */
+    q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */
+    
+    ASSERT_WSDEQUE_INVARIANTS(q); 
+    return q;
+}
+
+/* -----------------------------------------------------------------------------
+ * freeWSDeque
+ * -------------------------------------------------------------------------- */
+
+void
+freeWSDeque (WSDeque *q)
+{
+    stgFree(q->elements);
+    stgFree(q);
+}
+
+/* -----------------------------------------------------------------------------
+ * 
+ * popWSDeque: remove an element from the write end of the queue.
+ * Returns the removed spark, and NULL if a race is lost or the pool
+ * empty.
+ *
+ * If only one spark is left in the pool, we synchronise with
+ * concurrently stealing threads by using cas to modify the top field.
+ * This routine should NEVER be called by a task which does not own
+ * this deque.
+ *
+ * -------------------------------------------------------------------------- */
+
+void *
+popWSDeque (WSDeque *q)
+{
+    /* also a bit tricky, has to avoid concurrent steal() calls by
+       accessing top with cas, when there is only one element left */
+    StgWord t, b;
+    void ** pos;
+    long  currSize;
+    void * removed;
+    
+    ASSERT_WSDEQUE_INVARIANTS(q); 
+    
+    b = q->bottom;
+    /* "decrement b as a test, see what happens" */
+    q->bottom = --b; 
+    pos = (q->elements) + (b & (q->moduloSize));
+    t = q->top; /* using topBound would give an *upper* bound, we
+                   need a lower bound. We use the real top here, but
+                   can update the topBound value */
+    q->topBound = t;
+    currSize = b - t;
+    if (currSize < 0) { /* was empty before decrementing b, set b
+                           consistently and abort */
+        q->bottom = t;
+        return NULL;
+    }
+    removed = *pos;
+    if (currSize > 0) { /* no danger, still elements in buffer after b-- */
+        return removed;
+    } 
+    /* otherwise, has someone meanwhile stolen the same (last) element?
+       Check and increment top value to know  */
+    if ( !(CASTOP(&(q->top),t,t+1)) ) {
+        removed = NULL; /* no success, but continue adjusting bottom */
+    }
+    q->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
+    q->topBound = t+1; /* ...and cached top value as well */
+    
+    ASSERT_WSDEQUE_INVARIANTS(q); 
+    
+    return removed;
+}
+
+/* -----------------------------------------------------------------------------
+ * stealWSDeque
+ * -------------------------------------------------------------------------- */
+
+void *
+stealWSDeque_ (WSDeque *q)
+{
+    void ** pos;
+    void ** arraybase;
+    StgWord sz;
+    void * stolen;
+    StgWord b,t; 
+    
+// Can't do this on someone else's spark pool:
+// ASSERT_WSDEQUE_INVARIANTS(q); 
+    
+    b = q->bottom;
+    t = q->top;
+    
+    // NB. b and t are unsigned; we need a signed value for the test
+    // below.
+    if ((long)b - (long)t <= 0 ) { 
+        return NULL; /* already looks empty, abort */
+  }
+    
+    /* now access array, see pushBottom() */
+    arraybase = q->elements;
+    sz = q->moduloSize;
+    pos = arraybase + (t & sz);  
+    stolen = *pos;
+    
+    /* now decide whether we have won */
+    if ( !(CASTOP(&(q->top),t,t+1)) ) {
+        /* lost the race, someon else has changed top in the meantime */
+        return NULL;
+    }  /* else: OK, top has been incremented by the cas call */
+
+// Can't do this on someone else's spark pool:
+// ASSERT_WSDEQUE_INVARIANTS(q); 
+    
+    return stolen;
+}
+
+void *
+stealWSDeque (WSDeque *q)
+{
+    void *stolen;
+    
+    do { 
+        stolen = stealWSDeque_(q);
+    } while (stolen == NULL && !looksEmptyWSDeque(q));
+    
+    return stolen;
+}
+
+
+#define DISCARD_NEW
+
+/* enqueue an element. Should always succeed by resizing the array
+   (not implemented yet, silently fails in that case). */
+rtsBool
+pushWSDeque (WSDeque* q, void * elem)
+{
+    StgWord t;
+    void ** pos;
+    StgWord sz = q->moduloSize; 
+    StgWord b = q->bottom;
+    
+    ASSERT_WSDEQUE_INVARIANTS(q); 
+    
+    /* we try to avoid reading q->top (accessed by all) and use
+       q->topBound (accessed only by writer) instead. 
+       This is why we do not just call empty(q) here.
+    */
+    t = q->topBound;
+    if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) { 
+        /* NB. 1. sz == q->size - 1, thus ">="
+           2. signed comparison, it is possible that t > b
+        */
+        /* could be full, check the real top value in this case */
+        t = q->top;
+        q->topBound = t;
+        if (b - t >= sz) { /* really no space left :-( */
+            /* reallocate the array, copying the values. Concurrent steal()s
+               will in the meantime use the old one and modify only top.
+               This means: we cannot safely free the old space! Can keep it
+               on a free list internally here...
+               
+               Potential bug in combination with steal(): if array is
+               replaced, it is unclear which one concurrent steal operations
+               use. Must read the array base address in advance in steal().
+            */
+#if defined(DISCARD_NEW)
+            ASSERT_WSDEQUE_INVARIANTS(q); 
+            return rtsFalse; // we didn't push anything
+#else
+            /* could make room by incrementing the top position here.  In
+             * this case, should use CASTOP. If this fails, someone else has
+             * removed something, and new room will be available.
+             */
+            ASSERT_WSDEQUE_INVARIANTS(q); 
+#endif
+        }
+    }
+    pos = (q->elements) + (b & sz);
+    *pos = elem;
+    (q->bottom)++;
+    
+    ASSERT_WSDEQUE_INVARIANTS(q); 
+    return rtsTrue;
+}
+
+#endif
diff --git a/rts/parallel/WSDeque.h b/rts/parallel/WSDeque.h
new file mode 100644 (file)
index 0000000..c254671
--- /dev/null
@@ -0,0 +1,130 @@
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2009
+ *
+ * Work-stealing Deque data structure
+ * 
+ * ---------------------------------------------------------------------------*/
+
+#ifndef WSDEQUE_H
+#define WSDEQUE_H
+
+#if defined(THREADED_RTS)
+
+typedef struct WSDeque_ {
+    // Size of elements array. Used for modulo calculation: we round up
+    // to powers of 2 and use the dyadic log (modulo == bitwise &) 
+    StgWord size; 
+    StgWord moduloSize; /* bitmask for modulo */
+
+    // top, index where multiple readers steal() (protected by a cas)
+    volatile StgWord top;
+
+    // bottom, index of next free place where one writer can push
+    // elements. This happens unsynchronised.
+    volatile StgWord bottom;
+
+    // both top and bottom are continuously incremented, and used as
+    // an index modulo the current array size.
+  
+    // lower bound on the current top value. This is an internal
+    // optimisation to avoid unnecessarily accessing the top field
+    // inside pushBottom
+    volatile StgWord topBound;
+
+    // The elements array
+    void ** elements;
+
+    //  Please note: the dataspace cannot follow the admin fields
+    //  immediately, as it should be possible to enlarge it without
+    //  disposing the old one automatically (as realloc would)!
+
+} WSDeque;
+
+/* INVARIANTS, in this order: reasonable size,
+   topBound consistent, space pointer, space accessible to us.
+   
+   NB. This is safe to use only (a) on a spark pool owned by the
+   current thread, or (b) when there's only one thread running, or no
+   stealing going on (e.g. during GC).
+*/
+#define ASSERT_WSDEQUE_INVARIANTS(p)         \
+  ASSERT((p)->size > 0);                        \
+  ASSERT((p)->topBound <= (p)->top);            \
+  ASSERT((p)->elements != NULL);                \
+  ASSERT(*((p)->elements) || 1);                \
+  ASSERT(*((p)->elements - 1  + ((p)->size)) || 1);
+
+// No: it is possible that top > bottom when using pop()
+//  ASSERT((p)->bottom >= (p)->top);           
+//  ASSERT((p)->size > (p)->bottom - (p)->top);
+
+/* -----------------------------------------------------------------------------
+ * Operations
+ *
+ * A WSDeque has an *owner* thread.  The owner can perform any operation;
+ * other threads are only allowed to call stealWSDeque_(),
+ * stealWSDeque(), looksEmptyWSDeque(), and dequeElements().
+ *
+ * -------------------------------------------------------------------------- */
+
+// Allocation, deallocation
+WSDeque * newWSDeque  (nat size);
+void      freeWSDeque (WSDeque *q);
+
+// Take an element from the "write" end of the pool.  Can be called
+// by the pool owner only.
+void* popWSDeque (WSDeque *q);
+
+// Push onto the "write" end of the pool.  Return true if the push
+// succeeded, or false if the deque is full.
+rtsBool pushWSDeque (WSDeque *q, void *elem);
+
+// Removes all elements from the deque
+INLINE_HEADER void discardElements (WSDeque *q);
+
+// Removes an element of the deque from the "read" end, or returns
+// NULL if the pool is empty, or if there was a collision with another
+// thief.
+void * stealWSDeque_ (WSDeque *q);
+
+// Removes an element of the deque from the "read" end, or returns
+// NULL if the pool is empty.
+void * stealWSDeque (WSDeque *q);
+
+// "guesses" whether a deque is empty. Can return false negatives in
+//  presence of concurrent steal() calls, and false positives in
+//  presence of a concurrent pushBottom().
+INLINE_HEADER rtsBool looksEmptyWSDeque (WSDeque* q);
+
+INLINE_HEADER long dequeElements   (WSDeque *q);
+
+/* -----------------------------------------------------------------------------
+ * PRIVATE below here
+ * -------------------------------------------------------------------------- */
+
+INLINE_HEADER long
+dequeElements (WSDeque *q)
+{
+    StgWord t = q->top;
+    StgWord b = q->bottom;
+    // try to prefer false negatives by reading top first
+    return ((long)b - (long)t);
+}
+
+INLINE_HEADER rtsBool
+looksEmptyWSDeque (WSDeque *q)
+{
+    return (dequeElements(q) <= 0);
+}
+
+INLINE_HEADER void
+discardElements (WSDeque *q)
+{
+    q->top = q->bottom;
+//    pool->topBound = pool->top;
+}
+
+#endif // THREADED_RTS
+
+#endif // WSDEQUE_H