Do not link ghc stage1 using -threaded, only for stage2 or 3
[ghc-hetmet.git] / rts / Sparks.c
index ac11172..2167de0 100644 (file)
@@ -4,35 +4,6 @@
  *
  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
  *
- * The implementation uses Double-Ended Queues with lock-free access
- * (thereby often called "deque") as described in
- *
- * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
- * SPAA'05, July 2005, Las Vegas, USA.
- * ACM 1-58113-986-1/05/0007
- *
- * Author: Jost Berthold MSRC 07-09/2008
- *
- * The DeQue is held as a circular array with known length. Positions
- * of top (read-end) and bottom (write-end) always increase, and the
- * array is accessed with indices modulo array-size. While this bears
- * the risk of overflow, we assume that (with 64 bit indices), a
- * program must run very long to reach that point.
- * 
- * The write end of the queue (position bottom) can only be used with
- * mutual exclusion, i.e. by exactly one caller at a time.  At this
- * end, new items can be enqueued using pushBottom()/newSpark(), and
- * removed using popBottom()/reclaimSpark() (the latter implying a cas
- * synchronisation with potential concurrent readers for the case of
- * just one element).
- * 
- * Multiple readers can steal()/findSpark() from the read end
- * (position top), and are synchronised without a lock, based on a cas
- * of the top position. One reader wins, the others return NULL for a
- * failure.
- * 
- * Both popBottom and steal also return NULL when the queue is empty.
- * 
  -------------------------------------------------------------------------*/
 
 #include "PosixSource.h"
@@ -44,6 +15,7 @@
 #include "RtsUtils.h"
 #include "ParTicky.h"
 #include "Trace.h"
+#include "Prelude.h"
 
 #include "SMP.h" // for cas
 
 
 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 
-/* internal helpers ... */
-
-StgWord roundUp2(StgWord val);
-
-StgWord roundUp2(StgWord val) {
-  StgWord rounded = 1;
-
-  /* StgWord is unsigned anyway, only catch 0 */
-  if (val == 0) {
-    barf("DeQue,roundUp2: invalid size 0 requested");
-  }
-  /* at least 1 bit set, shift up to its place */
-  do {
-    rounded = rounded << 1;
-  } while (0 != (val = val>>1));
-  return rounded;
-}
-
-INLINE_HEADER
-rtsBool casTop(StgPtr addr, StgWord old, StgWord new);
-
-#if !defined(THREADED_RTS)
-/* missing def. in non THREADED RTS, and makes no sense anyway... */
-StgWord cas(StgPtr addr,StgWord old,StgWord new);
-StgWord cas(StgPtr addr,StgWord old,StgWord new) { 
-  barf("cas: not implemented without multithreading");
-  old = new = *addr; /* to avoid gcc warnings */
-}
-#endif
-
-INLINE_HEADER
-rtsBool casTop(StgWord* addr, StgWord old, StgWord new) {
-  StgWord res = cas((StgPtr) addr, old, new);
-  return ((res == old));
-}
-
-/* or simply like this */
-#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
-
-/* -----------------------------------------------------------------------------
- * 
- * Initialising spark pools.
- *
- * -------------------------------------------------------------------------- */
-
-/* constructor */
-SparkPool* initPool(StgWord size) {
-
-  StgWord realsize; 
-  SparkPool *q;
-
-  realsize = roundUp2(size); /* to compute modulo as a bitwise & */
-
-  q = (SparkPool*) stgMallocBytes(sizeof(SparkPool),   /* admin fields */
-                             "newSparkPool");
-  q->elements = (StgClosurePtr*) 
-                stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
-                              "newSparkPool:data space");
-  q->top=0;
-  q->bottom=0;
-  q->topBound=0; /* read by writer, updated each time top is read */
-
-  q->size = realsize;  /* power of 2 */
-  q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */
-
-  ASSERT_SPARK_POOL_INVARIANTS(q); 
-  return q;
-}
-
 void
 initSparkPools( void )
 {
@@ -127,152 +30,18 @@ initSparkPools( void )
     /* walk over the capabilities, allocating a spark pool for each one */
     nat i;
     for (i = 0; i < n_capabilities; i++) {
-      capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
+      capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
     }
 #else
     /* allocate a single spark pool */
-    MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
+    MainCapability->sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
 #endif
 }
 
 void
-freeSparkPool(SparkPool *pool) {
-  /* should not interfere with concurrent findSpark() calls! And
-     nobody should use the pointer any more. We cross our fingers...*/
-  stgFree(pool->elements);
-  stgFree(pool);
-}
-
-/* reclaimSpark(cap): remove a spark from the write end of the queue.
- * Returns the removed spark, and NULL if a race is lost or the pool
- * empty.
- *
- * If only one spark is left in the pool, we synchronise with
- * concurrently stealing threads by using cas to modify the top field.
- * This routine should NEVER be called by a task which does not own
- * the capability. Can this be checked here?
- */
-StgClosure* reclaimSpark(Capability *cap) {
-  SparkPool *deque = cap->sparks;
-  /* also a bit tricky, has to avoid concurrent steal() calls by
-     accessing top with cas, when there is only one element left */
-  StgWord t, b;
-  StgClosurePtr* pos;
-  long  currSize;
-  StgClosurePtr removed;
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  b = deque->bottom;
-  /* "decrement b as a test, see what happens" */
-  deque->bottom = --b; 
-  pos = (deque->elements) + (b & (deque->moduloSize));
-  t = deque->top; /* using topBound would give an *upper* bound, we
-                    need a lower bound. We use the real top here, but
-                    can update the topBound value */
-  deque->topBound = t;
-  currSize = b - t;
-  if (currSize < 0) { /* was empty before decrementing b, set b
-                        consistently and abort */
-    deque->bottom = t;
-    return NULL;
-  }
-  removed = *pos;
-  if (currSize > 0) { /* no danger, still elements in buffer after b-- */
-    return removed;
-  } 
-  /* otherwise, has someone meanwhile stolen the same (last) element?
-     Check and increment top value to know  */
-  if ( !(CASTOP(&(deque->top),t,t+1)) ) {
-    removed = NULL; /* no success, but continue adjusting bottom */
-  }
-  deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
-  deque->topBound = t+1; /* ...and cached top value as well */
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  return removed;
-}
-
-/* -----------------------------------------------------------------------------
- * 
- * findSpark: find a spark on the current Capability that we can fork
- * into a thread.
- *
- * May be called by concurrent threads, which synchronise on top
- * variable. Returns a spark, or NULL if pool empty or race lost.
- *
- -------------------------------------------------------------------------- */
-
-StgClosurePtr steal(SparkPool *deque);
-
-/* steal an element from the read end. Synchronises multiple callers
-   by failing with NULL return. Returns NULL when deque is empty. */
-StgClosurePtr steal(SparkPool *deque) {
-  StgClosurePtr* pos;
-  StgClosurePtr* arraybase;
-  StgWord sz;
-  StgClosurePtr stolen;
-  StgWord b,t; 
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  b = deque->bottom;
-  t = deque->top;
-  if (b - t <= 0 ) { 
-    return NULL; /* already looks empty, abort */
-  }
-
-  /* now access array, see pushBottom() */
-  arraybase = deque->elements;
-  sz = deque->moduloSize;
-  pos = arraybase + (t & sz);  
-  stolen = *pos;
-
-  /* now decide whether we have won */
-  if ( !(CASTOP(&(deque->top),t,t+1)) ) {
-    /* lost the race, someon else has changed top in the meantime */
-    stolen = NULL;    
-  }  /* else: OK, top has been incremented by the cas call */
-
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-  /* return NULL or stolen element */
-  return stolen;
-}
-
-StgClosure *
-findSpark (Capability *cap)
+freeSparkPool (SparkPool *pool)
 {
-  SparkPool *deque = (cap->sparks);
-  StgClosure *stolen;
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  do { 
-    /* keep trying until good spark found or pool looks empty. 
-       TODO is this a good idea? */
-
-    stolen = steal(deque);
-    
-  } while ( ( !stolen /* nothing stolen*/
-             || !closure_SHOULD_SPARK(stolen)) /* spark not OK */
-           && !looksEmpty(deque)); /* run empty, give up */
-
-  /* return stolen element */
-  return stolen;
-}
-
-
-/* "guesses" whether a deque is empty. Can return false negatives in
-   presence of concurrent steal() calls, and false positives in
-   presence of a concurrent pushBottom().*/
-rtsBool looksEmpty(SparkPool* deque) {
-  StgWord t = deque->top;
-  StgWord b = deque->bottom;
-  /* try to prefer false negatives by reading top first */
-  return (b - t <= 0);
-  /* => array is *never* completely filled, always 1 place free! */
+    freeWSDeque(pool);
 }
 
 /* -----------------------------------------------------------------------------
@@ -282,79 +51,28 @@ rtsBool looksEmpty(SparkPool* deque) {
  * -------------------------------------------------------------------------- */
 
 void
-createSparkThread (Capability *cap, StgClosure *p)
+createSparkThread (Capability *cap)
 {
     StgTSO *tso;
 
-    tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
-    appendToRunQueue(cap,tso);
-}
-
-/* -----------------------------------------------------------------------------
- * 
- * Create a new spark
- *
- * -------------------------------------------------------------------------- */
+    tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, 
+                          &base_GHCziConc_runSparks_closure);
 
-#define DISCARD_NEW
-void pushBottom(SparkPool* deque, StgClosurePtr elem);
+    postEvent(cap, EVENT_CREATE_SPARK_THREAD, 0, tso->id);
 
-/* enqueue an element. Should always succeed by resizing the array
-   (not implemented yet, silently fails in that case). */
-void pushBottom(SparkPool* deque, StgClosurePtr elem) {
-  StgWord t;
-  StgClosurePtr* pos;
-  StgWord sz = deque->moduloSize; 
-  StgWord b = deque->bottom;
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
-  /* we try to avoid reading deque->top (accessed by all) and use
-     deque->topBound (accessed only by writer) instead. 
-     This is why we do not just call empty(deque) here.
-  */
-  t = deque->topBound;
-  if ( b - t >= sz ) { /* nota bene: sz == deque->size - 1, thus ">=" */
-    /* could be full, check the real top value in this case */
-    t = deque->top;
-    deque->topBound = t;
-    if (b - t >= sz) { /* really no space left :-( */
-      /* reallocate the array, copying the values. Concurrent steal()s
-        will in the meantime use the old one and modify only top.
-        This means: we cannot safely free the old space! Can keep it
-        on a free list internally here...
-
-        Potential bug in combination with steal(): if array is
-        replaced, it is unclear which one concurrent steal operations
-        use. Must read the array base address in advance in steal().
-      */
-#if defined(DISCARD_NEW)
-      ASSERT_SPARK_POOL_INVARIANTS(deque); 
-      return; /* for now, silently fail */
-#else
-      /* could make room by incrementing the top position here.  In
-       * this case, should use CASTOP. If this fails, someone else has
-       * removed something, and new room will be available.
-       */
-      ASSERT_SPARK_POOL_INVARIANTS(deque); 
-#endif
-    }
-  }
-  pos = (deque->elements) + (b & sz);
-  *pos = elem;
-  (deque->bottom)++;
-
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-  return;
+    appendToRunQueue(cap,tso);
 }
 
+/* --------------------------------------------------------------------------
+ * newSpark: create a new spark, as a result of calling "par"
+ * Called directly from STG.
+ * -------------------------------------------------------------------------- */
 
-/* this is called as a direct C-call from Stg => we need to keep the
-   pool in a register (???) */
 StgInt
 newSpark (StgRegTable *reg, StgClosure *p)
 {
-    SparkPool *pool = (reg->rCurrentTSO->cap->sparks);
+    Capability *cap = regTableToCapability(reg);
+    SparkPool *pool = cap->sparks;
 
     /* I am not sure whether this is the right thing to do.
      * Maybe it is better to exploit the tag information
@@ -362,17 +80,42 @@ newSpark (StgRegTable *reg, StgClosure *p)
      */
     p = UNTAG_CLOSURE(p);
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
-
     if (closure_SHOULD_SPARK(p)) {
-      pushBottom(pool,p);
+        pushWSDeque(pool,p);
     }  
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
+    cap->sparks_created++;
+
+    postEvent(cap, EVENT_CREATE_SPARK, cap->r.rCurrentTSO->id, 0);
+
     return 1;
 }
 
+/* -----------------------------------------------------------------------------
+ * 
+ * tryStealSpark: try to steal a spark from a Capability.
+ *
+ * Returns a valid spark, or NULL if the pool was empty, and can
+ * occasionally return NULL if there was a race with another thread
+ * stealing from the same pool.  In this case, try again later.
+ *
+ -------------------------------------------------------------------------- */
+
+StgClosure *
+tryStealSpark (Capability *cap)
+{
+  SparkPool *pool = cap->sparks;
+  StgClosure *stolen;
+
+  do { 
+      stolen = stealWSDeque_(pool); 
+      // use the no-loopy version, stealWSDeque_(), since if we get a
+      // spurious NULL here the caller may want to try stealing from
+      // other pools before trying again.
+  } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
 
+  return stolen;
+}
 
 /* --------------------------------------------------------------------------
  * Remove all sparks from the spark queues which should not spark any
@@ -381,13 +124,14 @@ newSpark (StgRegTable *reg, StgClosure *p)
  * the spark pool only contains sparkable closures.
  * -------------------------------------------------------------------------- */
 
-static void
-pruneSparkQueue (Capability *cap)
+void
+pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
 { 
     SparkPool *pool;
-    StgClosurePtr spark, evacspark, *elements;
+    StgClosurePtr spark, tmp, *elements;
     nat n, pruned_sparks; // stats only
     StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
+    const StgInfoTable *info;
     
     PAR_TICKY_MARK_SPARK_QUEUE_START();
     
@@ -396,12 +140,26 @@ pruneSparkQueue (Capability *cap)
     
     pool = cap->sparks;
     
+    // it is possible that top > bottom, indicating an empty pool.  We
+    // fix that here; this is only necessary because the loop below
+    // assumes it.
+    if (pool->top > pool->bottom)
+        pool->top = pool->bottom;
+
+    // Take this opportunity to reset top/bottom modulo the size of
+    // the array, to avoid overflow.  This is only possible because no
+    // stealing is happening during GC.
+    pool->bottom  -= pool->top & ~pool->moduloSize;
+    pool->top     &= pool->moduloSize;
+    pool->topBound = pool->top;
+
     debugTrace(DEBUG_sched,
-               "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
+               "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)",
                sparkPoolSize(pool), pool->bottom, pool->top);
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
 
-    elements = pool->elements;
+    ASSERT_WSDEQUE_INVARIANTS(pool);
+
+    elements = (StgClosurePtr *)pool->elements;
 
     /* We have exclusive access to the structure here, so we can reset
        bottom and top counters, and prune invalid sparks. Contents are
@@ -450,13 +208,31 @@ pruneSparkQueue (Capability *cap)
         botInd, otherwise move on */
       spark = elements[currInd];
 
-      /* if valuable work: shift inside the pool */
-      if ( closure_SHOULD_SPARK(spark) ) {
-       elements[botInd] = spark; // keep entry (new address)
-       botInd++;
-       n++;
-      } else { 
-       pruned_sparks++; // discard spark
+      // We have to be careful here: in the parallel GC, another
+      // thread might evacuate this closure while we're looking at it,
+      // so grab the info pointer just once.
+      info = spark->header.info;
+      if (IS_FORWARDING_PTR(info)) {
+          tmp = (StgClosure*)UN_FORWARDING_PTR(info);
+          /* if valuable work: shift inside the pool */
+          if (closure_SHOULD_SPARK(tmp)) {
+              elements[botInd] = tmp; // keep entry (new address)
+              botInd++;
+              n++;
+          } else {
+              pruned_sparks++; // discard spark
+              cap->sparks_pruned++;
+          }
+      } else {
+          if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) {
+              elements[botInd] = spark; // keep entry (new address)
+              evac (user, &elements[botInd]);
+              botInd++;
+              n++;
+          } else {
+              pruned_sparks++; // discard spark
+              cap->sparks_pruned++;
+          }
       }
       currInd++;
 
@@ -480,19 +256,10 @@ pruneSparkQueue (Capability *cap)
     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
     
     debugTrace(DEBUG_sched,
-               "new spark queue len=%d; (hd=%ld; tl=%ld)",
+               "new spark queue len=%ld; (hd=%ld; tl=%ld)",
                sparkPoolSize(pool), pool->bottom, pool->top);
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
-}
-
-void
-pruneSparkQueues (void)
-{
-    nat i;
-    for (i = 0; i < n_capabilities; i++) {
-        pruneSparkQueue(&capabilities[i]);
-    }
+    ASSERT_WSDEQUE_INVARIANTS(pool);
 }
 
 /* GC for the spark pool, called inside Capability.c for all
@@ -506,11 +273,11 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
     
     pool = cap->sparks;
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
+    ASSERT_WSDEQUE_INVARIANTS(pool);
 
     top = pool->top;
     bottom = pool->bottom;
-    sparkp = pool->elements;
+    sparkp = (StgClosurePtr*)pool->elements;
     modMask = pool->moduloSize;
 
     while (top < bottom) {
@@ -523,12 +290,11 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
     }
 
     debugTrace(DEBUG_sched,
-               "traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
+               "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)",
                sparkPoolSize(pool), pool->bottom, pool->top);
 }
 
 /* ----------------------------------------------------------------------------
-
  * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
  * capabilities) and its size. Accesses all spark pools and equally
  * distributes the sparks among them.
@@ -537,7 +303,8 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
  * -------------------------------------------------------------------------- */
 void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
 
-void balanceSparkPoolsCaps(nat n_caps, Capability caps[]) {
+void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, 
+                           Capability caps[] STG_UNUSED) {
   barf("not implemented");
 }