X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FCapability.c;h=1e2d3d657d8b3875ed0d712e2a21c54dc3ec43bc;hb=c2d428ba7d0f92d4e97eec7d5d877958689dbc55;hp=ee25f279b97f62d6791a4d3bd4a10044dc5e7fdd;hpb=af13609607da81e7837a7c7c598de82452363ab5;p=ghc-hetmet.git diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index ee25f27..1e2d3d6 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -1,6 +1,5 @@ /* --------------------------------------------------------------------------- - * - * (c) The GHC Team, 2002 + * (c) The GHC Team, 2003 * * Capabilities * @@ -15,21 +14,24 @@ * one global capability, namely MainCapability. * * --------------------------------------------------------------------------*/ + #include "PosixSource.h" #include "Rts.h" #include "RtsUtils.h" +#include "RtsFlags.h" #include "OSThreads.h" #include "Capability.h" #include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */ -#include "Signals.h" /* to get at handleSignalsInThisThread() */ #if !defined(SMP) Capability MainCapability; /* for non-SMP, we have one global capability */ #endif +Capability *capabilities = NULL; nat rts_n_free_capabilities; #if defined(RTS_SUPPORTS_THREADS) + /* returning_worker_cond: when a worker thread returns from executing an * external call, it needs to wait for an RTS Capability before passing * on the result of the call to the Haskell thread that made it. @@ -54,8 +56,8 @@ nat rts_n_waiting_workers = 0; * exclusive access to the RTS and all its data structures (that are not * locked by the Scheduler's mutex). * - * thread_ready_cond is signalled whenever noCapabilities doesn't hold. - * + * thread_ready_cond is signalled whenever + * !noCapabilities && !EMPTY_RUN_QUEUE(). */ Condition thread_ready_cond = INIT_COND_VAR; @@ -69,132 +71,162 @@ Condition thread_ready_cond = INIT_COND_VAR; * Task.startTask() uses its current value. */ nat rts_n_waiting_tasks = 0; + +static Condition *passTarget = NULL; +static rtsBool passingCapability = rtsFalse; #endif -/* ----------------------------------------------------------------------------- +#if defined(SMP) +/* + * Free capability list. + */ +Capability *free_capabilities; +#endif + +#ifdef SMP +#define UNUSED_IF_NOT_SMP +#else +#define UNUSED_IF_NOT_SMP STG_UNUSED +#endif + +#if defined(RTS_USER_SIGNALS) +#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || blackholes_need_checking || signals_pending()) +#else +#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || blackholes_need_checking) +#endif + +/* ---------------------------------------------------------------------------- Initialisation - -------------------------------------------------------------------------- */ -static -void + ------------------------------------------------------------------------- */ + +static void initCapability( Capability *cap ) { + cap->r.rInHaskell = rtsFalse; cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; cap->f.stgGCFun = (F_)__stg_gc_fun; } -#if defined(SMP) -static void initCapabilities_(nat n); -#endif - -/* +/* --------------------------------------------------------------------------- * Function: initCapabilities() * * Purpose: set up the Capability handling. For the SMP build, * we keep a table of them, the size of which is * controlled by the user via the RTS flag RtsFlags.ParFlags.nNodes * - * Pre-conditions: no locks assumed held. - */ + * ------------------------------------------------------------------------- */ void -initCapabilities() +initCapabilities( void ) { -#if defined(RTS_SUPPORTS_THREADS) - initCondition(&returning_worker_cond); - initCondition(&thread_ready_cond); -#endif - #if defined(SMP) - initCapabilities_(RtsFlags.ParFlags.nNodes); + nat i,n; + + n = RtsFlags.ParFlags.nNodes; + capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities"); + + for (i = 0; i < n; i++) { + initCapability(&capabilities[i]); + capabilities[i].link = &capabilities[i+1]; + } + capabilities[n-1].link = NULL; + + free_capabilities = &capabilities[0]; + rts_n_free_capabilities = n; + + IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n)); #else - initCapability(&MainCapability); - rts_n_free_capabilities = 1; + capabilities = &MainCapability; + initCapability(&MainCapability); + rts_n_free_capabilities = 1; #endif - return; +#if defined(RTS_SUPPORTS_THREADS) + initCondition(&returning_worker_cond); + initCondition(&thread_ready_cond); +#endif } -#if defined(SMP) -/* Free capability list. */ -static Capability *free_capabilities; /* Available capabilities for running threads */ -static Capability *returning_capabilities; - /* Capabilities being passed to returning worker threads */ -#endif +/* ---------------------------------------------------------------------------- + grabCapability( Capability** ) -/* ----------------------------------------------------------------------------- - Acquiring capabilities - -------------------------------------------------------------------------- */ + (only externally visible when !RTS_SUPPORTS_THREADS. In the + threaded RTS, clients must use waitFor*Capability()). + ------------------------------------------------------------------------- */ -/* - * Function: grabCapability(Capability**) - * - * Purpose: the act of grabbing a capability is easy; just - * remove one from the free capabilities list (which - * may just have one entry). In threaded builds, worker - * threads are prevented from doing so willy-nilly - * via the condition variables thread_ready_cond and - * returning_worker_cond. - * - */ -void grabCapability(Capability** cap) +#if defined(RTS_SUPPORTS_THREADS) +static +#endif +void +grabCapability( Capability** cap ) { +#if defined(SMP) ASSERT(rts_n_free_capabilities > 0); -#if !defined(SMP) - rts_n_free_capabilities = 0; - *cap = &MainCapability; - handleSignalsInThisThread(); -#else *cap = free_capabilities; free_capabilities = (*cap)->link; rts_n_free_capabilities--; +#else +# if defined(RTS_SUPPORTS_THREADS) + ASSERT(rts_n_free_capabilities == 1); + rts_n_free_capabilities = 0; +# endif + *cap = &MainCapability; +#endif +#if defined(RTS_SUPPORTS_THREADS) + IF_DEBUG(scheduler, sched_belch("worker: got capability")); #endif } -/* +/* ---------------------------------------------------------------------------- * Function: releaseCapability(Capability*) * * Purpose: Letting go of a capability. Causes a * 'returning worker' thread or a 'waiting worker' * to wake up, in that order. - * - */ -void releaseCapability(Capability* cap -#if !defined(SMP) - STG_UNUSED -#endif -) -{ // Precondition: sched_mutex must be held + * ------------------------------------------------------------------------- */ + +void +releaseCapability( Capability* cap UNUSED_IF_NOT_SMP ) +{ + // Precondition: sched_mutex is held. #if defined(RTS_SUPPORTS_THREADS) -#ifndef SMP - ASSERT(rts_n_free_capabilities == 0); -#endif - /* Check to see whether a worker thread can be given - the go-ahead to return the result of an external call..*/ - if (rts_n_waiting_workers > 0) { - /* Decrement the counter here to avoid livelock where the - * thread that is yielding its capability will repeatedly - * signal returning_worker_cond. - */ -#if defined(SMP) - // SMP variant untested - cap->link = returning_capabilities; - returning_capabilities = cap; -#else +#if !defined(SMP) + ASSERT(rts_n_free_capabilities == 0); #endif - rts_n_waiting_workers--; - signalCondition(&returning_worker_cond); - } else /*if ( !EMPTY_RUN_QUEUE() )*/ { #if defined(SMP) cap->link = free_capabilities; free_capabilities = cap; - rts_n_free_capabilities++; -#else - rts_n_free_capabilities = 1; #endif - /* Signal that a capability is available */ - signalCondition(&thread_ready_cond); - } + // Check to see whether a worker thread can be given + // the go-ahead to return the result of an external call.. + if (rts_n_waiting_workers > 0) { + // Decrement the counter here to avoid livelock where the + // thread that is yielding its capability will repeatedly + // signal returning_worker_cond. + + rts_n_waiting_workers--; + signalCondition(&returning_worker_cond); + IF_DEBUG(scheduler, sched_belch("worker: released capability to returning worker")); + } else if (passingCapability) { + if (passTarget == NULL) { + signalCondition(&thread_ready_cond); + startSchedulerTaskIfNecessary(); + } else { + signalCondition(passTarget); + } + rts_n_free_capabilities++; + IF_DEBUG(scheduler, sched_belch("worker: released capability, passing it")); + + } else { + rts_n_free_capabilities++; + // Signal that a capability is available + if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) { + signalCondition(&thread_ready_cond); + } + startSchedulerTaskIfNecessary(); + IF_DEBUG(scheduler, sched_belch("worker: released capability")); + } #endif - return; + return; } #if defined(RTS_SUPPORTS_THREADS) @@ -203,8 +235,8 @@ void releaseCapability(Capability* cap * call, it needs to communicate the result back. This is done * as follows: * - * - in resumeThread(), the thread calls grabReturnCapability(). - * - If no capabilities are readily available, grabReturnCapability() + * - in resumeThread(), the thread calls waitForReturnCapability(). + * - If no capabilities are readily available, waitForReturnCapability() * increments a counter rts_n_waiting_workers, and blocks * waiting for the condition returning_worker_cond to become * signalled. @@ -217,8 +249,8 @@ void releaseCapability(Capability* cap * to re-grab a capability and re-enter the Scheduler. */ -/* - * Function: grabReturnCapability(Capability**) +/* ---------------------------------------------------------------------------- + * waitForReturnCapability( Mutext *pMutex, Capability** ) * * Purpose: when an OS thread returns from an external call, * it calls grabReturnCapability() (via Schedule.resumeThread()) @@ -226,74 +258,76 @@ void releaseCapability(Capability* cap * result of the external call back to the Haskell thread that * made it. * - * Pre-condition: pMutex is held. - * Post-condition: pMutex is still held and a capability has - * been assigned to the worker thread. - */ + * ------------------------------------------------------------------------- */ + void -grabReturnCapability(Mutex* pMutex, Capability** pCap) +waitForReturnCapability( Mutex* pMutex, Capability** pCap ) { - IF_DEBUG(scheduler, - fprintf(stderr,"worker (%ld): returning, waiting for lock.\n", osThreadId())); - IF_DEBUG(scheduler, - fprintf(stderr,"worker (%ld): returning; workers waiting: %d\n", - osThreadId(), rts_n_waiting_workers)); - if ( noCapabilities() ) { - rts_n_waiting_workers++; - wakeBlockedWorkerThread(); - context_switch = 1; // make sure it's our turn soon - waitCondition(&returning_worker_cond, pMutex); + // Pre-condition: pMutex is held. + + IF_DEBUG(scheduler, + sched_belch("worker: returning; workers waiting: %d", + rts_n_waiting_workers)); + + if ( noCapabilities() || passingCapability ) { + rts_n_waiting_workers++; + context_switch = 1; // make sure it's our turn soon + waitCondition(&returning_worker_cond, pMutex); #if defined(SMP) - *pCap = returning_capabilities; - returning_capabilities = (*pCap)->link; + *pCap = free_capabilities; + free_capabilities = (*pCap)->link; + ASSERT(pCap != NULL); #else - *pCap = &MainCapability; - ASSERT(rts_n_free_capabilities == 0); - handleSignalsInThisThread(); + *pCap = &MainCapability; + ASSERT(rts_n_free_capabilities == 0); #endif - } else { - grabCapability(pCap); - } - return; + } else { + grabCapability(pCap); + } + + // Post-condition: pMutex is held, pCap points to a capability + // which is now held by the current thread. + return; } -/* ----------------------------------------------------------------------------- - Yielding/waiting for capabilities - -------------------------------------------------------------------------- */ +/* ---------------------------------------------------------------------------- + * yieldCapability( Mutex* pMutex, Capability** pCap ) + * ------------------------------------------------------------------------- */ -/* - * Function: yieldToReturningWorker(Mutex*,Capability*) - * - * Purpose: when, upon entry to the Scheduler, an OS worker thread - * spots that one or more threads are blocked waiting for - * permission to return back their result, it gives up - * its Capability. - * - * Pre-condition: pMutex is assumed held and the thread possesses - * a Capability. - * Post-condition: pMutex is held and the Capability has - * been given back. - */ void -yieldToReturningWorker(Mutex* pMutex, Capability** pCap) +yieldCapability( Capability** pCap ) { - if ( rts_n_waiting_workers > 0 ) { - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId())); - releaseCapability(*pCap); - /* And wait for work */ - waitForWorkCapability(pMutex, pCap, rtsFalse); - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n", - osThreadId())); - } - return; + // Pre-condition: pMutex is assumed held, the current thread + // holds the capability pointed to by pCap. + + if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) { + IF_DEBUG(scheduler, + if (rts_n_waiting_workers > 0) { + sched_belch("worker: giving up capability (returning wkr)"); + } else if (passingCapability) { + sched_belch("worker: giving up capability (passing capability)"); + } else { + sched_belch("worker: giving up capability (no threads to run)"); + } + ); + releaseCapability(*pCap); + *pCap = NULL; + } + + // Post-condition: either: + // + // 1. *pCap is NULL, in which case the current thread does not + // hold a capability now, or + // 2. *pCap is not NULL, in which case the current thread still + // holds the capability. + // + return; } -/* - * Function: waitForWorkCapability(Mutex*, Capability**, rtsBool) +/* ---------------------------------------------------------------------------- + * waitForCapability( Mutex*, Capability**, Condition* ) * * Purpose: wait for a Capability to become available. In * the process of doing so, updates the number @@ -301,50 +335,101 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap) * work. That counter is used when deciding whether or * not to create a new worker thread when an external * call is made. - * - * Pre-condition: pMutex is held. - * Post-condition: pMutex is held and *pCap is held by the current thread - */ -void -waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable) + * If pThreadCond is not NULL, a capability can be specifically + * passed to this thread using passCapability. + * ------------------------------------------------------------------------- */ + +void +waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond ) +{ + // Pre-condition: pMutex is held. + + while ( noCapabilities() || + (passingCapability && passTarget != pThreadCond) || + !ANY_WORK_TO_DO()) { + IF_DEBUG(scheduler, + sched_belch("worker: wait for capability (cond: %p)", + pThreadCond)); + + if (pThreadCond != NULL) { + waitCondition(pThreadCond, pMutex); + IF_DEBUG(scheduler, sched_belch("worker: get passed capability")); + } else { + rts_n_waiting_tasks++; + waitCondition(&thread_ready_cond, pMutex); + rts_n_waiting_tasks--; + IF_DEBUG(scheduler, sched_belch("worker: get normal capability")); + } + } + passingCapability = rtsFalse; + grabCapability(pCap); + + // Post-condition: pMutex is held and *pCap is held by the current thread + return; +} + +/* ---------------------------------------------------------------------------- + passCapability, passCapabilityToWorker + ------------------------------------------------------------------------- */ + +void +passCapability( Condition *pTargetThreadCond ) +{ + // Pre-condition: pMutex is held and cap is held by the current thread + + passTarget = pTargetThreadCond; + passingCapability = rtsTrue; + IF_DEBUG(scheduler, sched_belch("worker: passCapability")); + + // Post-condition: pMutex is held; cap is still held, but will be + // passed to the target thread when next released. +} + +void +passCapabilityToWorker( void ) { - while ( noCapabilities() || (runnable && EMPTY_RUN_QUEUE()) ) { - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, pMutex); - rts_n_waiting_tasks--; - } - grabCapability(pCap); - return; + // Pre-condition: pMutex is held and cap is held by the current thread + + passTarget = NULL; + passingCapability = rtsTrue; + IF_DEBUG(scheduler, sched_belch("worker: passCapabilityToWorker")); + + // Post-condition: pMutex is held; cap is still held, but will be + // passed to a worker thread when next released. } #endif /* RTS_SUPPORTS_THREADS */ -#if defined(SMP) -/* - * Function: initCapabilities_(nat) - * - * Purpose: upon startup, allocate and fill in table - * holding 'n' Capabilities. Only for SMP, since - * it is the only build that supports multiple - * capabilities within the RTS. - */ -static void -initCapabilities_(nat n) +/* ---------------------------------------------------------------------------- + threadRunnable() + + Signals that a thread has been placed on the run queue, so a worker + might need to be woken up to run it. + + ToDo: should check whether the thread at the front of the queue is + bound, and if so wake up the appropriate worker. + -------------------------------------------------------------------------- */ +void +threadRunnable ( void ) { - nat i; - Capability *cap, *prev; - cap = NULL; - prev = NULL; - for (i = 0; i < n; i++) { - cap = stgMallocBytes(sizeof(Capability), "initCapabilities"); - initCapability(cap); - cap->link = prev; - prev = cap; - } - free_capabilities = cap; - rts_n_free_capabilities = n; - returning_capabilities = NULL; - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n", n_free_capabilities);); +#if defined(RTS_SUPPORTS_THREADS) + if ( !noCapabilities() && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) { + signalCondition(&thread_ready_cond); + } + startSchedulerTaskIfNecessary(); +#endif } -#endif /* SMP */ + +/* ---------------------------------------------------------------------------- + prodWorker() + + Wake up... time to die. + -------------------------------------------------------------------------- */ +void +prodWorker ( void ) +{ +#if defined(RTS_SUPPORTS_THREADS) + signalCondition(&thread_ready_cond); +#endif +}