Refactor the spark queue implementation into a generic work-stealing deque
[ghc-hetmet.git] / rts / Sparks.c
index 9e4492a..3fccdb6 100644 (file)
@@ -4,35 +4,6 @@
  *
  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
  *
- * The implementation uses Double-Ended Queues with lock-free access
- * (thereby often called "deque") as described in
- *
- * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
- * SPAA'05, July 2005, Las Vegas, USA.
- * ACM 1-58113-986-1/05/0007
- *
- * Author: Jost Berthold MSRC 07-09/2008
- *
- * The DeQue is held as a circular array with known length. Positions
- * of top (read-end) and bottom (write-end) always increase, and the
- * array is accessed with indices modulo array-size. While this bears
- * the risk of overflow, we assume that (with 64 bit indices), a
- * program must run very long to reach that point.
- * 
- * The write end of the queue (position bottom) can only be used with
- * mutual exclusion, i.e. by exactly one caller at a time.  At this
- * end, new items can be enqueued using pushBottom()/newSpark(), and
- * removed using popBottom()/reclaimSpark() (the latter implying a cas
- * synchronisation with potential concurrent readers for the case of
- * just one element).
- * 
- * Multiple readers can steal()/findSpark() from the read end
- * (position top), and are synchronised without a lock, based on a cas
- * of the top position. One reader wins, the others return NULL for a
- * failure.
- * 
- * Both popBottom and steal also return NULL when the queue is empty.
- * 
  -------------------------------------------------------------------------*/
 
 #include "PosixSource.h"
 
 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 
-/* internal helpers ... */
-
-static StgWord
-roundUp2(StgWord val)
-{
-  StgWord rounded = 1;
-
-  /* StgWord is unsigned anyway, only catch 0 */
-  if (val == 0) {
-    barf("DeQue,roundUp2: invalid size 0 requested");
-  }
-  /* at least 1 bit set, shift up to its place */
-  do {
-    rounded = rounded << 1;
-  } while (0 != (val = val>>1));
-  return rounded;
-}
-
-#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
-
-/* -----------------------------------------------------------------------------
- * 
- * Initialising spark pools.
- *
- * -------------------------------------------------------------------------- */
-
-/* constructor */
-static SparkPool*
-initPool(StgWord size)
-{
-  StgWord realsize; 
-  SparkPool *q;
-
-  realsize = roundUp2(size); /* to compute modulo as a bitwise & */
-
-  q = (SparkPool*) stgMallocBytes(sizeof(SparkPool),   /* admin fields */
-                             "newSparkPool");
-  q->elements = (StgClosurePtr*) 
-                stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
-                              "newSparkPool:data space");
-  q->top=0;
-  q->bottom=0;
-  q->topBound=0; /* read by writer, updated each time top is read */
-
-  q->size = realsize;  /* power of 2 */
-  q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */
-
-  ASSERT_SPARK_POOL_INVARIANTS(q); 
-  return q;
-}
-
 void
 initSparkPools( void )
 {
@@ -110,159 +30,18 @@ initSparkPools( void )
     /* walk over the capabilities, allocating a spark pool for each one */
     nat i;
     for (i = 0; i < n_capabilities; i++) {
-      capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
+      capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
     }
 #else
     /* allocate a single spark pool */
-    MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
+    MainCapability->sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
 #endif
 }
 
 void
 freeSparkPool (SparkPool *pool)
 {
-  /* should not interfere with concurrent findSpark() calls! And
-     nobody should use the pointer any more. We cross our fingers...*/
-  stgFree(pool->elements);
-  stgFree(pool);
-}
-
-/* -----------------------------------------------------------------------------
- * 
- * reclaimSpark: remove a spark from the write end of the queue.
- * Returns the removed spark, and NULL if a race is lost or the pool
- * empty.
- *
- * If only one spark is left in the pool, we synchronise with
- * concurrently stealing threads by using cas to modify the top field.
- * This routine should NEVER be called by a task which does not own
- * the capability. Can this be checked here?
- *
- * -------------------------------------------------------------------------- */
-
-StgClosure *
-reclaimSpark (SparkPool *deque)
-{
-  /* also a bit tricky, has to avoid concurrent steal() calls by
-     accessing top with cas, when there is only one element left */
-  StgWord t, b;
-  StgClosurePtr* pos;
-  long  currSize;
-  StgClosurePtr removed;
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  b = deque->bottom;
-  /* "decrement b as a test, see what happens" */
-  deque->bottom = --b; 
-  pos = (deque->elements) + (b & (deque->moduloSize));
-  t = deque->top; /* using topBound would give an *upper* bound, we
-                    need a lower bound. We use the real top here, but
-                    can update the topBound value */
-  deque->topBound = t;
-  currSize = b - t;
-  if (currSize < 0) { /* was empty before decrementing b, set b
-                        consistently and abort */
-    deque->bottom = t;
-    return NULL;
-  }
-  removed = *pos;
-  if (currSize > 0) { /* no danger, still elements in buffer after b-- */
-    return removed;
-  } 
-  /* otherwise, has someone meanwhile stolen the same (last) element?
-     Check and increment top value to know  */
-  if ( !(CASTOP(&(deque->top),t,t+1)) ) {
-    removed = NULL; /* no success, but continue adjusting bottom */
-  }
-  deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
-  deque->topBound = t+1; /* ...and cached top value as well */
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  return removed;
-}
-
-/* -----------------------------------------------------------------------------
- * 
- * tryStealSpark: try to steal a spark from a Capability.
- *
- * Returns a valid spark, or NULL if the pool was empty, and can
- * occasionally return NULL if there was a race with another thread
- * stealing from the same pool.  In this case, try again later.
- *
- -------------------------------------------------------------------------- */
-
-static StgClosurePtr
-steal(SparkPool *deque)
-{
-  StgClosurePtr* pos;
-  StgClosurePtr* arraybase;
-  StgWord sz;
-  StgClosurePtr stolen;
-  StgWord b,t; 
-
-// Can't do this on someone else's spark pool:
-// ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  b = deque->bottom;
-  t = deque->top;
-
-  // NB. b and t are unsigned; we need a signed value for the test
-  // below.
-  if ((long)b - (long)t <= 0 ) { 
-    return NULL; /* already looks empty, abort */
-  }
-
-  /* now access array, see pushBottom() */
-  arraybase = deque->elements;
-  sz = deque->moduloSize;
-  pos = arraybase + (t & sz);  
-  stolen = *pos;
-
-  /* now decide whether we have won */
-  if ( !(CASTOP(&(deque->top),t,t+1)) ) {
-      /* lost the race, someon else has changed top in the meantime */
-      return NULL;
-  }  /* else: OK, top has been incremented by the cas call */
-
-// Can't do this on someone else's spark pool:
-// ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  /* return stolen element */
-  return stolen;
-}
-
-StgClosure *
-tryStealSpark (Capability *cap)
-{
-  SparkPool *pool = cap->sparks;
-  StgClosure *stolen;
-
-  do { 
-      stolen = steal(pool);
-  } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
-
-  return stolen;
-}
-
-
-/* -----------------------------------------------------------------------------
- * 
- * "guesses" whether a deque is empty. Can return false negatives in
- *  presence of concurrent steal() calls, and false positives in
- *  presence of a concurrent pushBottom().
- *
- * -------------------------------------------------------------------------- */
-
-rtsBool
-looksEmpty(SparkPool* deque)
-{
-  StgWord t = deque->top;
-  StgWord b = deque->bottom;
-  /* try to prefer false negatives by reading top first */
-  return ((long)b - (long)t <= 0);
-  /* => array is *never* completely filled, always 1 place free! */
+    freeWSDeque(pool);
 }
 
 /* -----------------------------------------------------------------------------
@@ -281,69 +60,6 @@ createSparkThread (Capability *cap)
     appendToRunQueue(cap,tso);
 }
 
-/* -----------------------------------------------------------------------------
- * 
- * Create a new spark
- *
- * -------------------------------------------------------------------------- */
-
-#define DISCARD_NEW
-
-/* enqueue an element. Should always succeed by resizing the array
-   (not implemented yet, silently fails in that case). */
-static void
-pushBottom (SparkPool* deque, StgClosurePtr elem)
-{
-  StgWord t;
-  StgClosurePtr* pos;
-  StgWord sz = deque->moduloSize; 
-  StgWord b = deque->bottom;
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  /* we try to avoid reading deque->top (accessed by all) and use
-     deque->topBound (accessed only by writer) instead. 
-     This is why we do not just call empty(deque) here.
-  */
-  t = deque->topBound;
-  if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) { 
-    /* NB. 1. sz == deque->size - 1, thus ">="
-           2. signed comparison, it is possible that t > b
-    */
-    /* could be full, check the real top value in this case */
-    t = deque->top;
-    deque->topBound = t;
-    if (b - t >= sz) { /* really no space left :-( */
-      /* reallocate the array, copying the values. Concurrent steal()s
-        will in the meantime use the old one and modify only top.
-        This means: we cannot safely free the old space! Can keep it
-        on a free list internally here...
-
-        Potential bug in combination with steal(): if array is
-        replaced, it is unclear which one concurrent steal operations
-        use. Must read the array base address in advance in steal().
-      */
-#if defined(DISCARD_NEW)
-      ASSERT_SPARK_POOL_INVARIANTS(deque); 
-      return; /* for now, silently fail */
-#else
-      /* could make room by incrementing the top position here.  In
-       * this case, should use CASTOP. If this fails, someone else has
-       * removed something, and new room will be available.
-       */
-      ASSERT_SPARK_POOL_INVARIANTS(deque); 
-#endif
-    }
-  }
-  pos = (deque->elements) + (b & sz);
-  *pos = elem;
-  (deque->bottom)++;
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-  return;
-}
-
-
 /* --------------------------------------------------------------------------
  * newSpark: create a new spark, as a result of calling "par"
  * Called directly from STG.
@@ -361,19 +77,40 @@ newSpark (StgRegTable *reg, StgClosure *p)
      */
     p = UNTAG_CLOSURE(p);
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
-
     if (closure_SHOULD_SPARK(p)) {
-      pushBottom(pool,p);
+        pushWSDeque(pool,p);
     }  
 
     cap->sparks_created++;
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
     return 1;
 }
 
+/* -----------------------------------------------------------------------------
+ * 
+ * tryStealSpark: try to steal a spark from a Capability.
+ *
+ * Returns a valid spark, or NULL if the pool was empty, and can
+ * occasionally return NULL if there was a race with another thread
+ * stealing from the same pool.  In this case, try again later.
+ *
+ -------------------------------------------------------------------------- */
 
+StgClosure *
+tryStealSpark (Capability *cap)
+{
+  SparkPool *pool = cap->sparks;
+  StgClosure *stolen;
+
+  do { 
+      stolen = stealWSDeque_(pool); 
+      // use the no-loopy version, stealWSDeque_(), since if we get a
+      // spurious NULL here the caller may want to try stealing from
+      // other pools before trying again.
+  } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
+
+  return stolen;
+}
 
 /* --------------------------------------------------------------------------
  * Remove all sparks from the spark queues which should not spark any
@@ -412,11 +149,12 @@ pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
     pool->topBound = pool->top;
 
     debugTrace(DEBUG_sched,
-               "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
+               "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)",
                sparkPoolSize(pool), pool->bottom, pool->top);
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
 
-    elements = pool->elements;
+    ASSERT_WSDEQUE_INVARIANTS(pool);
+
+    elements = (StgClosurePtr *)pool->elements;
 
     /* We have exclusive access to the structure here, so we can reset
        bottom and top counters, and prune invalid sparks. Contents are
@@ -513,10 +251,10 @@ pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
     
     debugTrace(DEBUG_sched,
-               "new spark queue len=%d; (hd=%ld; tl=%ld)",
+               "new spark queue len=%ld; (hd=%ld; tl=%ld)",
                sparkPoolSize(pool), pool->bottom, pool->top);
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
+    ASSERT_WSDEQUE_INVARIANTS(pool);
 }
 
 /* GC for the spark pool, called inside Capability.c for all
@@ -530,11 +268,11 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
     
     pool = cap->sparks;
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
+    ASSERT_WSDEQUE_INVARIANTS(pool);
 
     top = pool->top;
     bottom = pool->bottom;
-    sparkp = pool->elements;
+    sparkp = (StgClosurePtr*)pool->elements;
     modMask = pool->moduloSize;
 
     while (top < bottom) {
@@ -547,7 +285,7 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
     }
 
     debugTrace(DEBUG_sched,
-               "traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
+               "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)",
                sparkPoolSize(pool), pool->bottom, pool->top);
 }