Run sparks in batches, instead of creating a new thread for each one
authorSimon Marlow <marlowsd@gmail.com>
Thu, 6 Nov 2008 11:36:39 +0000 (11:36 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Thu, 6 Nov 2008 11:36:39 +0000 (11:36 +0000)
Signficantly reduces the overhead for par, which means that we can
make use of paralellism at a much finer granularity.

compiler/prelude/primops.txt.pp
includes/StgMiscClosures.h
rts/Capability.c
rts/Capability.h
rts/Linker.c
rts/Prelude.h
rts/PrimOps.cmm
rts/Schedule.c
rts/Sparks.c
rts/Sparks.h
rts/package.conf.in

index 417d42e..77ef9de 100644 (file)
@@ -1633,6 +1633,12 @@ primop  ParOp "par#" GenPrimOp
       -- gets evaluted strictly, which it should *not* be
    has_side_effects = True
 
+primop GetSparkOp "getSpark#" GenPrimOp
+   State# s -> (# State# s, Int#, a #)
+   with
+   has_side_effects = True
+   out_of_line = True
+
 -- HWL: The first 4 Int# in all par... annotations denote:
 --   name, granularity info, size of result, degree of parallelism
 --      Same  structure as _seq_ i.e. returns Int#
index f69a4ae..9158682 100644 (file)
@@ -606,6 +606,7 @@ RTS_FUN(checkzh_fast);
 
 RTS_FUN(unpackClosurezh_fast);
 RTS_FUN(getApStackValzh_fast);
+RTS_FUN(getSparkzh_fast);
 
 RTS_FUN(noDuplicatezh_fast);
 
index c810311..ddb47b4 100644 (file)
@@ -54,7 +54,7 @@ globalWorkToDo (void)
 #endif
 
 #if defined(THREADED_RTS)
-rtsBool
+StgClosure *
 stealWork (Capability *cap)
 {
   /* use the normal Sparks.h interface (internally modified to enable
@@ -70,7 +70,7 @@ stealWork (Capability *cap)
             "cap %d: Trying to steal work from other capabilities", 
             cap->no);
 
-  if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
+  if (n_capabilities == 1) { return NULL; } // makes no sense...
 
   do {
       retry = rtsFalse;
@@ -85,7 +85,7 @@ stealWork (Capability *cap)
           if (emptySparkPoolCap(robbed)) // nothing to steal here
               continue;
 
-          spark = tryStealSpark(robbed->sparks);
+          spark = tryStealSpark(robbed);
           if (spark == NULL && !emptySparkPoolCap(robbed)) {
               // we conflicted with another thread while trying to steal;
               // try again later.
@@ -96,16 +96,31 @@ stealWork (Capability *cap)
               debugTrace(DEBUG_sched,
                 "cap %d: Stole a spark from capability %d",
                          cap->no, robbed->no);
-
-              createSparkThread(cap,spark);
-              return rtsTrue;
+              return spark;
           }
           // otherwise: no success, try next one
       }
   } while (retry);
 
   debugTrace(DEBUG_sched, "No sparks stolen");
-  return rtsFalse;
+  return NULL;
+}
+
+// Returns True if any spark pool is non-empty at this moment in time
+// The result is only valid for an instant, of course, so in a sense
+// is immediately invalid, and should not be relied upon for
+// correctness.
+rtsBool
+anySparks (void)
+{
+    nat i;
+
+    for (i=0; i < n_capabilities; i++) {
+        if (!emptySparkPoolCap(&capabilities[i])) {
+            return rtsTrue;
+        }
+    }
+    return rtsFalse;
 }
 #endif
 
index 9446a7e..869fdc3 100644 (file)
@@ -244,7 +244,11 @@ rtsBool tryGrabCapability (Capability *cap, Task *task);
 
 // Try to steal a spark from other Capabilities
 //
-rtsBool stealWork (Capability *cap);
+StgClosure *stealWork (Capability *cap);
+
+// True if any capabilities have sparks
+//
+rtsBool anySparks (void);
 
 INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap);
 INLINE_HEADER nat     sparkPoolSizeCap  (Capability *cap);
index 6efca38..c73fbec 100644 (file)
@@ -608,6 +608,7 @@ typedef struct _RtsSymbolVal {
       SymI_HasProto(initLinker)                                \
       SymI_HasProto(unpackClosurezh_fast)               \
       SymI_HasProto(getApStackValzh_fast)               \
+      SymI_HasProto(getSparkzh_fast)                    \
       SymI_HasProto(int2Integerzh_fast)                        \
       SymI_HasProto(integer2Intzh_fast)                        \
       SymI_HasProto(integer2Wordzh_fast)               \
index 6eb1311..d89119a 100644 (file)
@@ -42,6 +42,7 @@ PRELUDE_CLOSURE(base_GHCziIOBase_blockedIndefinitely_closure);
 PRELUDE_CLOSURE(base_ControlziExceptionziBase_nonTermination_closure);
 PRELUDE_CLOSURE(base_ControlziExceptionziBase_nestedAtomically_closure);
 
+PRELUDE_CLOSURE(base_GHCziConc_runSparks_closure);
 PRELUDE_CLOSURE(base_GHCziConc_ensureIOManagerIsRunning_closure);
 
 PRELUDE_INFO(ghczmprim_GHCziTypes_Czh_static_info);
index e65cbc4..55ada8c 100644 (file)
@@ -2287,3 +2287,25 @@ getApStackValzh_fast
    }
    RET_NP(ok,val);
 }
+
+getSparkzh_fast
+{
+   W_ spark;
+
+#ifndef THREADED_RTS
+   RET_NP(0,ghczmprim_GHCziBool_False_closure);
+#else
+   (spark) = foreign "C" tryStealSpark(MyCapability());
+   if (spark != 0) {
+      RET_NP(1,spark);
+   } else {
+      (spark) = foreign "C" stealWork (MyCapability());
+      if (spark != 0) {
+         RET_NP(1,spark);
+      } else {
+         RET_NP(0,ghczmprim_GHCziBool_False_closure);
+
+      }
+   }
+#endif
+}
index ca6e426..8c2c3de 100644 (file)
@@ -654,15 +654,9 @@ scheduleFindWork (Capability *cap)
     scheduleCheckBlockedThreads(cap);
 
 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
-    // Try to activate one of our own sparks
     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
 #endif
 
-#if defined(THREADED_RTS)
-    // Try to steak work if we don't have any
-    if (emptyRunQueue(cap)) { stealWork(cap); }
-#endif
-    
 #if defined(PARALLEL_HASKELL)
     // if messages have been buffered...
     scheduleSendPendingMessages();
@@ -1069,30 +1063,10 @@ scheduleSendPendingMessages(void)
 static void
 scheduleActivateSpark(Capability *cap)
 {
-    StgClosure *spark;
-
-/* We only want to stay here if the run queue is empty and we want some
-   work. We try to turn a spark into a thread, and add it to the run
-   queue, from where it will be picked up in the next iteration of the
-   scheduler loop.  
-*/
-    if (!emptyRunQueue(cap)) 
-      /* In the threaded RTS, another task might have pushed a thread
-        on our run queue in the meantime ? But would need a lock.. */
-      return;
-
-    // Really we should be using reclaimSpark() here, but
-    // experimentally it doesn't seem to perform as well as just
-    // stealing from our own spark pool:
-    // spark = reclaimSpark(cap->sparks);
-    spark = tryStealSpark(cap->sparks); // defined in Sparks.c
-
-    if (spark != NULL) {
-      debugTrace(DEBUG_sched,
-                "turning spark of closure %p into a thread",
-                (StgClosure *)spark);
-      createSparkThread(cap,spark); // defined in Sparks.c
+    if (anySparks())
+    {
+        createSparkThread(cap);
+        debugTrace(DEBUG_sched, "creating a spark thread");
     }
 }
 #endif // PARALLEL_HASKELL || THREADED_RTS
index 38a3090..e7273f3 100644 (file)
@@ -44,6 +44,7 @@
 #include "RtsUtils.h"
 #include "ParTicky.h"
 #include "Trace.h"
+#include "Prelude.h"
 
 #include "SMP.h" // for cas
 
@@ -227,8 +228,9 @@ steal(SparkPool *deque)
 }
 
 StgClosure *
-tryStealSpark (SparkPool *pool)
+tryStealSpark (Capability *cap)
 {
+  SparkPool *pool = cap->sparks;
   StgClosure *stolen;
 
   do { 
@@ -264,13 +266,13 @@ looksEmpty(SparkPool* deque)
  * -------------------------------------------------------------------------- */
 
 void
-createSparkThread (Capability *cap, StgClosure *p)
+createSparkThread (Capability *cap)
 {
     StgTSO *tso;
 
-    tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
+    tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, 
+                          &base_GHCziConc_runSparks_closure);
     appendToRunQueue(cap,tso);
-    cap->sparks_converted++;
 }
 
 /* -----------------------------------------------------------------------------
index 0d116bd..9696889 100644 (file)
@@ -73,9 +73,9 @@ StgClosure* reclaimSpark(SparkPool *pool);
 // if the pool is almost empty).
 rtsBool looksEmpty(SparkPool* deque);
 
-StgClosure * tryStealSpark     (SparkPool *pool);
+StgClosure * tryStealSpark     (Capability *cap);
 void         freeSparkPool     (SparkPool *pool);
-void         createSparkThread (Capability *cap, StgClosure *p);
+void         createSparkThread (Capability *cap);
 void         traverseSparkQueue(evac_fn evac, void *user, Capability *cap);
 void         pruneSparkQueue   (evac_fn evac, void *user, Capability *cap);
 
index e869d9c..318f4ed 100644 (file)
@@ -107,6 +107,8 @@ ld-options:
          , "-u", "_base_GHCziWeak_runFinalizzerBatch_closure"
          , "-u", "_base_GHCziTopHandler_runIO_closure"
          , "-u", "_base_GHCziTopHandler_runNonIO_closure"
+        , "-u", "_base_GHCziConc_ensureIOManagerIsRunning_closure"
+        , "-u", "_base_GHCziConc_runSparks_closure"
 #else
            "-u", "ghczmprim_GHCziTypes_Izh_static_info"
          , "-u", "ghczmprim_GHCziTypes_Czh_static_info"
@@ -142,12 +144,8 @@ ld-options:
          , "-u", "base_GHCziWeak_runFinalizzerBatch_closure"
          , "-u", "base_GHCziTopHandler_runIO_closure"
          , "-u", "base_GHCziTopHandler_runNonIO_closure"
-#endif
-
-#ifdef LEADING_UNDERSCORE
-        , "-u", "_base_GHCziConc_ensureIOManagerIsRunning_closure"
-#else
         , "-u", "base_GHCziConc_ensureIOManagerIsRunning_closure"
+        , "-u", "base_GHCziConc_runSparks_closure"
 #endif
 
 /*  Pick up static libraries in preference over dynamic if in earlier search