Work stealing for sparks
authorberthold@mathematik.uni-marburg.de <unknown>
Mon, 15 Sep 2008 13:28:46 +0000 (13:28 +0000)
committerberthold@mathematik.uni-marburg.de <unknown>
Mon, 15 Sep 2008 13:28:46 +0000 (13:28 +0000)
   Spark stealing support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.

  Spark pools are per capability, separately allocated and held in the Capability
  structure. The implementation uses Double-Ended Queues (deque) and cas-protected
  access.

  The write end of the queue (position bottom) can only be used with
  mutual exclusion, i.e. by exactly one caller at a time.
  Multiple readers can steal()/findSpark() from the read end
  (position top), and are synchronised without a lock, based on a cas
  of the top position. One reader wins, the others return NULL for a
  failure.

  Work stealing is called when Capabilities find no other work (inside yieldCapability),
  and tries all capabilities 0..n-1 twice, unless a theft succeeds.

  Inside schedulePushWork, all considered cap.s (those which were idle and could
  be grabbed) are woken up. Future versions should wake up capabilities immediately when
  putting a new spark in the local pool, from newSpark().

Patch has been re-recorded due to conflicting bugfixes in the sparks.c, also fixing a
(strange) conflict in the scheduler.

includes/Regs.h
includes/RtsTypes.h
rts/Capability.c
rts/Capability.h
rts/Schedule.c
rts/Sparks.c
rts/Sparks.h

index 0f974ec..cf083c9 100644 (file)
 #ifndef REGS_H
 #define REGS_H
 
-/*
- * Spark pools: used to store pending sparks 
- *  (THREADED_RTS & PARALLEL_HASKELL only)
- * This is a circular buffer.  Invariants:
- *    - base <= hd < lim
- *    - base <= tl < lim
- *    - if hd==tl, then the pool is empty.
- *    - if hd == tl+1, then the pool is full.
- * Adding to the pool is done by assigning to *tl++ (wrapping round as
- * necessary).  When adding to a full pool, we have the option of
- * throwing away either the oldest (hd++) or the most recent (tl--) entry.
- */
-typedef struct StgSparkPool_ {
-  StgClosure **base;
-  StgClosure **lim;
-  StgClosure **hd;
-  StgClosure **tl;
-} StgSparkPool;
-
-#define ASSERT_SPARK_POOL_INVARIANTS(p)                \
-  ASSERT((p)->base <= (p)->hd);                        \
-  ASSERT((p)->hd < (p)->lim);                  \
-  ASSERT((p)->base <= (p)->tl);                        \
-  ASSERT((p)->tl < (p)->lim);
-
 typedef struct {
   StgFunPtr      stgGCEnter1;
   StgFunPtr      stgGCFun;
@@ -120,7 +95,6 @@ typedef struct StgRegTable_ {
   StgWord         rmp_result1[MP_INT_WORDS];
   StgWord         rmp_result2[MP_INT_WORDS];
   StgWord         rRet;  // holds the return code of the thread
-  StgSparkPool    rSparks;     /* per-task spark pool */
 } StgRegTable;
 
 #if IN_STG_CODE
@@ -163,10 +137,6 @@ typedef struct StgRegTable_ {
 #define SAVE_CurrentTSO     (BaseReg->rCurrentTSO)
 #define SAVE_CurrentNursery (BaseReg->rCurrentNursery)
 #define SAVE_HpAlloc        (BaseReg->rHpAlloc)
-#define SAVE_SparkHd       (BaseReg->rSparks.hd)
-#define SAVE_SparkTl        (BaseReg->rSparks.tl)
-#define SAVE_SparkBase      (BaseReg->rSparks.base)
-#define SAVE_SparkLim      (BaseReg->rSparks.lim)
 
 /* We sometimes need to save registers across a C-call, eg. if they
  * are clobbered in the standard calling convention.  We define the
@@ -401,30 +371,6 @@ GLOBAL_REG_DECL(bdescr *,HpAlloc,REG_HpAlloc)
 #define HpAlloc (BaseReg->rHpAlloc)
 #endif
 
-#if defined(REG_SparkHd) && !defined(NO_GLOBAL_REG_DECLS)
-GLOBAL_REG_DECL(bdescr *,SparkHd,REG_SparkHd)
-#else
-#define SparkHd (BaseReg->rSparks.hd)
-#endif
-
-#if defined(REG_SparkTl) && !defined(NO_GLOBAL_REG_DECLS)
-GLOBAL_REG_DECL(bdescr *,SparkTl,REG_SparkTl)
-#else
-#define SparkTl (BaseReg->rSparks.tl)
-#endif
-
-#if defined(REG_SparkBase) && !defined(NO_GLOBAL_REG_DECLS)
-GLOBAL_REG_DECL(bdescr *,SparkBase,REG_SparkBase)
-#else
-#define SparkBase (BaseReg->rSparks.base)
-#endif
-
-#if defined(REG_SparkLim) && !defined(NO_GLOBAL_REG_DECLS)
-GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
-#else
-#define SparkLim (BaseReg->rSparks.lim)
-#endif
-
 /* -----------------------------------------------------------------------------
    Get absolute function pointers from the register table, to save
    code space.  On x86, 
@@ -665,38 +611,6 @@ GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
 #define CALLER_RESTORE_HpAlloc         /* nothing */
 #endif
 
-#ifdef CALLER_SAVES_SparkHd
-#define CALLER_SAVE_SparkHd            SAVE_SparkHd = SparkHd;
-#define CALLER_RESTORE_SparkHd         SparkHd = SAVE_SparkHd;
-#else
-#define CALLER_SAVE_SparkHd            /* nothing */
-#define CALLER_RESTORE_SparkHd         /* nothing */
-#endif
-
-#ifdef CALLER_SAVES_SparkTl
-#define CALLER_SAVE_SparkTl            SAVE_SparkTl = SparkTl;
-#define CALLER_RESTORE_SparkTl         SparkTl = SAVE_SparkTl;
-#else
-#define CALLER_SAVE_SparkTl            /* nothing */
-#define CALLER_RESTORE_SparkTl         /* nothing */
-#endif
-
-#ifdef CALLER_SAVES_SparkBase
-#define CALLER_SAVE_SparkBase          SAVE_SparkBase = SparkBase;
-#define CALLER_RESTORE_SparkBase       SparkBase = SAVE_SparkBase;
-#else
-#define CALLER_SAVE_SparkBase          /* nothing */
-#define CALLER_RESTORE_SparkBase       /* nothing */
-#endif
-
-#ifdef CALLER_SAVES_SparkLim
-#define CALLER_SAVE_SparkLim                   SAVE_SparkLim = SparkLim;
-#define CALLER_RESTORE_SparkLim                SparkLim = SAVE_SparkLim;
-#else
-#define CALLER_SAVE_SparkLim                   /* nothing */
-#define CALLER_RESTORE_SparkLim        /* nothing */
-#endif
-
 #endif /* IN_STG_CODE */
 
 /* ----------------------------------------------------------------------------
@@ -731,10 +645,6 @@ GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
   CALLER_SAVE_HpLim                            \
   CALLER_SAVE_CurrentTSO                       \
   CALLER_SAVE_CurrentNursery                   \
-  CALLER_SAVE_SparkHd                          \
-  CALLER_SAVE_SparkTl                          \
-  CALLER_SAVE_SparkBase                                \
-  CALLER_SAVE_SparkLim                          \
   CALLER_SAVE_Base
 
 #define CALLER_RESTORE_USER                    \
@@ -763,11 +673,7 @@ GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
   CALLER_RESTORE_Hp                            \
   CALLER_RESTORE_HpLim                         \
   CALLER_RESTORE_CurrentTSO                    \
-  CALLER_RESTORE_CurrentNursery                        \
-  CALLER_RESTORE_SparkHd                       \
-  CALLER_RESTORE_SparkTl                       \
-  CALLER_RESTORE_SparkBase                     \
-  CALLER_RESTORE_SparkLim
+  CALLER_RESTORE_CurrentNursery
 
 #else /* not IN_STG_CODE */
 
index 9e8c7b8..3510ee7 100644 (file)
@@ -37,6 +37,40 @@ typedef enum {
    Types specific to the parallel runtime system.
 */
 
+
+/* Spark pools: used to store pending sparks 
+ *  (THREADED_RTS & PARALLEL_HASKELL only)
+ * Implementation uses a DeQue to enable concurrent read accesses at
+ * the top end.
+ */
+typedef struct  SparkPool_ {
+  /* Size of elements array. Used for modulo calculation: we round up
+     to powers of 2 and use the dyadic log (modulo == bitwise &) */
+  StgWord size; 
+  StgWord moduloSize; /* bitmask for modulo */
+
+  /* top, index where multiple readers steal() (protected by a cas) */
+  StgWord top;
+
+  /* bottom, index of next free place where one writer can push
+     elements. This happens unsynchronised. */
+  StgWord bottom;
+  /* both position indices are continuously incremented, and used as
+     an index modulo the current array size. */
+  
+  /* lower bound on the current top value. This is an internal
+     optimisation to avoid unnecessarily accessing the top field
+     inside pushBottom */
+  StgWord topBound;
+
+  /* The elements array */
+  StgClosurePtr* elements;
+  /*  Please note: the dataspace cannot follow the admin fields
+      immediately, as it should be possible to enlarge it without
+      disposing the old one automatically (as realloc would)! */
+
+} SparkPool;
+
 typedef ullong        rtsTime;
 
 #if defined(PAR)
index 4d5748c..516aaa5 100644 (file)
@@ -54,6 +54,55 @@ globalWorkToDo (void)
 #endif
 
 #if defined(THREADED_RTS)
+rtsBool stealWork( Capability *cap) {
+  /* use the normal Sparks.h interface (internally modified to enable
+     concurrent stealing) 
+     and immediately turn the spark into a thread when successful
+  */
+  Capability *robbed;
+  SparkPool *pool;
+  StgClosurePtr spark;
+  rtsBool success = rtsFalse;
+  nat i = 0;
+
+  debugTrace(DEBUG_sched,
+            "cap %d: Trying to steal work from other capabilities", 
+            cap->no);
+
+  if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
+
+  /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
+     start at a random place instead of 0 as well.  */
+  for ( i=0 ; i < n_capabilities ; i++ ) {
+    robbed = &capabilities[i];
+    if (cap == robbed)  // ourselves...
+      continue;
+
+    if (emptySparkPoolCap(robbed)) // nothing to steal here
+      continue;
+    
+    spark = findSpark(robbed);
+
+    if (spark == NULL && !emptySparkPoolCap(robbed)) {
+      spark = findSpark(robbed); // lost race in concurrent access, try again
+    }
+    if (spark != NULL) {
+      debugTrace(DEBUG_sched,
+                "cap %d: Stole a spark from capability %d",
+                cap->no, robbed->no);
+
+      createSparkThread(cap,spark);
+      success = rtsTrue;
+      break; // got one, leave the loop
+    }
+ // otherwise: no success, try next one
+  }
+  debugTrace(DEBUG_sched,
+            "Leaving work stealing routine (%s)",
+            success?"one spark stolen":"thefts did not succeed");
+  return success;
+}
+
 STATIC_INLINE rtsBool
 anyWorkForMe( Capability *cap, Task *task )
 {
@@ -73,9 +122,11 @@ anyWorkForMe( Capability *cap, Task *task )
        if (emptyRunQueue(cap)) {
            return !emptySparkPoolCap(cap)
                || !emptyWakeupQueue(cap)
-               || globalWorkToDo();
-       } else
+               || globalWorkToDo()
+               || stealWork(cap); /* if all false: try to steal work */
+       } else {
            return cap->run_queue_hd->bound == NULL;
+       }
     }
 }
 #endif
@@ -778,7 +829,7 @@ void
 freeCapability (Capability *cap) {
     stgFree(cap->mut_lists);
 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
-    freeSparkPool(&cap->r.rSparks);
+    freeSparkPool(cap->sparks);
 #endif
 }
 
index 70d9ee9..5945895 100644 (file)
@@ -23,6 +23,7 @@
 #ifndef CAPABILITY_H
 #define CAPABILITY_H
 
+#include "RtsTypes.h"
 #include "RtsFlags.h"
 #include "Task.h"
 
@@ -98,6 +99,9 @@ struct Capability_ {
     StgTRecChunk *free_trec_chunks;
     StgTRecHeader *free_trec_headers;
     nat transaction_tokens;
+
+    SparkPool *sparks;
+
 }; // typedef Capability, defined in RtsAPI.h
 
 
index 626c097..09150fd 100644 (file)
@@ -137,17 +137,17 @@ static Capability *schedule (Capability *initialCapability, Task *task);
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
-#if defined(THREADED_RTS)
-static void schedulePushWork(Capability *cap, Task *task);
-#endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL)
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+static void schedulePushWork(Capability *cap, Task *task);
 static rtsBool scheduleGetRemoteWork(Capability *cap);
+#if defined(PARALLEL_HASKELL)
 static void scheduleSendPendingMessages(void);
+#endif
 static void scheduleActivateSpark(Capability *cap);
 #endif
 static void schedulePostRunThread(Capability *cap, StgTSO *t);
@@ -291,13 +291,15 @@ schedule (Capability *initialCapability, Task *task)
       } else {
          // Yield the capability to higher-priority tasks if necessary.
          yieldCapability(&cap, task);
+         /* inside yieldCapability, attempts to steal work from other
+            capabilities, unless the capability has own work. 
+            See (REMARK) below.
+         */
       }
 #endif
-      
-#if defined(THREADED_RTS)
-      schedulePushWork(cap,task);
-#endif
 
+    /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
+      
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
     // call).
@@ -365,21 +367,7 @@ schedule (Capability *initialCapability, Task *task)
        barf("sched_state: %d", sched_state);
     }
 
-#if defined(THREADED_RTS)
-    // If the run queue is empty, take a spark and turn it into a thread.
-    {
-       if (emptyRunQueue(cap)) {
-           StgClosure *spark;
-           spark = findSpark(cap);
-           if (spark != NULL) {
-               debugTrace(DEBUG_sched,
-                          "turning spark of closure %p into a thread",
-                          (StgClosure *)spark);
-               createSparkThread(cap,spark);     
-           }
-       }
-    }
-#endif // THREADED_RTS
+    /* this was the place to activate a spark, now below... */
 
     scheduleStartSignalHandlers(cap);
 
@@ -393,11 +381,19 @@ schedule (Capability *initialCapability, Task *task)
 
     scheduleCheckBlockedThreads(cap);
 
-#if defined(PARALLEL_HASKELL)
-    /* message processing and work distribution goes here */ 
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+    /* work distribution in multithreaded and parallel systems 
+
+       REMARK: IMHO best location for work-stealing as well.
+       tests above might yield some new jobs, so no need to steal a
+       spark in some cases. I believe the yieldCapability.. above
+       should be moved here.
+    */
 
+#if defined(PARALLEL_HASKELL)
     /* if messages have been buffered... a NOOP in THREADED_RTS */
     scheduleSendPendingMessages();
+#endif
 
     /* If the run queue is empty,...*/
     if (emptyRunQueue(cap)) {
@@ -406,6 +402,7 @@ schedule (Capability *initialCapability, Task *task)
 
        /* if this did not work, try to steal a spark from someone else */
       if (emptyRunQueue(cap)) {
+#if defined(PARALLEL_HASKELL)
        receivedFinish = scheduleGetRemoteWork(cap);
        continue; //  a new round, (hopefully) with new work
        /* 
@@ -414,10 +411,20 @@ schedule (Capability *initialCapability, Task *task)
                        b) (blocking) awaits and receives messages
           
           in Eden, this is only the blocking receive, as b) in GUM.
+
+          in Threaded-RTS, this does plain nothing. Stealing routine
+               is inside Capability.c and called from
+               yieldCapability() at the very beginning, see REMARK.
        */
+#endif
       }
-    } 
+    } else { /* i.e. run queue was (initially) not empty */
+      schedulePushWork(cap,task);
+      /* work pushing, currently relevant only for THREADED_RTS:
+        (pushes threads, wakes up idle capabilities for stealing) */
+    }
 
+#if defined(PARALLEL_HASKELL)
     /* since we perform a blocking receive and continue otherwise,
        either we never reach here or we definitely have work! */
     // from here: non-empty run queue
@@ -430,7 +437,9 @@ schedule (Capability *initialCapability, Task *task)
                                above, waits for messages as well! */
       processMessages(cap, &receivedFinish);
     }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL: non-empty run queue!
+
+#endif /* THREADED_RTS || PARALLEL_HASKELL */
 
     scheduleDetectDeadlock(cap,task);
 #if defined(THREADED_RTS)
@@ -679,11 +688,15 @@ schedulePreLoop(void)
  * Push work to other Capabilities if we have some.
  * -------------------------------------------------------------------------- */
 
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 static void
 schedulePushWork(Capability *cap USED_IF_THREADS, 
                 Task *task      USED_IF_THREADS)
 {
+  /* following code not for PARALLEL_HASKELL. I kept the call general,
+     future GUM versions might use pushing in a distributed setup */
+#if defined(THREADED_RTS)
+
     Capability *free_caps[n_capabilities], *cap0;
     nat i, n_free_caps;
 
@@ -726,7 +739,12 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
        StgTSO *prev, *t, *next;
        rtsBool pushed_to_all;
 
-       debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
+       debugTrace(DEBUG_sched, 
+                  "cap %d: %s and %d free capabilities, sharing...", 
+                  cap->no, 
+                  (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
+                  "excess threads on run queue":"sparks to share (>=2)",
+                  n_free_caps);
 
        i = 0;
        pushed_to_all = rtsFalse;
@@ -760,6 +778,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
            cap->run_queue_tl = prev;
        }
 
+#ifdef SPARK_PUSHING
+       /* JB I left this code in place, it would work but is not necessary */
+
        // If there are some free capabilities that we didn't push any
        // threads to, then try to push a spark to each one.
        if (!pushed_to_all) {
@@ -775,16 +796,23 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                }
            }
        }
+#endif /* SPARK_PUSHING */
 
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
            releaseCapability(free_caps[i]);
        }
+       // now wake them all up, and they might steal sparks if
+       // the did not get a thread
+       prodAllCapabilities();
     }
     task->cap = cap; // reset to point to our Capability.
+
+#endif /* THREADED_RTS */
+
 }
-#endif
+#endif /* THREADED_RTS || PARALLEL_HASKELL */
 
 /* ----------------------------------------------------------------------------
  * Start any pending signal handlers
@@ -965,7 +993,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
  * ------------------------------------------------------------------------- */
 
 #if defined(PARALLEL_HASKELL)
-static StgTSO *
+static void
 scheduleSendPendingMessages(void)
 {
 
@@ -984,10 +1012,10 @@ scheduleSendPendingMessages(void)
 #endif
 
 /* ----------------------------------------------------------------------------
- * Activate spark threads (PARALLEL_HASKELL only)
+ * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
  * ------------------------------------------------------------------------- */
 
-#if defined(PARALLEL_HASKELL)
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 static void
 scheduleActivateSpark(Capability *cap)
 {
@@ -1012,14 +1040,14 @@ scheduleActivateSpark(Capability *cap)
       createSparkThread(cap,spark); // defined in Sparks.c
     }
 }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
 
 /* ----------------------------------------------------------------------------
  * Get work from a remote node (PARALLEL_HASKELL only)
  * ------------------------------------------------------------------------- */
     
-#if defined(PARALLEL_HASKELL)
-static rtsBool
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+static rtsBool /* return value used in PARALLEL_HASKELL only */
 scheduleGetRemoteWork(Capability *cap)
 {
 #if defined(PARALLEL_HASKELL)
@@ -1057,7 +1085,7 @@ scheduleGetRemoteWork(Capability *cap)
 
 #endif /* PARALLEL_HASKELL */
 }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
 
 /* ----------------------------------------------------------------------------
  * After running a thread...
@@ -1483,6 +1511,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
        performHeapProfile = rtsFalse;
     }
 
+#ifdef SPARKBALANCE
+    /* JB 
+       Once we are all together... this would be the place to balance all
+       spark pools. No concurrent stealing or adding of new sparks can
+       occur. Should be defined in Sparks.c. */
+    balanceSparkPoolsCaps(n_capabilities, capabilities);
+#endif
+
 #if defined(THREADED_RTS)
     // release our stash of capabilities.
     for (i = 0; i < n_capabilities; i++) {
index 2e9e61c..ac11172 100644 (file)
@@ -1,10 +1,39 @@
 /* ---------------------------------------------------------------------------
  *
- * (c) The GHC Team, 2000-2006
+ * (c) The GHC Team, 2000-2008
  *
  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
  *
- * -------------------------------------------------------------------------*/
+ * The implementation uses Double-Ended Queues with lock-free access
+ * (thereby often called "deque") as described in
+ *
+ * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
+ * SPAA'05, July 2005, Las Vegas, USA.
+ * ACM 1-58113-986-1/05/0007
+ *
+ * Author: Jost Berthold MSRC 07-09/2008
+ *
+ * The DeQue is held as a circular array with known length. Positions
+ * of top (read-end) and bottom (write-end) always increase, and the
+ * array is accessed with indices modulo array-size. While this bears
+ * the risk of overflow, we assume that (with 64 bit indices), a
+ * program must run very long to reach that point.
+ * 
+ * The write end of the queue (position bottom) can only be used with
+ * mutual exclusion, i.e. by exactly one caller at a time.  At this
+ * end, new items can be enqueued using pushBottom()/newSpark(), and
+ * removed using popBottom()/reclaimSpark() (the latter implying a cas
+ * synchronisation with potential concurrent readers for the case of
+ * just one element).
+ * 
+ * Multiple readers can steal()/findSpark() from the read end
+ * (position top), and are synchronised without a lock, based on a cas
+ * of the top position. One reader wins, the others return NULL for a
+ * failure.
+ * 
+ * Both popBottom and steal also return NULL when the queue is empty.
+ * 
+ -------------------------------------------------------------------------*/
 
 #include "PosixSource.h"
 #include "Rts.h"
 #include "RtsFlags.h"
 #include "RtsUtils.h"
 #include "ParTicky.h"
-# if defined(PARALLEL_HASKELL)
-# include "ParallelRts.h"
-# include "GranSimRts.h"   // for GR_...
-# elif defined(GRAN)
-# include "GranSimRts.h"
-# endif
-#include "Sparks.h"
 #include "Trace.h"
 
+#include "SMP.h" // for cas
+
+#include "Sparks.h"
+
 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 
-static INLINE_ME void bump_hd (StgSparkPool *p)
-{ p->hd++; if (p->hd == p->lim) p->hd = p->base; }
+/* internal helpers ... */
+
+StgWord roundUp2(StgWord val);
 
-static INLINE_ME void bump_tl (StgSparkPool *p)
-{ p->tl++; if (p->tl == p->lim) p->tl = p->base; }
+StgWord roundUp2(StgWord val) {
+  StgWord rounded = 1;
+
+  /* StgWord is unsigned anyway, only catch 0 */
+  if (val == 0) {
+    barf("DeQue,roundUp2: invalid size 0 requested");
+  }
+  /* at least 1 bit set, shift up to its place */
+  do {
+    rounded = rounded << 1;
+  } while (0 != (val = val>>1));
+  return rounded;
+}
+
+INLINE_HEADER
+rtsBool casTop(StgPtr addr, StgWord old, StgWord new);
+
+#if !defined(THREADED_RTS)
+/* missing def. in non THREADED RTS, and makes no sense anyway... */
+StgWord cas(StgPtr addr,StgWord old,StgWord new);
+StgWord cas(StgPtr addr,StgWord old,StgWord new) { 
+  barf("cas: not implemented without multithreading");
+  old = new = *addr; /* to avoid gcc warnings */
+}
+#endif
+
+INLINE_HEADER
+rtsBool casTop(StgWord* addr, StgWord old, StgWord new) {
+  StgWord res = cas((StgPtr) addr, old, new);
+  return ((res == old));
+}
+
+/* or simply like this */
+#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
 
 /* -----------------------------------------------------------------------------
  * 
@@ -37,15 +96,28 @@ static INLINE_ME void bump_tl (StgSparkPool *p)
  *
  * -------------------------------------------------------------------------- */
 
-static void 
-initSparkPool(StgSparkPool *pool)
-{
-    pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
-                               * sizeof(StgClosure *),
-                               "initSparkPools");
-    pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
-    pool->hd  = pool->base;
-    pool->tl  = pool->base;
+/* constructor */
+SparkPool* initPool(StgWord size) {
+
+  StgWord realsize; 
+  SparkPool *q;
+
+  realsize = roundUp2(size); /* to compute modulo as a bitwise & */
+
+  q = (SparkPool*) stgMallocBytes(sizeof(SparkPool),   /* admin fields */
+                             "newSparkPool");
+  q->elements = (StgClosurePtr*) 
+                stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
+                              "newSparkPool:data space");
+  q->top=0;
+  q->bottom=0;
+  q->topBound=0; /* read by writer, updated each time top is read */
+
+  q->size = realsize;  /* power of 2 */
+  q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */
+
+  ASSERT_SPARK_POOL_INVARIANTS(q); 
+  return q;
 }
 
 void
@@ -55,17 +127,71 @@ initSparkPools( void )
     /* walk over the capabilities, allocating a spark pool for each one */
     nat i;
     for (i = 0; i < n_capabilities; i++) {
-       initSparkPool(&capabilities[i].r.rSparks);
+      capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
     }
 #else
     /* allocate a single spark pool */
-    initSparkPool(&MainCapability.r.rSparks);
+    MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
 #endif
 }
 
 void
-freeSparkPool(StgSparkPool *pool) {
-    stgFree(pool->base);
+freeSparkPool(SparkPool *pool) {
+  /* should not interfere with concurrent findSpark() calls! And
+     nobody should use the pointer any more. We cross our fingers...*/
+  stgFree(pool->elements);
+  stgFree(pool);
+}
+
+/* reclaimSpark(cap): remove a spark from the write end of the queue.
+ * Returns the removed spark, and NULL if a race is lost or the pool
+ * empty.
+ *
+ * If only one spark is left in the pool, we synchronise with
+ * concurrently stealing threads by using cas to modify the top field.
+ * This routine should NEVER be called by a task which does not own
+ * the capability. Can this be checked here?
+ */
+StgClosure* reclaimSpark(Capability *cap) {
+  SparkPool *deque = cap->sparks;
+  /* also a bit tricky, has to avoid concurrent steal() calls by
+     accessing top with cas, when there is only one element left */
+  StgWord t, b;
+  StgClosurePtr* pos;
+  long  currSize;
+  StgClosurePtr removed;
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+
+  b = deque->bottom;
+  /* "decrement b as a test, see what happens" */
+  deque->bottom = --b; 
+  pos = (deque->elements) + (b & (deque->moduloSize));
+  t = deque->top; /* using topBound would give an *upper* bound, we
+                    need a lower bound. We use the real top here, but
+                    can update the topBound value */
+  deque->topBound = t;
+  currSize = b - t;
+  if (currSize < 0) { /* was empty before decrementing b, set b
+                        consistently and abort */
+    deque->bottom = t;
+    return NULL;
+  }
+  removed = *pos;
+  if (currSize > 0) { /* no danger, still elements in buffer after b-- */
+    return removed;
+  } 
+  /* otherwise, has someone meanwhile stolen the same (last) element?
+     Check and increment top value to know  */
+  if ( !(CASTOP(&(deque->top),t,t+1)) ) {
+    removed = NULL; /* no success, but continue adjusting bottom */
+  }
+  deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
+  deque->topBound = t+1; /* ...and cached top value as well */
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+
+  return removed;
 }
 
 /* -----------------------------------------------------------------------------
@@ -73,32 +199,80 @@ freeSparkPool(StgSparkPool *pool) {
  * findSpark: find a spark on the current Capability that we can fork
  * into a thread.
  *
- * -------------------------------------------------------------------------- */
+ * May be called by concurrent threads, which synchronise on top
+ * variable. Returns a spark, or NULL if pool empty or race lost.
+ *
+ -------------------------------------------------------------------------- */
+
+StgClosurePtr steal(SparkPool *deque);
+
+/* steal an element from the read end. Synchronises multiple callers
+   by failing with NULL return. Returns NULL when deque is empty. */
+StgClosurePtr steal(SparkPool *deque) {
+  StgClosurePtr* pos;
+  StgClosurePtr* arraybase;
+  StgWord sz;
+  StgClosurePtr stolen;
+  StgWord b,t; 
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+
+  b = deque->bottom;
+  t = deque->top;
+  if (b - t <= 0 ) { 
+    return NULL; /* already looks empty, abort */
+  }
+
+  /* now access array, see pushBottom() */
+  arraybase = deque->elements;
+  sz = deque->moduloSize;
+  pos = arraybase + (t & sz);  
+  stolen = *pos;
+
+  /* now decide whether we have won */
+  if ( !(CASTOP(&(deque->top),t,t+1)) ) {
+    /* lost the race, someon else has changed top in the meantime */
+    stolen = NULL;    
+  }  /* else: OK, top has been incremented by the cas call */
+
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+  /* return NULL or stolen element */
+  return stolen;
+}
 
 StgClosure *
 findSpark (Capability *cap)
 {
-    StgSparkPool *pool;
-    StgClosure *spark;
+  SparkPool *deque = (cap->sparks);
+  StgClosure *stolen;
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+
+  do { 
+    /* keep trying until good spark found or pool looks empty. 
+       TODO is this a good idea? */
+
+    stolen = steal(deque);
     
-    pool = &(cap->r.rSparks);
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
+  } while ( ( !stolen /* nothing stolen*/
+             || !closure_SHOULD_SPARK(stolen)) /* spark not OK */
+           && !looksEmpty(deque)); /* run empty, give up */
 
-    while (pool->hd != pool->tl) {
-       spark = *pool->hd;
-       bump_hd(pool);
-       if (closure_SHOULD_SPARK(spark)) {
-#ifdef GRAN
-           if (RtsFlags.ParFlags.ParStats.Sparks) 
-               DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
-                                GR_STEALING, ((StgTSO *)NULL), spark, 
-                                0, 0 /* spark_queue_len(ADVISORY_POOL) */);
-#endif
-           return spark;
-       }
-    }
-    // spark pool is now empty
-    return NULL;
+  /* return stolen element */
+  return stolen;
+}
+
+
+/* "guesses" whether a deque is empty. Can return false negatives in
+   presence of concurrent steal() calls, and false positives in
+   presence of a concurrent pushBottom().*/
+rtsBool looksEmpty(SparkPool* deque) {
+  StgWord t = deque->top;
+  StgWord b = deque->bottom;
+  /* try to prefer false negatives by reading top first */
+  return (b - t <= 0);
+  /* => array is *never* completely filled, always 1 place free! */
 }
 
 /* -----------------------------------------------------------------------------
@@ -123,11 +297,64 @@ createSparkThread (Capability *cap, StgClosure *p)
  * -------------------------------------------------------------------------- */
 
 #define DISCARD_NEW
+void pushBottom(SparkPool* deque, StgClosurePtr elem);
+
+/* enqueue an element. Should always succeed by resizing the array
+   (not implemented yet, silently fails in that case). */
+void pushBottom(SparkPool* deque, StgClosurePtr elem) {
+  StgWord t;
+  StgClosurePtr* pos;
+  StgWord sz = deque->moduloSize; 
+  StgWord b = deque->bottom;
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+
+  /* we try to avoid reading deque->top (accessed by all) and use
+     deque->topBound (accessed only by writer) instead. 
+     This is why we do not just call empty(deque) here.
+  */
+  t = deque->topBound;
+  if ( b - t >= sz ) { /* nota bene: sz == deque->size - 1, thus ">=" */
+    /* could be full, check the real top value in this case */
+    t = deque->top;
+    deque->topBound = t;
+    if (b - t >= sz) { /* really no space left :-( */
+      /* reallocate the array, copying the values. Concurrent steal()s
+        will in the meantime use the old one and modify only top.
+        This means: we cannot safely free the old space! Can keep it
+        on a free list internally here...
+
+        Potential bug in combination with steal(): if array is
+        replaced, it is unclear which one concurrent steal operations
+        use. Must read the array base address in advance in steal().
+      */
+#if defined(DISCARD_NEW)
+      ASSERT_SPARK_POOL_INVARIANTS(deque); 
+      return; /* for now, silently fail */
+#else
+      /* could make room by incrementing the top position here.  In
+       * this case, should use CASTOP. If this fails, someone else has
+       * removed something, and new room will be available.
+       */
+      ASSERT_SPARK_POOL_INVARIANTS(deque); 
+#endif
+    }
+  }
+  pos = (deque->elements) + (b & sz);
+  *pos = elem;
+  (deque->bottom)++;
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+  return;
+}
+
 
+/* this is called as a direct C-call from Stg => we need to keep the
+   pool in a register (???) */
 StgInt
 newSpark (StgRegTable *reg, StgClosure *p)
 {
-    StgSparkPool *pool = &(reg->rSparks);
+    SparkPool *pool = (reg->rCurrentTSO->cap->sparks);
 
     /* I am not sure whether this is the right thing to do.
      * Maybe it is better to exploit the tag information
@@ -138,82 +365,125 @@ newSpark (StgRegTable *reg, StgClosure *p)
     ASSERT_SPARK_POOL_INVARIANTS(pool);
 
     if (closure_SHOULD_SPARK(p)) {
-#ifdef DISCARD_NEW
-       StgClosure **new_tl;
-       new_tl = pool->tl + 1;
-       if (new_tl == pool->lim) { new_tl = pool->base; }
-       if (new_tl != pool->hd) {
-           *pool->tl = p;
-           pool->tl = new_tl;
-       } else if (!closure_SHOULD_SPARK(*pool->hd)) {
-           // if the old closure is not sparkable, discard it and
-           // keep the new one.  Otherwise, keep the old one.
-           *pool->tl = p;
-           bump_hd(pool);
-       }
-#else  /* DISCARD OLD */
-       *pool->tl = p;
-       bump_tl(pool);
-       if (pool->tl == pool->hd) { bump_hd(pool); }
-#endif
+      pushBottom(pool,p);
     }  
 
     ASSERT_SPARK_POOL_INVARIANTS(pool);
     return 1;
 }
 
-/* -----------------------------------------------------------------------------
- * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
- * implicit slide i.e. after marking all sparks are at the beginning of the
- * spark pool and the spark pool only contains sparkable closures 
+
+
+/* --------------------------------------------------------------------------
+ * Remove all sparks from the spark queues which should not spark any
+ * more.  Called after GC. We assume exclusive access to the structure
+ * and replace  all sparks in the queue, see explanation below. At exit,
+ * the spark pool only contains sparkable closures.
  * -------------------------------------------------------------------------- */
 
 static void
 pruneSparkQueue (Capability *cap)
 { 
-    StgClosure *spark, **sparkp, **to_sparkp;
+    SparkPool *pool;
+    StgClosurePtr spark, evacspark, *elements;
     nat n, pruned_sparks; // stats only
-    StgSparkPool *pool;
+    StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
     
     PAR_TICKY_MARK_SPARK_QUEUE_START();
     
     n = 0;
     pruned_sparks = 0;
     
-    pool = &(cap->r.rSparks);
+    pool = cap->sparks;
     
+    debugTrace(DEBUG_sched,
+               "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
+               sparkPoolSize(pool), pool->bottom, pool->top);
     ASSERT_SPARK_POOL_INVARIANTS(pool);
-    
-    sparkp = pool->hd;
-    to_sparkp = pool->hd;
-    while (sparkp != pool->tl) {
-        ASSERT(*sparkp!=NULL);
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
-        // ToDo?: statistics gathering here (also for GUM!)
-        spark = *sparkp;
-        if (!closure_SHOULD_SPARK(spark)) {
-            pruned_sparks++;
-        } else{
-            *to_sparkp++ = spark;
-            if (to_sparkp == pool->lim) {
-                to_sparkp = pool->base;
-            }
-            n++;
-        }
-        sparkp++;
-        if (sparkp == pool->lim) {
-            sparkp = pool->base;
-        }
-    }
-    pool->tl = to_sparkp;
-       
+
+    elements = pool->elements;
+
+    /* We have exclusive access to the structure here, so we can reset
+       bottom and top counters, and prune invalid sparks. Contents are
+       copied in-place if they are valuable, otherwise discarded. The
+       routine uses "real" indices t and b, starts by computing them
+       as the modulus size of top and bottom,
+
+       Copying:
+
+       At the beginning, the pool structure can look like this:
+       ( bottom % size >= top % size , no wrap-around)
+                  t          b
+       ___________***********_________________
+
+       or like this ( bottom % size < top % size, wrap-around )
+                  b         t
+       ***********__________******************
+       As we need to remove useless sparks anyway, we make one pass
+       between t and b, moving valuable content to b and subsequent
+       cells (wrapping around when the size is reached).
+
+                     b      t
+       ***********OOO_______XX_X__X?**********
+                     ^____move?____/
+
+       After this movement, botInd becomes the new bottom, and old
+       bottom becomes the new top index, both as indices in the array
+       size range.
+    */
+    // starting here
+    currInd = (pool->top) & (pool->moduloSize); // mod
+
+    // copies of evacuated closures go to space from botInd on
+    // we keep oldBotInd to know when to stop
+    oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
+
+    // on entry to loop, we are within the bounds
+    ASSERT( currInd < pool->size && botInd  < pool->size );
+
+    while (currInd != oldBotInd ) {
+      /* must use != here, wrap-around at size
+        subtle: loop not entered if queue empty
+       */
+
+      /* check element at currInd. if valuable, evacuate and move to
+        botInd, otherwise move on */
+      spark = elements[currInd];
+
+      /* if valuable work: shift inside the pool */
+      if ( closure_SHOULD_SPARK(spark) ) {
+       elements[botInd] = spark; // keep entry (new address)
+       botInd++;
+       n++;
+      } else { 
+       pruned_sparks++; // discard spark
+      }
+      currInd++;
+
+      // in the loop, we may reach the bounds, and instantly wrap around
+      ASSERT( currInd <= pool->size && botInd <= pool->size );
+      if ( currInd == pool->size ) { currInd = 0; }
+      if ( botInd == pool->size )  { botInd = 0;  }
+
+    } // while-loop over spark pool elements
+
+    ASSERT(currInd == oldBotInd);
+
+    pool->top = oldBotInd; // where we started writing
+    pool->topBound = pool->top;
+
+    pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); 
+    // first free place we did not use (corrected by wraparound)
+
     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
-       
+
     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
     
     debugTrace(DEBUG_sched,
-               "new spark queue len=%d; (hd=%p; tl=%p)",
-               sparkPoolSize(pool), pool->hd, pool->tl);
+               "new spark queue len=%d; (hd=%ld; tl=%ld)",
+               sparkPoolSize(pool), pool->bottom, pool->top);
+
+    ASSERT_SPARK_POOL_INVARIANTS(pool);
 }
 
 void
@@ -225,21 +495,50 @@ pruneSparkQueues (void)
     }
 }
 
+/* GC for the spark pool, called inside Capability.c for all
+   capabilities in turn. Blindly "evac"s complete spark pool. */
 void
 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
 {
     StgClosure **sparkp;
-    StgSparkPool *pool;
+    SparkPool *pool;
+    StgWord top,bottom, modMask;
     
-    pool = &(cap->r.rSparks);
-    sparkp = pool->hd;
-    while (sparkp != pool->tl) {
-        evac(user, sparkp);
-        sparkp++;
-        if (sparkp == pool->lim) {
-            sparkp = pool->base;
-        }
+    pool = cap->sparks;
+
+    ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+    top = pool->top;
+    bottom = pool->bottom;
+    sparkp = pool->elements;
+    modMask = pool->moduloSize;
+
+    while (top < bottom) {
+    /* call evac for all closures in range (wrap-around via modulo)
+     * In GHC-6.10, evac takes an additional 1st argument to hold a
+     * GC-specific register, see rts/sm/GC.c::mark_root()
+     */
+      evac( user , sparkp + (top & modMask) ); 
+      top++;
     }
+
+    debugTrace(DEBUG_sched,
+               "traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
+               sparkPoolSize(pool), pool->bottom, pool->top);
+}
+
+/* ----------------------------------------------------------------------------
+
+ * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
+ * capabilities) and its size. Accesses all spark pools and equally
+ * distributes the sparks among them.
+ *
+ * Could be called after GC, before Cap. release, from scheduler. 
+ * -------------------------------------------------------------------------- */
+void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
+
+void balanceSparkPoolsCaps(nat n_caps, Capability caps[]) {
+  barf("not implemented");
 }
 
 #else
@@ -259,6 +558,8 @@ newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
  * 
  * GRAN & PARALLEL_HASKELL stuff beyond here.
  *
+ *  TODO "nuke" this!
+ *
  * -------------------------------------------------------------------------- */
 
 #if defined(PARALLEL_HASKELL) || defined(GRAN)
index 8e0ba90..dbbf268 100644 (file)
@@ -9,17 +9,45 @@
 #ifndef SPARKS_H
 #define SPARKS_H
 
+#if defined(PARALLEL_HASKELL)
+#error Sparks.c using new internal structure, needs major overhaul!
+#endif
+
+/* typedef for SparkPool in RtsTypes.h */
+
 #if defined(THREADED_RTS)
+
+/* INVARIANTS, in this order: bottom/top consistent, reasonable size,
+   topBound consistent, space pointer, space accessible to us */
+#define ASSERT_SPARK_POOL_INVARIANTS(p)         \
+  ASSERT((p)->bottom >= (p)->top);              \
+  ASSERT((p)->size > 0);                        \
+  ASSERT((p)->size > (p)->bottom - (p)->top);  \
+  ASSERT((p)->topBound <= (p)->top);            \
+  ASSERT((p)->elements != NULL);                \
+  ASSERT(*((p)->elements) || 1);                \
+  ASSERT(*((p)->elements - 1  + ((p)->size)) || 1);
+
+// missing in old interface. Currently called by initSparkPools
+// internally.
+SparkPool* initPool(StgWord size);
+
+// special case: accessing our own pool, at the write end
+// otherwise, we can always steal from our pool as the others do...
+StgClosure* reclaimSpark(Capability *cap);
+
+rtsBool looksEmpty(SparkPool* deque);
+
+// rest: same as old interface
 StgClosure * findSpark         (Capability *cap);
 void         initSparkPools    (void);
-void         freeSparkPool     (StgSparkPool *pool);
+void         freeSparkPool     (SparkPool *pool);
 void         createSparkThread (Capability *cap, StgClosure *p);
 void         pruneSparkQueues  (void);
 void         traverseSparkQueue(evac_fn evac, void *user, Capability *cap);
 
-INLINE_HEADER void     discardSparks  (StgSparkPool *pool);
-INLINE_HEADER nat      sparkPoolSize  (StgSparkPool *pool);
-INLINE_HEADER rtsBool  emptySparkPool (StgSparkPool *pool);
+INLINE_HEADER void     discardSparks  (SparkPool *pool);
+INLINE_HEADER nat      sparkPoolSize  (SparkPool *pool);
 
 INLINE_HEADER void     discardSparksCap  (Capability *cap);
 INLINE_HEADER nat      sparkPoolSizeCap  (Capability *cap);
@@ -32,46 +60,33 @@ INLINE_HEADER rtsBool  emptySparkPoolCap (Capability *cap);
 
 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 
-INLINE_HEADER rtsBool
-emptySparkPool (StgSparkPool *pool)
-{
-    return (pool->hd == pool->tl);
-}
+INLINE_HEADER rtsBool  
+emptySparkPool (SparkPool *pool) 
+{ return looksEmpty(pool); }
 
 INLINE_HEADER rtsBool
 emptySparkPoolCap (Capability *cap) 
-{ return emptySparkPool(&cap->r.rSparks); }
+{ return looksEmpty(cap->sparks); }
 
 INLINE_HEADER nat
-sparkPoolSize (StgSparkPool *pool) 
+sparkPoolSize (SparkPool *pool) 
 {
-    if (pool->hd <= pool->tl) {
-       return (pool->tl - pool->hd);
-    } else {
-       return (pool->lim - pool->hd + pool->tl - pool->base);
-    }
+  return (pool->bottom - pool->top);
 }
 
 INLINE_HEADER nat
 sparkPoolSizeCap (Capability *cap) 
-{ return sparkPoolSize(&cap->r.rSparks); }
+{ return sparkPoolSize(cap->sparks); }
 
 INLINE_HEADER void
-discardSparks (StgSparkPool *pool)
+discardSparks (SparkPool *pool)
 {
-    pool->hd = pool->tl;
+    pool->top = pool->bottom = 0;
 }
 
 INLINE_HEADER void
 discardSparksCap (Capability *cap) 
-{ return discardSparks(&cap->r.rSparks); }
-
-
-#elif defined(THREADED_RTS) 
-
-INLINE_HEADER rtsBool
-emptySparkPoolCap (Capability *cap STG_UNUSED)
-{ return rtsTrue; }
+{ return discardSparks(cap->sparks); }
 
 #endif