From 2b16fa4791b08b02df8461f3b79d0e44d72d0960 Mon Sep 17 00:00:00 2001 From: Simon Marlow Date: Thu, 6 Nov 2008 11:36:39 +0000 Subject: [PATCH] Run sparks in batches, instead of creating a new thread for each one Signficantly reduces the overhead for par, which means that we can make use of paralellism at a much finer granularity. --- compiler/prelude/primops.txt.pp | 6 ++++++ includes/StgMiscClosures.h | 1 + rts/Capability.c | 29 ++++++++++++++++++++++------- rts/Capability.h | 6 +++++- rts/Linker.c | 1 + rts/Prelude.h | 1 + rts/PrimOps.cmm | 22 ++++++++++++++++++++++ rts/Schedule.c | 34 ++++------------------------------ rts/Sparks.c | 10 ++++++---- rts/Sparks.h | 4 ++-- rts/package.conf.in | 8 +++----- 11 files changed, 73 insertions(+), 49 deletions(-) diff --git a/compiler/prelude/primops.txt.pp b/compiler/prelude/primops.txt.pp index 417d42e..77ef9de 100644 --- a/compiler/prelude/primops.txt.pp +++ b/compiler/prelude/primops.txt.pp @@ -1633,6 +1633,12 @@ primop ParOp "par#" GenPrimOp -- gets evaluted strictly, which it should *not* be has_side_effects = True +primop GetSparkOp "getSpark#" GenPrimOp + State# s -> (# State# s, Int#, a #) + with + has_side_effects = True + out_of_line = True + -- HWL: The first 4 Int# in all par... annotations denote: -- name, granularity info, size of result, degree of parallelism -- Same structure as _seq_ i.e. returns Int# diff --git a/includes/StgMiscClosures.h b/includes/StgMiscClosures.h index f69a4ae..9158682 100644 --- a/includes/StgMiscClosures.h +++ b/includes/StgMiscClosures.h @@ -606,6 +606,7 @@ RTS_FUN(checkzh_fast); RTS_FUN(unpackClosurezh_fast); RTS_FUN(getApStackValzh_fast); +RTS_FUN(getSparkzh_fast); RTS_FUN(noDuplicatezh_fast); diff --git a/rts/Capability.c b/rts/Capability.c index c810311..ddb47b4 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -54,7 +54,7 @@ globalWorkToDo (void) #endif #if defined(THREADED_RTS) -rtsBool +StgClosure * stealWork (Capability *cap) { /* use the normal Sparks.h interface (internally modified to enable @@ -70,7 +70,7 @@ stealWork (Capability *cap) "cap %d: Trying to steal work from other capabilities", cap->no); - if (n_capabilities == 1) { return rtsFalse; } // makes no sense... + if (n_capabilities == 1) { return NULL; } // makes no sense... do { retry = rtsFalse; @@ -85,7 +85,7 @@ stealWork (Capability *cap) if (emptySparkPoolCap(robbed)) // nothing to steal here continue; - spark = tryStealSpark(robbed->sparks); + spark = tryStealSpark(robbed); if (spark == NULL && !emptySparkPoolCap(robbed)) { // we conflicted with another thread while trying to steal; // try again later. @@ -96,16 +96,31 @@ stealWork (Capability *cap) debugTrace(DEBUG_sched, "cap %d: Stole a spark from capability %d", cap->no, robbed->no); - - createSparkThread(cap,spark); - return rtsTrue; + return spark; } // otherwise: no success, try next one } } while (retry); debugTrace(DEBUG_sched, "No sparks stolen"); - return rtsFalse; + return NULL; +} + +// Returns True if any spark pool is non-empty at this moment in time +// The result is only valid for an instant, of course, so in a sense +// is immediately invalid, and should not be relied upon for +// correctness. +rtsBool +anySparks (void) +{ + nat i; + + for (i=0; i < n_capabilities; i++) { + if (!emptySparkPoolCap(&capabilities[i])) { + return rtsTrue; + } + } + return rtsFalse; } #endif diff --git a/rts/Capability.h b/rts/Capability.h index 9446a7e..869fdc3 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -244,7 +244,11 @@ rtsBool tryGrabCapability (Capability *cap, Task *task); // Try to steal a spark from other Capabilities // -rtsBool stealWork (Capability *cap); +StgClosure *stealWork (Capability *cap); + +// True if any capabilities have sparks +// +rtsBool anySparks (void); INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap); INLINE_HEADER nat sparkPoolSizeCap (Capability *cap); diff --git a/rts/Linker.c b/rts/Linker.c index 6efca38..c73fbec 100644 --- a/rts/Linker.c +++ b/rts/Linker.c @@ -608,6 +608,7 @@ typedef struct _RtsSymbolVal { SymI_HasProto(initLinker) \ SymI_HasProto(unpackClosurezh_fast) \ SymI_HasProto(getApStackValzh_fast) \ + SymI_HasProto(getSparkzh_fast) \ SymI_HasProto(int2Integerzh_fast) \ SymI_HasProto(integer2Intzh_fast) \ SymI_HasProto(integer2Wordzh_fast) \ diff --git a/rts/Prelude.h b/rts/Prelude.h index 6eb1311..d89119a 100644 --- a/rts/Prelude.h +++ b/rts/Prelude.h @@ -42,6 +42,7 @@ PRELUDE_CLOSURE(base_GHCziIOBase_blockedIndefinitely_closure); PRELUDE_CLOSURE(base_ControlziExceptionziBase_nonTermination_closure); PRELUDE_CLOSURE(base_ControlziExceptionziBase_nestedAtomically_closure); +PRELUDE_CLOSURE(base_GHCziConc_runSparks_closure); PRELUDE_CLOSURE(base_GHCziConc_ensureIOManagerIsRunning_closure); PRELUDE_INFO(ghczmprim_GHCziTypes_Czh_static_info); diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index e65cbc4..55ada8c 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -2287,3 +2287,25 @@ getApStackValzh_fast } RET_NP(ok,val); } + +getSparkzh_fast +{ + W_ spark; + +#ifndef THREADED_RTS + RET_NP(0,ghczmprim_GHCziBool_False_closure); +#else + (spark) = foreign "C" tryStealSpark(MyCapability()); + if (spark != 0) { + RET_NP(1,spark); + } else { + (spark) = foreign "C" stealWork (MyCapability()); + if (spark != 0) { + RET_NP(1,spark); + } else { + RET_NP(0,ghczmprim_GHCziBool_False_closure); + + } + } +#endif +} diff --git a/rts/Schedule.c b/rts/Schedule.c index ca6e426..8c2c3de 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -654,15 +654,9 @@ scheduleFindWork (Capability *cap) scheduleCheckBlockedThreads(cap); #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) - // Try to activate one of our own sparks if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); } #endif -#if defined(THREADED_RTS) - // Try to steak work if we don't have any - if (emptyRunQueue(cap)) { stealWork(cap); } -#endif - #if defined(PARALLEL_HASKELL) // if messages have been buffered... scheduleSendPendingMessages(); @@ -1069,30 +1063,10 @@ scheduleSendPendingMessages(void) static void scheduleActivateSpark(Capability *cap) { - StgClosure *spark; - -/* We only want to stay here if the run queue is empty and we want some - work. We try to turn a spark into a thread, and add it to the run - queue, from where it will be picked up in the next iteration of the - scheduler loop. -*/ - if (!emptyRunQueue(cap)) - /* In the threaded RTS, another task might have pushed a thread - on our run queue in the meantime ? But would need a lock.. */ - return; - - - // Really we should be using reclaimSpark() here, but - // experimentally it doesn't seem to perform as well as just - // stealing from our own spark pool: - // spark = reclaimSpark(cap->sparks); - spark = tryStealSpark(cap->sparks); // defined in Sparks.c - - if (spark != NULL) { - debugTrace(DEBUG_sched, - "turning spark of closure %p into a thread", - (StgClosure *)spark); - createSparkThread(cap,spark); // defined in Sparks.c + if (anySparks()) + { + createSparkThread(cap); + debugTrace(DEBUG_sched, "creating a spark thread"); } } #endif // PARALLEL_HASKELL || THREADED_RTS diff --git a/rts/Sparks.c b/rts/Sparks.c index 38a3090..e7273f3 100644 --- a/rts/Sparks.c +++ b/rts/Sparks.c @@ -44,6 +44,7 @@ #include "RtsUtils.h" #include "ParTicky.h" #include "Trace.h" +#include "Prelude.h" #include "SMP.h" // for cas @@ -227,8 +228,9 @@ steal(SparkPool *deque) } StgClosure * -tryStealSpark (SparkPool *pool) +tryStealSpark (Capability *cap) { + SparkPool *pool = cap->sparks; StgClosure *stolen; do { @@ -264,13 +266,13 @@ looksEmpty(SparkPool* deque) * -------------------------------------------------------------------------- */ void -createSparkThread (Capability *cap, StgClosure *p) +createSparkThread (Capability *cap) { StgTSO *tso; - tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p); + tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, + &base_GHCziConc_runSparks_closure); appendToRunQueue(cap,tso); - cap->sparks_converted++; } /* ----------------------------------------------------------------------------- diff --git a/rts/Sparks.h b/rts/Sparks.h index 0d116bd..9696889 100644 --- a/rts/Sparks.h +++ b/rts/Sparks.h @@ -73,9 +73,9 @@ StgClosure* reclaimSpark(SparkPool *pool); // if the pool is almost empty). rtsBool looksEmpty(SparkPool* deque); -StgClosure * tryStealSpark (SparkPool *pool); +StgClosure * tryStealSpark (Capability *cap); void freeSparkPool (SparkPool *pool); -void createSparkThread (Capability *cap, StgClosure *p); +void createSparkThread (Capability *cap); void traverseSparkQueue(evac_fn evac, void *user, Capability *cap); void pruneSparkQueue (evac_fn evac, void *user, Capability *cap); diff --git a/rts/package.conf.in b/rts/package.conf.in index e869d9c..318f4ed 100644 --- a/rts/package.conf.in +++ b/rts/package.conf.in @@ -107,6 +107,8 @@ ld-options: , "-u", "_base_GHCziWeak_runFinalizzerBatch_closure" , "-u", "_base_GHCziTopHandler_runIO_closure" , "-u", "_base_GHCziTopHandler_runNonIO_closure" + , "-u", "_base_GHCziConc_ensureIOManagerIsRunning_closure" + , "-u", "_base_GHCziConc_runSparks_closure" #else "-u", "ghczmprim_GHCziTypes_Izh_static_info" , "-u", "ghczmprim_GHCziTypes_Czh_static_info" @@ -142,12 +144,8 @@ ld-options: , "-u", "base_GHCziWeak_runFinalizzerBatch_closure" , "-u", "base_GHCziTopHandler_runIO_closure" , "-u", "base_GHCziTopHandler_runNonIO_closure" -#endif - -#ifdef LEADING_UNDERSCORE - , "-u", "_base_GHCziConc_ensureIOManagerIsRunning_closure" -#else , "-u", "base_GHCziConc_ensureIOManagerIsRunning_closure" + , "-u", "base_GHCziConc_runSparks_closure" #endif /* Pick up static libraries in preference over dynamic if in earlier search -- 1.7.10.4