X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FCapability.c;h=203ca9f301a519a7a404fb583a397a3d7a601a7f;hb=0dbbf1932d550293986af6244202cb735b2cd966;hp=d08bf233393157a64d2a459c617a21133d005884;hpb=74df0d16d17647ab46344802ed57533d38727479;p=ghc-hetmet.git diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index d08bf23..203ca9f 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -1,6 +1,6 @@ /* --------------------------------------------------------------------------- * - * (c) The GHC Team, 2003-2005 + * (c) The GHC Team, 2003-2006 * * Capabilities * @@ -10,9 +10,9 @@ * STG execution, a pointer to the capabilitity is kept in a * register (BaseReg; actually it is a pointer to cap->r). * - * Only in an SMP build will there be multiple capabilities, for - * the threaded RTS and other non-threaded builds, there is only - * one global capability, namely MainCapability. + * Only in an THREADED_RTS build will there be multiple capabilities, + * for non-threaded builds there is only one global capability, namely + * MainCapability. * * --------------------------------------------------------------------------*/ @@ -20,13 +20,15 @@ #include "Rts.h" #include "RtsUtils.h" #include "RtsFlags.h" +#include "STM.h" #include "OSThreads.h" #include "Capability.h" #include "Schedule.h" +#include "Sparks.h" -#if !defined(SMP) -Capability MainCapability; // for non-SMP, we have one global capability -#endif +// one global capability, this is the Capability for non-threaded +// builds, and for +RTS -N1 +Capability MainCapability; nat n_capabilities; Capability *capabilities = NULL; @@ -37,45 +39,37 @@ Capability *capabilities = NULL; // locking, so we don't do that. Capability *last_free_capability; -#ifdef SMP -#define UNUSED_IF_NOT_SMP -#else -#define UNUSED_IF_NOT_SMP STG_UNUSED -#endif - -#ifdef RTS_USER_SIGNALS -#define UNUSED_IF_NOT_THREADS -#else -#define UNUSED_IF_NOT_THREADS STG_UNUSED -#endif - - +#if defined(THREADED_RTS) STATIC_INLINE rtsBool globalWorkToDo (void) { return blackholes_need_checking || interrupted -#if defined(RTS_USER_SIGNALS) - || signals_pending() -#endif ; } +#endif #if defined(THREADED_RTS) STATIC_INLINE rtsBool anyWorkForMe( Capability *cap, Task *task ) { - // If the run queue is not empty, then we only wake up the guy who - // can run the thread at the head, even if there is some other - // reason for this task to run (eg. interrupted=rtsTrue). - if (!emptyRunQueue(cap)) { - if (cap->run_queue_hd->bound == NULL) { - return (task->tso == NULL); - } else { - return (cap->run_queue_hd->bound == task); - } + if (task->tso != NULL) { + // A bound task only runs if its thread is on the run queue of + // the capability on which it was woken up. Otherwise, we + // can't be sure that we have the right capability: the thread + // might be woken up on some other capability, and task->cap + // could change under our feet. + return (!emptyRunQueue(cap) && cap->run_queue_hd->bound == task); + } else { + // A vanilla worker task runs if either (a) there is a + // lightweight thread at the head of the run queue, or (b) + // there are sparks to execute, or (c) there is some other + // global condition to check, such as threads blocked on + // blackholes. + return ((!emptyRunQueue(cap) && cap->run_queue_hd->bound == NULL) + || !emptySparkPoolCap(cap) + || globalWorkToDo()); } - return globalWorkToDo(); } #endif @@ -152,12 +146,17 @@ initCapability( Capability *cap, nat i ) for (g = 0; g < RtsFlags.GcFlags.generations; g++) { cap->mut_lists[g] = NULL; } + + cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE; + cap->free_trec_chunks = END_STM_CHUNK_LIST; + cap->free_trec_headers = NO_TREC; + cap->transaction_tokens = 0; } /* --------------------------------------------------------------------------- * Function: initCapabilities() * - * Purpose: set up the Capability handling. For the SMP build, + * Purpose: set up the Capability handling. For the THREADED_RTS build, * we keep a table of them, the size of which is * controlled by the user via the RTS flag -N. * @@ -165,21 +164,42 @@ initCapability( Capability *cap, nat i ) void initCapabilities( void ) { -#if defined(SMP) - nat i,n; +#if defined(THREADED_RTS) + nat i; - n_capabilities = n = RtsFlags.ParFlags.nNodes; - capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities"); +#ifndef REG_Base + // We can't support multiple CPUs if BaseReg is not a register + if (RtsFlags.ParFlags.nNodes > 1) { + errorBelch("warning: multiple CPUs not supported in this build, reverting to 1"); + RtsFlags.ParFlags.nNodes = 1; + } +#endif - for (i = 0; i < n; i++) { + n_capabilities = RtsFlags.ParFlags.nNodes; + + if (n_capabilities == 1) { + capabilities = &MainCapability; + // THREADED_RTS must work on builds that don't have a mutable + // BaseReg (eg. unregisterised), so in this case + // capabilities[0] must coincide with &MainCapability. + } else { + capabilities = stgMallocBytes(n_capabilities * sizeof(Capability), + "initCapabilities"); + } + + for (i = 0; i < n_capabilities; i++) { initCapability(&capabilities[i], i); } - IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n)); -#else + IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", + n_capabilities)); + +#else /* !THREADED_RTS */ + n_capabilities = 1; capabilities = &MainCapability; initCapability(&MainCapability, 0); + #endif // There are no free capabilities to begin with. We will start @@ -204,11 +224,10 @@ initCapabilities( void ) #if defined(THREADED_RTS) STATIC_INLINE void -giveCapabilityToTask (Capability *cap, Task *task) +giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) { ASSERT_LOCK_HELD(&cap->lock); ASSERT(task->cap == cap); - // We are not modifying task->cap, so we do not need to take task->lock. IF_DEBUG(scheduler, sched_belch("passing capability %d to %s %p", cap->no, task->tso ? "bound task" : "worker", @@ -239,7 +258,7 @@ releaseCapability_ (Capability* cap) task = cap->running_task; - ASSERT_CAPABILITY_INVARIANTS(cap,task); + ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task); cap->running_task = NULL; @@ -261,15 +280,7 @@ releaseCapability_ (Capability* cap) return; } - // If we have an unbound thread on the run queue, or if there's - // anything else to do, give the Capability to a worker thread. - if (!emptyRunQueue(cap) || globalWorkToDo()) { - if (cap->spare_workers) { - giveCapabilityToTask(cap,cap->spare_workers); - // The worker Task pops itself from the queue; - return; - } - + if (!cap->spare_workers) { // Create a worker thread if we don't have one. If the system // is interrupted, we only create a worker task if there // are threads that need to be completed. If the system is @@ -282,12 +293,22 @@ releaseCapability_ (Capability* cap) } } + // If we have an unbound thread on the run queue, or if there's + // anything else to do, give the Capability to a worker thread. + if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) { + if (cap->spare_workers) { + giveCapabilityToTask(cap,cap->spare_workers); + // The worker Task pops itself from the queue; + return; + } + } + last_free_capability = cap; IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no)); } void -releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS) +releaseCapability (Capability* cap USED_IF_THREADS) { ACQUIRE_LOCK(&cap->lock); releaseCapability_(cap); @@ -295,7 +316,7 @@ releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS) } static void -releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS) +releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS) { Task *task; @@ -333,8 +354,7 @@ releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS) * * ------------------------------------------------------------------------- */ void -waitForReturnCapability (Capability **pCap, - Task *task UNUSED_IF_NOT_THREADS) +waitForReturnCapability (Capability **pCap, Task *task) { #if !defined(THREADED_RTS) @@ -407,7 +427,7 @@ waitForReturnCapability (Capability **pCap, } - ASSERT_CAPABILITY_INVARIANTS(cap,task); + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); IF_DEBUG(scheduler, sched_belch("returning; got capability %d", cap->no)); @@ -426,15 +446,14 @@ yieldCapability (Capability** pCap, Task *task) { Capability *cap = *pCap; - // The fast path; no locking - if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) ) - return; + // The fast path has no locking, if we don't enter this while loop while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) { IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no)); // We must now release the capability and wait to be woken up // again. + task->wakeup = rtsFalse; releaseCapabilityAndQueueWorker(cap); for (;;) { @@ -448,6 +467,7 @@ yieldCapability (Capability** pCap, Task *task) IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no)); ACQUIRE_LOCK(&cap->lock); if (cap->running_task != NULL) { + IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no)); RELEASE_LOCK(&cap->lock); continue; } @@ -475,7 +495,7 @@ yieldCapability (Capability** pCap, Task *task) *pCap = cap; - ASSERT_CAPABILITY_INVARIANTS(cap,task); + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); return; } @@ -511,6 +531,7 @@ prodCapabilities(rtsBool all) } RELEASE_LOCK(&cap->lock); } + return; } void