Work stealing for sparks
authorberthold@mathematik.uni-marburg.de <unknown>
Mon, 15 Sep 2008 13:28:46 +0000 (13:28 +0000)
committerberthold@mathematik.uni-marburg.de <unknown>
Mon, 15 Sep 2008 13:28:46 +0000 (13:28 +0000)
   Spark stealing support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.

  Spark pools are per capability, separately allocated and held in the Capability
  structure. The implementation uses Double-Ended Queues (deque) and cas-protected
  access.

  The write end of the queue (position bottom) can only be used with
  mutual exclusion, i.e. by exactly one caller at a time.
  Multiple readers can steal()/findSpark() from the read end
  (position top), and are synchronised without a lock, based on a cas
  of the top position. One reader wins, the others return NULL for a
  failure.

  Work stealing is called when Capabilities find no other work (inside yieldCapability),
  and tries all capabilities 0..n-1 twice, unless a theft succeeds.

  Inside schedulePushWork, all considered cap.s (those which were idle and could
  be grabbed) are woken up. Future versions should wake up capabilities immediately when
  putting a new spark in the local pool, from newSpark().

Patch has been re-recorded due to conflicting bugfixes in the sparks.c, also fixing a
(strange) conflict in the scheduler.

includes/Regs.h
includes/RtsTypes.h
rts/Capability.c
rts/Capability.h
rts/Schedule.c
rts/Sparks.c
rts/Sparks.h

index 0f974ec..cf083c9 100644 (file)
 #ifndef REGS_H
 #define REGS_H
 
 #ifndef REGS_H
 #define REGS_H
 
-/*
- * Spark pools: used to store pending sparks 
- *  (THREADED_RTS & PARALLEL_HASKELL only)
- * This is a circular buffer.  Invariants:
- *    - base <= hd < lim
- *    - base <= tl < lim
- *    - if hd==tl, then the pool is empty.
- *    - if hd == tl+1, then the pool is full.
- * Adding to the pool is done by assigning to *tl++ (wrapping round as
- * necessary).  When adding to a full pool, we have the option of
- * throwing away either the oldest (hd++) or the most recent (tl--) entry.
- */
-typedef struct StgSparkPool_ {
-  StgClosure **base;
-  StgClosure **lim;
-  StgClosure **hd;
-  StgClosure **tl;
-} StgSparkPool;
-
-#define ASSERT_SPARK_POOL_INVARIANTS(p)                \
-  ASSERT((p)->base <= (p)->hd);                        \
-  ASSERT((p)->hd < (p)->lim);                  \
-  ASSERT((p)->base <= (p)->tl);                        \
-  ASSERT((p)->tl < (p)->lim);
-
 typedef struct {
   StgFunPtr      stgGCEnter1;
   StgFunPtr      stgGCFun;
 typedef struct {
   StgFunPtr      stgGCEnter1;
   StgFunPtr      stgGCFun;
@@ -120,7 +95,6 @@ typedef struct StgRegTable_ {
   StgWord         rmp_result1[MP_INT_WORDS];
   StgWord         rmp_result2[MP_INT_WORDS];
   StgWord         rRet;  // holds the return code of the thread
   StgWord         rmp_result1[MP_INT_WORDS];
   StgWord         rmp_result2[MP_INT_WORDS];
   StgWord         rRet;  // holds the return code of the thread
-  StgSparkPool    rSparks;     /* per-task spark pool */
 } StgRegTable;
 
 #if IN_STG_CODE
 } StgRegTable;
 
 #if IN_STG_CODE
@@ -163,10 +137,6 @@ typedef struct StgRegTable_ {
 #define SAVE_CurrentTSO     (BaseReg->rCurrentTSO)
 #define SAVE_CurrentNursery (BaseReg->rCurrentNursery)
 #define SAVE_HpAlloc        (BaseReg->rHpAlloc)
 #define SAVE_CurrentTSO     (BaseReg->rCurrentTSO)
 #define SAVE_CurrentNursery (BaseReg->rCurrentNursery)
 #define SAVE_HpAlloc        (BaseReg->rHpAlloc)
-#define SAVE_SparkHd       (BaseReg->rSparks.hd)
-#define SAVE_SparkTl        (BaseReg->rSparks.tl)
-#define SAVE_SparkBase      (BaseReg->rSparks.base)
-#define SAVE_SparkLim      (BaseReg->rSparks.lim)
 
 /* We sometimes need to save registers across a C-call, eg. if they
  * are clobbered in the standard calling convention.  We define the
 
 /* We sometimes need to save registers across a C-call, eg. if they
  * are clobbered in the standard calling convention.  We define the
@@ -401,30 +371,6 @@ GLOBAL_REG_DECL(bdescr *,HpAlloc,REG_HpAlloc)
 #define HpAlloc (BaseReg->rHpAlloc)
 #endif
 
 #define HpAlloc (BaseReg->rHpAlloc)
 #endif
 
-#if defined(REG_SparkHd) && !defined(NO_GLOBAL_REG_DECLS)
-GLOBAL_REG_DECL(bdescr *,SparkHd,REG_SparkHd)
-#else
-#define SparkHd (BaseReg->rSparks.hd)
-#endif
-
-#if defined(REG_SparkTl) && !defined(NO_GLOBAL_REG_DECLS)
-GLOBAL_REG_DECL(bdescr *,SparkTl,REG_SparkTl)
-#else
-#define SparkTl (BaseReg->rSparks.tl)
-#endif
-
-#if defined(REG_SparkBase) && !defined(NO_GLOBAL_REG_DECLS)
-GLOBAL_REG_DECL(bdescr *,SparkBase,REG_SparkBase)
-#else
-#define SparkBase (BaseReg->rSparks.base)
-#endif
-
-#if defined(REG_SparkLim) && !defined(NO_GLOBAL_REG_DECLS)
-GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
-#else
-#define SparkLim (BaseReg->rSparks.lim)
-#endif
-
 /* -----------------------------------------------------------------------------
    Get absolute function pointers from the register table, to save
    code space.  On x86, 
 /* -----------------------------------------------------------------------------
    Get absolute function pointers from the register table, to save
    code space.  On x86, 
@@ -665,38 +611,6 @@ GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
 #define CALLER_RESTORE_HpAlloc         /* nothing */
 #endif
 
 #define CALLER_RESTORE_HpAlloc         /* nothing */
 #endif
 
-#ifdef CALLER_SAVES_SparkHd
-#define CALLER_SAVE_SparkHd            SAVE_SparkHd = SparkHd;
-#define CALLER_RESTORE_SparkHd         SparkHd = SAVE_SparkHd;
-#else
-#define CALLER_SAVE_SparkHd            /* nothing */
-#define CALLER_RESTORE_SparkHd         /* nothing */
-#endif
-
-#ifdef CALLER_SAVES_SparkTl
-#define CALLER_SAVE_SparkTl            SAVE_SparkTl = SparkTl;
-#define CALLER_RESTORE_SparkTl         SparkTl = SAVE_SparkTl;
-#else
-#define CALLER_SAVE_SparkTl            /* nothing */
-#define CALLER_RESTORE_SparkTl         /* nothing */
-#endif
-
-#ifdef CALLER_SAVES_SparkBase
-#define CALLER_SAVE_SparkBase          SAVE_SparkBase = SparkBase;
-#define CALLER_RESTORE_SparkBase       SparkBase = SAVE_SparkBase;
-#else
-#define CALLER_SAVE_SparkBase          /* nothing */
-#define CALLER_RESTORE_SparkBase       /* nothing */
-#endif
-
-#ifdef CALLER_SAVES_SparkLim
-#define CALLER_SAVE_SparkLim                   SAVE_SparkLim = SparkLim;
-#define CALLER_RESTORE_SparkLim                SparkLim = SAVE_SparkLim;
-#else
-#define CALLER_SAVE_SparkLim                   /* nothing */
-#define CALLER_RESTORE_SparkLim        /* nothing */
-#endif
-
 #endif /* IN_STG_CODE */
 
 /* ----------------------------------------------------------------------------
 #endif /* IN_STG_CODE */
 
 /* ----------------------------------------------------------------------------
@@ -731,10 +645,6 @@ GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
   CALLER_SAVE_HpLim                            \
   CALLER_SAVE_CurrentTSO                       \
   CALLER_SAVE_CurrentNursery                   \
   CALLER_SAVE_HpLim                            \
   CALLER_SAVE_CurrentTSO                       \
   CALLER_SAVE_CurrentNursery                   \
-  CALLER_SAVE_SparkHd                          \
-  CALLER_SAVE_SparkTl                          \
-  CALLER_SAVE_SparkBase                                \
-  CALLER_SAVE_SparkLim                          \
   CALLER_SAVE_Base
 
 #define CALLER_RESTORE_USER                    \
   CALLER_SAVE_Base
 
 #define CALLER_RESTORE_USER                    \
@@ -763,11 +673,7 @@ GLOBAL_REG_DECL(bdescr *,SparkLim,REG_SparkLim)
   CALLER_RESTORE_Hp                            \
   CALLER_RESTORE_HpLim                         \
   CALLER_RESTORE_CurrentTSO                    \
   CALLER_RESTORE_Hp                            \
   CALLER_RESTORE_HpLim                         \
   CALLER_RESTORE_CurrentTSO                    \
-  CALLER_RESTORE_CurrentNursery                        \
-  CALLER_RESTORE_SparkHd                       \
-  CALLER_RESTORE_SparkTl                       \
-  CALLER_RESTORE_SparkBase                     \
-  CALLER_RESTORE_SparkLim
+  CALLER_RESTORE_CurrentNursery
 
 #else /* not IN_STG_CODE */
 
 
 #else /* not IN_STG_CODE */
 
index 9e8c7b8..3510ee7 100644 (file)
@@ -37,6 +37,40 @@ typedef enum {
    Types specific to the parallel runtime system.
 */
 
    Types specific to the parallel runtime system.
 */
 
+
+/* Spark pools: used to store pending sparks 
+ *  (THREADED_RTS & PARALLEL_HASKELL only)
+ * Implementation uses a DeQue to enable concurrent read accesses at
+ * the top end.
+ */
+typedef struct  SparkPool_ {
+  /* Size of elements array. Used for modulo calculation: we round up
+     to powers of 2 and use the dyadic log (modulo == bitwise &) */
+  StgWord size; 
+  StgWord moduloSize; /* bitmask for modulo */
+
+  /* top, index where multiple readers steal() (protected by a cas) */
+  StgWord top;
+
+  /* bottom, index of next free place where one writer can push
+     elements. This happens unsynchronised. */
+  StgWord bottom;
+  /* both position indices are continuously incremented, and used as
+     an index modulo the current array size. */
+  
+  /* lower bound on the current top value. This is an internal
+     optimisation to avoid unnecessarily accessing the top field
+     inside pushBottom */
+  StgWord topBound;
+
+  /* The elements array */
+  StgClosurePtr* elements;
+  /*  Please note: the dataspace cannot follow the admin fields
+      immediately, as it should be possible to enlarge it without
+      disposing the old one automatically (as realloc would)! */
+
+} SparkPool;
+
 typedef ullong        rtsTime;
 
 #if defined(PAR)
 typedef ullong        rtsTime;
 
 #if defined(PAR)
index 4d5748c..516aaa5 100644 (file)
@@ -54,6 +54,55 @@ globalWorkToDo (void)
 #endif
 
 #if defined(THREADED_RTS)
 #endif
 
 #if defined(THREADED_RTS)
+rtsBool stealWork( Capability *cap) {
+  /* use the normal Sparks.h interface (internally modified to enable
+     concurrent stealing) 
+     and immediately turn the spark into a thread when successful
+  */
+  Capability *robbed;
+  SparkPool *pool;
+  StgClosurePtr spark;
+  rtsBool success = rtsFalse;
+  nat i = 0;
+
+  debugTrace(DEBUG_sched,
+            "cap %d: Trying to steal work from other capabilities", 
+            cap->no);
+
+  if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
+
+  /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
+     start at a random place instead of 0 as well.  */
+  for ( i=0 ; i < n_capabilities ; i++ ) {
+    robbed = &capabilities[i];
+    if (cap == robbed)  // ourselves...
+      continue;
+
+    if (emptySparkPoolCap(robbed)) // nothing to steal here
+      continue;
+    
+    spark = findSpark(robbed);
+
+    if (spark == NULL && !emptySparkPoolCap(robbed)) {
+      spark = findSpark(robbed); // lost race in concurrent access, try again
+    }
+    if (spark != NULL) {
+      debugTrace(DEBUG_sched,
+                "cap %d: Stole a spark from capability %d",
+                cap->no, robbed->no);
+
+      createSparkThread(cap,spark);
+      success = rtsTrue;
+      break; // got one, leave the loop
+    }
+ // otherwise: no success, try next one
+  }
+  debugTrace(DEBUG_sched,
+            "Leaving work stealing routine (%s)",
+            success?"one spark stolen":"thefts did not succeed");
+  return success;
+}
+
 STATIC_INLINE rtsBool
 anyWorkForMe( Capability *cap, Task *task )
 {
 STATIC_INLINE rtsBool
 anyWorkForMe( Capability *cap, Task *task )
 {
@@ -73,9 +122,11 @@ anyWorkForMe( Capability *cap, Task *task )
        if (emptyRunQueue(cap)) {
            return !emptySparkPoolCap(cap)
                || !emptyWakeupQueue(cap)
        if (emptyRunQueue(cap)) {
            return !emptySparkPoolCap(cap)
                || !emptyWakeupQueue(cap)
-               || globalWorkToDo();
-       } else
+               || globalWorkToDo()
+               || stealWork(cap); /* if all false: try to steal work */
+       } else {
            return cap->run_queue_hd->bound == NULL;
            return cap->run_queue_hd->bound == NULL;
+       }
     }
 }
 #endif
     }
 }
 #endif
@@ -778,7 +829,7 @@ void
 freeCapability (Capability *cap) {
     stgFree(cap->mut_lists);
 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 freeCapability (Capability *cap) {
     stgFree(cap->mut_lists);
 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
-    freeSparkPool(&cap->r.rSparks);
+    freeSparkPool(cap->sparks);
 #endif
 }
 
 #endif
 }
 
index 70d9ee9..5945895 100644 (file)
@@ -23,6 +23,7 @@
 #ifndef CAPABILITY_H
 #define CAPABILITY_H
 
 #ifndef CAPABILITY_H
 #define CAPABILITY_H
 
+#include "RtsTypes.h"
 #include "RtsFlags.h"
 #include "Task.h"
 
 #include "RtsFlags.h"
 #include "Task.h"
 
@@ -98,6 +99,9 @@ struct Capability_ {
     StgTRecChunk *free_trec_chunks;
     StgTRecHeader *free_trec_headers;
     nat transaction_tokens;
     StgTRecChunk *free_trec_chunks;
     StgTRecHeader *free_trec_headers;
     nat transaction_tokens;
+
+    SparkPool *sparks;
+
 }; // typedef Capability, defined in RtsAPI.h
 
 
 }; // typedef Capability, defined in RtsAPI.h
 
 
index 626c097..09150fd 100644 (file)
@@ -137,17 +137,17 @@ static Capability *schedule (Capability *initialCapability, Task *task);
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
-#if defined(THREADED_RTS)
-static void schedulePushWork(Capability *cap, Task *task);
-#endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL)
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+static void schedulePushWork(Capability *cap, Task *task);
 static rtsBool scheduleGetRemoteWork(Capability *cap);
 static rtsBool scheduleGetRemoteWork(Capability *cap);
+#if defined(PARALLEL_HASKELL)
 static void scheduleSendPendingMessages(void);
 static void scheduleSendPendingMessages(void);
+#endif
 static void scheduleActivateSpark(Capability *cap);
 #endif
 static void schedulePostRunThread(Capability *cap, StgTSO *t);
 static void scheduleActivateSpark(Capability *cap);
 #endif
 static void schedulePostRunThread(Capability *cap, StgTSO *t);
@@ -291,13 +291,15 @@ schedule (Capability *initialCapability, Task *task)
       } else {
          // Yield the capability to higher-priority tasks if necessary.
          yieldCapability(&cap, task);
       } else {
          // Yield the capability to higher-priority tasks if necessary.
          yieldCapability(&cap, task);
+         /* inside yieldCapability, attempts to steal work from other
+            capabilities, unless the capability has own work. 
+            See (REMARK) below.
+         */
       }
 #endif
       }
 #endif
-      
-#if defined(THREADED_RTS)
-      schedulePushWork(cap,task);
-#endif
 
 
+    /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
+      
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
     // call).
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
     // call).
@@ -365,21 +367,7 @@ schedule (Capability *initialCapability, Task *task)
        barf("sched_state: %d", sched_state);
     }
 
        barf("sched_state: %d", sched_state);
     }
 
-#if defined(THREADED_RTS)
-    // If the run queue is empty, take a spark and turn it into a thread.
-    {
-       if (emptyRunQueue(cap)) {
-           StgClosure *spark;
-           spark = findSpark(cap);
-           if (spark != NULL) {
-               debugTrace(DEBUG_sched,
-                          "turning spark of closure %p into a thread",
-                          (StgClosure *)spark);
-               createSparkThread(cap,spark);     
-           }
-       }
-    }
-#endif // THREADED_RTS
+    /* this was the place to activate a spark, now below... */
 
     scheduleStartSignalHandlers(cap);
 
 
     scheduleStartSignalHandlers(cap);
 
@@ -393,11 +381,19 @@ schedule (Capability *initialCapability, Task *task)
 
     scheduleCheckBlockedThreads(cap);
 
 
     scheduleCheckBlockedThreads(cap);
 
-#if defined(PARALLEL_HASKELL)
-    /* message processing and work distribution goes here */ 
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+    /* work distribution in multithreaded and parallel systems 
+
+       REMARK: IMHO best location for work-stealing as well.
+       tests above might yield some new jobs, so no need to steal a
+       spark in some cases. I believe the yieldCapability.. above
+       should be moved here.
+    */
 
 
+#if defined(PARALLEL_HASKELL)
     /* if messages have been buffered... a NOOP in THREADED_RTS */
     scheduleSendPendingMessages();
     /* if messages have been buffered... a NOOP in THREADED_RTS */
     scheduleSendPendingMessages();
+#endif
 
     /* If the run queue is empty,...*/
     if (emptyRunQueue(cap)) {
 
     /* If the run queue is empty,...*/
     if (emptyRunQueue(cap)) {
@@ -406,6 +402,7 @@ schedule (Capability *initialCapability, Task *task)
 
        /* if this did not work, try to steal a spark from someone else */
       if (emptyRunQueue(cap)) {
 
        /* if this did not work, try to steal a spark from someone else */
       if (emptyRunQueue(cap)) {
+#if defined(PARALLEL_HASKELL)
        receivedFinish = scheduleGetRemoteWork(cap);
        continue; //  a new round, (hopefully) with new work
        /* 
        receivedFinish = scheduleGetRemoteWork(cap);
        continue; //  a new round, (hopefully) with new work
        /* 
@@ -414,10 +411,20 @@ schedule (Capability *initialCapability, Task *task)
                        b) (blocking) awaits and receives messages
           
           in Eden, this is only the blocking receive, as b) in GUM.
                        b) (blocking) awaits and receives messages
           
           in Eden, this is only the blocking receive, as b) in GUM.
+
+          in Threaded-RTS, this does plain nothing. Stealing routine
+               is inside Capability.c and called from
+               yieldCapability() at the very beginning, see REMARK.
        */
        */
+#endif
       }
       }
-    } 
+    } else { /* i.e. run queue was (initially) not empty */
+      schedulePushWork(cap,task);
+      /* work pushing, currently relevant only for THREADED_RTS:
+        (pushes threads, wakes up idle capabilities for stealing) */
+    }
 
 
+#if defined(PARALLEL_HASKELL)
     /* since we perform a blocking receive and continue otherwise,
        either we never reach here or we definitely have work! */
     // from here: non-empty run queue
     /* since we perform a blocking receive and continue otherwise,
        either we never reach here or we definitely have work! */
     // from here: non-empty run queue
@@ -430,7 +437,9 @@ schedule (Capability *initialCapability, Task *task)
                                above, waits for messages as well! */
       processMessages(cap, &receivedFinish);
     }
                                above, waits for messages as well! */
       processMessages(cap, &receivedFinish);
     }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL: non-empty run queue!
+
+#endif /* THREADED_RTS || PARALLEL_HASKELL */
 
     scheduleDetectDeadlock(cap,task);
 #if defined(THREADED_RTS)
 
     scheduleDetectDeadlock(cap,task);
 #if defined(THREADED_RTS)
@@ -679,11 +688,15 @@ schedulePreLoop(void)
  * Push work to other Capabilities if we have some.
  * -------------------------------------------------------------------------- */
 
  * Push work to other Capabilities if we have some.
  * -------------------------------------------------------------------------- */
 
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 static void
 schedulePushWork(Capability *cap USED_IF_THREADS, 
                 Task *task      USED_IF_THREADS)
 {
 static void
 schedulePushWork(Capability *cap USED_IF_THREADS, 
                 Task *task      USED_IF_THREADS)
 {
+  /* following code not for PARALLEL_HASKELL. I kept the call general,
+     future GUM versions might use pushing in a distributed setup */
+#if defined(THREADED_RTS)
+
     Capability *free_caps[n_capabilities], *cap0;
     nat i, n_free_caps;
 
     Capability *free_caps[n_capabilities], *cap0;
     nat i, n_free_caps;
 
@@ -726,7 +739,12 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
        StgTSO *prev, *t, *next;
        rtsBool pushed_to_all;
 
        StgTSO *prev, *t, *next;
        rtsBool pushed_to_all;
 
-       debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
+       debugTrace(DEBUG_sched, 
+                  "cap %d: %s and %d free capabilities, sharing...", 
+                  cap->no, 
+                  (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
+                  "excess threads on run queue":"sparks to share (>=2)",
+                  n_free_caps);
 
        i = 0;
        pushed_to_all = rtsFalse;
 
        i = 0;
        pushed_to_all = rtsFalse;
@@ -760,6 +778,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
            cap->run_queue_tl = prev;
        }
 
            cap->run_queue_tl = prev;
        }
 
+#ifdef SPARK_PUSHING
+       /* JB I left this code in place, it would work but is not necessary */
+
        // If there are some free capabilities that we didn't push any
        // threads to, then try to push a spark to each one.
        if (!pushed_to_all) {
        // If there are some free capabilities that we didn't push any
        // threads to, then try to push a spark to each one.
        if (!pushed_to_all) {
@@ -775,16 +796,23 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                }
            }
        }
                }
            }
        }
+#endif /* SPARK_PUSHING */
 
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
            releaseCapability(free_caps[i]);
        }
 
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
            releaseCapability(free_caps[i]);
        }
+       // now wake them all up, and they might steal sparks if
+       // the did not get a thread
+       prodAllCapabilities();
     }
     task->cap = cap; // reset to point to our Capability.
     }
     task->cap = cap; // reset to point to our Capability.
+
+#endif /* THREADED_RTS */
+
 }
 }
-#endif
+#endif /* THREADED_RTS || PARALLEL_HASKELL */
 
 /* ----------------------------------------------------------------------------
  * Start any pending signal handlers
 
 /* ----------------------------------------------------------------------------
  * Start any pending signal handlers
@@ -965,7 +993,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
  * ------------------------------------------------------------------------- */
 
 #if defined(PARALLEL_HASKELL)
  * ------------------------------------------------------------------------- */
 
 #if defined(PARALLEL_HASKELL)
-static StgTSO *
+static void
 scheduleSendPendingMessages(void)
 {
 
 scheduleSendPendingMessages(void)
 {
 
@@ -984,10 +1012,10 @@ scheduleSendPendingMessages(void)
 #endif
 
 /* ----------------------------------------------------------------------------
 #endif
 
 /* ----------------------------------------------------------------------------
- * Activate spark threads (PARALLEL_HASKELL only)
+ * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
  * ------------------------------------------------------------------------- */
 
  * ------------------------------------------------------------------------- */
 
-#if defined(PARALLEL_HASKELL)
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 static void
 scheduleActivateSpark(Capability *cap)
 {
 static void
 scheduleActivateSpark(Capability *cap)
 {
@@ -1012,14 +1040,14 @@ scheduleActivateSpark(Capability *cap)
       createSparkThread(cap,spark); // defined in Sparks.c
     }
 }
       createSparkThread(cap,spark); // defined in Sparks.c
     }
 }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
 
 /* ----------------------------------------------------------------------------
  * Get work from a remote node (PARALLEL_HASKELL only)
  * ------------------------------------------------------------------------- */
     
 
 /* ----------------------------------------------------------------------------
  * Get work from a remote node (PARALLEL_HASKELL only)
  * ------------------------------------------------------------------------- */
     
-#if defined(PARALLEL_HASKELL)
-static rtsBool
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+static rtsBool /* return value used in PARALLEL_HASKELL only */
 scheduleGetRemoteWork(Capability *cap)
 {
 #if defined(PARALLEL_HASKELL)
 scheduleGetRemoteWork(Capability *cap)
 {
 #if defined(PARALLEL_HASKELL)
@@ -1057,7 +1085,7 @@ scheduleGetRemoteWork(Capability *cap)
 
 #endif /* PARALLEL_HASKELL */
 }
 
 #endif /* PARALLEL_HASKELL */
 }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
 
 /* ----------------------------------------------------------------------------
  * After running a thread...
 
 /* ----------------------------------------------------------------------------
  * After running a thread...
@@ -1483,6 +1511,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
        performHeapProfile = rtsFalse;
     }
 
        performHeapProfile = rtsFalse;
     }
 
+#ifdef SPARKBALANCE
+    /* JB 
+       Once we are all together... this would be the place to balance all
+       spark pools. No concurrent stealing or adding of new sparks can
+       occur. Should be defined in Sparks.c. */
+    balanceSparkPoolsCaps(n_capabilities, capabilities);
+#endif
+
 #if defined(THREADED_RTS)
     // release our stash of capabilities.
     for (i = 0; i < n_capabilities; i++) {
 #if defined(THREADED_RTS)
     // release our stash of capabilities.
     for (i = 0; i < n_capabilities; i++) {
index 2e9e61c..ac11172 100644 (file)
@@ -1,10 +1,39 @@
 /* ---------------------------------------------------------------------------
  *
 /* ---------------------------------------------------------------------------
  *
- * (c) The GHC Team, 2000-2006
+ * (c) The GHC Team, 2000-2008
  *
  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
  *
  *
  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
  *
- * -------------------------------------------------------------------------*/
+ * The implementation uses Double-Ended Queues with lock-free access
+ * (thereby often called "deque") as described in
+ *
+ * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
+ * SPAA'05, July 2005, Las Vegas, USA.
+ * ACM 1-58113-986-1/05/0007
+ *
+ * Author: Jost Berthold MSRC 07-09/2008
+ *
+ * The DeQue is held as a circular array with known length. Positions
+ * of top (read-end) and bottom (write-end) always increase, and the
+ * array is accessed with indices modulo array-size. While this bears
+ * the risk of overflow, we assume that (with 64 bit indices), a
+ * program must run very long to reach that point.
+ * 
+ * The write end of the queue (position bottom) can only be used with
+ * mutual exclusion, i.e. by exactly one caller at a time.  At this
+ * end, new items can be enqueued using pushBottom()/newSpark(), and
+ * removed using popBottom()/reclaimSpark() (the latter implying a cas
+ * synchronisation with potential concurrent readers for the case of
+ * just one element).
+ * 
+ * Multiple readers can steal()/findSpark() from the read end
+ * (position top), and are synchronised without a lock, based on a cas
+ * of the top position. One reader wins, the others return NULL for a
+ * failure.
+ * 
+ * Both popBottom and steal also return NULL when the queue is empty.
+ * 
+ -------------------------------------------------------------------------*/
 
 #include "PosixSource.h"
 #include "Rts.h"
 
 #include "PosixSource.h"
 #include "Rts.h"
 #include "RtsFlags.h"
 #include "RtsUtils.h"
 #include "ParTicky.h"
 #include "RtsFlags.h"
 #include "RtsUtils.h"
 #include "ParTicky.h"
-# if defined(PARALLEL_HASKELL)
-# include "ParallelRts.h"
-# include "GranSimRts.h"   // for GR_...
-# elif defined(GRAN)
-# include "GranSimRts.h"
-# endif
-#include "Sparks.h"
 #include "Trace.h"
 
 #include "Trace.h"
 
+#include "SMP.h" // for cas
+
+#include "Sparks.h"
+
 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 
 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 
-static INLINE_ME void bump_hd (StgSparkPool *p)
-{ p->hd++; if (p->hd == p->lim) p->hd = p->base; }
+/* internal helpers ... */
+
+StgWord roundUp2(StgWord val);
 
 
-static INLINE_ME void bump_tl (StgSparkPool *p)
-{ p->tl++; if (p->tl == p->lim) p->tl = p->base; }
+StgWord roundUp2(StgWord val) {
+  StgWord rounded = 1;
+
+  /* StgWord is unsigned anyway, only catch 0 */
+  if (val == 0) {
+    barf("DeQue,roundUp2: invalid size 0 requested");
+  }
+  /* at least 1 bit set, shift up to its place */
+  do {
+    rounded = rounded << 1;
+  } while (0 != (val = val>>1));
+  return rounded;
+}
+
+INLINE_HEADER
+rtsBool casTop(StgPtr addr, StgWord old, StgWord new);
+
+#if !defined(THREADED_RTS)
+/* missing def. in non THREADED RTS, and makes no sense anyway... */
+StgWord cas(StgPtr addr,StgWord old,StgWord new);
+StgWord cas(StgPtr addr,StgWord old,StgWord new) { 
+  barf("cas: not implemented without multithreading");
+  old = new = *addr; /* to avoid gcc warnings */
+}
+#endif
+
+INLINE_HEADER
+rtsBool casTop(StgWord* addr, StgWord old, StgWord new) {
+  StgWord res = cas((StgPtr) addr, old, new);
+  return ((res == old));
+}
+
+/* or simply like this */
+#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
 
 /* -----------------------------------------------------------------------------
  * 
 
 /* -----------------------------------------------------------------------------
  * 
@@ -37,15 +96,28 @@ static INLINE_ME void bump_tl (StgSparkPool *p)
  *
  * -------------------------------------------------------------------------- */
 
  *
  * -------------------------------------------------------------------------- */
 
-static void 
-initSparkPool(StgSparkPool *pool)
-{
-    pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
-                               * sizeof(StgClosure *),
-                               "initSparkPools");
-    pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
-    pool->hd  = pool->base;
-    pool->tl  = pool->base;
+/* constructor */
+SparkPool* initPool(StgWord size) {
+
+  StgWord realsize; 
+  SparkPool *q;
+
+  realsize = roundUp2(size); /* to compute modulo as a bitwise & */
+
+  q = (SparkPool*) stgMallocBytes(sizeof(SparkPool),   /* admin fields */
+                             "newSparkPool");
+  q->elements = (StgClosurePtr*) 
+                stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
+                              "newSparkPool:data space");
+  q->top=0;
+  q->bottom=0;
+  q->topBound=0; /* read by writer, updated each time top is read */
+
+  q->size = realsize;  /* power of 2 */
+  q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */
+
+  ASSERT_SPARK_POOL_INVARIANTS(q); 
+  return q;
 }
 
 void
 }
 
 void
@@ -55,17 +127,71 @@ initSparkPools( void )
     /* walk over the capabilities, allocating a spark pool for each one */
     nat i;
     for (i = 0; i < n_capabilities; i++) {
     /* walk over the capabilities, allocating a spark pool for each one */
     nat i;
     for (i = 0; i < n_capabilities; i++) {
-       initSparkPool(&capabilities[i].r.rSparks);
+      capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
     }
 #else
     /* allocate a single spark pool */
     }
 #else
     /* allocate a single spark pool */
-    initSparkPool(&MainCapability.r.rSparks);
+    MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
 #endif
 }
 
 void
 #endif
 }
 
 void
-freeSparkPool(StgSparkPool *pool) {
-    stgFree(pool->base);
+freeSparkPool(SparkPool *pool) {
+  /* should not interfere with concurrent findSpark() calls! And
+     nobody should use the pointer any more. We cross our fingers...*/
+  stgFree(pool->elements);
+  stgFree(pool);
+}
+
+/* reclaimSpark(cap): remove a spark from the write end of the queue.
+ * Returns the removed spark, and NULL if a race is lost or the pool
+ * empty.
+ *
+ * If only one spark is left in the pool, we synchronise with
+ * concurrently stealing threads by using cas to modify the top field.
+ * This routine should NEVER be called by a task which does not own
+ * the capability. Can this be checked here?
+ */
+StgClosure* reclaimSpark(Capability *cap) {
+  SparkPool *deque = cap->sparks;
+  /* also a bit tricky, has to avoid concurrent steal() calls by
+     accessing top with cas, when there is only one element left */
+  StgWord t, b;
+  StgClosurePtr* pos;
+  long  currSize;
+  StgClosurePtr removed;
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+
+  b = deque->bottom;
+  /* "decrement b as a test, see what happens" */
+  deque->bottom = --b; 
+  pos = (deque->elements) + (b & (deque->moduloSize));
+  t = deque->top; /* using topBound would give an *upper* bound, we
+                    need a lower bound. We use the real top here, but
+                    can update the topBound value */
+  deque->topBound = t;
+  currSize = b - t;
+  if (currSize < 0) { /* was empty before decrementing b, set b
+                        consistently and abort */
+    deque->bottom = t;
+    return NULL;
+  }
+  removed = *pos;
+  if (currSize > 0) { /* no danger, still elements in buffer after b-- */
+    return removed;
+  } 
+  /* otherwise, has someone meanwhile stolen the same (last) element?
+     Check and increment top value to know  */
+  if ( !(CASTOP(&(deque->top),t,t+1)) ) {
+    removed = NULL; /* no success, but continue adjusting bottom */
+  }
+  deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
+  deque->topBound = t+1; /* ...and cached top value as well */
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+
+  return removed;
 }
 
 /* -----------------------------------------------------------------------------
 }
 
 /* -----------------------------------------------------------------------------
@@ -73,32 +199,80 @@ freeSparkPool(StgSparkPool *pool) {
  * findSpark: find a spark on the current Capability that we can fork
  * into a thread.
  *
  * findSpark: find a spark on the current Capability that we can fork
  * into a thread.
  *
- * -------------------------------------------------------------------------- */
+ * May be called by concurrent threads, which synchronise on top
+ * variable. Returns a spark, or NULL if pool empty or race lost.
+ *
+ -------------------------------------------------------------------------- */
+
+StgClosurePtr steal(SparkPool *deque);
+
+/* steal an element from the read end. Synchronises multiple callers
+   by failing with NULL return. Returns NULL when deque is empty. */
+StgClosurePtr steal(SparkPool *deque) {
+  StgClosurePtr* pos;
+  StgClosurePtr* arraybase;
+  StgWord sz;
+  StgClosurePtr stolen;
+  StgWord b,t; 
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+
+  b = deque->bottom;
+  t = deque->top;
+  if (b - t <= 0 ) { 
+    return NULL; /* already looks empty, abort */
+  }
+
+  /* now access array, see pushBottom() */
+  arraybase = deque->elements;
+  sz = deque->moduloSize;
+  pos = arraybase + (t & sz);  
+  stolen = *pos;
+
+  /* now decide whether we have won */
+  if ( !(CASTOP(&(deque->top),t,t+1)) ) {
+    /* lost the race, someon else has changed top in the meantime */
+    stolen = NULL;    
+  }  /* else: OK, top has been incremented by the cas call */
+
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+  /* return NULL or stolen element */
+  return stolen;
+}
 
 StgClosure *
 findSpark (Capability *cap)
 {
 
 StgClosure *
 findSpark (Capability *cap)
 {
-    StgSparkPool *pool;
-    StgClosure *spark;
+  SparkPool *deque = (cap->sparks);
+  StgClosure *stolen;
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+
+  do { 
+    /* keep trying until good spark found or pool looks empty. 
+       TODO is this a good idea? */
+
+    stolen = steal(deque);
     
     
-    pool = &(cap->r.rSparks);
-    ASSERT_SPARK_POOL_INVARIANTS(pool);
+  } while ( ( !stolen /* nothing stolen*/
+             || !closure_SHOULD_SPARK(stolen)) /* spark not OK */
+           && !looksEmpty(deque)); /* run empty, give up */
 
 
-    while (pool->hd != pool->tl) {
-       spark = *pool->hd;
-       bump_hd(pool);
-       if (closure_SHOULD_SPARK(spark)) {
-#ifdef GRAN
-           if (RtsFlags.ParFlags.ParStats.Sparks) 
-               DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
-                                GR_STEALING, ((StgTSO *)NULL), spark, 
-                                0, 0 /* spark_queue_len(ADVISORY_POOL) */);
-#endif
-           return spark;
-       }
-    }
-    // spark pool is now empty
-    return NULL;
+  /* return stolen element */
+  return stolen;
+}
+
+
+/* "guesses" whether a deque is empty. Can return false negatives in
+   presence of concurrent steal() calls, and false positives in
+   presence of a concurrent pushBottom().*/
+rtsBool looksEmpty(SparkPool* deque) {
+  StgWord t = deque->top;
+  StgWord b = deque->bottom;
+  /* try to prefer false negatives by reading top first */
+  return (b - t <= 0);
+  /* => array is *never* completely filled, always 1 place free! */
 }
 
 /* -----------------------------------------------------------------------------
 }
 
 /* -----------------------------------------------------------------------------
@@ -123,11 +297,64 @@ createSparkThread (Capability *cap, StgClosure *p)
  * -------------------------------------------------------------------------- */
 
 #define DISCARD_NEW
  * -------------------------------------------------------------------------- */
 
 #define DISCARD_NEW
+void pushBottom(SparkPool* deque, StgClosurePtr elem);
+
+/* enqueue an element. Should always succeed by resizing the array
+   (not implemented yet, silently fails in that case). */
+void pushBottom(SparkPool* deque, StgClosurePtr elem) {
+  StgWord t;
+  StgClosurePtr* pos;
+  StgWord sz = deque->moduloSize; 
+  StgWord b = deque->bottom;
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+
+  /* we try to avoid reading deque->top (accessed by all) and use
+     deque->topBound (accessed only by writer) instead. 
+     This is why we do not just call empty(deque) here.
+  */
+  t = deque->topBound;
+  if ( b - t >= sz ) { /* nota bene: sz == deque->size - 1, thus ">=" */
+    /* could be full, check the real top value in this case */
+    t = deque->top;
+    deque->topBound = t;
+    if (b - t >= sz) { /* really no space left :-( */
+      /* reallocate the array, copying the values. Concurrent steal()s
+        will in the meantime use the old one and modify only top.
+        This means: we cannot safely free the old space! Can keep it
+        on a free list internally here...
+
+        Potential bug in combination with steal(): if array is
+        replaced, it is unclear which one concurrent steal operations
+        use. Must read the array base address in advance in steal().
+      */
+#if defined(DISCARD_NEW)
+      ASSERT_SPARK_POOL_INVARIANTS(deque); 
+      return; /* for now, silently fail */
+#else
+      /* could make room by incrementing the top position here.  In
+       * this case, should use CASTOP. If this fails, someone else has
+       * removed something, and new room will be available.
+       */
+      ASSERT_SPARK_POOL_INVARIANTS(deque); 
+#endif
+    }
+  }
+  pos = (deque->elements) + (b & sz);
+  *pos = elem;
+  (deque->bottom)++;
+
+  ASSERT_SPARK_POOL_INVARIANTS(deque); 
+  return;
+}
+
 
 
+/* this is called as a direct C-call from Stg => we need to keep the
+   pool in a register (???) */
 StgInt
 newSpark (StgRegTable *reg, StgClosure *p)
 {
 StgInt
 newSpark (StgRegTable *reg, StgClosure *p)
 {
-    StgSparkPool *pool = &(reg->rSparks);
+    SparkPool *pool = (reg->rCurrentTSO->cap->sparks);
 
     /* I am not sure whether this is the right thing to do.
      * Maybe it is better to exploit the tag information
 
     /* I am not sure whether this is the right thing to do.
      * Maybe it is better to exploit the tag information
@@ -138,82 +365,125 @@ newSpark (StgRegTable *reg, StgClosure *p)
     ASSERT_SPARK_POOL_INVARIANTS(pool);
 
     if (closure_SHOULD_SPARK(p)) {
     ASSERT_SPARK_POOL_INVARIANTS(pool);
 
     if (closure_SHOULD_SPARK(p)) {
-#ifdef DISCARD_NEW
-       StgClosure **new_tl;
-       new_tl = pool->tl + 1;
-       if (new_tl == pool->lim) { new_tl = pool->base; }
-       if (new_tl != pool->hd) {
-           *pool->tl = p;
-           pool->tl = new_tl;
-       } else if (!closure_SHOULD_SPARK(*pool->hd)) {
-           // if the old closure is not sparkable, discard it and
-           // keep the new one.  Otherwise, keep the old one.
-           *pool->tl = p;
-           bump_hd(pool);
-       }
-#else  /* DISCARD OLD */
-       *pool->tl = p;
-       bump_tl(pool);
-       if (pool->tl == pool->hd) { bump_hd(pool); }
-#endif
+      pushBottom(pool,p);
     }  
 
     ASSERT_SPARK_POOL_INVARIANTS(pool);
     return 1;
 }
 
     }  
 
     ASSERT_SPARK_POOL_INVARIANTS(pool);
     return 1;
 }
 
-/* -----------------------------------------------------------------------------
- * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
- * implicit slide i.e. after marking all sparks are at the beginning of the
- * spark pool and the spark pool only contains sparkable closures 
+
+
+/* --------------------------------------------------------------------------
+ * Remove all sparks from the spark queues which should not spark any
+ * more.  Called after GC. We assume exclusive access to the structure
+ * and replace  all sparks in the queue, see explanation below. At exit,
+ * the spark pool only contains sparkable closures.
  * -------------------------------------------------------------------------- */
 
 static void
 pruneSparkQueue (Capability *cap)
 { 
  * -------------------------------------------------------------------------- */
 
 static void
 pruneSparkQueue (Capability *cap)
 { 
-    StgClosure *spark, **sparkp, **to_sparkp;
+    SparkPool *pool;
+    StgClosurePtr spark, evacspark, *elements;
     nat n, pruned_sparks; // stats only
     nat n, pruned_sparks; // stats only
-    StgSparkPool *pool;
+    StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
     
     PAR_TICKY_MARK_SPARK_QUEUE_START();
     
     n = 0;
     pruned_sparks = 0;
     
     
     PAR_TICKY_MARK_SPARK_QUEUE_START();
     
     n = 0;
     pruned_sparks = 0;
     
-    pool = &(cap->r.rSparks);
+    pool = cap->sparks;
     
     
+    debugTrace(DEBUG_sched,
+               "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
+               sparkPoolSize(pool), pool->bottom, pool->top);
     ASSERT_SPARK_POOL_INVARIANTS(pool);
     ASSERT_SPARK_POOL_INVARIANTS(pool);
-    
-    sparkp = pool->hd;
-    to_sparkp = pool->hd;
-    while (sparkp != pool->tl) {
-        ASSERT(*sparkp!=NULL);
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
-        // ToDo?: statistics gathering here (also for GUM!)
-        spark = *sparkp;
-        if (!closure_SHOULD_SPARK(spark)) {
-            pruned_sparks++;
-        } else{
-            *to_sparkp++ = spark;
-            if (to_sparkp == pool->lim) {
-                to_sparkp = pool->base;
-            }
-            n++;
-        }
-        sparkp++;
-        if (sparkp == pool->lim) {
-            sparkp = pool->base;
-        }
-    }
-    pool->tl = to_sparkp;
-       
+
+    elements = pool->elements;
+
+    /* We have exclusive access to the structure here, so we can reset
+       bottom and top counters, and prune invalid sparks. Contents are
+       copied in-place if they are valuable, otherwise discarded. The
+       routine uses "real" indices t and b, starts by computing them
+       as the modulus size of top and bottom,
+
+       Copying:
+
+       At the beginning, the pool structure can look like this:
+       ( bottom % size >= top % size , no wrap-around)
+                  t          b
+       ___________***********_________________
+
+       or like this ( bottom % size < top % size, wrap-around )
+                  b         t
+       ***********__________******************
+       As we need to remove useless sparks anyway, we make one pass
+       between t and b, moving valuable content to b and subsequent
+       cells (wrapping around when the size is reached).
+
+                     b      t
+       ***********OOO_______XX_X__X?**********
+                     ^____move?____/
+
+       After this movement, botInd becomes the new bottom, and old
+       bottom becomes the new top index, both as indices in the array
+       size range.
+    */
+    // starting here
+    currInd = (pool->top) & (pool->moduloSize); // mod
+
+    // copies of evacuated closures go to space from botInd on
+    // we keep oldBotInd to know when to stop
+    oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
+
+    // on entry to loop, we are within the bounds
+    ASSERT( currInd < pool->size && botInd  < pool->size );
+
+    while (currInd != oldBotInd ) {
+      /* must use != here, wrap-around at size
+        subtle: loop not entered if queue empty
+       */
+
+      /* check element at currInd. if valuable, evacuate and move to
+        botInd, otherwise move on */
+      spark = elements[currInd];
+
+      /* if valuable work: shift inside the pool */
+      if ( closure_SHOULD_SPARK(spark) ) {
+       elements[botInd] = spark; // keep entry (new address)
+       botInd++;
+       n++;
+      } else { 
+       pruned_sparks++; // discard spark
+      }
+      currInd++;
+
+      // in the loop, we may reach the bounds, and instantly wrap around
+      ASSERT( currInd <= pool->size && botInd <= pool->size );
+      if ( currInd == pool->size ) { currInd = 0; }
+      if ( botInd == pool->size )  { botInd = 0;  }
+
+    } // while-loop over spark pool elements
+
+    ASSERT(currInd == oldBotInd);
+
+    pool->top = oldBotInd; // where we started writing
+    pool->topBound = pool->top;
+
+    pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); 
+    // first free place we did not use (corrected by wraparound)
+
     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
-       
+
     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
     
     debugTrace(DEBUG_sched,
     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
     
     debugTrace(DEBUG_sched,
-               "new spark queue len=%d; (hd=%p; tl=%p)",
-               sparkPoolSize(pool), pool->hd, pool->tl);
+               "new spark queue len=%d; (hd=%ld; tl=%ld)",
+               sparkPoolSize(pool), pool->bottom, pool->top);
+
+    ASSERT_SPARK_POOL_INVARIANTS(pool);
 }
 
 void
 }
 
 void
@@ -225,21 +495,50 @@ pruneSparkQueues (void)
     }
 }
 
     }
 }
 
+/* GC for the spark pool, called inside Capability.c for all
+   capabilities in turn. Blindly "evac"s complete spark pool. */
 void
 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
 {
     StgClosure **sparkp;
 void
 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
 {
     StgClosure **sparkp;
-    StgSparkPool *pool;
+    SparkPool *pool;
+    StgWord top,bottom, modMask;
     
     
-    pool = &(cap->r.rSparks);
-    sparkp = pool->hd;
-    while (sparkp != pool->tl) {
-        evac(user, sparkp);
-        sparkp++;
-        if (sparkp == pool->lim) {
-            sparkp = pool->base;
-        }
+    pool = cap->sparks;
+
+    ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+    top = pool->top;
+    bottom = pool->bottom;
+    sparkp = pool->elements;
+    modMask = pool->moduloSize;
+
+    while (top < bottom) {
+    /* call evac for all closures in range (wrap-around via modulo)
+     * In GHC-6.10, evac takes an additional 1st argument to hold a
+     * GC-specific register, see rts/sm/GC.c::mark_root()
+     */
+      evac( user , sparkp + (top & modMask) ); 
+      top++;
     }
     }
+
+    debugTrace(DEBUG_sched,
+               "traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
+               sparkPoolSize(pool), pool->bottom, pool->top);
+}
+
+/* ----------------------------------------------------------------------------
+
+ * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
+ * capabilities) and its size. Accesses all spark pools and equally
+ * distributes the sparks among them.
+ *
+ * Could be called after GC, before Cap. release, from scheduler. 
+ * -------------------------------------------------------------------------- */
+void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
+
+void balanceSparkPoolsCaps(nat n_caps, Capability caps[]) {
+  barf("not implemented");
 }
 
 #else
 }
 
 #else
@@ -259,6 +558,8 @@ newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
  * 
  * GRAN & PARALLEL_HASKELL stuff beyond here.
  *
  * 
  * GRAN & PARALLEL_HASKELL stuff beyond here.
  *
+ *  TODO "nuke" this!
+ *
  * -------------------------------------------------------------------------- */
 
 #if defined(PARALLEL_HASKELL) || defined(GRAN)
  * -------------------------------------------------------------------------- */
 
 #if defined(PARALLEL_HASKELL) || defined(GRAN)
index 8e0ba90..dbbf268 100644 (file)
@@ -9,17 +9,45 @@
 #ifndef SPARKS_H
 #define SPARKS_H
 
 #ifndef SPARKS_H
 #define SPARKS_H
 
+#if defined(PARALLEL_HASKELL)
+#error Sparks.c using new internal structure, needs major overhaul!
+#endif
+
+/* typedef for SparkPool in RtsTypes.h */
+
 #if defined(THREADED_RTS)
 #if defined(THREADED_RTS)
+
+/* INVARIANTS, in this order: bottom/top consistent, reasonable size,
+   topBound consistent, space pointer, space accessible to us */
+#define ASSERT_SPARK_POOL_INVARIANTS(p)         \
+  ASSERT((p)->bottom >= (p)->top);              \
+  ASSERT((p)->size > 0);                        \
+  ASSERT((p)->size > (p)->bottom - (p)->top);  \
+  ASSERT((p)->topBound <= (p)->top);            \
+  ASSERT((p)->elements != NULL);                \
+  ASSERT(*((p)->elements) || 1);                \
+  ASSERT(*((p)->elements - 1  + ((p)->size)) || 1);
+
+// missing in old interface. Currently called by initSparkPools
+// internally.
+SparkPool* initPool(StgWord size);
+
+// special case: accessing our own pool, at the write end
+// otherwise, we can always steal from our pool as the others do...
+StgClosure* reclaimSpark(Capability *cap);
+
+rtsBool looksEmpty(SparkPool* deque);
+
+// rest: same as old interface
 StgClosure * findSpark         (Capability *cap);
 void         initSparkPools    (void);
 StgClosure * findSpark         (Capability *cap);
 void         initSparkPools    (void);
-void         freeSparkPool     (StgSparkPool *pool);
+void         freeSparkPool     (SparkPool *pool);
 void         createSparkThread (Capability *cap, StgClosure *p);
 void         pruneSparkQueues  (void);
 void         traverseSparkQueue(evac_fn evac, void *user, Capability *cap);
 
 void         createSparkThread (Capability *cap, StgClosure *p);
 void         pruneSparkQueues  (void);
 void         traverseSparkQueue(evac_fn evac, void *user, Capability *cap);
 
-INLINE_HEADER void     discardSparks  (StgSparkPool *pool);
-INLINE_HEADER nat      sparkPoolSize  (StgSparkPool *pool);
-INLINE_HEADER rtsBool  emptySparkPool (StgSparkPool *pool);
+INLINE_HEADER void     discardSparks  (SparkPool *pool);
+INLINE_HEADER nat      sparkPoolSize  (SparkPool *pool);
 
 INLINE_HEADER void     discardSparksCap  (Capability *cap);
 INLINE_HEADER nat      sparkPoolSizeCap  (Capability *cap);
 
 INLINE_HEADER void     discardSparksCap  (Capability *cap);
 INLINE_HEADER nat      sparkPoolSizeCap  (Capability *cap);
@@ -32,46 +60,33 @@ INLINE_HEADER rtsBool  emptySparkPoolCap (Capability *cap);
 
 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 
 
 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 
-INLINE_HEADER rtsBool
-emptySparkPool (StgSparkPool *pool)
-{
-    return (pool->hd == pool->tl);
-}
+INLINE_HEADER rtsBool  
+emptySparkPool (SparkPool *pool) 
+{ return looksEmpty(pool); }
 
 INLINE_HEADER rtsBool
 emptySparkPoolCap (Capability *cap) 
 
 INLINE_HEADER rtsBool
 emptySparkPoolCap (Capability *cap) 
-{ return emptySparkPool(&cap->r.rSparks); }
+{ return looksEmpty(cap->sparks); }
 
 INLINE_HEADER nat
 
 INLINE_HEADER nat
-sparkPoolSize (StgSparkPool *pool) 
+sparkPoolSize (SparkPool *pool) 
 {
 {
-    if (pool->hd <= pool->tl) {
-       return (pool->tl - pool->hd);
-    } else {
-       return (pool->lim - pool->hd + pool->tl - pool->base);
-    }
+  return (pool->bottom - pool->top);
 }
 
 INLINE_HEADER nat
 sparkPoolSizeCap (Capability *cap) 
 }
 
 INLINE_HEADER nat
 sparkPoolSizeCap (Capability *cap) 
-{ return sparkPoolSize(&cap->r.rSparks); }
+{ return sparkPoolSize(cap->sparks); }
 
 INLINE_HEADER void
 
 INLINE_HEADER void
-discardSparks (StgSparkPool *pool)
+discardSparks (SparkPool *pool)
 {
 {
-    pool->hd = pool->tl;
+    pool->top = pool->bottom = 0;
 }
 
 INLINE_HEADER void
 discardSparksCap (Capability *cap) 
 }
 
 INLINE_HEADER void
 discardSparksCap (Capability *cap) 
-{ return discardSparks(&cap->r.rSparks); }
-
-
-#elif defined(THREADED_RTS) 
-
-INLINE_HEADER rtsBool
-emptySparkPoolCap (Capability *cap STG_UNUSED)
-{ return rtsTrue; }
+{ return discardSparks(cap->sparks); }
 
 #endif
 
 
 #endif