X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=ghc%2Frts%2FCapability.c;h=8c40b639927b3af1202bdc36d0b130a9ddece7c3;hp=62f205d4d474244da57df51c1abe7484cbf9dbad;hb=2f861d149686a96d7783ce984afa7c263a39c355;hpb=bb01a96bea6bd7808332d43a5bed78d1aff4a3fd diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 62f205d..8c40b63 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -1,5 +1,6 @@ /* --------------------------------------------------------------------------- - * (c) The GHC Team, 2003 + * + * (c) The GHC Team, 2003-2006 * * Capabilities * @@ -7,164 +8,240 @@ * and all the state an OS thread/task needs to run Haskell code: * its STG registers, a pointer to its TSO, a nursery etc. During * STG execution, a pointer to the capabilitity is kept in a - * register (BaseReg). + * register (BaseReg; actually it is a pointer to cap->r). + * + * Only in an THREADED_RTS build will there be multiple capabilities, + * for non-threaded builds there is only one global capability, namely + * MainCapability. * - * Only in an SMP build will there be multiple capabilities, for - * the threaded RTS and other non-threaded builds, there is only - * one global capability, namely MainCapability. - * * --------------------------------------------------------------------------*/ #include "PosixSource.h" #include "Rts.h" #include "RtsUtils.h" #include "RtsFlags.h" +#include "STM.h" #include "OSThreads.h" #include "Capability.h" -#include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */ -#include "Signals.h" /* to get at handleSignalsInThisThread() */ +#include "Schedule.h" +#include "Sparks.h" -#if !defined(SMP) -Capability MainCapability; /* for non-SMP, we have one global capability */ -#endif +// one global capability, this is the Capability for non-threaded +// builds, and for +RTS -N1 +Capability MainCapability; -#if defined(RTS_SUPPORTS_THREADS) +nat n_capabilities; +Capability *capabilities = NULL; -nat rts_n_free_capabilities; - -/* returning_worker_cond: when a worker thread returns from executing an - * external call, it needs to wait for an RTS Capability before passing - * on the result of the call to the Haskell thread that made it. - * - * returning_worker_cond is signalled in Capability.releaseCapability(). - * - */ -Condition returning_worker_cond = INIT_COND_VAR; - -/* - * To avoid starvation of threads blocked on worker_thread_cond, - * the task(s) that enter the Scheduler will check to see whether - * there are one or more worker threads blocked waiting on - * returning_worker_cond. - */ -nat rts_n_waiting_workers = 0; - -/* thread_ready_cond: when signalled, a thread has become runnable for a - * task to execute. - * - * In the non-SMP case, it also implies that the thread that is woken up has - * exclusive access to the RTS and all its data structures (that are not - * locked by the Scheduler's mutex). - * - * thread_ready_cond is signalled whenever - * !noCapabilities && !EMPTY_RUN_QUEUE(). - */ -Condition thread_ready_cond = INIT_COND_VAR; - -/* - * To be able to make an informed decision about whether or not - * to create a new task when making an external call, keep track of - * the number of tasks currently blocked waiting on thread_ready_cond. - * (if > 0 => no need for a new task, just unblock an existing one). - * - * waitForWorkCapability() takes care of keeping it up-to-date; - * Task.startTask() uses its current value. - */ -nat rts_n_waiting_tasks = 0; +// Holds the Capability which last became free. This is used so that +// an in-call has a chance of quickly finding a free Capability. +// Maintaining a global free list of Capabilities would require global +// locking, so we don't do that. +Capability *last_free_capability; -static Condition *passTarget = NULL; -static rtsBool passingCapability = rtsFalse; +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +globalWorkToDo (void) +{ + return blackholes_need_checking + || interrupted + ; +} #endif -#ifdef SMP -#define UNUSED_IF_NOT_SMP -#else -#define UNUSED_IF_NOT_SMP STG_UNUSED +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +anyWorkForMe( Capability *cap, Task *task ) +{ + if (task->tso != NULL) { + // A bound task only runs if its thread is on the run queue of + // the capability on which it was woken up. Otherwise, we + // can't be sure that we have the right capability: the thread + // might be woken up on some other capability, and task->cap + // could change under our feet. + return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task; + } else { + // A vanilla worker task runs if either there is a lightweight + // thread at the head of the run queue, or the run queue is + // empty and (there are sparks to execute, or there is some + // other global condition to check, such as threads blocked on + // blackholes). + if (emptyRunQueue(cap)) { + return !emptySparkPoolCap(cap) || globalWorkToDo(); + } else + return cap->run_queue_hd->bound == NULL; + } +} #endif -#if defined(RTS_USER_SIGNALS) -#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || signals_pending()) -#else -#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted) +/* ----------------------------------------------------------------------------- + * Manage the returning_tasks lists. + * + * These functions require cap->lock + * -------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +STATIC_INLINE void +newReturningTask (Capability *cap, Task *task) +{ + ASSERT_LOCK_HELD(&cap->lock); + ASSERT(task->return_link == NULL); + if (cap->returning_tasks_hd) { + ASSERT(cap->returning_tasks_tl->return_link == NULL); + cap->returning_tasks_tl->return_link = task; + } else { + cap->returning_tasks_hd = task; + } + cap->returning_tasks_tl = task; +} + +STATIC_INLINE Task * +popReturningTask (Capability *cap) +{ + ASSERT_LOCK_HELD(&cap->lock); + Task *task; + task = cap->returning_tasks_hd; + ASSERT(task); + cap->returning_tasks_hd = task->return_link; + if (!cap->returning_tasks_hd) { + cap->returning_tasks_tl = NULL; + } + task->return_link = NULL; + return task; +} #endif /* ---------------------------------------------------------------------------- - Initialisation - ------------------------------------------------------------------------- */ + * Initialisation + * + * The Capability is initially marked not free. + * ------------------------------------------------------------------------- */ static void -initCapability( Capability *cap ) +initCapability( Capability *cap, nat i ) { + nat g; + + cap->no = i; + cap->in_haskell = rtsFalse; + + cap->run_queue_hd = END_TSO_QUEUE; + cap->run_queue_tl = END_TSO_QUEUE; + +#if defined(THREADED_RTS) + initMutex(&cap->lock); + cap->running_task = NULL; // indicates cap is free + cap->spare_workers = NULL; + cap->suspended_ccalling_tasks = NULL; + cap->returning_tasks_hd = NULL; + cap->returning_tasks_tl = NULL; +#endif + cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; cap->f.stgGCFun = (F_)__stg_gc_fun; -} -#if defined(SMP) -static void initCapabilities_(nat n); -#endif + cap->mut_lists = stgMallocBytes(sizeof(bdescr *) * + RtsFlags.GcFlags.generations, + "initCapability"); + + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + cap->mut_lists[g] = NULL; + } + + cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE; + cap->free_trec_chunks = END_STM_CHUNK_LIST; + cap->free_trec_headers = NO_TREC; + cap->transaction_tokens = 0; +} /* --------------------------------------------------------------------------- * Function: initCapabilities() * - * Purpose: set up the Capability handling. For the SMP build, + * Purpose: set up the Capability handling. For the THREADED_RTS build, * we keep a table of them, the size of which is - * controlled by the user via the RTS flag RtsFlags.ParFlags.nNodes + * controlled by the user via the RTS flag -N. * * ------------------------------------------------------------------------- */ void initCapabilities( void ) { -#if defined(SMP) - initCapabilities_(RtsFlags.ParFlags.nNodes); -#else - initCapability(&MainCapability); +#if defined(THREADED_RTS) + nat i; + +#ifndef REG_Base + // We can't support multiple CPUs if BaseReg is not a register + if (RtsFlags.ParFlags.nNodes > 1) { + errorBelch("warning: multiple CPUs not supported in this build, reverting to 1"); + RtsFlags.ParFlags.nNodes = 1; + } #endif -#if defined(RTS_SUPPORTS_THREADS) - initCondition(&returning_worker_cond); - initCondition(&thread_ready_cond); - rts_n_free_capabilities = 1; -#endif + n_capabilities = RtsFlags.ParFlags.nNodes; - return; -} + if (n_capabilities == 1) { + capabilities = &MainCapability; + // THREADED_RTS must work on builds that don't have a mutable + // BaseReg (eg. unregisterised), so in this case + // capabilities[0] must coincide with &MainCapability. + } else { + capabilities = stgMallocBytes(n_capabilities * sizeof(Capability), + "initCapabilities"); + } -#if defined(SMP) -/* Free capability list. */ -static Capability *free_capabilities; /* Available capabilities for running threads */ -static Capability *returning_capabilities; - /* Capabilities being passed to returning worker threads */ -#endif + for (i = 0; i < n_capabilities; i++) { + initCapability(&capabilities[i], i); + } -/* ---------------------------------------------------------------------------- - grabCapability( Capability** ) + IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", + n_capabilities)); - (only externally visible when !RTS_SUPPORTS_THREADS. In the - threaded RTS, clients must use waitFor*Capability()). - ------------------------------------------------------------------------- */ +#else /* !THREADED_RTS */ + + n_capabilities = 1; + capabilities = &MainCapability; + initCapability(&MainCapability, 0); -#if defined(RTS_SUPPORTS_THREADS) -static #endif -void -grabCapability( Capability** cap ) + + // There are no free capabilities to begin with. We will start + // a worker Task to each Capability, which will quickly put the + // Capability on the free list when it finds nothing to do. + last_free_capability = &capabilities[0]; +} + +/* ---------------------------------------------------------------------------- + * Give a Capability to a Task. The task must currently be sleeping + * on its condition variable. + * + * Requires cap->lock (modifies cap->running_task). + * + * When migrating a Task, the migrater must take task->lock before + * modifying task->cap, to synchronise with the waking up Task. + * Additionally, the migrater should own the Capability (when + * migrating the run queue), or cap->lock (when migrating + * returning_workers). + * + * ------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +STATIC_INLINE void +giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) { -#if !defined(SMP) -#if defined(RTS_SUPPORTS_THREADS) - ASSERT(rts_n_free_capabilities == 1); - rts_n_free_capabilities = 0; -#endif - *cap = &MainCapability; - handleSignalsInThisThread(); -#else - *cap = free_capabilities; - free_capabilities = (*cap)->link; - rts_n_free_capabilities--; -#endif -#if defined(RTS_SUPPORTS_THREADS) - IF_DEBUG(scheduler, sched_belch("worker: got capability")); -#endif + ASSERT_LOCK_HELD(&cap->lock); + ASSERT(task->cap == cap); + IF_DEBUG(scheduler, + sched_belch("passing capability %d to %s %p", + cap->no, task->tso ? "bound task" : "worker", + (void *)task->id)); + ACQUIRE_LOCK(&task->lock); + task->wakeup = rtsTrue; + // the wakeup flag is needed because signalCondition() doesn't + // flag the condition if the thread is already runniing, but we want + // it to be sticky. + signalCondition(&task->cond); + RELEASE_LOCK(&task->lock); } +#endif /* ---------------------------------------------------------------------------- * Function: releaseCapability(Capability*) @@ -174,280 +251,382 @@ grabCapability( Capability** cap ) * to wake up, in that order. * ------------------------------------------------------------------------- */ +#if defined(THREADED_RTS) void -releaseCapability( Capability* cap UNUSED_IF_NOT_SMP ) +releaseCapability_ (Capability* cap) { - // Precondition: sched_mutex is held. -#if defined(RTS_SUPPORTS_THREADS) -#ifndef SMP - ASSERT(rts_n_free_capabilities == 0); -#endif + Task *task; + + task = cap->running_task; + + ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task); + + cap->running_task = NULL; + // Check to see whether a worker thread can be given // the go-ahead to return the result of an external call.. - if (rts_n_waiting_workers > 0) { - // Decrement the counter here to avoid livelock where the - // thread that is yielding its capability will repeatedly - // signal returning_worker_cond. - -#if defined(SMP) - // SMP variant untested - cap->link = returning_capabilities; - returning_capabilities = cap; -#endif + if (cap->returning_tasks_hd != NULL) { + giveCapabilityToTask(cap,cap->returning_tasks_hd); + // The Task pops itself from the queue (see waitForReturnCapability()) + return; + } - rts_n_waiting_workers--; - signalCondition(&returning_worker_cond); - IF_DEBUG(scheduler, sched_belch("worker: released capability to returning worker")); - } else if (passingCapability) { - if (passTarget == NULL) { - signalCondition(&thread_ready_cond); - startSchedulerTaskIfNecessary(); - } else { - signalCondition(passTarget); + // If the next thread on the run queue is a bound thread, + // give this Capability to the appropriate Task. + if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) { + // Make sure we're not about to try to wake ourselves up + ASSERT(task != cap->run_queue_hd->bound); + task = cap->run_queue_hd->bound; + giveCapabilityToTask(cap,task); + return; + } + + if (!cap->spare_workers) { + // Create a worker thread if we don't have one. If the system + // is interrupted, we only create a worker task if there + // are threads that need to be completed. If the system is + // shutting down, we never create a new worker. + if (!shutting_down_scheduler) { + IF_DEBUG(scheduler, + sched_belch("starting new worker on capability %d", cap->no)); + startWorkerTask(cap, workerStart); + return; } - rts_n_free_capabilities = 1; - IF_DEBUG(scheduler, sched_belch("worker: released capability, passing it")); + } - } else { -#if defined(SMP) - cap->link = free_capabilities; - free_capabilities = cap; - rts_n_free_capabilities++; -#else - rts_n_free_capabilities = 1; -#endif - // Signal that a capability is available - if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) { - signalCondition(&thread_ready_cond); + // If we have an unbound thread on the run queue, or if there's + // anything else to do, give the Capability to a worker thread. + if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) { + if (cap->spare_workers) { + giveCapabilityToTask(cap,cap->spare_workers); + // The worker Task pops itself from the queue; + return; } - startSchedulerTaskIfNecessary(); - IF_DEBUG(scheduler, sched_belch("worker: released capability")); } -#endif - return; + + last_free_capability = cap; + IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no)); } -#if defined(RTS_SUPPORTS_THREADS) -/* - * When a native thread has completed the execution of an external - * call, it needs to communicate the result back. This is done - * as follows: - * - * - in resumeThread(), the thread calls waitForReturnCapability(). - * - If no capabilities are readily available, waitForReturnCapability() - * increments a counter rts_n_waiting_workers, and blocks - * waiting for the condition returning_worker_cond to become - * signalled. - * - upon entry to the Scheduler, a worker thread checks the - * value of rts_n_waiting_workers. If > 0, the worker thread - * will yield its capability to let a returning worker thread - * proceed with returning its result -- this is done via - * yieldToReturningWorker(). - * - the worker thread that yielded its capability then tries - * to re-grab a capability and re-enter the Scheduler. - */ +void +releaseCapability (Capability* cap USED_IF_THREADS) +{ + ACQUIRE_LOCK(&cap->lock); + releaseCapability_(cap); + RELEASE_LOCK(&cap->lock); +} + +static void +releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS) +{ + Task *task; + + ACQUIRE_LOCK(&cap->lock); + + task = cap->running_task; + + // If the current task is a worker, save it on the spare_workers + // list of this Capability. A worker can mark itself as stopped, + // in which case it is not replaced on the spare_worker queue. + // This happens when the system is shutting down (see + // Schedule.c:workerStart()). + // Also, be careful to check that this task hasn't just exited + // Haskell to do a foreign call (task->suspended_tso). + if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) { + task->next = cap->spare_workers; + cap->spare_workers = task; + } + // Bound tasks just float around attached to their TSOs. + + releaseCapability_(cap); + + RELEASE_LOCK(&cap->lock); +} +#endif /* ---------------------------------------------------------------------------- - * waitForReturnCapability( Mutext *pMutex, Capability** ) + * waitForReturnCapability( Task *task ) * * Purpose: when an OS thread returns from an external call, - * it calls grabReturnCapability() (via Schedule.resumeThread()) - * to wait for permissions to enter the RTS & communicate the + * it calls waitForReturnCapability() (via Schedule.resumeThread()) + * to wait for permission to enter the RTS & communicate the * result of the external call back to the Haskell thread that * made it. * * ------------------------------------------------------------------------- */ - void -waitForReturnCapability( Mutex* pMutex, Capability** pCap ) +waitForReturnCapability (Capability **pCap, Task *task) { - // Pre-condition: pMutex is held. - - IF_DEBUG(scheduler, - sched_belch("worker: returning; workers waiting: %d", - rts_n_waiting_workers)); - - if ( noCapabilities() || passingCapability ) { - rts_n_waiting_workers++; - context_switch = 1; // make sure it's our turn soon - waitCondition(&returning_worker_cond, pMutex); -#if defined(SMP) - *pCap = returning_capabilities; - returning_capabilities = (*pCap)->link; +#if !defined(THREADED_RTS) + + MainCapability.running_task = task; + task->cap = &MainCapability; + *pCap = &MainCapability; + #else - *pCap = &MainCapability; - ASSERT(rts_n_free_capabilities == 0); - handleSignalsInThisThread(); -#endif + Capability *cap = *pCap; + + if (cap == NULL) { + // Try last_free_capability first + cap = last_free_capability; + if (!cap->running_task) { + nat i; + // otherwise, search for a free capability + for (i = 0; i < n_capabilities; i++) { + cap = &capabilities[i]; + if (!cap->running_task) { + break; + } + } + // Can't find a free one, use last_free_capability. + cap = last_free_capability; + } + + // record the Capability as the one this Task is now assocated with. + task->cap = cap; + } else { - grabCapability(pCap); + ASSERT(task->cap == cap); } - // Post-condition: pMutex is held, pCap points to a capability - // which is now held by the current thread. - return; -} + ACQUIRE_LOCK(&cap->lock); + IF_DEBUG(scheduler, + sched_belch("returning; I want capability %d", cap->no)); + if (!cap->running_task) { + // It's free; just grab it + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + } else { + newReturningTask(cap,task); + RELEASE_LOCK(&cap->lock); + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + // now check whether we should wake up... + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task == NULL) { + if (cap->returning_tasks_hd != task) { + giveCapabilityToTask(cap,cap->returning_tasks_hd); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->running_task = task; + popReturningTask(cap); + RELEASE_LOCK(&cap->lock); + break; + } + RELEASE_LOCK(&cap->lock); + } + + } + + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + + IF_DEBUG(scheduler, + sched_belch("returning; got capability %d", cap->no)); + + *pCap = cap; +#endif +} + +#if defined(THREADED_RTS) /* ---------------------------------------------------------------------------- - * yieldCapability( Mutex* pMutex, Capability** pCap ) + * yieldCapability * ------------------------------------------------------------------------- */ void -yieldCapability( Capability** pCap ) +yieldCapability (Capability** pCap, Task *task) { - // Pre-condition: pMutex is assumed held, the current thread - // holds the capability pointed to by pCap. - - if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) { - IF_DEBUG(scheduler, - if (rts_n_waiting_workers > 0) { - sched_belch("worker: giving up capability (returning wkr)"); - } else if (passingCapability) { - sched_belch("worker: giving up capability (passing capability)"); - } else { - sched_belch("worker: giving up capability (no threads to run)"); - } - ); - releaseCapability(*pCap); - *pCap = NULL; + Capability *cap = *pCap; + + // The fast path has no locking, if we don't enter this while loop + + while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) { + IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no)); + + // We must now release the capability and wait to be woken up + // again. + task->wakeup = rtsFalse; + releaseCapabilityAndQueueWorker(cap); + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no)); + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task != NULL) { + IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no)); + RELEASE_LOCK(&cap->lock); + continue; + } + + if (task->tso == NULL) { + ASSERT(cap->spare_workers != NULL); + // if we're not at the front of the queue, release it + // again. This is unlikely to happen. + if (cap->spare_workers != task) { + giveCapabilityToTask(cap,cap->spare_workers); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->spare_workers = task->next; + task->next = NULL; + } + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + break; + } + + IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no)); + ASSERT(cap->running_task == task); } - // Post-condition: pMutex is assumed held, and either: - // - // 1. *pCap is NULL, in which case the current thread does not - // hold a capability now, or - // 2. *pCap is not NULL, in which case the current thread still - // holds the capability. - // + *pCap = cap; + + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + return; } - /* ---------------------------------------------------------------------------- - * waitForCapability( Mutex*, Capability**, Condition* ) + * prodCapabilities * - * Purpose: wait for a Capability to become available. In - * the process of doing so, updates the number - * of tasks currently blocked waiting for a capability/more - * work. That counter is used when deciding whether or - * not to create a new worker thread when an external - * call is made. - * If pThreadCond is not NULL, a capability can be specifically - * passed to this thread using passCapability. + * Used to indicate that the interrupted flag is now set, or some + * other global condition that might require waking up a Task on each + * Capability. * ------------------------------------------------------------------------- */ - -void -waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond ) + +static void +prodCapabilities(rtsBool all) { - // Pre-condition: pMutex is held. - - while ( noCapabilities() || - (passingCapability && passTarget != pThreadCond) || - !ANY_WORK_TO_DO()) { - IF_DEBUG(scheduler, - sched_belch("worker: wait for capability (cond: %p)", - pThreadCond)); - - if (pThreadCond != NULL) { - waitCondition(pThreadCond, pMutex); - IF_DEBUG(scheduler, sched_belch("worker: get passed capability")); - } else { - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, pMutex); - rts_n_waiting_tasks--; - IF_DEBUG(scheduler, sched_belch("worker: get normal capability")); + nat i; + Capability *cap; + Task *task; + + for (i=0; i < n_capabilities; i++) { + cap = &capabilities[i]; + ACQUIRE_LOCK(&cap->lock); + if (!cap->running_task) { + if (cap->spare_workers) { + task = cap->spare_workers; + ASSERT(!task->stopped); + giveCapabilityToTask(cap,task); + if (!all) { + RELEASE_LOCK(&cap->lock); + return; + } + } } + RELEASE_LOCK(&cap->lock); } - passingCapability = rtsFalse; - grabCapability(pCap); - - // Post-condition: pMutex is held and *pCap is held by the current thread return; } -/* ---------------------------------------------------------------------------- - passCapability, passCapabilityToWorker - ------------------------------------------------------------------------- */ - void -passCapability( Condition *pTargetThreadCond ) +prodAllCapabilities (void) { - // Pre-condition: pMutex is held and cap is held by the current thread + prodCapabilities(rtsTrue); +} - passTarget = pTargetThreadCond; - passingCapability = rtsTrue; - IF_DEBUG(scheduler, sched_belch("worker: passCapability")); +/* ---------------------------------------------------------------------------- + * prodOneCapability + * + * Like prodAllCapabilities, but we only require a single Task to wake + * up in order to service some global event, such as checking for + * deadlock after some idle time has passed. + * ------------------------------------------------------------------------- */ - // Post-condition: pMutex is held; cap is still held, but will be - // passed to the target thread when next released. +void +prodOneCapability (void) +{ + prodCapabilities(rtsFalse); } +/* ---------------------------------------------------------------------------- + * shutdownCapability + * + * At shutdown time, we want to let everything exit as cleanly as + * possible. For each capability, we let its run queue drain, and + * allow the workers to stop. + * + * This function should be called when interrupted and + * shutting_down_scheduler = rtsTrue, thus any worker that wakes up + * will exit the scheduler and call taskStop(), and any bound thread + * that wakes up will return to its caller. Runnable threads are + * killed. + * + * ------------------------------------------------------------------------- */ + void -passCapabilityToWorker( void ) +shutdownCapability (Capability *cap, Task *task) { - // Pre-condition: pMutex is held and cap is held by the current thread + nat i; - passTarget = NULL; - passingCapability = rtsTrue; - IF_DEBUG(scheduler, sched_belch("worker: passCapabilityToWorker")); + ASSERT(interrupted && shutting_down_scheduler); - // Post-condition: pMutex is held; cap is still held, but will be - // passed to a worker thread when next released. -} + task->cap = cap; -#endif /* RTS_SUPPORTS_THREADS */ + for (i = 0; i < 50; i++) { + IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i)); + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task) { + RELEASE_LOCK(&cap->lock); + IF_DEBUG(scheduler, sched_belch("not owner, yielding")); + yieldThread(); + continue; + } + cap->running_task = task; + if (!emptyRunQueue(cap) || cap->spare_workers) { + IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding")); + releaseCapability_(cap); // this will wake up a worker + RELEASE_LOCK(&cap->lock); + yieldThread(); + continue; + } + IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no)); + RELEASE_LOCK(&cap->lock); + break; + } + // we now have the Capability, its run queue and spare workers + // list are both empty. +} /* ---------------------------------------------------------------------------- - threadRunnable() - - Signals that a thread has been placed on the run queue, so a worker - might need to be woken up to run it. - - ToDo: should check whether the thread at the front of the queue is - bound, and if so wake up the appropriate worker. - -------------------------------------------------------------------------- */ + * tryGrabCapability + * + * Attempt to gain control of a Capability if it is free. + * + * ------------------------------------------------------------------------- */ -void -threadRunnable ( void ) +rtsBool +tryGrabCapability (Capability *cap, Task *task) { -#if defined(RTS_SUPPORTS_THREADS) - if ( !noCapabilities && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) { - signalCondition(&thread_ready_cond); + if (cap->running_task != NULL) return rtsFalse; + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task != NULL) { + RELEASE_LOCK(&cap->lock); + return rtsFalse; } - startSchedulerTaskIfNecessary(); -#endif + task->cap = cap; + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + return rtsTrue; } -/* ------------------------------------------------------------------------- */ -#if defined(SMP) -/* - * Function: initCapabilities_(nat) - * - * Purpose: upon startup, allocate and fill in table - * holding 'n' Capabilities. Only for SMP, since - * it is the only build that supports multiple - * capabilities within the RTS. - */ -static void -initCapabilities_(nat n) -{ - nat i; - Capability *cap, *prev; - cap = NULL; - prev = NULL; - for (i = 0; i < n; i++) { - cap = stgMallocBytes(sizeof(Capability), "initCapabilities"); - initCapability(cap); - cap->link = prev; - prev = cap; - } - free_capabilities = cap; - rts_n_free_capabilities = n; - returning_capabilities = NULL; - IF_DEBUG(scheduler, - sched_belch("allocated %d capabilities", n_free_capabilities)); -} -#endif /* SMP */ +#endif /* THREADED_RTS */ +