X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FCapability.c;h=62f205d4d474244da57df51c1abe7484cbf9dbad;hb=bb01a96bea6bd7808332d43a5bed78d1aff4a3fd;hp=a2910ad4ee2b2f8c6afb745799be12e4ca08d40b;hpb=85aa72b9dc6803685595936c61f3cab6faab815a;p=ghc-hetmet.git diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index a2910ad..62f205d 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -1,6 +1,5 @@ /* --------------------------------------------------------------------------- - * - * (c) The GHC Team, 2002 + * (c) The GHC Team, 2003 * * Capabilities * @@ -15,9 +14,11 @@ * one global capability, namely MainCapability. * * --------------------------------------------------------------------------*/ + #include "PosixSource.h" #include "Rts.h" #include "RtsUtils.h" +#include "RtsFlags.h" #include "OSThreads.h" #include "Capability.h" #include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */ @@ -27,9 +28,10 @@ Capability MainCapability; /* for non-SMP, we have one global capability */ #endif +#if defined(RTS_SUPPORTS_THREADS) + nat rts_n_free_capabilities; -#if defined(RTS_SUPPORTS_THREADS) /* returning_worker_cond: when a worker thread returns from executing an * external call, it needs to wait for an RTS Capability before passing * on the result of the call to the Haskell thread that made it. @@ -54,8 +56,8 @@ nat rts_n_waiting_workers = 0; * exclusive access to the RTS and all its data structures (that are not * locked by the Scheduler's mutex). * - * thread_ready_cond is signalled whenever noCapabilities doesn't hold. - * + * thread_ready_cond is signalled whenever + * !noCapabilities && !EMPTY_RUN_QUEUE(). */ Condition thread_ready_cond = INIT_COND_VAR; @@ -69,13 +71,28 @@ Condition thread_ready_cond = INIT_COND_VAR; * Task.startTask() uses its current value. */ nat rts_n_waiting_tasks = 0; + +static Condition *passTarget = NULL; +static rtsBool passingCapability = rtsFalse; #endif -/* ----------------------------------------------------------------------------- +#ifdef SMP +#define UNUSED_IF_NOT_SMP +#else +#define UNUSED_IF_NOT_SMP STG_UNUSED +#endif + +#if defined(RTS_USER_SIGNALS) +#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || signals_pending()) +#else +#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted) +#endif + +/* ---------------------------------------------------------------------------- Initialisation - -------------------------------------------------------------------------- */ -static -void + ------------------------------------------------------------------------- */ + +static void initCapability( Capability *cap ) { cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; @@ -86,27 +103,26 @@ initCapability( Capability *cap ) static void initCapabilities_(nat n); #endif -/* +/* --------------------------------------------------------------------------- * Function: initCapabilities() * * Purpose: set up the Capability handling. For the SMP build, * we keep a table of them, the size of which is * controlled by the user via the RTS flag RtsFlags.ParFlags.nNodes * - * Pre-conditions: no locks assumed held. - */ + * ------------------------------------------------------------------------- */ void -initCapabilities() +initCapabilities( void ) { -#if defined(RTS_SUPPORTS_THREADS) - initCondition(&returning_worker_cond); - initCondition(&thread_ready_cond); -#endif - #if defined(SMP) initCapabilities_(RtsFlags.ParFlags.nNodes); #else initCapability(&MainCapability); +#endif + +#if defined(RTS_SUPPORTS_THREADS) + initCondition(&returning_worker_cond); + initCondition(&thread_ready_cond); rts_n_free_capabilities = 1; #endif @@ -120,28 +136,24 @@ static Capability *returning_capabilities; /* Capabilities being passed to returning worker threads */ #endif -/* ----------------------------------------------------------------------------- - Acquiring capabilities - -------------------------------------------------------------------------- */ +/* ---------------------------------------------------------------------------- + grabCapability( Capability** ) -/* - * Function: grabCapability(Capability**) - * - * Purpose: the act of grabbing a capability is easy; just - * remove one from the free capabilities list (which - * may just have one entry). In threaded builds, worker - * threads are prevented from doing so willy-nilly - * via the condition variables thread_ready_cond and - * returning_worker_cond. - * - */ -void grabCapability(Capability** cap) -{ -#ifdef RTS_SUPPORTS_THREADS - ASSERT(rts_n_free_capabilities > 0); + (only externally visible when !RTS_SUPPORTS_THREADS. In the + threaded RTS, clients must use waitFor*Capability()). + ------------------------------------------------------------------------- */ + +#if defined(RTS_SUPPORTS_THREADS) +static #endif +void +grabCapability( Capability** cap ) +{ #if !defined(SMP) +#if defined(RTS_SUPPORTS_THREADS) + ASSERT(rts_n_free_capabilities == 1); rts_n_free_capabilities = 0; +#endif *cap = &MainCapability; handleSignalsInThisThread(); #else @@ -149,60 +161,70 @@ void grabCapability(Capability** cap) free_capabilities = (*cap)->link; rts_n_free_capabilities--; #endif - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): got capability\n", - osThreadId())); +#if defined(RTS_SUPPORTS_THREADS) + IF_DEBUG(scheduler, sched_belch("worker: got capability")); +#endif } -/* +/* ---------------------------------------------------------------------------- * Function: releaseCapability(Capability*) * * Purpose: Letting go of a capability. Causes a * 'returning worker' thread or a 'waiting worker' * to wake up, in that order. - * - */ -void releaseCapability(Capability* cap -#if !defined(SMP) - STG_UNUSED -#endif -) -{ // Precondition: sched_mutex must be held + * ------------------------------------------------------------------------- */ + +void +releaseCapability( Capability* cap UNUSED_IF_NOT_SMP ) +{ + // Precondition: sched_mutex is held. #if defined(RTS_SUPPORTS_THREADS) #ifndef SMP - ASSERT(rts_n_free_capabilities == 0); + ASSERT(rts_n_free_capabilities == 0); #endif - /* Check to see whether a worker thread can be given - the go-ahead to return the result of an external call..*/ - if (rts_n_waiting_workers > 0) { - /* Decrement the counter here to avoid livelock where the - * thread that is yielding its capability will repeatedly - * signal returning_worker_cond. - */ + // Check to see whether a worker thread can be given + // the go-ahead to return the result of an external call.. + if (rts_n_waiting_workers > 0) { + // Decrement the counter here to avoid livelock where the + // thread that is yielding its capability will repeatedly + // signal returning_worker_cond. + #if defined(SMP) // SMP variant untested - cap->link = returning_capabilities; - returning_capabilities = cap; -#else + cap->link = returning_capabilities; + returning_capabilities = cap; #endif - rts_n_waiting_workers--; - signalCondition(&returning_worker_cond); - } else /*if ( !EMPTY_RUN_QUEUE() )*/ { + + rts_n_waiting_workers--; + signalCondition(&returning_worker_cond); + IF_DEBUG(scheduler, sched_belch("worker: released capability to returning worker")); + } else if (passingCapability) { + if (passTarget == NULL) { + signalCondition(&thread_ready_cond); + startSchedulerTaskIfNecessary(); + } else { + signalCondition(passTarget); + } + rts_n_free_capabilities = 1; + IF_DEBUG(scheduler, sched_belch("worker: released capability, passing it")); + + } else { #if defined(SMP) - cap->link = free_capabilities; - free_capabilities = cap; - rts_n_free_capabilities++; + cap->link = free_capabilities; + free_capabilities = cap; + rts_n_free_capabilities++; #else - rts_n_free_capabilities = 1; + rts_n_free_capabilities = 1; #endif - /* Signal that a capability is available */ - signalCondition(&thread_ready_cond); - } + // Signal that a capability is available + if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) { + signalCondition(&thread_ready_cond); + } + startSchedulerTaskIfNecessary(); + IF_DEBUG(scheduler, sched_belch("worker: released capability")); + } #endif - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): released capability\n", - osThreadId())); - return; + return; } #if defined(RTS_SUPPORTS_THREADS) @@ -211,8 +233,8 @@ void releaseCapability(Capability* cap * call, it needs to communicate the result back. This is done * as follows: * - * - in resumeThread(), the thread calls grabReturnCapability(). - * - If no capabilities are readily available, grabReturnCapability() + * - in resumeThread(), the thread calls waitForReturnCapability(). + * - If no capabilities are readily available, waitForReturnCapability() * increments a counter rts_n_waiting_workers, and blocks * waiting for the condition returning_worker_cond to become * signalled. @@ -225,8 +247,8 @@ void releaseCapability(Capability* cap * to re-grab a capability and re-enter the Scheduler. */ -/* - * Function: grabReturnCapability(Capability**) +/* ---------------------------------------------------------------------------- + * waitForReturnCapability( Mutext *pMutex, Capability** ) * * Purpose: when an OS thread returns from an external call, * it calls grabReturnCapability() (via Schedule.resumeThread()) @@ -234,77 +256,76 @@ void releaseCapability(Capability* cap * result of the external call back to the Haskell thread that * made it. * - * Pre-condition: pMutex is held. - * Post-condition: pMutex is still held and a capability has - * been assigned to the worker thread. - */ + * ------------------------------------------------------------------------- */ + void -grabReturnCapability(Mutex* pMutex, Capability** pCap) +waitForReturnCapability( Mutex* pMutex, Capability** pCap ) { - IF_DEBUG(scheduler, - fprintf(stderr,"worker (%p): returning, waiting for lock.\n", osThreadId())); - IF_DEBUG(scheduler, - fprintf(stderr,"worker (%p): returning; workers waiting: %d\n", - osThreadId(), rts_n_waiting_workers)); - if ( noCapabilities() ) { - rts_n_waiting_workers++; - wakeBlockedWorkerThread(); - context_switch = 1; // make sure it's our turn soon - waitCondition(&returning_worker_cond, pMutex); + // Pre-condition: pMutex is held. + + IF_DEBUG(scheduler, + sched_belch("worker: returning; workers waiting: %d", + rts_n_waiting_workers)); + + if ( noCapabilities() || passingCapability ) { + rts_n_waiting_workers++; + context_switch = 1; // make sure it's our turn soon + waitCondition(&returning_worker_cond, pMutex); #if defined(SMP) - *pCap = returning_capabilities; - returning_capabilities = (*pCap)->link; + *pCap = returning_capabilities; + returning_capabilities = (*pCap)->link; #else - *pCap = &MainCapability; - ASSERT(rts_n_free_capabilities == 0); - handleSignalsInThisThread(); + *pCap = &MainCapability; + ASSERT(rts_n_free_capabilities == 0); + handleSignalsInThisThread(); #endif - } else { - grabCapability(pCap); - } - return; + } else { + grabCapability(pCap); + } + + // Post-condition: pMutex is held, pCap points to a capability + // which is now held by the current thread. + return; } -/* ----------------------------------------------------------------------------- - Yielding/waiting for capabilities - -------------------------------------------------------------------------- */ +/* ---------------------------------------------------------------------------- + * yieldCapability( Mutex* pMutex, Capability** pCap ) + * ------------------------------------------------------------------------- */ -/* - * Function: yieldToReturningWorker(Mutex*,Capability*,Condition*) - * - * Purpose: when, upon entry to the Scheduler, an OS worker thread - * spots that one or more threads are blocked waiting for - * permission to return back their result, it gives up - * its Capability. - * Immediately afterwards, it tries to reaquire the Capabilty - * using waitForWorkCapability. - * - * - * Pre-condition: pMutex is assumed held and the thread possesses - * a Capability. - * Post-condition: pMutex is held and the thread possesses - * a Capability. - */ void -yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) +yieldCapability( Capability** pCap ) { - if ( rts_n_waiting_workers > 0 ) { - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId())); - releaseCapability(*pCap); - /* And wait for work */ - waitForWorkCapability(pMutex, pCap, pThreadCond); - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n", - osThreadId())); - } - return; + // Pre-condition: pMutex is assumed held, the current thread + // holds the capability pointed to by pCap. + + if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) { + IF_DEBUG(scheduler, + if (rts_n_waiting_workers > 0) { + sched_belch("worker: giving up capability (returning wkr)"); + } else if (passingCapability) { + sched_belch("worker: giving up capability (passing capability)"); + } else { + sched_belch("worker: giving up capability (no threads to run)"); + } + ); + releaseCapability(*pCap); + *pCap = NULL; + } + + // Post-condition: pMutex is assumed held, and either: + // + // 1. *pCap is NULL, in which case the current thread does not + // hold a capability now, or + // 2. *pCap is not NULL, in which case the current thread still + // holds the capability. + // + return; } -/* - * Function: waitForWorkCapability(Mutex*, Capability**, Condition*) +/* ---------------------------------------------------------------------------- + * waitForCapability( Mutex*, Capability**, Condition* ) * * Purpose: wait for a Capability to become available. In * the process of doing so, updates the number @@ -314,74 +335,92 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) * call is made. * If pThreadCond is not NULL, a capability can be specifically * passed to this thread using passCapability. - * - * Pre-condition: pMutex is held. - * Post-condition: pMutex is held and *pCap is held by the current thread - */ + * ------------------------------------------------------------------------- */ -static Condition *passTarget = NULL; - -void -waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) +void +waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond ) { -#ifdef SMP - #error SMP version not implemented -#endif - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n", - osThreadId(),pThreadCond)); - while ( noCapabilities() || (pThreadCond && passTarget != pThreadCond) - || (!pThreadCond && passTarget)) { - if(pThreadCond) - { - waitCondition(pThreadCond, pMutex); - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): get passed capability\n", - osThreadId())); - } - else - { - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, pMutex); - rts_n_waiting_tasks--; - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): get normal capability\n", - osThreadId())); + // Pre-condition: pMutex is held. + + while ( noCapabilities() || + (passingCapability && passTarget != pThreadCond) || + !ANY_WORK_TO_DO()) { + IF_DEBUG(scheduler, + sched_belch("worker: wait for capability (cond: %p)", + pThreadCond)); + + if (pThreadCond != NULL) { + waitCondition(pThreadCond, pMutex); + IF_DEBUG(scheduler, sched_belch("worker: get passed capability")); + } else { + rts_n_waiting_tasks++; + waitCondition(&thread_ready_cond, pMutex); + rts_n_waiting_tasks--; + IF_DEBUG(scheduler, sched_belch("worker: get normal capability")); + } } - } - passTarget = NULL; - grabCapability(pCap); - return; + passingCapability = rtsFalse; + grabCapability(pCap); + + // Post-condition: pMutex is held and *pCap is held by the current thread + return; } -/* - * Function: passCapability(Mutex*, Capability*, Condition*) - * - * Purpose: Let go of the capability and make sure the thread associated - * with the Condition pTargetThreadCond gets it next. - * - * Pre-condition: pMutex is held and cap is held by the current thread - * Post-condition: pMutex is held; cap will be grabbed by the "target" - * thread when pMutex is released. - */ +/* ---------------------------------------------------------------------------- + passCapability, passCapabilityToWorker + ------------------------------------------------------------------------- */ void -passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond) +passCapability( Condition *pTargetThreadCond ) { -#ifdef SMP - #error SMP version not implemented -#endif - rts_n_free_capabilities = 1; - signalCondition(pTargetThreadCond); + // Pre-condition: pMutex is held and cap is held by the current thread + passTarget = pTargetThreadCond; - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): passCapability\n", - osThreadId())); + passingCapability = rtsTrue; + IF_DEBUG(scheduler, sched_belch("worker: passCapability")); + + // Post-condition: pMutex is held; cap is still held, but will be + // passed to the target thread when next released. } +void +passCapabilityToWorker( void ) +{ + // Pre-condition: pMutex is held and cap is held by the current thread + + passTarget = NULL; + passingCapability = rtsTrue; + IF_DEBUG(scheduler, sched_belch("worker: passCapabilityToWorker")); + + // Post-condition: pMutex is held; cap is still held, but will be + // passed to a worker thread when next released. +} #endif /* RTS_SUPPORTS_THREADS */ +/* ---------------------------------------------------------------------------- + threadRunnable() + + Signals that a thread has been placed on the run queue, so a worker + might need to be woken up to run it. + + ToDo: should check whether the thread at the front of the queue is + bound, and if so wake up the appropriate worker. + -------------------------------------------------------------------------- */ + +void +threadRunnable ( void ) +{ +#if defined(RTS_SUPPORTS_THREADS) + if ( !noCapabilities && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) { + signalCondition(&thread_ready_cond); + } + startSchedulerTaskIfNecessary(); +#endif +} + +/* ------------------------------------------------------------------------- */ + #if defined(SMP) /* * Function: initCapabilities_(nat) @@ -407,7 +446,8 @@ initCapabilities_(nat n) free_capabilities = cap; rts_n_free_capabilities = n; returning_capabilities = NULL; - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n", n_free_capabilities);); + IF_DEBUG(scheduler, + sched_belch("allocated %d capabilities", n_free_capabilities)); } #endif /* SMP */