X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FCapability.c;h=2288ca63c162e0a117a7e5a60b90706442a99b1f;hb=03a9ff01812afc81eb5236fd3063cbec44cf469e;hp=9e28a16e49ccfff6a310095446b6872d695cc502;hpb=63e8af080a7e779a48e812e6caa9ea519b046260;p=ghc-hetmet.git diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 9e28a16..2288ca6 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -1,5 +1,6 @@ /* --------------------------------------------------------------------------- - * (c) The GHC Team, 2003 + * + * (c) The GHC Team, 2003-2005 * * Capabilities * @@ -7,7 +8,7 @@ * and all the state an OS thread/task needs to run Haskell code: * its STG registers, a pointer to its TSO, a nursery etc. During * STG execution, a pointer to the capabilitity is kept in a - * register (BaseReg). + * register (BaseReg; actually it is a pointer to cap->r). * * Only in an SMP build will there be multiple capabilities, for * the threaded RTS and other non-threaded builds, there is only @@ -21,72 +22,20 @@ #include "RtsFlags.h" #include "OSThreads.h" #include "Capability.h" -#include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */ -#if defined(SMP) -#include "Hash.h" -#endif +#include "Schedule.h" #if !defined(SMP) -Capability MainCapability; /* for non-SMP, we have one global capability */ +Capability MainCapability; // for non-SMP, we have one global capability #endif +nat n_capabilities; Capability *capabilities = NULL; -nat rts_n_free_capabilities; - -#if defined(RTS_SUPPORTS_THREADS) - -/* returning_worker_cond: when a worker thread returns from executing an - * external call, it needs to wait for an RTS Capability before passing - * on the result of the call to the Haskell thread that made it. - * - * returning_worker_cond is signalled in Capability.releaseCapability(). - * - */ -Condition returning_worker_cond = INIT_COND_VAR; - -/* - * To avoid starvation of threads blocked on worker_thread_cond, - * the task(s) that enter the Scheduler will check to see whether - * there are one or more worker threads blocked waiting on - * returning_worker_cond. - */ -nat rts_n_waiting_workers = 0; - -/* thread_ready_cond: when signalled, a thread has become runnable for a - * task to execute. - * - * In the non-SMP case, it also implies that the thread that is woken up has - * exclusive access to the RTS and all its data structures (that are not - * locked by the Scheduler's mutex). - * - * thread_ready_cond is signalled whenever - * !noCapabilities && !EMPTY_RUN_QUEUE(). - */ -Condition thread_ready_cond = INIT_COND_VAR; - -/* - * To be able to make an informed decision about whether or not - * to create a new task when making an external call, keep track of - * the number of tasks currently blocked waiting on thread_ready_cond. - * (if > 0 => no need for a new task, just unblock an existing one). - * - * waitForWorkCapability() takes care of keeping it up-to-date; - * Task.startTask() uses its current value. - */ -nat rts_n_waiting_tasks = 0; -#endif -#if defined(SMP) -/* - * Free capability list. - */ -Capability *free_capabilities; - -/* - * Maps OSThreadId to Capability * - */ -HashTable *capability_hash; -#endif +// Holds the Capability which last became free. This is used so that +// an in-call has a chance of quickly finding a free Capability. +// Maintaining a global free list of Capabilities would require global +// locking, so we don't do that. +Capability *last_free_capability; #ifdef SMP #define UNUSED_IF_NOT_SMP @@ -94,22 +43,16 @@ HashTable *capability_hash; #define UNUSED_IF_NOT_SMP STG_UNUSED #endif +#ifdef RTS_USER_SIGNALS +#define UNUSED_IF_NOT_THREADS +#else +#define UNUSED_IF_NOT_THREADS STG_UNUSED +#endif -#if defined(RTS_SUPPORTS_THREADS) -INLINE_HEADER rtsBool -ANY_WORK_FOR_ME( Condition *cond ) -{ - // If the run queue is not empty, then we only wake up the guy who - // can run the thread at the head, even if there is some other - // reason for this task to run (eg. interrupted=rtsTrue). - if (!EMPTY_RUN_QUEUE()) { - if (run_queue_hd->main == NULL) { - return (cond == NULL); - } else { - return (&run_queue_hd->main->bound_thread_cond == cond); - } - } +STATIC_INLINE rtsBool +globalWorkToDo (void) +{ return blackholes_need_checking || interrupted #if defined(RTS_USER_SIGNALS) @@ -117,28 +60,88 @@ ANY_WORK_FOR_ME( Condition *cond ) #endif ; } -#endif -INLINE_HEADER rtsBool -ANY_WORK_TO_DO(void) +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +anyWorkForMe( Capability *cap, Task *task ) { - return (!EMPTY_RUN_QUEUE() - || interrupted - || blackholes_need_checking -#if defined(RTS_USER_SIGNALS) - || signals_pending() + // If the run queue is not empty, then we only wake up the guy who + // can run the thread at the head, even if there is some other + // reason for this task to run (eg. interrupted=rtsTrue). + if (!emptyRunQueue(cap)) { + if (cap->run_queue_hd->bound == NULL) { + return (task->tso == NULL); + } else { + return (cap->run_queue_hd->bound == task); + } + } + return globalWorkToDo(); +} #endif - ); + +/* ----------------------------------------------------------------------------- + * Manage the returning_tasks lists. + * + * These functions require cap->lock + * -------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +STATIC_INLINE void +newReturningTask (Capability *cap, Task *task) +{ + ASSERT_LOCK_HELD(&cap->lock); + ASSERT(task->return_link == NULL); + if (cap->returning_tasks_hd) { + ASSERT(cap->returning_tasks_tl->return_link == NULL); + cap->returning_tasks_tl->return_link = task; + } else { + cap->returning_tasks_hd = task; + } + cap->returning_tasks_tl = task; } +STATIC_INLINE Task * +popReturningTask (Capability *cap) +{ + ASSERT_LOCK_HELD(&cap->lock); + Task *task; + task = cap->returning_tasks_hd; + ASSERT(task); + cap->returning_tasks_hd = task->return_link; + if (!cap->returning_tasks_hd) { + cap->returning_tasks_tl = NULL; + } + task->return_link = NULL; + return task; +} +#endif + /* ---------------------------------------------------------------------------- - Initialisation - ------------------------------------------------------------------------- */ + * Initialisation + * + * The Capability is initially marked not free. + * ------------------------------------------------------------------------- */ static void -initCapability( Capability *cap ) +initCapability( Capability *cap, nat i ) { - cap->r.rInHaskell = rtsFalse; + cap->no = i; + cap->in_haskell = rtsFalse; + + cap->run_queue_hd = END_TSO_QUEUE; + cap->run_queue_tl = END_TSO_QUEUE; + +#if defined(THREADED_RTS) + initMutex(&cap->lock); + cap->running_task = NULL; // indicates cap is free + cap->spare_workers = NULL; + cap->suspended_ccalling_tasks = NULL; + cap->returning_tasks_hd = NULL; + cap->returning_tasks_tl = NULL; + cap->next = NULL; + cap->prev = NULL; +#endif + cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; cap->f.stgGCFun = (F_)__stg_gc_fun; } @@ -148,7 +151,7 @@ initCapability( Capability *cap ) * * Purpose: set up the Capability handling. For the SMP build, * we keep a table of them, the size of which is - * controlled by the user via the RTS flag RtsFlags.ParFlags.nNodes + * controlled by the user via the RTS flag -N. * * ------------------------------------------------------------------------- */ void @@ -157,80 +160,60 @@ initCapabilities( void ) #if defined(SMP) nat i,n; - n = RtsFlags.ParFlags.nNodes; + n_capabilities = n = RtsFlags.ParFlags.nNodes; capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities"); for (i = 0; i < n; i++) { - initCapability(&capabilities[i]); - capabilities[i].link = &capabilities[i+1]; + initCapability(&capabilities[i], i); } - capabilities[n-1].link = NULL; - free_capabilities = &capabilities[0]; - rts_n_free_capabilities = n; - - capability_hash = allocHashTable(); - IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n)); #else + n_capabilities = 1; capabilities = &MainCapability; - initCapability(&MainCapability); - rts_n_free_capabilities = 1; + initCapability(&MainCapability, 0); #endif -#if defined(RTS_SUPPORTS_THREADS) - initCondition(&returning_worker_cond); - initCondition(&thread_ready_cond); -#endif -} - -/* ---------------------------------------------------------------------------- - grabCapability( Capability** ) - - (only externally visible when !RTS_SUPPORTS_THREADS. In the - threaded RTS, clients must use waitFor*Capability()). - ------------------------------------------------------------------------- */ - -#if defined(RTS_SUPPORTS_THREADS) -static -#endif -void -grabCapability( Capability** cap ) -{ -#if defined(SMP) - ASSERT(rts_n_free_capabilities > 0); - *cap = free_capabilities; - free_capabilities = (*cap)->link; - rts_n_free_capabilities--; - insertHashTable(capability_hash, osThreadId(), *cap); -#else -# if defined(RTS_SUPPORTS_THREADS) - ASSERT(rts_n_free_capabilities == 1); - rts_n_free_capabilities = 0; -# endif - *cap = &MainCapability; -#endif -#if defined(RTS_SUPPORTS_THREADS) - IF_DEBUG(scheduler, sched_belch("worker: got capability")); -#endif + // There are no free capabilities to begin with. We will start + // a worker Task to each Capability, which will quickly put the + // Capability on the free list when it finds nothing to do. + last_free_capability = &capabilities[0]; } /* ---------------------------------------------------------------------------- - * Function: myCapability(void) + * Give a Capability to a Task. The task must currently be sleeping + * on its condition variable. + * + * Requires cap->lock (modifies cap->running_task). + * + * When migrating a Task, the migrater must take task->lock before + * modifying task->cap, to synchronise with the waking up Task. + * Additionally, the migrater should own the Capability (when + * migrating the run queue), or cap->lock (when migrating + * returning_workers). * - * Purpose: Return the capability owned by the current thread. - * Should not be used if the current thread does not - * hold a Capability. * ------------------------------------------------------------------------- */ -Capability * -myCapability (void) + +#if defined(THREADED_RTS) +STATIC_INLINE void +giveCapabilityToTask (Capability *cap, Task *task) { -#if defined(SMP) - return lookupHashTable(capability_hash, osThreadId()); -#else - return &MainCapability; -#endif + ASSERT_LOCK_HELD(&cap->lock); + ASSERT(task->cap == cap); + // We are not modifying task->cap, so we do not need to take task->lock. + IF_DEBUG(scheduler, + sched_belch("passing capability %d to %s %p", + cap->no, task->tso ? "bound task" : "worker", + (void *)task->id)); + ACQUIRE_LOCK(&task->lock); + task->wakeup = rtsTrue; + // the wakeup flag is needed because signalCondition() doesn't + // flag the condition if the thread is already runniing, but we want + // it to be sticky. + signalCondition(&task->cond); + RELEASE_LOCK(&task->lock); } +#endif /* ---------------------------------------------------------------------------- * Function: releaseCapability(Capability*) @@ -240,215 +223,355 @@ myCapability (void) * to wake up, in that order. * ------------------------------------------------------------------------- */ +#if defined(THREADED_RTS) void -releaseCapability( Capability* cap UNUSED_IF_NOT_SMP ) +releaseCapability_ (Capability* cap) { - // Precondition: sched_mutex is held. -#if defined(RTS_SUPPORTS_THREADS) -#if !defined(SMP) - ASSERT(rts_n_free_capabilities == 0); -#endif -#if defined(SMP) - cap->link = free_capabilities; - free_capabilities = cap; - ASSERT(myCapability() == cap); - removeHashTable(capability_hash, osThreadId(), NULL); -#endif + Task *task; + + ASSERT(cap->running_task != NULL && myTask() == cap->running_task); + + task = cap->running_task; + cap->running_task = NULL; + + ASSERT(task->id == osThreadId()); + // Check to see whether a worker thread can be given // the go-ahead to return the result of an external call.. - if (rts_n_waiting_workers > 0) { - // Decrement the counter here to avoid livelock where the - // thread that is yielding its capability will repeatedly - // signal returning_worker_cond. - rts_n_waiting_workers--; - signalCondition(&returning_worker_cond); - IF_DEBUG(scheduler, - sched_belch("worker: released capability to returning worker")); - } else { - rts_n_free_capabilities++; - IF_DEBUG(scheduler, sched_belch("worker: released capability")); - threadRunnable(); + if (cap->returning_tasks_hd != NULL) { + giveCapabilityToTask(cap,cap->returning_tasks_hd); + // The Task pops itself from the queue (see waitForReturnCapability()) + return; + } + + // If the next thread on the run queue is a bound thread, + // give this Capability to the appropriate Task. + if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) { + // Make sure we're not about to try to wake ourselves up + ASSERT(task != cap->run_queue_hd->bound); + task = cap->run_queue_hd->bound; + giveCapabilityToTask(cap,task); + return; + } + + // If we have an unbound thread on the run queue, or if there's + // anything else to do, give the Capability to a worker thread. + if (!emptyRunQueue(cap) || globalWorkToDo()) { + if (cap->spare_workers) { + giveCapabilityToTask(cap,cap->spare_workers); + // The worker Task pops itself from the queue; + return; + } + + // Create a worker thread if we don't have one. If the system + // is interrupted, we only create a worker task if there + // are threads that need to be completed. If the system is + // shutting down, we never create a new worker. + if (!shutting_down_scheduler) { + IF_DEBUG(scheduler, + sched_belch("starting new worker on capability %d", cap->no)); + startWorkerTask(cap, workerStart); + return; + } } -#endif - return; + + last_free_capability = cap; + IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no)); } -#if defined(RTS_SUPPORTS_THREADS) -/* - * When a native thread has completed the execution of an external - * call, it needs to communicate the result back. This is done - * as follows: - * - * - in resumeThread(), the thread calls waitForReturnCapability(). - * - If no capabilities are readily available, waitForReturnCapability() - * increments a counter rts_n_waiting_workers, and blocks - * waiting for the condition returning_worker_cond to become - * signalled. - * - upon entry to the Scheduler, a worker thread checks the - * value of rts_n_waiting_workers. If > 0, the worker thread - * will yield its capability to let a returning worker thread - * proceed with returning its result -- this is done via - * yieldToReturningWorker(). - * - the worker thread that yielded its capability then tries - * to re-grab a capability and re-enter the Scheduler. - */ +void +releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS) +{ + ACQUIRE_LOCK(&cap->lock); + releaseCapability_(cap); + RELEASE_LOCK(&cap->lock); +} + +static void +releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS) +{ + Task *task; + + ACQUIRE_LOCK(&cap->lock); + + task = cap->running_task; + + // If the current task is a worker, save it on the spare_workers + // list of this Capability. A worker can mark itself as stopped, + // in which case it is not replaced on the spare_worker queue. + // This happens when the system is shutting down (see + // Schedule.c:workerStart()). + // Also, be careful to check that this task hasn't just exited + // Haskell to do a foreign call (task->suspended_tso). + if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) { + task->next = cap->spare_workers; + cap->spare_workers = task; + } + // Bound tasks just float around attached to their TSOs. + + releaseCapability_(cap); + + RELEASE_LOCK(&cap->lock); +} +#endif /* ---------------------------------------------------------------------------- - * waitForReturnCapability( Mutext *pMutex, Capability** ) + * waitForReturnCapability( Task *task ) * * Purpose: when an OS thread returns from an external call, - * it calls grabReturnCapability() (via Schedule.resumeThread()) - * to wait for permissions to enter the RTS & communicate the + * it calls waitForReturnCapability() (via Schedule.resumeThread()) + * to wait for permission to enter the RTS & communicate the * result of the external call back to the Haskell thread that * made it. * * ------------------------------------------------------------------------- */ - void -waitForReturnCapability( Mutex* pMutex, Capability** pCap ) +waitForReturnCapability (Capability **pCap, + Task *task UNUSED_IF_NOT_THREADS) { - // Pre-condition: pMutex is held. +#if !defined(THREADED_RTS) - IF_DEBUG(scheduler, - sched_belch("worker: returning; workers waiting: %d", - rts_n_waiting_workers)); + MainCapability.running_task = task; + task->cap = &MainCapability; + *pCap = &MainCapability; - if ( noCapabilities() ) { - rts_n_waiting_workers++; - context_switch = 1; // make sure it's our turn soon - waitCondition(&returning_worker_cond, pMutex); -#if defined(SMP) - *pCap = free_capabilities; - free_capabilities = (*pCap)->link; - ASSERT(pCap != NULL); - insertHashTable(capability_hash, osThreadId(), *pCap); #else - *pCap = &MainCapability; - ASSERT(rts_n_free_capabilities == 0); -#endif + Capability *cap = *pCap; + + if (cap == NULL) { + // Try last_free_capability first + cap = last_free_capability; + if (!cap->running_task) { + nat i; + // otherwise, search for a free capability + for (i = 0; i < n_capabilities; i++) { + cap = &capabilities[i]; + if (!cap->running_task) { + break; + } + } + // Can't find a free one, use last_free_capability. + cap = last_free_capability; + } + + // record the Capability as the one this Task is now assocated with. + task->cap = cap; + } else { - grabCapability(pCap); + ASSERT(task->cap == cap); } - // Post-condition: pMutex is held, pCap points to a capability - // which is now held by the current thread. - return; -} + ACQUIRE_LOCK(&cap->lock); + IF_DEBUG(scheduler, + sched_belch("returning; I want capability %d", cap->no)); + if (!cap->running_task) { + // It's free; just grab it + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + } else { + newReturningTask(cap,task); + RELEASE_LOCK(&cap->lock); + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + // now check whether we should wake up... + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task == NULL) { + if (cap->returning_tasks_hd != task) { + giveCapabilityToTask(cap,cap->returning_tasks_hd); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->running_task = task; + popReturningTask(cap); + RELEASE_LOCK(&cap->lock); + break; + } + RELEASE_LOCK(&cap->lock); + } + + } + + ASSERT(cap->running_task == task); + + IF_DEBUG(scheduler, + sched_belch("returning; got capability %d", cap->no)); + + *pCap = cap; +#endif +} + +#if defined(THREADED_RTS) /* ---------------------------------------------------------------------------- - * yieldCapability( Mutex* pMutex, Capability** pCap ) + * yieldCapability * ------------------------------------------------------------------------- */ void -yieldCapability( Capability** pCap, Condition *cond ) +yieldCapability (Capability** pCap, Task *task) { - // Pre-condition: pMutex is assumed held, the current thread - // holds the capability pointed to by pCap. - - if ( rts_n_waiting_workers > 0 || !ANY_WORK_FOR_ME(cond)) { - IF_DEBUG(scheduler, - if (rts_n_waiting_workers > 0) { - sched_belch("worker: giving up capability (returning wkr)"); - } else if (!EMPTY_RUN_QUEUE()) { - sched_belch("worker: giving up capability (passing capability)"); - } else { - sched_belch("worker: giving up capability (no threads to run)"); - } - ); - releaseCapability(*pCap); - *pCap = NULL; + Capability *cap = *pCap; + + // The fast path; no locking + if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) ) + return; + + while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) { + IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no)); + + // We must now release the capability and wait to be woken up + // again. + releaseCapabilityAndQueueWorker(cap); + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no)); + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task != NULL) { + RELEASE_LOCK(&cap->lock); + continue; + } + + if (task->tso == NULL) { + ASSERT(cap->spare_workers != NULL); + // if we're not at the front of the queue, release it + // again. This is unlikely to happen. + if (cap->spare_workers != task) { + giveCapabilityToTask(cap,cap->spare_workers); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->spare_workers = task->next; + task->next = NULL; + } + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + break; + } + + IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no)); + ASSERT(cap->running_task == task); } - // Post-condition: either: - // - // 1. *pCap is NULL, in which case the current thread does not - // hold a capability now, or - // 2. *pCap is not NULL, in which case the current thread still - // holds the capability. - // + *pCap = cap; return; } - /* ---------------------------------------------------------------------------- - * waitForCapability( Mutex*, Capability**, Condition* ) + * prodCapabilities * - * Purpose: wait for a Capability to become available. In - * the process of doing so, updates the number - * of tasks currently blocked waiting for a capability/more - * work. That counter is used when deciding whether or - * not to create a new worker thread when an external - * call is made. - * If pThreadCond is not NULL, a capability can be specifically - * passed to this thread. + * Used to indicate that the interrupted flag is now set, or some + * other global condition that might require waking up a Task on each + * Capability. * ------------------------------------------------------------------------- */ - -void -waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond ) -{ - // Pre-condition: pMutex is held. - - while ( noCapabilities() || !ANY_WORK_FOR_ME(pThreadCond)) { - IF_DEBUG(scheduler, - sched_belch("worker: wait for capability (cond: %p)", - pThreadCond)); - if (pThreadCond != NULL) { - waitCondition(pThreadCond, pMutex); - IF_DEBUG(scheduler, sched_belch("worker: get passed capability")); - } else { - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, pMutex); - rts_n_waiting_tasks--; - IF_DEBUG(scheduler, sched_belch("worker: get normal capability")); +static void +prodCapabilities(rtsBool all) +{ + nat i; + Capability *cap; + Task *task; + + for (i=0; i < n_capabilities; i++) { + cap = &capabilities[i]; + ACQUIRE_LOCK(&cap->lock); + if (!cap->running_task) { + if (cap->spare_workers) { + task = cap->spare_workers; + ASSERT(!task->stopped); + giveCapabilityToTask(cap,task); + if (!all) { + RELEASE_LOCK(&cap->lock); + return; + } + } } + RELEASE_LOCK(&cap->lock); } - grabCapability(pCap); - - // Post-condition: pMutex is held and *pCap is held by the current thread - return; } -#endif /* RTS_SUPPORTS_THREADS */ +void +prodAllCapabilities (void) +{ + prodCapabilities(rtsTrue); +} /* ---------------------------------------------------------------------------- - threadRunnable() + * prodOneCapability + * + * Like prodAllCapabilities, but we only require a single Task to wake + * up in order to service some global event, such as checking for + * deadlock after some idle time has passed. + * ------------------------------------------------------------------------- */ - Signals that a thread has been placed on the run queue, so a worker - might need to be woken up to run it. +void +prodOneCapability (void) +{ + prodCapabilities(rtsFalse); +} + +/* ---------------------------------------------------------------------------- + * shutdownCapability + * + * At shutdown time, we want to let everything exit as cleanly as + * possible. For each capability, we let its run queue drain, and + * allow the workers to stop. + * + * This function should be called when interrupted and + * shutting_down_scheduler = rtsTrue, thus any worker that wakes up + * will exit the scheduler and call taskStop(), and any bound thread + * that wakes up will return to its caller. Runnable threads are + * killed. + * + * ------------------------------------------------------------------------- */ - ToDo: should check whether the thread at the front of the queue is - bound, and if so wake up the appropriate worker. - -------------------------------------------------------------------------- */ void -threadRunnable ( void ) +shutdownCapability (Capability *cap, Task *task) { -#if defined(RTS_SUPPORTS_THREADS) - if ( !noCapabilities() && ANY_WORK_TO_DO() ) { - if (!EMPTY_RUN_QUEUE() && run_queue_hd->main != NULL) { - signalCondition(&run_queue_hd->main->bound_thread_cond); - return; + nat i; + + ASSERT(interrupted && shutting_down_scheduler); + + task->cap = cap; + + for (i = 0; i < 50; i++) { + IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i)); + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task) { + RELEASE_LOCK(&cap->lock); + IF_DEBUG(scheduler, sched_belch("not owner, yielding")); + yieldThread(); + continue; } - if (rts_n_waiting_tasks > 0) { - signalCondition(&thread_ready_cond); - } else { - startSchedulerTaskIfNecessary(); + cap->running_task = task; + if (!emptyRunQueue(cap) || cap->spare_workers) { + IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding")); + releaseCapability_(cap); // this will wake up a worker + RELEASE_LOCK(&cap->lock); + yieldThread(); + continue; } + IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no)); + RELEASE_LOCK(&cap->lock); + break; } -#endif + // we now have the Capability, its run queue and spare workers + // list are both empty. } +#endif /* THREADED_RTS */ -/* ---------------------------------------------------------------------------- - prodWorker() - Wake up... time to die. - -------------------------------------------------------------------------- */ -void -prodWorker ( void ) -{ -#if defined(RTS_SUPPORTS_THREADS) - signalCondition(&thread_ready_cond); -#endif -}