From 6d16c4760494abbe54b449f6a9b7d51336a1e3c3 Mon Sep 17 00:00:00 2001 From: simonmar Date: Mon, 23 May 2005 15:44:10 +0000 Subject: [PATCH] [project @ 2005-05-23 15:44:10 by simonmar] Simplify and improve the Capability-passing machinery for bound threads. The old story was quite complicated: if you find a thread on the run queue which the current task can't run, you had to call passCapability(), which set a flag saying where the next Capability was to go, and then release the Capability. When multiple Capabilities are flying around, it's not clear how this story should extend. The new story is much simpler: each time around the scheduler loop, the task looks to see whether it can make any progress, and if not, it releases its Capability and wakes up a task which *can* make some progress. The predicate for whether we can make any progress is encapsulated in the (inline) function ANY_WORK_FOR_ME(Condition). Waking up an appropriate task is encapsulated in the function threadRunnable() (previously it was in two places). The logic in Capability.c is simpler, but unfortunately it is now more closely connected with the Scheduler, because it inspects the run queue. However, performance when communicating between bound and unbound threads might be better. The concurrency tests still work, so hopefully this hasn't broken anything. --- ghc/rts/Capability.c | 118 +++++++++++++++++++++++--------------------------- ghc/rts/Capability.h | 2 +- ghc/rts/Schedule.c | 9 ++-- 3 files changed, 58 insertions(+), 71 deletions(-) diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 0d58e75..9e28a16 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -74,9 +74,6 @@ Condition thread_ready_cond = INIT_COND_VAR; * Task.startTask() uses its current value. */ nat rts_n_waiting_tasks = 0; - -static Condition *passTarget = NULL; -static rtsBool passingCapability = rtsFalse; #endif #if defined(SMP) @@ -97,12 +94,43 @@ HashTable *capability_hash; #define UNUSED_IF_NOT_SMP STG_UNUSED #endif + +#if defined(RTS_SUPPORTS_THREADS) +INLINE_HEADER rtsBool +ANY_WORK_FOR_ME( Condition *cond ) +{ + // If the run queue is not empty, then we only wake up the guy who + // can run the thread at the head, even if there is some other + // reason for this task to run (eg. interrupted=rtsTrue). + if (!EMPTY_RUN_QUEUE()) { + if (run_queue_hd->main == NULL) { + return (cond == NULL); + } else { + return (&run_queue_hd->main->bound_thread_cond == cond); + } + } + + return blackholes_need_checking + || interrupted #if defined(RTS_USER_SIGNALS) -#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || blackholes_need_checking || signals_pending()) -#else -#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || blackholes_need_checking) + || signals_pending() +#endif + ; +} #endif +INLINE_HEADER rtsBool +ANY_WORK_TO_DO(void) +{ + return (!EMPTY_RUN_QUEUE() + || interrupted + || blackholes_need_checking +#if defined(RTS_USER_SIGNALS) + || signals_pending() +#endif + ); +} + /* ---------------------------------------------------------------------------- Initialisation ------------------------------------------------------------------------- */ @@ -232,28 +260,14 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP ) // Decrement the counter here to avoid livelock where the // thread that is yielding its capability will repeatedly // signal returning_worker_cond. - rts_n_waiting_workers--; signalCondition(&returning_worker_cond); - IF_DEBUG(scheduler, sched_belch("worker: released capability to returning worker")); - } else if (passingCapability) { - if (passTarget == NULL) { - signalCondition(&thread_ready_cond); - startSchedulerTaskIfNecessary(); - } else { - signalCondition(passTarget); - } - rts_n_free_capabilities++; - IF_DEBUG(scheduler, sched_belch("worker: released capability, passing it")); - + IF_DEBUG(scheduler, + sched_belch("worker: released capability to returning worker")); } else { rts_n_free_capabilities++; - // Signal that a capability is available - if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) { - signalCondition(&thread_ready_cond); - } - startSchedulerTaskIfNecessary(); IF_DEBUG(scheduler, sched_belch("worker: released capability")); + threadRunnable(); } #endif return; @@ -299,7 +313,7 @@ waitForReturnCapability( Mutex* pMutex, Capability** pCap ) sched_belch("worker: returning; workers waiting: %d", rts_n_waiting_workers)); - if ( noCapabilities() || passingCapability ) { + if ( noCapabilities() ) { rts_n_waiting_workers++; context_switch = 1; // make sure it's our turn soon waitCondition(&returning_worker_cond, pMutex); @@ -327,16 +341,16 @@ waitForReturnCapability( Mutex* pMutex, Capability** pCap ) * ------------------------------------------------------------------------- */ void -yieldCapability( Capability** pCap ) +yieldCapability( Capability** pCap, Condition *cond ) { // Pre-condition: pMutex is assumed held, the current thread // holds the capability pointed to by pCap. - if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) { + if ( rts_n_waiting_workers > 0 || !ANY_WORK_FOR_ME(cond)) { IF_DEBUG(scheduler, if (rts_n_waiting_workers > 0) { sched_belch("worker: giving up capability (returning wkr)"); - } else if (passingCapability) { + } else if (!EMPTY_RUN_QUEUE()) { sched_belch("worker: giving up capability (passing capability)"); } else { sched_belch("worker: giving up capability (no threads to run)"); @@ -367,7 +381,7 @@ yieldCapability( Capability** pCap ) * not to create a new worker thread when an external * call is made. * If pThreadCond is not NULL, a capability can be specifically - * passed to this thread using passCapability. + * passed to this thread. * ------------------------------------------------------------------------- */ void @@ -375,9 +389,7 @@ waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond ) { // Pre-condition: pMutex is held. - while ( noCapabilities() || - (passingCapability && passTarget != pThreadCond) || - !ANY_WORK_TO_DO()) { + while ( noCapabilities() || !ANY_WORK_FOR_ME(pThreadCond)) { IF_DEBUG(scheduler, sched_belch("worker: wait for capability (cond: %p)", pThreadCond)); @@ -392,43 +404,12 @@ waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond ) IF_DEBUG(scheduler, sched_belch("worker: get normal capability")); } } - passingCapability = rtsFalse; grabCapability(pCap); // Post-condition: pMutex is held and *pCap is held by the current thread return; } -/* ---------------------------------------------------------------------------- - passCapability, passCapabilityToWorker - ------------------------------------------------------------------------- */ - -void -passCapability( Condition *pTargetThreadCond ) -{ - // Pre-condition: pMutex is held and cap is held by the current thread - - passTarget = pTargetThreadCond; - passingCapability = rtsTrue; - IF_DEBUG(scheduler, sched_belch("worker: passCapability")); - - // Post-condition: pMutex is held; cap is still held, but will be - // passed to the target thread when next released. -} - -void -passCapabilityToWorker( void ) -{ - // Pre-condition: pMutex is held and cap is held by the current thread - - passTarget = NULL; - passingCapability = rtsTrue; - IF_DEBUG(scheduler, sched_belch("worker: passCapabilityToWorker")); - - // Post-condition: pMutex is held; cap is still held, but will be - // passed to a worker thread when next released. -} - #endif /* RTS_SUPPORTS_THREADS */ /* ---------------------------------------------------------------------------- @@ -444,10 +425,17 @@ void threadRunnable ( void ) { #if defined(RTS_SUPPORTS_THREADS) - if ( !noCapabilities() && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) { - signalCondition(&thread_ready_cond); + if ( !noCapabilities() && ANY_WORK_TO_DO() ) { + if (!EMPTY_RUN_QUEUE() && run_queue_hd->main != NULL) { + signalCondition(&run_queue_hd->main->bound_thread_cond); + return; + } + if (rts_n_waiting_tasks > 0) { + signalCondition(&thread_ready_cond); + } else { + startSchedulerTaskIfNecessary(); + } } - startSchedulerTaskIfNecessary(); #endif } diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h index 7034b6b..5f1649e 100644 --- a/ghc/rts/Capability.h +++ b/ghc/rts/Capability.h @@ -59,7 +59,7 @@ extern void prodWorker ( void ); // current worker thread should then re-acquire it using // waitForCapability(). // -extern void yieldCapability( Capability **pCap ); +extern void yieldCapability( Capability** pCap, Condition *cond ); // Acquires a capability for doing some work. // diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 7f85690..ea85c83 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -463,7 +463,8 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // Yield the capability to higher-priority tasks if necessary. // if (cap != NULL) { - yieldCapability(&cap); + yieldCapability(&cap, + mainThread ? &mainThread->bound_thread_cond : NULL ); } // If we do not currently hold a capability, we wait for one @@ -475,7 +476,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // We now have a capability... #endif - + #if 0 /* extra sanity checking */ { StgMainThread *m; @@ -627,21 +628,19 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, sched_belch("### thread %d bound to another OS thread", t->id)); // no, bound to a different Haskell thread: pass to that thread PUSH_ON_RUN_QUEUE(t); - passCapability(&m->bound_thread_cond); continue; } } else { if(mainThread != NULL) - // The thread we want to run is bound. + // The thread we want to run is unbound. { IF_DEBUG(scheduler, sched_belch("### this OS thread cannot run thread %d", t->id)); // no, the current native thread is bound to a different // Haskell thread, so pass it to any worker thread PUSH_ON_RUN_QUEUE(t); - passCapabilityToWorker(); continue; } } -- 1.7.10.4