From 99df892cc9620fcc92747b79bba75dad8a1d295c Mon Sep 17 00:00:00 2001 From: Simon Marlow Date: Wed, 22 Oct 2008 09:27:44 +0000 Subject: [PATCH] Refactoring and reorganisation of the scheduler Change the way we look for work in the scheduler. Previously, checking to see whether there was anything to do was a non-side-effecting operation, but this has changed now that we do work-stealing. This lead to a refactoring of the inner loop of the scheduler. Also, lots of cleanup in the new work-stealing code, but no functional changes. One new statistic is added to the +RTS -s output: SPARKS: 1430 (2 converted, 1427 pruned) lets you know something about the use of `par` in the program. --- includes/RtsTypes.h | 46 ++-------- rts/Capability.c | 125 ++++++++++++---------------- rts/Capability.h | 43 ++++++++-- rts/Schedule.c | 231 +++++++++++++++++++++++++++++++-------------------- rts/Sparks.c | 125 +++++++++++++--------------- rts/Sparks.h | 77 ++++++++++------- rts/Stable.c | 3 - rts/Stats.c | 15 ++++ 8 files changed, 355 insertions(+), 310 deletions(-) diff --git a/includes/RtsTypes.h b/includes/RtsTypes.h index 3510ee7..d497005 100644 --- a/includes/RtsTypes.h +++ b/includes/RtsTypes.h @@ -1,8 +1,10 @@ -/* - Time-stamp: <2005-03-30 12:02:33 simonmar> - - RTS specific types. -*/ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team, 1998-2008 + * + * RTS-specific types. + * + * ---------------------------------------------------------------------------*/ /* ------------------------------------------------------------------------- Generally useful typedefs @@ -37,40 +39,6 @@ typedef enum { Types specific to the parallel runtime system. */ - -/* Spark pools: used to store pending sparks - * (THREADED_RTS & PARALLEL_HASKELL only) - * Implementation uses a DeQue to enable concurrent read accesses at - * the top end. - */ -typedef struct SparkPool_ { - /* Size of elements array. Used for modulo calculation: we round up - to powers of 2 and use the dyadic log (modulo == bitwise &) */ - StgWord size; - StgWord moduloSize; /* bitmask for modulo */ - - /* top, index where multiple readers steal() (protected by a cas) */ - StgWord top; - - /* bottom, index of next free place where one writer can push - elements. This happens unsynchronised. */ - StgWord bottom; - /* both position indices are continuously incremented, and used as - an index modulo the current array size. */ - - /* lower bound on the current top value. This is an internal - optimisation to avoid unnecessarily accessing the top field - inside pushBottom */ - StgWord topBound; - - /* The elements array */ - StgClosurePtr* elements; - /* Please note: the dataspace cannot follow the admin fields - immediately, as it should be possible to enlarge it without - disposing the old one automatically (as realloc would)! */ - -} SparkPool; - typedef ullong rtsTime; #if defined(PAR) diff --git a/rts/Capability.c b/rts/Capability.c index 516aaa5..948922a 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -54,15 +54,17 @@ globalWorkToDo (void) #endif #if defined(THREADED_RTS) -rtsBool stealWork( Capability *cap) { +rtsBool +stealWork (Capability *cap) +{ /* use the normal Sparks.h interface (internally modified to enable concurrent stealing) and immediately turn the spark into a thread when successful */ Capability *robbed; - SparkPool *pool; StgClosurePtr spark; rtsBool success = rtsFalse; + rtsBool retry; nat i = 0; debugTrace(DEBUG_sched, @@ -71,63 +73,40 @@ rtsBool stealWork( Capability *cap) { if (n_capabilities == 1) { return rtsFalse; } // makes no sense... - /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could - start at a random place instead of 0 as well. */ - for ( i=0 ; i < n_capabilities ; i++ ) { - robbed = &capabilities[i]; - if (cap == robbed) // ourselves... - continue; + do { + retry = rtsFalse; - if (emptySparkPoolCap(robbed)) // nothing to steal here - continue; - - spark = findSpark(robbed); + /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could + start at a random place instead of 0 as well. */ + for ( i=0 ; i < n_capabilities ; i++ ) { + robbed = &capabilities[i]; + if (cap == robbed) // ourselves... + continue; - if (spark == NULL && !emptySparkPoolCap(robbed)) { - spark = findSpark(robbed); // lost race in concurrent access, try again - } - if (spark != NULL) { - debugTrace(DEBUG_sched, + if (emptySparkPoolCap(robbed)) // nothing to steal here + continue; + + spark = tryStealSpark(robbed->sparks); + if (spark == NULL && !emptySparkPoolCap(robbed)) { + // we conflicted with another thread while trying to steal; + // try again later. + retry = rtsTrue; + } + + if (spark != NULL) { + debugTrace(DEBUG_sched, "cap %d: Stole a spark from capability %d", - cap->no, robbed->no); + cap->no, robbed->no); - createSparkThread(cap,spark); - success = rtsTrue; - break; // got one, leave the loop - } - // otherwise: no success, try next one - } - debugTrace(DEBUG_sched, - "Leaving work stealing routine (%s)", - success?"one spark stolen":"thefts did not succeed"); - return success; -} + createSparkThread(cap,spark); + return rtsTrue; + } + // otherwise: no success, try next one + } + } while (retry); -STATIC_INLINE rtsBool -anyWorkForMe( Capability *cap, Task *task ) -{ - if (task->tso != NULL) { - // A bound task only runs if its thread is on the run queue of - // the capability on which it was woken up. Otherwise, we - // can't be sure that we have the right capability: the thread - // might be woken up on some other capability, and task->cap - // could change under our feet. - return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task; - } else { - // A vanilla worker task runs if either there is a lightweight - // thread at the head of the run queue, or the run queue is - // empty and (there are sparks to execute, or there is some - // other global condition to check, such as threads blocked on - // blackholes). - if (emptyRunQueue(cap)) { - return !emptySparkPoolCap(cap) - || !emptyWakeupQueue(cap) - || globalWorkToDo() - || stealWork(cap); /* if all false: try to steal work */ - } else { - return cap->run_queue_hd->bound == NULL; - } - } + debugTrace(DEBUG_sched, "No sparks stolen"); + return rtsFalse; } #endif @@ -194,6 +173,9 @@ initCapability( Capability *cap, nat i ) cap->returning_tasks_tl = NULL; cap->wakeup_queue_hd = END_TSO_QUEUE; cap->wakeup_queue_tl = END_TSO_QUEUE; + cap->sparks_created = 0; + cap->sparks_converted = 0; + cap->sparks_pruned = 0; #endif cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; @@ -326,7 +308,8 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) #if defined(THREADED_RTS) void -releaseCapability_ (Capability* cap) +releaseCapability_ (Capability* cap, + rtsBool always_wakeup) { Task *task; @@ -384,8 +367,9 @@ releaseCapability_ (Capability* cap) // If we have an unbound thread on the run queue, or if there's // anything else to do, give the Capability to a worker thread. - if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap) - || !emptySparkPoolCap(cap) || globalWorkToDo()) { + if (always_wakeup || + !emptyRunQueue(cap) || !emptyWakeupQueue(cap) || + !emptySparkPoolCap(cap) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap,cap->spare_workers); // The worker Task pops itself from the queue; @@ -401,7 +385,15 @@ void releaseCapability (Capability* cap USED_IF_THREADS) { ACQUIRE_LOCK(&cap->lock); - releaseCapability_(cap); + releaseCapability_(cap, rtsFalse); + RELEASE_LOCK(&cap->lock); +} + +void +releaseAndWakeupCapability (Capability* cap USED_IF_THREADS) +{ + ACQUIRE_LOCK(&cap->lock); + releaseCapability_(cap, rtsTrue); RELEASE_LOCK(&cap->lock); } @@ -427,7 +419,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS) } // Bound tasks just float around attached to their TSOs. - releaseCapability_(cap); + releaseCapability_(cap,rtsFalse); RELEASE_LOCK(&cap->lock); } @@ -534,16 +526,6 @@ yieldCapability (Capability** pCap, Task *task) { Capability *cap = *pCap; - // The fast path has no locking, if we don't enter this while loop - - while ( waiting_for_gc - /* i.e. another capability triggered HeapOverflow, is busy - getting capabilities (stopping their owning tasks) */ - || cap->returning_tasks_hd != NULL - /* cap reserved for another task */ - || !anyWorkForMe(cap,task) - /* cap/task have no work */ - ) { debugTrace(DEBUG_sched, "giving up capability %d", cap->no); // We must now release the capability and wait to be woken up @@ -588,7 +570,6 @@ yieldCapability (Capability** pCap, Task *task) trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no); ASSERT(cap->running_task == task); - } *pCap = cap; @@ -630,7 +611,7 @@ wakeupThreadOnCapability (Capability *my_cap, appendToRunQueue(other_cap,tso); trace(TRACE_sched, "resuming capability %d", other_cap->no); - releaseCapability_(other_cap); + releaseCapability_(other_cap,rtsFalse); } else { appendToWakeupQueue(my_cap,other_cap,tso); other_cap->context_switch = 1; @@ -765,7 +746,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe) if (!emptyRunQueue(cap) || cap->spare_workers) { debugTrace(DEBUG_sched, "runnable threads or workers still alive, yielding"); - releaseCapability_(cap); // this will wake up a worker + releaseCapability_(cap,rtsFalse); // this will wake up a worker RELEASE_LOCK(&cap->lock); yieldThread(); continue; diff --git a/rts/Capability.h b/rts/Capability.h index 5945895..779a194 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -23,9 +23,9 @@ #ifndef CAPABILITY_H #define CAPABILITY_H -#include "RtsTypes.h" #include "RtsFlags.h" #include "Task.h" +#include "Sparks.h" struct Capability_ { // State required by the STG virtual machine when running Haskell @@ -91,6 +91,13 @@ struct Capability_ { // woken up by another Capability. StgTSO *wakeup_queue_hd; StgTSO *wakeup_queue_tl; + + SparkPool *sparks; + + // Stats on spark creation/conversion + nat sparks_created; + nat sparks_converted; + nat sparks_pruned; #endif // Per-capability STM-related data @@ -100,8 +107,6 @@ struct Capability_ { StgTRecHeader *free_trec_headers; nat transaction_tokens; - SparkPool *sparks; - }; // typedef Capability, defined in RtsAPI.h @@ -147,12 +152,16 @@ void initCapabilities (void); // ASSUMES: cap->running_task is the current Task. // #if defined(THREADED_RTS) -void releaseCapability (Capability* cap); -void releaseCapability_ (Capability* cap); // assumes cap->lock is held +void releaseCapability (Capability* cap); +void releaseAndWakeupCapability (Capability* cap); +void releaseCapability_ (Capability* cap, rtsBool always_wakeup); +// assumes cap->lock is held #else // releaseCapability() is empty in non-threaded RTS INLINE_HEADER void releaseCapability (Capability* cap STG_UNUSED) {}; -INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED) {}; +INLINE_HEADER void releaseAndWakeupCapability (Capability* cap STG_UNUSED) {}; +INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED, + rtsBool always_wakeup STG_UNUSED) {}; #endif #if !IN_STG_CODE @@ -231,6 +240,14 @@ void shutdownCapability (Capability *cap, Task *task, rtsBool wait_foreign); // rtsBool tryGrabCapability (Capability *cap, Task *task); +// Try to steal a spark from other Capabilities +// +rtsBool stealWork (Capability *cap); + +INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap); +INLINE_HEADER nat sparkPoolSizeCap (Capability *cap); +INLINE_HEADER void discardSparksCap (Capability *cap); + #else // !THREADED_RTS // Grab a capability. (Only in the non-threaded RTS; in the threaded @@ -273,4 +290,18 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen) *bd->free++ = (StgWord)p; } +#if defined(THREADED_RTS) +INLINE_HEADER rtsBool +emptySparkPoolCap (Capability *cap) +{ return looksEmpty(cap->sparks); } + +INLINE_HEADER nat +sparkPoolSizeCap (Capability *cap) +{ return sparkPoolSize(cap->sparks); } + +INLINE_HEADER void +discardSparksCap (Capability *cap) +{ return discardSparks(cap->sparks); } +#endif + #endif /* CAPABILITY_H */ diff --git a/rts/Schedule.c b/rts/Schedule.c index 09150fd..e17c653 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -137,17 +137,21 @@ static Capability *schedule (Capability *initialCapability, Task *task); // scheduler clearer. // static void schedulePreLoop (void); +static void scheduleFindWork (Capability *cap); +#if defined(THREADED_RTS) +static void scheduleYield (Capability **pcap, Task *task); +#endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) static void schedulePushWork(Capability *cap, Task *task); -static rtsBool scheduleGetRemoteWork(Capability *cap); #if defined(PARALLEL_HASKELL) +static rtsBool scheduleGetRemoteWork(Capability *cap); static void scheduleSendPendingMessages(void); #endif +#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap); #endif static void schedulePostRunThread(Capability *cap, StgTSO *t); @@ -281,25 +285,6 @@ schedule (Capability *initialCapability, Task *task) while (TERMINATION_CONDITION) { -#if defined(THREADED_RTS) - if (first) { - // don't yield the first time, we want a chance to run this - // thread for a bit, even if there are others banging at the - // door. - first = rtsFalse; - ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); - } else { - // Yield the capability to higher-priority tasks if necessary. - yieldCapability(&cap, task); - /* inside yieldCapability, attempts to steal work from other - capabilities, unless the capability has own work. - See (REMARK) below. - */ - } -#endif - - /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */ - // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign // call). @@ -367,62 +352,11 @@ schedule (Capability *initialCapability, Task *task) barf("sched_state: %d", sched_state); } - /* this was the place to activate a spark, now below... */ - - scheduleStartSignalHandlers(cap); + scheduleFindWork(cap); - // Only check the black holes here if we've nothing else to do. - // During normal execution, the black hole list only gets checked - // at GC time, to avoid repeatedly traversing this possibly long - // list each time around the scheduler. - if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - - scheduleCheckWakeupThreads(cap); - - scheduleCheckBlockedThreads(cap); - -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) - /* work distribution in multithreaded and parallel systems - - REMARK: IMHO best location for work-stealing as well. - tests above might yield some new jobs, so no need to steal a - spark in some cases. I believe the yieldCapability.. above - should be moved here. - */ - -#if defined(PARALLEL_HASKELL) - /* if messages have been buffered... a NOOP in THREADED_RTS */ - scheduleSendPendingMessages(); -#endif - - /* If the run queue is empty,...*/ - if (emptyRunQueue(cap)) { - /* ...take one of our own sparks and turn it into a thread */ - scheduleActivateSpark(cap); - - /* if this did not work, try to steal a spark from someone else */ - if (emptyRunQueue(cap)) { -#if defined(PARALLEL_HASKELL) - receivedFinish = scheduleGetRemoteWork(cap); - continue; // a new round, (hopefully) with new work - /* - in GUM, this a) sends out a FISH and returns IF no fish is - out already - b) (blocking) awaits and receives messages - - in Eden, this is only the blocking receive, as b) in GUM. - - in Threaded-RTS, this does plain nothing. Stealing routine - is inside Capability.c and called from - yieldCapability() at the very beginning, see REMARK. - */ -#endif - } - } else { /* i.e. run queue was (initially) not empty */ - schedulePushWork(cap,task); - /* work pushing, currently relevant only for THREADED_RTS: - (pushes threads, wakes up idle capabilities for stealing) */ - } + /* work pushing, currently relevant only for THREADED_RTS: + (pushes threads, wakes up idle capabilities for stealing) */ + schedulePushWork(cap,task); #if defined(PARALLEL_HASKELL) /* since we perform a blocking receive and continue otherwise, @@ -439,9 +373,8 @@ schedule (Capability *initialCapability, Task *task) } #endif // PARALLEL_HASKELL: non-empty run queue! -#endif /* THREADED_RTS || PARALLEL_HASKELL */ - scheduleDetectDeadlock(cap,task); + #if defined(THREADED_RTS) cap = task->cap; // reload cap, it might have changed #endif @@ -454,12 +387,27 @@ schedule (Capability *initialCapability, Task *task) // // win32: might be here due to awaitEvent() being abandoned // as a result of a console event having been delivered. - if ( emptyRunQueue(cap) ) { + +#if defined(THREADED_RTS) + if (first) + { + // XXX: ToDo + // // don't yield the first time, we want a chance to run this + // // thread for a bit, even if there are others banging at the + // // door. + // first = rtsFalse; + // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + } + + scheduleYield(&cap,task); + if (emptyRunQueue(cap)) continue; // look for work again +#endif + #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS) + if ( emptyRunQueue(cap) ) { ASSERT(sched_state >= SCHED_INTERRUPTING); -#endif - continue; // nothing to do } +#endif // // Get a thread to run @@ -683,12 +631,110 @@ schedulePreLoop(void) } /* ----------------------------------------------------------------------------- + * scheduleFindWork() + * + * Search for work to do, and handle messages from elsewhere. + * -------------------------------------------------------------------------- */ + +static void +scheduleFindWork (Capability *cap) +{ + scheduleStartSignalHandlers(cap); + + // Only check the black holes here if we've nothing else to do. + // During normal execution, the black hole list only gets checked + // at GC time, to avoid repeatedly traversing this possibly long + // list each time around the scheduler. + if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } + + scheduleCheckWakeupThreads(cap); + + scheduleCheckBlockedThreads(cap); + +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) + // Try to activate one of our own sparks + if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); } +#endif + +#if defined(THREADED_RTS) + // Try to steak work if we don't have any + if (emptyRunQueue(cap)) { stealWork(cap); } +#endif + +#if defined(PARALLEL_HASKELL) + // if messages have been buffered... + scheduleSendPendingMessages(); +#endif + +#if defined(PARALLEL_HASKELL) + if (emptyRunQueue(cap)) { + receivedFinish = scheduleGetRemoteWork(cap); + continue; // a new round, (hopefully) with new work + /* + in GUM, this a) sends out a FISH and returns IF no fish is + out already + b) (blocking) awaits and receives messages + + in Eden, this is only the blocking receive, as b) in GUM. + */ + } +#endif +} + +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +shouldYieldCapability (Capability *cap, Task *task) +{ + // we need to yield this capability to someone else if.. + // - another thread is initiating a GC + // - another Task is returning from a foreign call + // - the thread at the head of the run queue cannot be run + // by this Task (it is bound to another Task, or it is unbound + // and this task it bound). + return (waiting_for_gc || + cap->returning_tasks_hd != NULL || + (!emptyRunQueue(cap) && (task->tso == NULL + ? cap->run_queue_hd->bound != NULL + : cap->run_queue_hd->bound != task))); +} + +// This is the single place where a Task goes to sleep. There are +// two reasons it might need to sleep: +// - there are no threads to run +// - we need to yield this Capability to someone else +// (see shouldYieldCapability()) +// +// The return value indicates whether + +static void +scheduleYield (Capability **pcap, Task *task) +{ + Capability *cap = *pcap; + + // if we have work, and we don't need to give up the Capability, continue. + if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task)) + return; + + // otherwise yield (sleep), and keep yielding if necessary. + do { + yieldCapability(&cap,task); + } + while (shouldYieldCapability(cap,task)); + + // note there may still be no threads on the run queue at this + // point, the caller has to check. + + *pcap = cap; + return; +} +#endif + +/* ----------------------------------------------------------------------------- * schedulePushWork() * * Push work to other Capabilities if we have some. * -------------------------------------------------------------------------- */ -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) static void schedulePushWork(Capability *cap USED_IF_THREADS, Task *task USED_IF_THREADS) @@ -788,7 +834,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // i is the next free capability to push to for (; i < n_free_caps; i++) { if (emptySparkPoolCap(free_caps[i])) { - spark = findSpark(cap); + spark = tryStealSpark(cap->sparks); if (spark != NULL) { debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no); newSpark(&(free_caps[i]->r), spark); @@ -801,18 +847,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // release the capabilities for (i = 0; i < n_free_caps; i++) { task->cap = free_caps[i]; - releaseCapability(free_caps[i]); + releaseAndWakeupCapability(free_caps[i]); } - // now wake them all up, and they might steal sparks if - // the did not get a thread - prodAllCapabilities(); } task->cap = cap; // reset to point to our Capability. #endif /* THREADED_RTS */ } -#endif /* THREADED_RTS || PARALLEL_HASKELL */ /* ---------------------------------------------------------------------------- * Start any pending signal handlers @@ -1031,7 +1073,12 @@ scheduleActivateSpark(Capability *cap) on our run queue in the meantime ? But would need a lock.. */ return; - spark = findSpark(cap); // defined in Sparks.c + + // Really we should be using reclaimSpark() here, but + // experimentally it doesn't seem to perform as well as just + // stealing from our own spark pool: + // spark = reclaimSpark(cap->sparks); + spark = tryStealSpark(cap->sparks); // defined in Sparks.c if (spark != NULL) { debugTrace(DEBUG_sched, @@ -1046,9 +1093,9 @@ scheduleActivateSpark(Capability *cap) * Get work from a remote node (PARALLEL_HASKELL only) * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +#if defined(PARALLEL_HASKELL) static rtsBool /* return value used in PARALLEL_HASKELL only */ -scheduleGetRemoteWork(Capability *cap) +scheduleGetRemoteWork (Capability *cap STG_UNUSED) { #if defined(PARALLEL_HASKELL) rtsBool receivedFinish = rtsFalse; @@ -1800,7 +1847,7 @@ suspendThread (StgRegTable *reg) suspendTask(cap,task); cap->in_haskell = rtsFalse; - releaseCapability_(cap); + releaseCapability_(cap,rtsFalse); RELEASE_LOCK(&cap->lock); diff --git a/rts/Sparks.c b/rts/Sparks.c index ac11172..360ea41 100644 --- a/rts/Sparks.c +++ b/rts/Sparks.c @@ -53,9 +53,9 @@ /* internal helpers ... */ -StgWord roundUp2(StgWord val); - -StgWord roundUp2(StgWord val) { +static StgWord +roundUp2(StgWord val) +{ StgWord rounded = 1; /* StgWord is unsigned anyway, only catch 0 */ @@ -69,25 +69,6 @@ StgWord roundUp2(StgWord val) { return rounded; } -INLINE_HEADER -rtsBool casTop(StgPtr addr, StgWord old, StgWord new); - -#if !defined(THREADED_RTS) -/* missing def. in non THREADED RTS, and makes no sense anyway... */ -StgWord cas(StgPtr addr,StgWord old,StgWord new); -StgWord cas(StgPtr addr,StgWord old,StgWord new) { - barf("cas: not implemented without multithreading"); - old = new = *addr; /* to avoid gcc warnings */ -} -#endif - -INLINE_HEADER -rtsBool casTop(StgWord* addr, StgWord old, StgWord new) { - StgWord res = cas((StgPtr) addr, old, new); - return ((res == old)); -} - -/* or simply like this */ #define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new))) /* ----------------------------------------------------------------------------- @@ -97,8 +78,9 @@ rtsBool casTop(StgWord* addr, StgWord old, StgWord new) { * -------------------------------------------------------------------------- */ /* constructor */ -SparkPool* initPool(StgWord size) { - +static SparkPool* +initPool(StgWord size) +{ StgWord realsize; SparkPool *q; @@ -136,14 +118,17 @@ initSparkPools( void ) } void -freeSparkPool(SparkPool *pool) { +freeSparkPool (SparkPool *pool) +{ /* should not interfere with concurrent findSpark() calls! And nobody should use the pointer any more. We cross our fingers...*/ stgFree(pool->elements); stgFree(pool); } -/* reclaimSpark(cap): remove a spark from the write end of the queue. +/* ----------------------------------------------------------------------------- + * + * reclaimSpark: remove a spark from the write end of the queue. * Returns the removed spark, and NULL if a race is lost or the pool * empty. * @@ -151,9 +136,12 @@ freeSparkPool(SparkPool *pool) { * concurrently stealing threads by using cas to modify the top field. * This routine should NEVER be called by a task which does not own * the capability. Can this be checked here? - */ -StgClosure* reclaimSpark(Capability *cap) { - SparkPool *deque = cap->sparks; + * + * -------------------------------------------------------------------------- */ + +StgClosure * +reclaimSpark (SparkPool *deque) +{ /* also a bit tricky, has to avoid concurrent steal() calls by accessing top with cas, when there is only one element left */ StgWord t, b; @@ -196,19 +184,17 @@ StgClosure* reclaimSpark(Capability *cap) { /* ----------------------------------------------------------------------------- * - * findSpark: find a spark on the current Capability that we can fork - * into a thread. + * tryStealSpark: try to steal a spark from a Capability. * - * May be called by concurrent threads, which synchronise on top - * variable. Returns a spark, or NULL if pool empty or race lost. + * Returns a valid spark, or NULL if the pool was empty, and can + * occasionally return NULL if there was a race with another thread + * stealing from the same pool. In this case, try again later. * -------------------------------------------------------------------------- */ -StgClosurePtr steal(SparkPool *deque); - -/* steal an element from the read end. Synchronises multiple callers - by failing with NULL return. Returns NULL when deque is empty. */ -StgClosurePtr steal(SparkPool *deque) { +static StgClosurePtr +steal(SparkPool *deque) +{ StgClosurePtr* pos; StgClosurePtr* arraybase; StgWord sz; @@ -231,43 +217,39 @@ StgClosurePtr steal(SparkPool *deque) { /* now decide whether we have won */ if ( !(CASTOP(&(deque->top),t,t+1)) ) { - /* lost the race, someon else has changed top in the meantime */ - stolen = NULL; + /* lost the race, someon else has changed top in the meantime */ + return NULL; } /* else: OK, top has been incremented by the cas call */ - ASSERT_SPARK_POOL_INVARIANTS(deque); - /* return NULL or stolen element */ + /* return stolen element */ return stolen; } StgClosure * -findSpark (Capability *cap) +tryStealSpark (SparkPool *pool) { - SparkPool *deque = (cap->sparks); StgClosure *stolen; - ASSERT_SPARK_POOL_INVARIANTS(deque); - do { - /* keep trying until good spark found or pool looks empty. - TODO is this a good idea? */ - - stolen = steal(deque); - - } while ( ( !stolen /* nothing stolen*/ - || !closure_SHOULD_SPARK(stolen)) /* spark not OK */ - && !looksEmpty(deque)); /* run empty, give up */ + stolen = steal(pool); + } while (stolen != NULL && !closure_SHOULD_SPARK(stolen)); - /* return stolen element */ return stolen; } -/* "guesses" whether a deque is empty. Can return false negatives in - presence of concurrent steal() calls, and false positives in - presence of a concurrent pushBottom().*/ -rtsBool looksEmpty(SparkPool* deque) { +/* ----------------------------------------------------------------------------- + * + * "guesses" whether a deque is empty. Can return false negatives in + * presence of concurrent steal() calls, and false positives in + * presence of a concurrent pushBottom(). + * + * -------------------------------------------------------------------------- */ + +rtsBool +looksEmpty(SparkPool* deque) +{ StgWord t = deque->top; StgWord b = deque->bottom; /* try to prefer false negatives by reading top first */ @@ -288,6 +270,7 @@ createSparkThread (Capability *cap, StgClosure *p) tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p); appendToRunQueue(cap,tso); + cap->sparks_converted++; } /* ----------------------------------------------------------------------------- @@ -297,11 +280,12 @@ createSparkThread (Capability *cap, StgClosure *p) * -------------------------------------------------------------------------- */ #define DISCARD_NEW -void pushBottom(SparkPool* deque, StgClosurePtr elem); /* enqueue an element. Should always succeed by resizing the array (not implemented yet, silently fails in that case). */ -void pushBottom(SparkPool* deque, StgClosurePtr elem) { +static void +pushBottom (SparkPool* deque, StgClosurePtr elem) +{ StgWord t; StgClosurePtr* pos; StgWord sz = deque->moduloSize; @@ -349,12 +333,16 @@ void pushBottom(SparkPool* deque, StgClosurePtr elem) { } -/* this is called as a direct C-call from Stg => we need to keep the - pool in a register (???) */ +/* -------------------------------------------------------------------------- + * newSpark: create a new spark, as a result of calling "par" + * Called directly from STG. + * -------------------------------------------------------------------------- */ + StgInt newSpark (StgRegTable *reg, StgClosure *p) { - SparkPool *pool = (reg->rCurrentTSO->cap->sparks); + Capability *cap = regTableToCapability(reg); + SparkPool *pool = cap->sparks; /* I am not sure whether this is the right thing to do. * Maybe it is better to exploit the tag information @@ -368,6 +356,8 @@ newSpark (StgRegTable *reg, StgClosure *p) pushBottom(pool,p); } + cap->sparks_created++; + ASSERT_SPARK_POOL_INVARIANTS(pool); return 1; } @@ -385,7 +375,7 @@ static void pruneSparkQueue (Capability *cap) { SparkPool *pool; - StgClosurePtr spark, evacspark, *elements; + StgClosurePtr spark, *elements; nat n, pruned_sparks; // stats only StgWord botInd,oldBotInd,currInd; // indices in array (always < size) @@ -457,6 +447,7 @@ pruneSparkQueue (Capability *cap) n++; } else { pruned_sparks++; // discard spark + cap->sparks_pruned++; } currInd++; @@ -528,7 +519,6 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap) } /* ---------------------------------------------------------------------------- - * balanceSparkPoolsCaps: takes an array of capabilities (usually: all * capabilities) and its size. Accesses all spark pools and equally * distributes the sparks among them. @@ -537,7 +527,8 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap) * -------------------------------------------------------------------------- */ void balanceSparkPoolsCaps(nat n_caps, Capability caps[]); -void balanceSparkPoolsCaps(nat n_caps, Capability caps[]) { +void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, + Capability caps[] STG_UNUSED) { barf("not implemented"); } diff --git a/rts/Sparks.h b/rts/Sparks.h index dbbf268..4062a0b 100644 --- a/rts/Sparks.h +++ b/rts/Sparks.h @@ -17,6 +17,40 @@ #if defined(THREADED_RTS) +/* Spark pools: used to store pending sparks + * (THREADED_RTS & PARALLEL_HASKELL only) + * Implementation uses a DeQue to enable concurrent read accesses at + * the top end. + */ +typedef struct SparkPool_ { + /* Size of elements array. Used for modulo calculation: we round up + to powers of 2 and use the dyadic log (modulo == bitwise &) */ + StgWord size; + StgWord moduloSize; /* bitmask for modulo */ + + /* top, index where multiple readers steal() (protected by a cas) */ + volatile StgWord top; + + /* bottom, index of next free place where one writer can push + elements. This happens unsynchronised. */ + volatile StgWord bottom; + /* both position indices are continuously incremented, and used as + an index modulo the current array size. */ + + /* lower bound on the current top value. This is an internal + optimisation to avoid unnecessarily accessing the top field + inside pushBottom */ + volatile StgWord topBound; + + /* The elements array */ + StgClosurePtr* elements; + /* Please note: the dataspace cannot follow the admin fields + immediately, as it should be possible to enlarge it without + disposing the old one automatically (as realloc would)! */ + +} SparkPool; + + /* INVARIANTS, in this order: bottom/top consistent, reasonable size, topBound consistent, space pointer, space accessible to us */ #define ASSERT_SPARK_POOL_INVARIANTS(p) \ @@ -28,30 +62,25 @@ ASSERT(*((p)->elements) || 1); \ ASSERT(*((p)->elements - 1 + ((p)->size)) || 1); -// missing in old interface. Currently called by initSparkPools -// internally. -SparkPool* initPool(StgWord size); +// Initialisation +void initSparkPools (void); -// special case: accessing our own pool, at the write end -// otherwise, we can always steal from our pool as the others do... -StgClosure* reclaimSpark(Capability *cap); +// Take a spark from the "write" end of the pool. Can be called +// by the pool owner only. +StgClosure* reclaimSpark(SparkPool *pool); +// Returns True if the spark pool is empty (can give a false positive +// if the pool is almost empty). rtsBool looksEmpty(SparkPool* deque); -// rest: same as old interface -StgClosure * findSpark (Capability *cap); -void initSparkPools (void); +StgClosure * tryStealSpark (SparkPool *pool); void freeSparkPool (SparkPool *pool); void createSparkThread (Capability *cap, StgClosure *p); void pruneSparkQueues (void); void traverseSparkQueue(evac_fn evac, void *user, Capability *cap); -INLINE_HEADER void discardSparks (SparkPool *pool); -INLINE_HEADER nat sparkPoolSize (SparkPool *pool); - -INLINE_HEADER void discardSparksCap (Capability *cap); -INLINE_HEADER nat sparkPoolSizeCap (Capability *cap); -INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap); +INLINE_HEADER void discardSparks (SparkPool *pool); +INLINE_HEADER nat sparkPoolSize (SparkPool *pool); #endif /* ----------------------------------------------------------------------------- @@ -64,30 +93,16 @@ INLINE_HEADER rtsBool emptySparkPool (SparkPool *pool) { return looksEmpty(pool); } -INLINE_HEADER rtsBool -emptySparkPoolCap (Capability *cap) -{ return looksEmpty(cap->sparks); } - INLINE_HEADER nat sparkPoolSize (SparkPool *pool) -{ - return (pool->bottom - pool->top); -} - -INLINE_HEADER nat -sparkPoolSizeCap (Capability *cap) -{ return sparkPoolSize(cap->sparks); } +{ return (pool->bottom - pool->top); } INLINE_HEADER void discardSparks (SparkPool *pool) { - pool->top = pool->bottom = 0; + pool->top = pool->topBound = pool->bottom = 0; } -INLINE_HEADER void -discardSparksCap (Capability *cap) -{ return discardSparks(cap->sparks); } - #endif #endif /* SPARKS_H */ diff --git a/rts/Stable.c b/rts/Stable.c index a2c47d7..94a756a 100644 --- a/rts/Stable.c +++ b/rts/Stable.c @@ -6,9 +6,6 @@ * * ---------------------------------------------------------------------------*/ -// Make static versions of inline functions in Stable.h: -#define RTS_STABLE_C - #include "PosixSource.h" #include "Rts.h" #include "Hash.h" diff --git a/rts/Stats.c b/rts/Stats.c index 2e15613..228f0c0 100644 --- a/rts/Stats.c +++ b/rts/Stats.c @@ -641,6 +641,21 @@ stat_exit(int alloc) TICK_TO_DBL(task->gc_etime)); } } + + { + nat i; + lnat sparks_created = 0; + lnat sparks_converted = 0; + lnat sparks_pruned = 0; + for (i = 0; i < n_capabilities; i++) { + sparks_created += capabilities[i].sparks_created; + sparks_converted += capabilities[i].sparks_converted; + sparks_pruned += capabilities[i].sparks_pruned; + } + + statsPrintf(" SPARKS: %ld (%ld converted, %ld pruned)\n\n", + sparks_created, sparks_converted, sparks_pruned); + } #endif statsPrintf(" INIT time %6.2fs (%6.2fs elapsed)\n", -- 1.7.10.4