X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FCapability.c;h=9e28a16e49ccfff6a310095446b6872d695cc502;hb=ab3d1f285cef784138d99e70f7359ea6e67f6c25;hp=1d789821c2230ca682bd9dbf5f63b846d6a9ed88;hpb=410a99e4d59c22940db2adf59fb912435b71d24b;p=ghc-hetmet.git diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 1d78982..9e28a16 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -1,6 +1,5 @@ /* --------------------------------------------------------------------------- - * - * (c) The GHC Team, 2002 + * (c) The GHC Team, 2003 * * Capabilities * @@ -15,21 +14,27 @@ * one global capability, namely MainCapability. * * --------------------------------------------------------------------------*/ + #include "PosixSource.h" #include "Rts.h" #include "RtsUtils.h" +#include "RtsFlags.h" #include "OSThreads.h" #include "Capability.h" #include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */ -#include "Signals.h" /* to get at handleSignalsInThisThread() */ +#if defined(SMP) +#include "Hash.h" +#endif #if !defined(SMP) Capability MainCapability; /* for non-SMP, we have one global capability */ #endif +Capability *capabilities = NULL; nat rts_n_free_capabilities; #if defined(RTS_SUPPORTS_THREADS) + /* returning_worker_cond: when a worker thread returns from executing an * external call, it needs to wait for an RTS Capability before passing * on the result of the call to the Haskell thread that made it. @@ -54,8 +59,8 @@ nat rts_n_waiting_workers = 0; * exclusive access to the RTS and all its data structures (that are not * locked by the Scheduler's mutex). * - * thread_ready_cond is signalled whenever noCapabilities doesn't hold. - * + * thread_ready_cond is signalled whenever + * !noCapabilities && !EMPTY_RUN_QUEUE(). */ Condition thread_ready_cond = INIT_COND_VAR; @@ -69,142 +74,203 @@ Condition thread_ready_cond = INIT_COND_VAR; * Task.startTask() uses its current value. */ nat rts_n_waiting_tasks = 0; +#endif + +#if defined(SMP) +/* + * Free capability list. + */ +Capability *free_capabilities; + +/* + * Maps OSThreadId to Capability * + */ +HashTable *capability_hash; +#endif + +#ifdef SMP +#define UNUSED_IF_NOT_SMP +#else +#define UNUSED_IF_NOT_SMP STG_UNUSED +#endif + -static Condition *passTarget = NULL; -static rtsBool passingCapability = rtsFalse; +#if defined(RTS_SUPPORTS_THREADS) +INLINE_HEADER rtsBool +ANY_WORK_FOR_ME( Condition *cond ) +{ + // If the run queue is not empty, then we only wake up the guy who + // can run the thread at the head, even if there is some other + // reason for this task to run (eg. interrupted=rtsTrue). + if (!EMPTY_RUN_QUEUE()) { + if (run_queue_hd->main == NULL) { + return (cond == NULL); + } else { + return (&run_queue_hd->main->bound_thread_cond == cond); + } + } + + return blackholes_need_checking + || interrupted +#if defined(RTS_USER_SIGNALS) + || signals_pending() +#endif + ; +} #endif -/* ----------------------------------------------------------------------------- +INLINE_HEADER rtsBool +ANY_WORK_TO_DO(void) +{ + return (!EMPTY_RUN_QUEUE() + || interrupted + || blackholes_need_checking +#if defined(RTS_USER_SIGNALS) + || signals_pending() +#endif + ); +} + +/* ---------------------------------------------------------------------------- Initialisation - -------------------------------------------------------------------------- */ -static -void + ------------------------------------------------------------------------- */ + +static void initCapability( Capability *cap ) { + cap->r.rInHaskell = rtsFalse; cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; cap->f.stgGCFun = (F_)__stg_gc_fun; } -#if defined(SMP) -static void initCapabilities_(nat n); -#endif - -/* +/* --------------------------------------------------------------------------- * Function: initCapabilities() * * Purpose: set up the Capability handling. For the SMP build, * we keep a table of them, the size of which is * controlled by the user via the RTS flag RtsFlags.ParFlags.nNodes * - * Pre-conditions: no locks assumed held. - */ + * ------------------------------------------------------------------------- */ void -initCapabilities() +initCapabilities( void ) { -#if defined(RTS_SUPPORTS_THREADS) - initCondition(&returning_worker_cond); - initCondition(&thread_ready_cond); -#endif - #if defined(SMP) - initCapabilities_(RtsFlags.ParFlags.nNodes); + nat i,n; + + n = RtsFlags.ParFlags.nNodes; + capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities"); + + for (i = 0; i < n; i++) { + initCapability(&capabilities[i]); + capabilities[i].link = &capabilities[i+1]; + } + capabilities[n-1].link = NULL; + + free_capabilities = &capabilities[0]; + rts_n_free_capabilities = n; + + capability_hash = allocHashTable(); + + IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n)); #else - initCapability(&MainCapability); - rts_n_free_capabilities = 1; + capabilities = &MainCapability; + initCapability(&MainCapability); + rts_n_free_capabilities = 1; #endif - return; +#if defined(RTS_SUPPORTS_THREADS) + initCondition(&returning_worker_cond); + initCondition(&thread_ready_cond); +#endif } -#if defined(SMP) -/* Free capability list. */ -static Capability *free_capabilities; /* Available capabilities for running threads */ -static Capability *returning_capabilities; - /* Capabilities being passed to returning worker threads */ -#endif +/* ---------------------------------------------------------------------------- + grabCapability( Capability** ) -/* ----------------------------------------------------------------------------- - Acquiring capabilities - -------------------------------------------------------------------------- */ + (only externally visible when !RTS_SUPPORTS_THREADS. In the + threaded RTS, clients must use waitFor*Capability()). + ------------------------------------------------------------------------- */ -/* - * Function: grabCapability(Capability**) - * - * Purpose: the act of grabbing a capability is easy; just - * remove one from the free capabilities list (which - * may just have one entry). In threaded builds, worker - * threads are prevented from doing so willy-nilly - * via the condition variables thread_ready_cond and - * returning_worker_cond. - * - */ -void grabCapability(Capability** cap) +#if defined(RTS_SUPPORTS_THREADS) +static +#endif +void +grabCapability( Capability** cap ) { -#if !defined(SMP) - ASSERT(rts_n_free_capabilities == 1); - rts_n_free_capabilities = 0; - *cap = &MainCapability; - handleSignalsInThisThread(); -#else +#if defined(SMP) + ASSERT(rts_n_free_capabilities > 0); *cap = free_capabilities; free_capabilities = (*cap)->link; rts_n_free_capabilities--; + insertHashTable(capability_hash, osThreadId(), *cap); +#else +# if defined(RTS_SUPPORTS_THREADS) + ASSERT(rts_n_free_capabilities == 1); + rts_n_free_capabilities = 0; +# endif + *cap = &MainCapability; #endif -#ifdef RTS_SUPPORTS_THREADS - IF_DEBUG(scheduler, sched_belch("worker: got capability")); +#if defined(RTS_SUPPORTS_THREADS) + IF_DEBUG(scheduler, sched_belch("worker: got capability")); #endif } -/* +/* ---------------------------------------------------------------------------- + * Function: myCapability(void) + * + * Purpose: Return the capability owned by the current thread. + * Should not be used if the current thread does not + * hold a Capability. + * ------------------------------------------------------------------------- */ +Capability * +myCapability (void) +{ +#if defined(SMP) + return lookupHashTable(capability_hash, osThreadId()); +#else + return &MainCapability; +#endif +} + +/* ---------------------------------------------------------------------------- * Function: releaseCapability(Capability*) * * Purpose: Letting go of a capability. Causes a * 'returning worker' thread or a 'waiting worker' * to wake up, in that order. - * - */ -void releaseCapability(Capability* cap -#if !defined(SMP) - STG_UNUSED -#endif -) -{ // Precondition: sched_mutex must be held + * ------------------------------------------------------------------------- */ + +void +releaseCapability( Capability* cap UNUSED_IF_NOT_SMP ) +{ + // Precondition: sched_mutex is held. #if defined(RTS_SUPPORTS_THREADS) -#ifndef SMP - ASSERT(rts_n_free_capabilities == 0); -#endif - /* Check to see whether a worker thread can be given - the go-ahead to return the result of an external call..*/ - if (rts_n_waiting_workers > 0) { - /* Decrement the counter here to avoid livelock where the - * thread that is yielding its capability will repeatedly - * signal returning_worker_cond. - */ -#if defined(SMP) - // SMP variant untested - cap->link = returning_capabilities; - returning_capabilities = cap; -#else +#if !defined(SMP) + ASSERT(rts_n_free_capabilities == 0); #endif - rts_n_waiting_workers--; - signalCondition(&returning_worker_cond); - IF_DEBUG(scheduler, sched_belch("worker: released capability to returning worker")); - } else /*if ( !EMPTY_RUN_QUEUE() )*/ { #if defined(SMP) cap->link = free_capabilities; free_capabilities = cap; - rts_n_free_capabilities++; -#else - rts_n_free_capabilities = 1; + ASSERT(myCapability() == cap); + removeHashTable(capability_hash, osThreadId(), NULL); #endif - /* Signal that a capability is available */ - signalCondition(&thread_ready_cond); - startSchedulerTaskIfNecessary(); // if there is more work to be done, - // we'll need a new thread - IF_DEBUG(scheduler, sched_belch("worker: released capability")); - } + // Check to see whether a worker thread can be given + // the go-ahead to return the result of an external call.. + if (rts_n_waiting_workers > 0) { + // Decrement the counter here to avoid livelock where the + // thread that is yielding its capability will repeatedly + // signal returning_worker_cond. + rts_n_waiting_workers--; + signalCondition(&returning_worker_cond); + IF_DEBUG(scheduler, + sched_belch("worker: released capability to returning worker")); + } else { + rts_n_free_capabilities++; + IF_DEBUG(scheduler, sched_belch("worker: released capability")); + threadRunnable(); + } #endif - return; + return; } #if defined(RTS_SUPPORTS_THREADS) @@ -213,8 +279,8 @@ void releaseCapability(Capability* cap * call, it needs to communicate the result back. This is done * as follows: * - * - in resumeThread(), the thread calls grabReturnCapability(). - * - If no capabilities are readily available, grabReturnCapability() + * - in resumeThread(), the thread calls waitForReturnCapability(). + * - If no capabilities are readily available, waitForReturnCapability() * increments a counter rts_n_waiting_workers, and blocks * waiting for the condition returning_worker_cond to become * signalled. @@ -227,8 +293,8 @@ void releaseCapability(Capability* cap * to re-grab a capability and re-enter the Scheduler. */ -/* - * Function: grabReturnCapability(Capability**) +/* ---------------------------------------------------------------------------- + * waitForReturnCapability( Mutext *pMutex, Capability** ) * * Purpose: when an OS thread returns from an external call, * it calls grabReturnCapability() (via Schedule.resumeThread()) @@ -236,73 +302,77 @@ void releaseCapability(Capability* cap * result of the external call back to the Haskell thread that * made it. * - * Pre-condition: pMutex is held. - * Post-condition: pMutex is still held and a capability has - * been assigned to the worker thread. - */ + * ------------------------------------------------------------------------- */ + void -grabReturnCapability(Mutex* pMutex, Capability** pCap) +waitForReturnCapability( Mutex* pMutex, Capability** pCap ) { - IF_DEBUG(scheduler, - sched_belch("worker: returning; workers waiting: %d", - rts_n_waiting_workers)); - if ( noCapabilities() || passingCapability ) { - rts_n_waiting_workers++; - wakeBlockedWorkerThread(); - context_switch = 1; // make sure it's our turn soon - waitCondition(&returning_worker_cond, pMutex); + // Pre-condition: pMutex is held. + + IF_DEBUG(scheduler, + sched_belch("worker: returning; workers waiting: %d", + rts_n_waiting_workers)); + + if ( noCapabilities() ) { + rts_n_waiting_workers++; + context_switch = 1; // make sure it's our turn soon + waitCondition(&returning_worker_cond, pMutex); #if defined(SMP) - *pCap = returning_capabilities; - returning_capabilities = (*pCap)->link; + *pCap = free_capabilities; + free_capabilities = (*pCap)->link; + ASSERT(pCap != NULL); + insertHashTable(capability_hash, osThreadId(), *pCap); #else - *pCap = &MainCapability; - ASSERT(rts_n_free_capabilities == 0); - handleSignalsInThisThread(); + *pCap = &MainCapability; + ASSERT(rts_n_free_capabilities == 0); #endif - } else { - grabCapability(pCap); - } - return; + } else { + grabCapability(pCap); + } + + // Post-condition: pMutex is held, pCap points to a capability + // which is now held by the current thread. + return; } -/* ----------------------------------------------------------------------------- - Yielding/waiting for capabilities - -------------------------------------------------------------------------- */ +/* ---------------------------------------------------------------------------- + * yieldCapability( Mutex* pMutex, Capability** pCap ) + * ------------------------------------------------------------------------- */ -/* - * Function: yieldToReturningWorker(Mutex*,Capability*,Condition*) - * - * Purpose: when, upon entry to the Scheduler, an OS worker thread - * spots that one or more threads are blocked waiting for - * permission to return back their result, it gives up - * its Capability. - * Immediately afterwards, it tries to reaquire the Capabilty - * using waitForWorkCapability. - * - * - * Pre-condition: pMutex is assumed held and the thread possesses - * a Capability. - * Post-condition: pMutex is held and the thread possesses - * a Capability. - */ void -yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) +yieldCapability( Capability** pCap, Condition *cond ) { - if ( rts_n_waiting_workers > 0 ) { - IF_DEBUG(scheduler, sched_belch("worker: giving up capability")); - releaseCapability(*pCap); - /* And wait for work */ - waitForWorkCapability(pMutex, pCap, pThreadCond); - IF_DEBUG(scheduler, - sched_belch("worker: got back capability (after yieldToReturningWorker)")); - } - return; + // Pre-condition: pMutex is assumed held, the current thread + // holds the capability pointed to by pCap. + + if ( rts_n_waiting_workers > 0 || !ANY_WORK_FOR_ME(cond)) { + IF_DEBUG(scheduler, + if (rts_n_waiting_workers > 0) { + sched_belch("worker: giving up capability (returning wkr)"); + } else if (!EMPTY_RUN_QUEUE()) { + sched_belch("worker: giving up capability (passing capability)"); + } else { + sched_belch("worker: giving up capability (no threads to run)"); + } + ); + releaseCapability(*pCap); + *pCap = NULL; + } + + // Post-condition: either: + // + // 1. *pCap is NULL, in which case the current thread does not + // hold a capability now, or + // 2. *pCap is not NULL, in which case the current thread still + // holds the capability. + // + return; } -/* - * Function: waitForWorkCapability(Mutex*, Capability**, Condition*) +/* ---------------------------------------------------------------------------- + * waitForCapability( Mutex*, Capability**, Condition* ) * * Purpose: wait for a Capability to become available. In * the process of doing so, updates the number @@ -311,120 +381,74 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) * not to create a new worker thread when an external * call is made. * If pThreadCond is not NULL, a capability can be specifically - * passed to this thread using passCapability. - * - * Pre-condition: pMutex is held. - * Post-condition: pMutex is held and *pCap is held by the current thread - */ + * passed to this thread. + * ------------------------------------------------------------------------- */ -void -waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) +void +waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond ) { -#ifdef SMP - #error SMP version not implemented -#endif - while ( noCapabilities() || (passingCapability && passTarget != pThreadCond)) { - IF_DEBUG(scheduler, - sched_belch("worker: wait for capability (cond: %p)", - pThreadCond)); - if(pThreadCond) - { - waitCondition(pThreadCond, pMutex); - IF_DEBUG(scheduler, sched_belch("worker: get passed capability")); - } - else - { - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, pMutex); - rts_n_waiting_tasks--; - IF_DEBUG(scheduler, sched_belch("worker: get normal capability")); + // Pre-condition: pMutex is held. + + while ( noCapabilities() || !ANY_WORK_FOR_ME(pThreadCond)) { + IF_DEBUG(scheduler, + sched_belch("worker: wait for capability (cond: %p)", + pThreadCond)); + + if (pThreadCond != NULL) { + waitCondition(pThreadCond, pMutex); + IF_DEBUG(scheduler, sched_belch("worker: get passed capability")); + } else { + rts_n_waiting_tasks++; + waitCondition(&thread_ready_cond, pMutex); + rts_n_waiting_tasks--; + IF_DEBUG(scheduler, sched_belch("worker: get normal capability")); + } } - } - passingCapability = rtsFalse; - grabCapability(pCap); - return; + grabCapability(pCap); + + // Post-condition: pMutex is held and *pCap is held by the current thread + return; } -/* - * Function: passCapability(Mutex*, Capability*, Condition*) - * - * Purpose: Let go of the capability and make sure the thread associated - * with the Condition pTargetThreadCond gets it next. - * - * Pre-condition: pMutex is held and cap is held by the current thread - * Post-condition: pMutex is held; cap will be grabbed by the "target" - * thread when pMutex is released. - */ +#endif /* RTS_SUPPORTS_THREADS */ -void -passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond) -{ -#ifdef SMP - #error SMP version not implemented -#endif - rts_n_free_capabilities = 1; - signalCondition(pTargetThreadCond); - passTarget = pTargetThreadCond; - passingCapability = rtsTrue; - IF_DEBUG(scheduler, sched_belch("worker: passCapability")); -} +/* ---------------------------------------------------------------------------- + threadRunnable() -/* - * Function: passCapabilityToWorker(Mutex*, Capability*) - * - * Purpose: Let go of the capability and make sure that a - * "plain" worker thread (not a bound thread) gets it next. - * - * Pre-condition: pMutex is held and cap is held by the current thread - * Post-condition: pMutex is held; cap will be grabbed by the "target" - * thread when pMutex is released. - */ + Signals that a thread has been placed on the run queue, so a worker + might need to be woken up to run it. + ToDo: should check whether the thread at the front of the queue is + bound, and if so wake up the appropriate worker. + -------------------------------------------------------------------------- */ void -passCapabilityToWorker(Mutex* pMutex, Capability* cap) +threadRunnable ( void ) { -#ifdef SMP - #error SMP version not implemented +#if defined(RTS_SUPPORTS_THREADS) + if ( !noCapabilities() && ANY_WORK_TO_DO() ) { + if (!EMPTY_RUN_QUEUE() && run_queue_hd->main != NULL) { + signalCondition(&run_queue_hd->main->bound_thread_cond); + return; + } + if (rts_n_waiting_tasks > 0) { + signalCondition(&thread_ready_cond); + } else { + startSchedulerTaskIfNecessary(); + } + } #endif - rts_n_free_capabilities = 1; - signalCondition(&thread_ready_cond); - startSchedulerTaskIfNecessary(); - passTarget = NULL; - passingCapability = rtsTrue; - IF_DEBUG(scheduler, sched_belch("worker: passCapabilityToWorker")); } +/* ---------------------------------------------------------------------------- + prodWorker() -#endif /* RTS_SUPPORTS_THREADS */ - -#if defined(SMP) -/* - * Function: initCapabilities_(nat) - * - * Purpose: upon startup, allocate and fill in table - * holding 'n' Capabilities. Only for SMP, since - * it is the only build that supports multiple - * capabilities within the RTS. - */ -static void -initCapabilities_(nat n) + Wake up... time to die. + -------------------------------------------------------------------------- */ +void +prodWorker ( void ) { - nat i; - Capability *cap, *prev; - cap = NULL; - prev = NULL; - for (i = 0; i < n; i++) { - cap = stgMallocBytes(sizeof(Capability), "initCapabilities"); - initCapability(cap); - cap->link = prev; - prev = cap; - } - free_capabilities = cap; - rts_n_free_capabilities = n; - returning_capabilities = NULL; - IF_DEBUG(scheduler, - sched_belch("allocated %d capabilities", n_free_capabilities)); +#if defined(RTS_SUPPORTS_THREADS) + signalCondition(&thread_ready_cond); +#endif } -#endif /* SMP */ -