X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=ghc%2Frts%2FCapability.c;h=8c40b639927b3af1202bdc36d0b130a9ddece7c3;hp=a1c7ea3269fac363ed296bd8f6051a65da0f1479;hb=2f861d149686a96d7783ce984afa7c263a39c355;hpb=20593d1d1cf47050d9430895a1c2ada6c39dfb98 diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index a1c7ea3..8c40b63 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -1,6 +1,6 @@ /* --------------------------------------------------------------------------- * - * (c) The GHC Team, 2002 + * (c) The GHC Team, 2003-2006 * * Capabilities * @@ -8,441 +8,625 @@ * and all the state an OS thread/task needs to run Haskell code: * its STG registers, a pointer to its TSO, a nursery etc. During * STG execution, a pointer to the capabilitity is kept in a - * register (BaseReg). + * register (BaseReg; actually it is a pointer to cap->r). + * + * Only in an THREADED_RTS build will there be multiple capabilities, + * for non-threaded builds there is only one global capability, namely + * MainCapability. * - * Only in an SMP build will there be multiple capabilities, for - * the threaded RTS and other non-threaded builds, there is only - * one global capability, namely MainCapability. - * * --------------------------------------------------------------------------*/ + #include "PosixSource.h" #include "Rts.h" #include "RtsUtils.h" +#include "RtsFlags.h" +#include "STM.h" #include "OSThreads.h" #include "Capability.h" -#include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */ -#include "Signals.h" /* to get at handleSignalsInThisThread() */ +#include "Schedule.h" +#include "Sparks.h" + +// one global capability, this is the Capability for non-threaded +// builds, and for +RTS -N1 +Capability MainCapability; -#if !defined(SMP) -Capability MainCapability; /* for non-SMP, we have one global capability */ +nat n_capabilities; +Capability *capabilities = NULL; + +// Holds the Capability which last became free. This is used so that +// an in-call has a chance of quickly finding a free Capability. +// Maintaining a global free list of Capabilities would require global +// locking, so we don't do that. +Capability *last_free_capability; + +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +globalWorkToDo (void) +{ + return blackholes_need_checking + || interrupted + ; +} #endif -nat rts_n_free_capabilities; +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +anyWorkForMe( Capability *cap, Task *task ) +{ + if (task->tso != NULL) { + // A bound task only runs if its thread is on the run queue of + // the capability on which it was woken up. Otherwise, we + // can't be sure that we have the right capability: the thread + // might be woken up on some other capability, and task->cap + // could change under our feet. + return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task; + } else { + // A vanilla worker task runs if either there is a lightweight + // thread at the head of the run queue, or the run queue is + // empty and (there are sparks to execute, or there is some + // other global condition to check, such as threads blocked on + // blackholes). + if (emptyRunQueue(cap)) { + return !emptySparkPoolCap(cap) || globalWorkToDo(); + } else + return cap->run_queue_hd->bound == NULL; + } +} +#endif -#if defined(RTS_SUPPORTS_THREADS) -/* returning_worker_cond: when a worker thread returns from executing an - * external call, it needs to wait for an RTS Capability before passing - * on the result of the call to the Haskell thread that made it. - * - * returning_worker_cond is signalled in Capability.releaseCapability(). - * - */ -Condition returning_worker_cond = INIT_COND_VAR; - -/* - * To avoid starvation of threads blocked on worker_thread_cond, - * the task(s) that enter the Scheduler will check to see whether - * there are one or more worker threads blocked waiting on - * returning_worker_cond. - */ -nat rts_n_waiting_workers = 0; - -/* thread_ready_cond: when signalled, a thread has become runnable for a - * task to execute. - * - * In the non-SMP case, it also implies that the thread that is woken up has - * exclusive access to the RTS and all its data structures (that are not - * locked by the Scheduler's mutex). - * - * thread_ready_cond is signalled whenever noCapabilities doesn't hold. - * - */ -Condition thread_ready_cond = INIT_COND_VAR; - -/* - * To be able to make an informed decision about whether or not - * to create a new task when making an external call, keep track of - * the number of tasks currently blocked waiting on thread_ready_cond. - * (if > 0 => no need for a new task, just unblock an existing one). +/* ----------------------------------------------------------------------------- + * Manage the returning_tasks lists. * - * waitForWorkCapability() takes care of keeping it up-to-date; - * Task.startTask() uses its current value. - */ -nat rts_n_waiting_tasks = 0; + * These functions require cap->lock + * -------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +STATIC_INLINE void +newReturningTask (Capability *cap, Task *task) +{ + ASSERT_LOCK_HELD(&cap->lock); + ASSERT(task->return_link == NULL); + if (cap->returning_tasks_hd) { + ASSERT(cap->returning_tasks_tl->return_link == NULL); + cap->returning_tasks_tl->return_link = task; + } else { + cap->returning_tasks_hd = task; + } + cap->returning_tasks_tl = task; +} + +STATIC_INLINE Task * +popReturningTask (Capability *cap) +{ + ASSERT_LOCK_HELD(&cap->lock); + Task *task; + task = cap->returning_tasks_hd; + ASSERT(task); + cap->returning_tasks_hd = task->return_link; + if (!cap->returning_tasks_hd) { + cap->returning_tasks_tl = NULL; + } + task->return_link = NULL; + return task; +} #endif -/* ----------------------------------------------------------------------------- - Initialisation - -------------------------------------------------------------------------- */ -static -void -initCapability( Capability *cap ) +/* ---------------------------------------------------------------------------- + * Initialisation + * + * The Capability is initially marked not free. + * ------------------------------------------------------------------------- */ + +static void +initCapability( Capability *cap, nat i ) { + nat g; + + cap->no = i; + cap->in_haskell = rtsFalse; + + cap->run_queue_hd = END_TSO_QUEUE; + cap->run_queue_tl = END_TSO_QUEUE; + +#if defined(THREADED_RTS) + initMutex(&cap->lock); + cap->running_task = NULL; // indicates cap is free + cap->spare_workers = NULL; + cap->suspended_ccalling_tasks = NULL; + cap->returning_tasks_hd = NULL; + cap->returning_tasks_tl = NULL; +#endif + cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; cap->f.stgGCFun = (F_)__stg_gc_fun; -} -#if defined(SMP) -static void initCapabilities_(nat n); -#endif + cap->mut_lists = stgMallocBytes(sizeof(bdescr *) * + RtsFlags.GcFlags.generations, + "initCapability"); + + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + cap->mut_lists[g] = NULL; + } -/* + cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE; + cap->free_trec_chunks = END_STM_CHUNK_LIST; + cap->free_trec_headers = NO_TREC; + cap->transaction_tokens = 0; +} + +/* --------------------------------------------------------------------------- * Function: initCapabilities() * - * Purpose: set up the Capability handling. For the SMP build, + * Purpose: set up the Capability handling. For the THREADED_RTS build, * we keep a table of them, the size of which is - * controlled by the user via the RTS flag RtsFlags.ParFlags.nNodes + * controlled by the user via the RTS flag -N. * - * Pre-conditions: no locks assumed held. - */ + * ------------------------------------------------------------------------- */ void -initCapabilities() +initCapabilities( void ) { -#if defined(RTS_SUPPORTS_THREADS) - initCondition(&returning_worker_cond); - initCondition(&thread_ready_cond); -#endif +#if defined(THREADED_RTS) + nat i; -#if defined(SMP) - initCapabilities_(RtsFlags.ParFlags.nNodes); -#else - initCapability(&MainCapability); - rts_n_free_capabilities = 1; +#ifndef REG_Base + // We can't support multiple CPUs if BaseReg is not a register + if (RtsFlags.ParFlags.nNodes > 1) { + errorBelch("warning: multiple CPUs not supported in this build, reverting to 1"); + RtsFlags.ParFlags.nNodes = 1; + } #endif - return; -} + n_capabilities = RtsFlags.ParFlags.nNodes; + + if (n_capabilities == 1) { + capabilities = &MainCapability; + // THREADED_RTS must work on builds that don't have a mutable + // BaseReg (eg. unregisterised), so in this case + // capabilities[0] must coincide with &MainCapability. + } else { + capabilities = stgMallocBytes(n_capabilities * sizeof(Capability), + "initCapabilities"); + } + + for (i = 0; i < n_capabilities; i++) { + initCapability(&capabilities[i], i); + } + + IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", + n_capabilities)); + +#else /* !THREADED_RTS */ + + n_capabilities = 1; + capabilities = &MainCapability; + initCapability(&MainCapability, 0); -#if defined(SMP) -/* Free capability list. */ -static Capability *free_capabilities; /* Available capabilities for running threads */ -static Capability *returning_capabilities; - /* Capabilities being passed to returning worker threads */ #endif -/* ----------------------------------------------------------------------------- - Acquiring capabilities - -------------------------------------------------------------------------- */ - -/* - * Function: grabCapability(Capability**) - * - * Purpose: the act of grabbing a capability is easy; just - * remove one from the free capabilities list (which - * may just have one entry). In threaded builds, worker - * threads are prevented from doing so willy-nilly - * via the condition variables thread_ready_cond and - * returning_worker_cond. + // There are no free capabilities to begin with. We will start + // a worker Task to each Capability, which will quickly put the + // Capability on the free list when it finds nothing to do. + last_free_capability = &capabilities[0]; +} + +/* ---------------------------------------------------------------------------- + * Give a Capability to a Task. The task must currently be sleeping + * on its condition variable. + * + * Requires cap->lock (modifies cap->running_task). + * + * When migrating a Task, the migrater must take task->lock before + * modifying task->cap, to synchronise with the waking up Task. + * Additionally, the migrater should own the Capability (when + * migrating the run queue), or cap->lock (when migrating + * returning_workers). * - */ -void grabCapability(Capability** cap) + * ------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +STATIC_INLINE void +giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) { -#ifdef RTS_SUPPORTS_THREADS - ASSERT(rts_n_free_capabilities > 0); -#endif -#if !defined(SMP) - rts_n_free_capabilities = 0; - *cap = &MainCapability; - handleSignalsInThisThread(); -#else - *cap = free_capabilities; - free_capabilities = (*cap)->link; - rts_n_free_capabilities--; -#endif -#ifdef RTS_SUPPORTS_THREADS - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): got capability\n", - osThreadId())); -#endif + ASSERT_LOCK_HELD(&cap->lock); + ASSERT(task->cap == cap); + IF_DEBUG(scheduler, + sched_belch("passing capability %d to %s %p", + cap->no, task->tso ? "bound task" : "worker", + (void *)task->id)); + ACQUIRE_LOCK(&task->lock); + task->wakeup = rtsTrue; + // the wakeup flag is needed because signalCondition() doesn't + // flag the condition if the thread is already runniing, but we want + // it to be sticky. + signalCondition(&task->cond); + RELEASE_LOCK(&task->lock); } +#endif -/* +/* ---------------------------------------------------------------------------- * Function: releaseCapability(Capability*) * * Purpose: Letting go of a capability. Causes a * 'returning worker' thread or a 'waiting worker' * to wake up, in that order. - * - */ -void releaseCapability(Capability* cap -#if !defined(SMP) - STG_UNUSED -#endif -) -{ // Precondition: sched_mutex must be held -#if defined(RTS_SUPPORTS_THREADS) -#ifndef SMP - ASSERT(rts_n_free_capabilities == 0); -#endif - /* Check to see whether a worker thread can be given - the go-ahead to return the result of an external call..*/ - if (rts_n_waiting_workers > 0) { - /* Decrement the counter here to avoid livelock where the - * thread that is yielding its capability will repeatedly - * signal returning_worker_cond. - */ -#if defined(SMP) - // SMP variant untested - cap->link = returning_capabilities; - returning_capabilities = cap; -#else -#endif - rts_n_waiting_workers--; - signalCondition(&returning_worker_cond); - } else /*if ( !EMPTY_RUN_QUEUE() )*/ { -#if defined(SMP) - cap->link = free_capabilities; - free_capabilities = cap; - rts_n_free_capabilities++; -#else - rts_n_free_capabilities = 1; -#endif - /* Signal that a capability is available */ - signalCondition(&thread_ready_cond); - startSchedulerTaskIfNecessary(); // if there is more work to be done, - // we'll need a new thread - } -#endif -#ifdef RTS_SUPPORTS_THREADS - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): released capability\n", - osThreadId())); -#endif - return; + * ------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +void +releaseCapability_ (Capability* cap) +{ + Task *task; + + task = cap->running_task; + + ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task); + + cap->running_task = NULL; + + // Check to see whether a worker thread can be given + // the go-ahead to return the result of an external call.. + if (cap->returning_tasks_hd != NULL) { + giveCapabilityToTask(cap,cap->returning_tasks_hd); + // The Task pops itself from the queue (see waitForReturnCapability()) + return; + } + + // If the next thread on the run queue is a bound thread, + // give this Capability to the appropriate Task. + if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) { + // Make sure we're not about to try to wake ourselves up + ASSERT(task != cap->run_queue_hd->bound); + task = cap->run_queue_hd->bound; + giveCapabilityToTask(cap,task); + return; + } + + if (!cap->spare_workers) { + // Create a worker thread if we don't have one. If the system + // is interrupted, we only create a worker task if there + // are threads that need to be completed. If the system is + // shutting down, we never create a new worker. + if (!shutting_down_scheduler) { + IF_DEBUG(scheduler, + sched_belch("starting new worker on capability %d", cap->no)); + startWorkerTask(cap, workerStart); + return; + } + } + + // If we have an unbound thread on the run queue, or if there's + // anything else to do, give the Capability to a worker thread. + if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) { + if (cap->spare_workers) { + giveCapabilityToTask(cap,cap->spare_workers); + // The worker Task pops itself from the queue; + return; + } + } + + last_free_capability = cap; + IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no)); } -#if defined(RTS_SUPPORTS_THREADS) -/* - * When a native thread has completed the execution of an external - * call, it needs to communicate the result back. This is done - * as follows: - * - * - in resumeThread(), the thread calls grabReturnCapability(). - * - If no capabilities are readily available, grabReturnCapability() - * increments a counter rts_n_waiting_workers, and blocks - * waiting for the condition returning_worker_cond to become - * signalled. - * - upon entry to the Scheduler, a worker thread checks the - * value of rts_n_waiting_workers. If > 0, the worker thread - * will yield its capability to let a returning worker thread - * proceed with returning its result -- this is done via - * yieldToReturningWorker(). - * - the worker thread that yielded its capability then tries - * to re-grab a capability and re-enter the Scheduler. - */ - -/* - * Function: grabReturnCapability(Capability**) +void +releaseCapability (Capability* cap USED_IF_THREADS) +{ + ACQUIRE_LOCK(&cap->lock); + releaseCapability_(cap); + RELEASE_LOCK(&cap->lock); +} + +static void +releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS) +{ + Task *task; + + ACQUIRE_LOCK(&cap->lock); + + task = cap->running_task; + + // If the current task is a worker, save it on the spare_workers + // list of this Capability. A worker can mark itself as stopped, + // in which case it is not replaced on the spare_worker queue. + // This happens when the system is shutting down (see + // Schedule.c:workerStart()). + // Also, be careful to check that this task hasn't just exited + // Haskell to do a foreign call (task->suspended_tso). + if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) { + task->next = cap->spare_workers; + cap->spare_workers = task; + } + // Bound tasks just float around attached to their TSOs. + + releaseCapability_(cap); + + RELEASE_LOCK(&cap->lock); +} +#endif + +/* ---------------------------------------------------------------------------- + * waitForReturnCapability( Task *task ) * * Purpose: when an OS thread returns from an external call, - * it calls grabReturnCapability() (via Schedule.resumeThread()) - * to wait for permissions to enter the RTS & communicate the + * it calls waitForReturnCapability() (via Schedule.resumeThread()) + * to wait for permission to enter the RTS & communicate the * result of the external call back to the Haskell thread that * made it. * - * Pre-condition: pMutex is held. - * Post-condition: pMutex is still held and a capability has - * been assigned to the worker thread. - */ + * ------------------------------------------------------------------------- */ void -grabReturnCapability(Mutex* pMutex, Capability** pCap) +waitForReturnCapability (Capability **pCap, Task *task) { - IF_DEBUG(scheduler, - fprintf(stderr,"worker (%p): returning, waiting for lock.\n", osThreadId())); - IF_DEBUG(scheduler, - fprintf(stderr,"worker (%p): returning; workers waiting: %d\n", - osThreadId(), rts_n_waiting_workers)); - if ( noCapabilities() ) { - rts_n_waiting_workers++; - wakeBlockedWorkerThread(); - context_switch = 1; // make sure it's our turn soon - waitCondition(&returning_worker_cond, pMutex); -#if defined(SMP) - *pCap = returning_capabilities; - returning_capabilities = (*pCap)->link; -#else +#if !defined(THREADED_RTS) + + MainCapability.running_task = task; + task->cap = &MainCapability; *pCap = &MainCapability; - ASSERT(rts_n_free_capabilities == 0); - handleSignalsInThisThread(); -#endif - } else { - grabCapability(pCap); - } - return; -} +#else + Capability *cap = *pCap; -/* ----------------------------------------------------------------------------- - Yielding/waiting for capabilities - -------------------------------------------------------------------------- */ + if (cap == NULL) { + // Try last_free_capability first + cap = last_free_capability; + if (!cap->running_task) { + nat i; + // otherwise, search for a free capability + for (i = 0; i < n_capabilities; i++) { + cap = &capabilities[i]; + if (!cap->running_task) { + break; + } + } + // Can't find a free one, use last_free_capability. + cap = last_free_capability; + } + + // record the Capability as the one this Task is now assocated with. + task->cap = cap; + + } else { + ASSERT(task->cap == cap); + } + + ACQUIRE_LOCK(&cap->lock); -/* - * Function: yieldToReturningWorker(Mutex*,Capability*,Condition*) - * - * Purpose: when, upon entry to the Scheduler, an OS worker thread - * spots that one or more threads are blocked waiting for - * permission to return back their result, it gives up - * its Capability. - * Immediately afterwards, it tries to reaquire the Capabilty - * using waitForWorkCapability. - * - * - * Pre-condition: pMutex is assumed held and the thread possesses - * a Capability. - * Post-condition: pMutex is held and the thread possesses - * a Capability. - */ -void -yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) -{ - if ( rts_n_waiting_workers > 0 ) { IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId())); - releaseCapability(*pCap); - /* And wait for work */ - waitForWorkCapability(pMutex, pCap, pThreadCond); + sched_belch("returning; I want capability %d", cap->no)); + + if (!cap->running_task) { + // It's free; just grab it + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + } else { + newReturningTask(cap,task); + RELEASE_LOCK(&cap->lock); + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + // now check whether we should wake up... + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task == NULL) { + if (cap->returning_tasks_hd != task) { + giveCapabilityToTask(cap,cap->returning_tasks_hd); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->running_task = task; + popReturningTask(cap); + RELEASE_LOCK(&cap->lock); + break; + } + RELEASE_LOCK(&cap->lock); + } + + } + + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n", - osThreadId())); - } - return; + sched_belch("returning; got capability %d", cap->no)); + + *pCap = cap; +#endif } +#if defined(THREADED_RTS) +/* ---------------------------------------------------------------------------- + * yieldCapability + * ------------------------------------------------------------------------- */ -/* - * Function: waitForWorkCapability(Mutex*, Capability**, Condition*) - * - * Purpose: wait for a Capability to become available. In - * the process of doing so, updates the number - * of tasks currently blocked waiting for a capability/more - * work. That counter is used when deciding whether or - * not to create a new worker thread when an external - * call is made. - * If pThreadCond is not NULL, a capability can be specifically - * passed to this thread using passCapability. - * - * Pre-condition: pMutex is held. - * Post-condition: pMutex is held and *pCap is held by the current thread - */ - -static Condition *passTarget = NULL; -static rtsBool passingCapability = rtsFalse; - -void -waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) +void +yieldCapability (Capability** pCap, Task *task) { -#ifdef SMP - #error SMP version not implemented -#endif - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n", - osThreadId(),pThreadCond)); - while ( noCapabilities() || (passingCapability && passTarget != pThreadCond)) { - if(pThreadCond) - { - waitCondition(pThreadCond, pMutex); - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): get passed capability\n", - osThreadId())); - } - else - { - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, pMutex); - rts_n_waiting_tasks--; - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): get normal capability\n", - osThreadId())); + Capability *cap = *pCap; + + // The fast path has no locking, if we don't enter this while loop + + while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) { + IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no)); + + // We must now release the capability and wait to be woken up + // again. + task->wakeup = rtsFalse; + releaseCapabilityAndQueueWorker(cap); + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no)); + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task != NULL) { + IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no)); + RELEASE_LOCK(&cap->lock); + continue; + } + + if (task->tso == NULL) { + ASSERT(cap->spare_workers != NULL); + // if we're not at the front of the queue, release it + // again. This is unlikely to happen. + if (cap->spare_workers != task) { + giveCapabilityToTask(cap,cap->spare_workers); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->spare_workers = task->next; + task->next = NULL; + } + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + break; + } + + IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no)); + ASSERT(cap->running_task == task); } - } - passingCapability = rtsFalse; - grabCapability(pCap); - return; + + *pCap = cap; + + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + + return; } -/* - * Function: passCapability(Mutex*, Capability*, Condition*) +/* ---------------------------------------------------------------------------- + * prodCapabilities * - * Purpose: Let go of the capability and make sure the thread associated - * with the Condition pTargetThreadCond gets it next. + * Used to indicate that the interrupted flag is now set, or some + * other global condition that might require waking up a Task on each + * Capability. + * ------------------------------------------------------------------------- */ + +static void +prodCapabilities(rtsBool all) +{ + nat i; + Capability *cap; + Task *task; + + for (i=0; i < n_capabilities; i++) { + cap = &capabilities[i]; + ACQUIRE_LOCK(&cap->lock); + if (!cap->running_task) { + if (cap->spare_workers) { + task = cap->spare_workers; + ASSERT(!task->stopped); + giveCapabilityToTask(cap,task); + if (!all) { + RELEASE_LOCK(&cap->lock); + return; + } + } + } + RELEASE_LOCK(&cap->lock); + } + return; +} + +void +prodAllCapabilities (void) +{ + prodCapabilities(rtsTrue); +} + +/* ---------------------------------------------------------------------------- + * prodOneCapability * - * Pre-condition: pMutex is held and cap is held by the current thread - * Post-condition: pMutex is held; cap will be grabbed by the "target" - * thread when pMutex is released. - */ + * Like prodAllCapabilities, but we only require a single Task to wake + * up in order to service some global event, such as checking for + * deadlock after some idle time has passed. + * ------------------------------------------------------------------------- */ void -passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond) +prodOneCapability (void) { -#ifdef SMP - #error SMP version not implemented -#endif - rts_n_free_capabilities = 1; - signalCondition(pTargetThreadCond); - passTarget = pTargetThreadCond; - passingCapability = rtsTrue; - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): passCapability\n", - osThreadId())); + prodCapabilities(rtsFalse); } -/* - * Function: passCapabilityToWorker(Mutex*, Capability*) +/* ---------------------------------------------------------------------------- + * shutdownCapability + * + * At shutdown time, we want to let everything exit as cleanly as + * possible. For each capability, we let its run queue drain, and + * allow the workers to stop. * - * Purpose: Let go of the capability and make sure that a - * "plain" worker thread (not a bound thread) gets it next. + * This function should be called when interrupted and + * shutting_down_scheduler = rtsTrue, thus any worker that wakes up + * will exit the scheduler and call taskStop(), and any bound thread + * that wakes up will return to its caller. Runnable threads are + * killed. * - * Pre-condition: pMutex is held and cap is held by the current thread - * Post-condition: pMutex is held; cap will be grabbed by the "target" - * thread when pMutex is released. - */ + * ------------------------------------------------------------------------- */ void -passCapabilityToWorker(Mutex* pMutex, Capability* cap) +shutdownCapability (Capability *cap, Task *task) { -#ifdef SMP - #error SMP version not implemented -#endif - rts_n_free_capabilities = 1; - signalCondition(&thread_ready_cond); - startSchedulerTaskIfNecessary(); - passTarget = NULL; - passingCapability = rtsTrue; - IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%p): passCapabilityToWorker\n", - osThreadId())); -} + nat i; + ASSERT(interrupted && shutting_down_scheduler); + task->cap = cap; -#endif /* RTS_SUPPORTS_THREADS */ + for (i = 0; i < 50; i++) { + IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i)); + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task) { + RELEASE_LOCK(&cap->lock); + IF_DEBUG(scheduler, sched_belch("not owner, yielding")); + yieldThread(); + continue; + } + cap->running_task = task; + if (!emptyRunQueue(cap) || cap->spare_workers) { + IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding")); + releaseCapability_(cap); // this will wake up a worker + RELEASE_LOCK(&cap->lock); + yieldThread(); + continue; + } + IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no)); + RELEASE_LOCK(&cap->lock); + break; + } + // we now have the Capability, its run queue and spare workers + // list are both empty. +} -#if defined(SMP) -/* - * Function: initCapabilities_(nat) +/* ---------------------------------------------------------------------------- + * tryGrabCapability * - * Purpose: upon startup, allocate and fill in table - * holding 'n' Capabilities. Only for SMP, since - * it is the only build that supports multiple - * capabilities within the RTS. - */ -static void -initCapabilities_(nat n) + * Attempt to gain control of a Capability if it is free. + * + * ------------------------------------------------------------------------- */ + +rtsBool +tryGrabCapability (Capability *cap, Task *task) { - nat i; - Capability *cap, *prev; - cap = NULL; - prev = NULL; - for (i = 0; i < n; i++) { - cap = stgMallocBytes(sizeof(Capability), "initCapabilities"); - initCapability(cap); - cap->link = prev; - prev = cap; - } - free_capabilities = cap; - rts_n_free_capabilities = n; - returning_capabilities = NULL; - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n", n_free_capabilities);); + if (cap->running_task != NULL) return rtsFalse; + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task != NULL) { + RELEASE_LOCK(&cap->lock); + return rtsFalse; + } + task->cap = cap; + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + return rtsTrue; } -#endif /* SMP */ + + +#endif /* THREADED_RTS */ +