Do not link ghc stage1 using -threaded, only for stage2 or 3
[ghc-hetmet.git] / rts / Sparks.c
index 68ad19d..2167de0 100644 (file)
@@ -1,52 +1,27 @@
 /* ---------------------------------------------------------------------------
  *
- * (c) The GHC Team, 2000-2006
+ * (c) The GHC Team, 2000-2008
  *
  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
  *
- * -------------------------------------------------------------------------*/
+ -------------------------------------------------------------------------*/
 
 #include "PosixSource.h"
 #include "Rts.h"
+#include "Storage.h"
 #include "Schedule.h"
 #include "SchedAPI.h"
-#include "Storage.h"
 #include "RtsFlags.h"
 #include "RtsUtils.h"
 #include "ParTicky.h"
-# if defined(PARALLEL_HASKELL)
-# include "ParallelRts.h"
-# include "GranSimRts.h"   // for GR_...
-# elif defined(GRAN)
-# include "GranSimRts.h"
-# endif
-#include "Sparks.h"
 #include "Trace.h"
+#include "Prelude.h"
 
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
-
-static INLINE_ME void bump_hd (StgSparkPool *p)
-{ p->hd++; if (p->hd == p->lim) p->hd = p->base; }
-
-static INLINE_ME void bump_tl (StgSparkPool *p)
-{ p->tl++; if (p->tl == p->lim) p->tl = p->base; }
+#include "SMP.h" // for cas
 
-/* -----------------------------------------------------------------------------
- * 
- * Initialising spark pools.
- *
- * -------------------------------------------------------------------------- */
+#include "Sparks.h"
 
-static void 
-initSparkPool(StgSparkPool *pool)
-{
-    pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
-                               * sizeof(StgClosure *),
-                               "initSparkPools");
-    pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
-    pool->hd  = pool->base;
-    pool->tl  = pool->base;
-}
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 
 void
 initSparkPools( void )
@@ -55,169 +30,282 @@ initSparkPools( void )
     /* walk over the capabilities, allocating a spark pool for each one */
     nat i;
     for (i = 0; i < n_capabilities; i++) {
-       initSparkPool(&capabilities[i].r.rSparks);
+      capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
     }
 #else
     /* allocate a single spark pool */
-    initSparkPool(&MainCapability.r.rSparks);
+    MainCapability->sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
 #endif
 }
 
+void
+freeSparkPool (SparkPool *pool)
+{
+    freeWSDeque(pool);
+}
+
 /* -----------------------------------------------------------------------------
  * 
- * findSpark: find a spark on the current Capability that we can fork
- * into a thread.
+ * Turn a spark into a real thread
  *
  * -------------------------------------------------------------------------- */
 
-StgClosure *
-findSpark (Capability *cap)
+void
+createSparkThread (Capability *cap)
 {
-    StgSparkPool *pool;
-    StgClosure *spark;
-    
-    pool = &(cap->r.rSparks);
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
-
-    while (pool->hd != pool->tl) {
-       spark = *pool->hd;
-       bump_hd(pool);
-       if (closure_SHOULD_SPARK(spark)) {
-#ifdef GRAN
-           if (RtsFlags.ParFlags.ParStats.Sparks) 
-               DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
-                                GR_STEALING, ((StgTSO *)NULL), spark, 
-                                0, 0 /* spark_queue_len(ADVISORY_POOL) */);
-#endif
-           return spark;
-       }
-    }
-    // spark pool is now empty
-    return NULL;
+    StgTSO *tso;
+
+    tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, 
+                          &base_GHCziConc_runSparks_closure);
+
+    postEvent(cap, EVENT_CREATE_SPARK_THREAD, 0, tso->id);
+
+    appendToRunQueue(cap,tso);
+}
+
+/* --------------------------------------------------------------------------
+ * newSpark: create a new spark, as a result of calling "par"
+ * Called directly from STG.
+ * -------------------------------------------------------------------------- */
+
+StgInt
+newSpark (StgRegTable *reg, StgClosure *p)
+{
+    Capability *cap = regTableToCapability(reg);
+    SparkPool *pool = cap->sparks;
+
+    /* I am not sure whether this is the right thing to do.
+     * Maybe it is better to exploit the tag information
+     * instead of throwing it away?
+     */
+    p = UNTAG_CLOSURE(p);
+
+    if (closure_SHOULD_SPARK(p)) {
+        pushWSDeque(pool,p);
+    }  
+
+    cap->sparks_created++;
+
+    postEvent(cap, EVENT_CREATE_SPARK, cap->r.rCurrentTSO->id, 0);
+
+    return 1;
 }
 
 /* -----------------------------------------------------------------------------
- * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
- * implicit slide i.e. after marking all sparks are at the beginning of the
- * spark pool and the spark pool only contains sparkable closures 
+ * 
+ * tryStealSpark: try to steal a spark from a Capability.
+ *
+ * Returns a valid spark, or NULL if the pool was empty, and can
+ * occasionally return NULL if there was a race with another thread
+ * stealing from the same pool.  In this case, try again later.
+ *
+ -------------------------------------------------------------------------- */
+
+StgClosure *
+tryStealSpark (Capability *cap)
+{
+  SparkPool *pool = cap->sparks;
+  StgClosure *stolen;
+
+  do { 
+      stolen = stealWSDeque_(pool); 
+      // use the no-loopy version, stealWSDeque_(), since if we get a
+      // spurious NULL here the caller may want to try stealing from
+      // other pools before trying again.
+  } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
+
+  return stolen;
+}
+
+/* --------------------------------------------------------------------------
+ * Remove all sparks from the spark queues which should not spark any
+ * more.  Called after GC. We assume exclusive access to the structure
+ * and replace  all sparks in the queue, see explanation below. At exit,
+ * the spark pool only contains sparkable closures.
  * -------------------------------------------------------------------------- */
 
 void
-markSparkQueue (evac_fn evac)
+pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
 { 
-    StgClosure **sparkp, **to_sparkp;
-    nat i, n, pruned_sparks; // stats only
-    StgSparkPool *pool;
-    Capability *cap;
+    SparkPool *pool;
+    StgClosurePtr spark, tmp, *elements;
+    nat n, pruned_sparks; // stats only
+    StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
+    const StgInfoTable *info;
     
     PAR_TICKY_MARK_SPARK_QUEUE_START();
     
     n = 0;
     pruned_sparks = 0;
-    for (i = 0; i < n_capabilities; i++) {
-       cap = &capabilities[i];
-       pool = &(cap->r.rSparks);
-       
-       ASSERT_SPARK_POOL_INVARIANTS(pool);
+    
+    pool = cap->sparks;
+    
+    // it is possible that top > bottom, indicating an empty pool.  We
+    // fix that here; this is only necessary because the loop below
+    // assumes it.
+    if (pool->top > pool->bottom)
+        pool->top = pool->bottom;
+
+    // Take this opportunity to reset top/bottom modulo the size of
+    // the array, to avoid overflow.  This is only possible because no
+    // stealing is happening during GC.
+    pool->bottom  -= pool->top & ~pool->moduloSize;
+    pool->top     &= pool->moduloSize;
+    pool->topBound = pool->top;
+
+    debugTrace(DEBUG_sched,
+               "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)",
+               sparkPoolSize(pool), pool->bottom, pool->top);
+
+    ASSERT_WSDEQUE_INVARIANTS(pool);
+
+    elements = (StgClosurePtr *)pool->elements;
+
+    /* We have exclusive access to the structure here, so we can reset
+       bottom and top counters, and prune invalid sparks. Contents are
+       copied in-place if they are valuable, otherwise discarded. The
+       routine uses "real" indices t and b, starts by computing them
+       as the modulus size of top and bottom,
+
+       Copying:
+
+       At the beginning, the pool structure can look like this:
+       ( bottom % size >= top % size , no wrap-around)
+                  t          b
+       ___________***********_________________
+
+       or like this ( bottom % size < top % size, wrap-around )
+                  b         t
+       ***********__________******************
+       As we need to remove useless sparks anyway, we make one pass
+       between t and b, moving valuable content to b and subsequent
+       cells (wrapping around when the size is reached).
+
+                     b      t
+       ***********OOO_______XX_X__X?**********
+                     ^____move?____/
+
+       After this movement, botInd becomes the new bottom, and old
+       bottom becomes the new top index, both as indices in the array
+       size range.
+    */
+    // starting here
+    currInd = (pool->top) & (pool->moduloSize); // mod
+
+    // copies of evacuated closures go to space from botInd on
+    // we keep oldBotInd to know when to stop
+    oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
+
+    // on entry to loop, we are within the bounds
+    ASSERT( currInd < pool->size && botInd  < pool->size );
+
+    while (currInd != oldBotInd ) {
+      /* must use != here, wrap-around at size
+        subtle: loop not entered if queue empty
+       */
+
+      /* check element at currInd. if valuable, evacuate and move to
+        botInd, otherwise move on */
+      spark = elements[currInd];
+
+      // We have to be careful here: in the parallel GC, another
+      // thread might evacuate this closure while we're looking at it,
+      // so grab the info pointer just once.
+      info = spark->header.info;
+      if (IS_FORWARDING_PTR(info)) {
+          tmp = (StgClosure*)UN_FORWARDING_PTR(info);
+          /* if valuable work: shift inside the pool */
+          if (closure_SHOULD_SPARK(tmp)) {
+              elements[botInd] = tmp; // keep entry (new address)
+              botInd++;
+              n++;
+          } else {
+              pruned_sparks++; // discard spark
+              cap->sparks_pruned++;
+          }
+      } else {
+          if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) {
+              elements[botInd] = spark; // keep entry (new address)
+              evac (user, &elements[botInd]);
+              botInd++;
+              n++;
+          } else {
+              pruned_sparks++; // discard spark
+              cap->sparks_pruned++;
+          }
+      }
+      currInd++;
 
-#if defined(PARALLEL_HASKELL)
-       // stats only
-       n = 0;
-       pruned_sparks = 0;
-#endif
-       
-       sparkp = pool->hd;
-       to_sparkp = pool->hd;
-       while (sparkp != pool->tl) {
-           ASSERT(to_sparkp<=sparkp);
-           ASSERT(*sparkp!=NULL);
-           ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
-           // ToDo?: statistics gathering here (also for GUM!)
-           if (closure_SHOULD_SPARK(*sparkp)) {
-               evac(sparkp);
-               *to_sparkp++ = *sparkp;
-               n++;
-           } else {
-               pruned_sparks++;
-           }
-           sparkp++;
-           if (sparkp == pool->lim) {
-               sparkp = pool->base;
-           }
-       }
-       pool->tl = to_sparkp;
-       
-       PAR_TICKY_MARK_SPARK_QUEUE_END(n);
-       
-#if defined(PARALLEL_HASKELL)
-       debugTrace(DEBUG_sched, 
-                  "marked %d sparks and pruned %d sparks on [%x]",
-                  n, pruned_sparks, mytid);
-#else
-       debugTrace(DEBUG_sched, 
-                  "marked %d sparks and pruned %d sparks",
-                  n, pruned_sparks);
-#endif
-       
-       debugTrace(DEBUG_sched,
-                  "new spark queue len=%d; (hd=%p; tl=%p)\n",
-                  sparkPoolSize(pool), pool->hd, pool->tl);
-    }
-}
+      // in the loop, we may reach the bounds, and instantly wrap around
+      ASSERT( currInd <= pool->size && botInd <= pool->size );
+      if ( currInd == pool->size ) { currInd = 0; }
+      if ( botInd == pool->size )  { botInd = 0;  }
 
-/* -----------------------------------------------------------------------------
- * 
- * Turn a spark into a real thread
- *
- * -------------------------------------------------------------------------- */
+    } // while-loop over spark pool elements
 
-void
-createSparkThread (Capability *cap, StgClosure *p)
-{
-    StgTSO *tso;
+    ASSERT(currInd == oldBotInd);
 
-    tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
-    appendToRunQueue(cap,tso);
-}
+    pool->top = oldBotInd; // where we started writing
+    pool->topBound = pool->top;
 
-/* -----------------------------------------------------------------------------
- * 
- * Create a new spark
- *
- * -------------------------------------------------------------------------- */
+    pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); 
+    // first free place we did not use (corrected by wraparound)
 
-#define DISCARD_NEW
+    PAR_TICKY_MARK_SPARK_QUEUE_END(n);
 
-StgInt
-newSpark (StgRegTable *reg, StgClosure *p)
+    debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
+    
+    debugTrace(DEBUG_sched,
+               "new spark queue len=%ld; (hd=%ld; tl=%ld)",
+               sparkPoolSize(pool), pool->bottom, pool->top);
+
+    ASSERT_WSDEQUE_INVARIANTS(pool);
+}
+
+/* GC for the spark pool, called inside Capability.c for all
+   capabilities in turn. Blindly "evac"s complete spark pool. */
+void
+traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
 {
-    StgSparkPool *pool = &(reg->rSparks);
+    StgClosure **sparkp;
+    SparkPool *pool;
+    StgWord top,bottom, modMask;
+    
+    pool = cap->sparks;
+
+    ASSERT_WSDEQUE_INVARIANTS(pool);
+
+    top = pool->top;
+    bottom = pool->bottom;
+    sparkp = (StgClosurePtr*)pool->elements;
+    modMask = pool->moduloSize;
+
+    while (top < bottom) {
+    /* call evac for all closures in range (wrap-around via modulo)
+     * In GHC-6.10, evac takes an additional 1st argument to hold a
+     * GC-specific register, see rts/sm/GC.c::mark_root()
+     */
+      evac( user , sparkp + (top & modMask) ); 
+      top++;
+    }
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
+    debugTrace(DEBUG_sched,
+               "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)",
+               sparkPoolSize(pool), pool->bottom, pool->top);
+}
 
-    if (closure_SHOULD_SPARK(p)) {
-#ifdef DISCARD_NEW
-       StgClosure **new_tl;
-       new_tl = pool->tl + 1;
-       if (new_tl == pool->lim) { new_tl = pool->base; }
-       if (new_tl != pool->hd) {
-           *pool->tl = p;
-           pool->tl = new_tl;
-       } else if (!closure_SHOULD_SPARK(*pool->hd)) {
-           // if the old closure is not sparkable, discard it and
-           // keep the new one.  Otherwise, keep the old one.
-           *pool->tl = p;
-           bump_hd(pool);
-       }
-#else  /* DISCARD OLD */
-       *pool->tl = p;
-       bump_tl(pool);
-       if (pool->tl == pool->hd) { bump_hd(pool); }
-#endif
-    }  
+/* ----------------------------------------------------------------------------
+ * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
+ * capabilities) and its size. Accesses all spark pools and equally
+ * distributes the sparks among them.
+ *
+ * Could be called after GC, before Cap. release, from scheduler. 
+ * -------------------------------------------------------------------------- */
+void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
 
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
-    return 1;
+void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, 
+                           Capability caps[] STG_UNUSED) {
+  barf("not implemented");
 }
 
 #else
@@ -229,6 +317,7 @@ newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
     return 1;
 }
 
+
 #endif /* PARALLEL_HASKELL || THREADED_RTS */
 
 
@@ -236,6 +325,8 @@ newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
  * 
  * GRAN & PARALLEL_HASKELL stuff beyond here.
  *
+ *  TODO "nuke" this!
+ *
  * -------------------------------------------------------------------------- */
 
 #if defined(PARALLEL_HASKELL) || defined(GRAN)