/* ---------------------------------------------------------------------------
*
- * (c) The GHC Team, 2002
+ * (c) The GHC Team, 2003-2006
*
* Capabilities
*
* and all the state an OS thread/task needs to run Haskell code:
* its STG registers, a pointer to its TSO, a nursery etc. During
* STG execution, a pointer to the capabilitity is kept in a
- * register (BaseReg).
+ * register (BaseReg; actually it is a pointer to cap->r).
+ *
+ * Only in an THREADED_RTS build will there be multiple capabilities,
+ * for non-threaded builds there is only one global capability, namely
+ * MainCapability.
*
- * Only in an SMP build will there be multiple capabilities, for
- * the threaded RTS and other non-threaded builds, there is only
- * one global capability, namely MainCapability.
- *
* --------------------------------------------------------------------------*/
+
#include "PosixSource.h"
#include "Rts.h"
#include "RtsUtils.h"
+#include "RtsFlags.h"
+#include "STM.h"
#include "OSThreads.h"
#include "Capability.h"
-#include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */
-#include "Signals.h" /* to get at handleSignalsInThisThread() */
+#include "Schedule.h"
+#include "Sparks.h"
+
+// one global capability, this is the Capability for non-threaded
+// builds, and for +RTS -N1
+Capability MainCapability;
-#if !defined(SMP)
-Capability MainCapability; /* for non-SMP, we have one global capability */
+nat n_capabilities;
+Capability *capabilities = NULL;
+
+// Holds the Capability which last became free. This is used so that
+// an in-call has a chance of quickly finding a free Capability.
+// Maintaining a global free list of Capabilities would require global
+// locking, so we don't do that.
+Capability *last_free_capability;
+
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+globalWorkToDo (void)
+{
+ return blackholes_need_checking
+ || interrupted
+ ;
+}
#endif
-nat rts_n_free_capabilities;
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+anyWorkForMe( Capability *cap, Task *task )
+{
+ if (task->tso != NULL) {
+ // A bound task only runs if its thread is on the run queue of
+ // the capability on which it was woken up. Otherwise, we
+ // can't be sure that we have the right capability: the thread
+ // might be woken up on some other capability, and task->cap
+ // could change under our feet.
+ return (!emptyRunQueue(cap) && cap->run_queue_hd->bound == task);
+ } else {
+ // A vanilla worker task runs if either (a) there is a
+ // lightweight thread at the head of the run queue, or (b)
+ // there are sparks to execute, or (c) there is some other
+ // global condition to check, such as threads blocked on
+ // blackholes.
+ return ((!emptyRunQueue(cap) && cap->run_queue_hd->bound == NULL)
+ || !emptySparkPoolCap(cap)
+ || globalWorkToDo());
+ }
+}
+#endif
-#if defined(RTS_SUPPORTS_THREADS)
-/* returning_worker_cond: when a worker thread returns from executing an
- * external call, it needs to wait for an RTS Capability before passing
- * on the result of the call to the Haskell thread that made it.
- *
- * returning_worker_cond is signalled in Capability.releaseCapability().
- *
- */
-Condition returning_worker_cond = INIT_COND_VAR;
-
-/*
- * To avoid starvation of threads blocked on worker_thread_cond,
- * the task(s) that enter the Scheduler will check to see whether
- * there are one or more worker threads blocked waiting on
- * returning_worker_cond.
- */
-nat rts_n_waiting_workers = 0;
-
-/* thread_ready_cond: when signalled, a thread has become runnable for a
- * task to execute.
- *
- * In the non-SMP case, it also implies that the thread that is woken up has
- * exclusive access to the RTS and all its data structures (that are not
- * locked by the Scheduler's mutex).
- *
- * thread_ready_cond is signalled whenever noCapabilities doesn't hold.
- *
- */
-Condition thread_ready_cond = INIT_COND_VAR;
-
-/*
- * To be able to make an informed decision about whether or not
- * to create a new task when making an external call, keep track of
- * the number of tasks currently blocked waiting on thread_ready_cond.
- * (if > 0 => no need for a new task, just unblock an existing one).
+/* -----------------------------------------------------------------------------
+ * Manage the returning_tasks lists.
*
- * waitForWorkCapability() takes care of keeping it up-to-date;
- * Task.startTask() uses its current value.
- */
-nat rts_n_waiting_tasks = 0;
+ * These functions require cap->lock
+ * -------------------------------------------------------------------------- */
+
+#if defined(THREADED_RTS)
+STATIC_INLINE void
+newReturningTask (Capability *cap, Task *task)
+{
+ ASSERT_LOCK_HELD(&cap->lock);
+ ASSERT(task->return_link == NULL);
+ if (cap->returning_tasks_hd) {
+ ASSERT(cap->returning_tasks_tl->return_link == NULL);
+ cap->returning_tasks_tl->return_link = task;
+ } else {
+ cap->returning_tasks_hd = task;
+ }
+ cap->returning_tasks_tl = task;
+}
+
+STATIC_INLINE Task *
+popReturningTask (Capability *cap)
+{
+ ASSERT_LOCK_HELD(&cap->lock);
+ Task *task;
+ task = cap->returning_tasks_hd;
+ ASSERT(task);
+ cap->returning_tasks_hd = task->return_link;
+ if (!cap->returning_tasks_hd) {
+ cap->returning_tasks_tl = NULL;
+ }
+ task->return_link = NULL;
+ return task;
+}
#endif
-/* -----------------------------------------------------------------------------
- Initialisation
- -------------------------------------------------------------------------- */
-static
-void
-initCapability( Capability *cap )
+/* ----------------------------------------------------------------------------
+ * Initialisation
+ *
+ * The Capability is initially marked not free.
+ * ------------------------------------------------------------------------- */
+
+static void
+initCapability( Capability *cap, nat i )
{
+ nat g;
+
+ cap->no = i;
+ cap->in_haskell = rtsFalse;
+
+ cap->run_queue_hd = END_TSO_QUEUE;
+ cap->run_queue_tl = END_TSO_QUEUE;
+
+#if defined(THREADED_RTS)
+ initMutex(&cap->lock);
+ cap->running_task = NULL; // indicates cap is free
+ cap->spare_workers = NULL;
+ cap->suspended_ccalling_tasks = NULL;
+ cap->returning_tasks_hd = NULL;
+ cap->returning_tasks_tl = NULL;
+#endif
+
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
cap->f.stgGCFun = (F_)__stg_gc_fun;
-}
-#if defined(SMP)
-static void initCapabilities_(nat n);
-#endif
+ cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
+ RtsFlags.GcFlags.generations,
+ "initCapability");
+
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ cap->mut_lists[g] = NULL;
+ }
-/*
+ cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
+ cap->free_trec_chunks = END_STM_CHUNK_LIST;
+ cap->free_trec_headers = NO_TREC;
+ cap->transaction_tokens = 0;
+}
+
+/* ---------------------------------------------------------------------------
* Function: initCapabilities()
*
- * Purpose: set up the Capability handling. For the SMP build,
+ * Purpose: set up the Capability handling. For the THREADED_RTS build,
* we keep a table of them, the size of which is
- * controlled by the user via the RTS flag RtsFlags.ParFlags.nNodes
+ * controlled by the user via the RTS flag -N.
*
- * Pre-conditions: no locks assumed held.
- */
+ * ------------------------------------------------------------------------- */
void
-initCapabilities()
+initCapabilities( void )
{
-#if defined(RTS_SUPPORTS_THREADS)
- initCondition(&returning_worker_cond);
- initCondition(&thread_ready_cond);
-#endif
+#if defined(THREADED_RTS)
+ nat i;
-#if defined(SMP)
- initCapabilities_(RtsFlags.ParFlags.nNodes);
-#else
- initCapability(&MainCapability);
- rts_n_free_capabilities = 1;
+#ifndef REG_Base
+ // We can't support multiple CPUs if BaseReg is not a register
+ if (RtsFlags.ParFlags.nNodes > 1) {
+ errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
+ RtsFlags.ParFlags.nNodes = 1;
+ }
#endif
- return;
-}
+ n_capabilities = RtsFlags.ParFlags.nNodes;
+
+ if (n_capabilities == 1) {
+ capabilities = &MainCapability;
+ // THREADED_RTS must work on builds that don't have a mutable
+ // BaseReg (eg. unregisterised), so in this case
+ // capabilities[0] must coincide with &MainCapability.
+ } else {
+ capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
+ "initCapabilities");
+ }
+
+ for (i = 0; i < n_capabilities; i++) {
+ initCapability(&capabilities[i], i);
+ }
+
+ IF_DEBUG(scheduler, sched_belch("allocated %d capabilities",
+ n_capabilities));
+
+#else /* !THREADED_RTS */
+
+ n_capabilities = 1;
+ capabilities = &MainCapability;
+ initCapability(&MainCapability, 0);
-#if defined(SMP)
-/* Free capability list. */
-static Capability *free_capabilities; /* Available capabilities for running threads */
-static Capability *returning_capabilities;
- /* Capabilities being passed to returning worker threads */
#endif
-/* -----------------------------------------------------------------------------
- Acquiring capabilities
- -------------------------------------------------------------------------- */
-
-/*
- * Function: grabCapability(Capability**)
- *
- * Purpose: the act of grabbing a capability is easy; just
- * remove one from the free capabilities list (which
- * may just have one entry). In threaded builds, worker
- * threads are prevented from doing so willy-nilly
- * via the condition variables thread_ready_cond and
- * returning_worker_cond.
+ // There are no free capabilities to begin with. We will start
+ // a worker Task to each Capability, which will quickly put the
+ // Capability on the free list when it finds nothing to do.
+ last_free_capability = &capabilities[0];
+}
+
+/* ----------------------------------------------------------------------------
+ * Give a Capability to a Task. The task must currently be sleeping
+ * on its condition variable.
+ *
+ * Requires cap->lock (modifies cap->running_task).
+ *
+ * When migrating a Task, the migrater must take task->lock before
+ * modifying task->cap, to synchronise with the waking up Task.
+ * Additionally, the migrater should own the Capability (when
+ * migrating the run queue), or cap->lock (when migrating
+ * returning_workers).
*
- */
-void grabCapability(Capability** cap)
+ * ------------------------------------------------------------------------- */
+
+#if defined(THREADED_RTS)
+STATIC_INLINE void
+giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
{
-#ifdef RTS_SUPPORTS_THREADS
- ASSERT(rts_n_free_capabilities > 0);
-#endif
-#if !defined(SMP)
- rts_n_free_capabilities = 0;
- *cap = &MainCapability;
- handleSignalsInThisThread();
-#else
- *cap = free_capabilities;
- free_capabilities = (*cap)->link;
- rts_n_free_capabilities--;
-#endif
+ ASSERT_LOCK_HELD(&cap->lock);
+ ASSERT(task->cap == cap);
IF_DEBUG(scheduler,
- fprintf(stderr,"worker thread (%p): got capability\n",
- osThreadId()));
+ sched_belch("passing capability %d to %s %p",
+ cap->no, task->tso ? "bound task" : "worker",
+ (void *)task->id));
+ ACQUIRE_LOCK(&task->lock);
+ task->wakeup = rtsTrue;
+ // the wakeup flag is needed because signalCondition() doesn't
+ // flag the condition if the thread is already runniing, but we want
+ // it to be sticky.
+ signalCondition(&task->cond);
+ RELEASE_LOCK(&task->lock);
}
+#endif
-/*
+/* ----------------------------------------------------------------------------
* Function: releaseCapability(Capability*)
*
* Purpose: Letting go of a capability. Causes a
* 'returning worker' thread or a 'waiting worker'
* to wake up, in that order.
- *
- */
-void releaseCapability(Capability* cap
-#if !defined(SMP)
- STG_UNUSED
-#endif
-)
-{ // Precondition: sched_mutex must be held
-#if defined(RTS_SUPPORTS_THREADS)
-#ifndef SMP
- ASSERT(rts_n_free_capabilities == 0);
-#endif
- /* Check to see whether a worker thread can be given
- the go-ahead to return the result of an external call..*/
- if (rts_n_waiting_workers > 0) {
- /* Decrement the counter here to avoid livelock where the
- * thread that is yielding its capability will repeatedly
- * signal returning_worker_cond.
- */
-#if defined(SMP)
- // SMP variant untested
- cap->link = returning_capabilities;
- returning_capabilities = cap;
-#else
-#endif
- rts_n_waiting_workers--;
- signalCondition(&returning_worker_cond);
- } else /*if ( !EMPTY_RUN_QUEUE() )*/ {
-#if defined(SMP)
- cap->link = free_capabilities;
- free_capabilities = cap;
- rts_n_free_capabilities++;
-#else
- rts_n_free_capabilities = 1;
-#endif
- /* Signal that a capability is available */
- signalCondition(&thread_ready_cond);
- }
-#endif
- IF_DEBUG(scheduler,
- fprintf(stderr,"worker thread (%p): released capability\n",
- osThreadId()));
- return;
+ * ------------------------------------------------------------------------- */
+
+#if defined(THREADED_RTS)
+void
+releaseCapability_ (Capability* cap)
+{
+ Task *task;
+
+ task = cap->running_task;
+
+ ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
+
+ cap->running_task = NULL;
+
+ // Check to see whether a worker thread can be given
+ // the go-ahead to return the result of an external call..
+ if (cap->returning_tasks_hd != NULL) {
+ giveCapabilityToTask(cap,cap->returning_tasks_hd);
+ // The Task pops itself from the queue (see waitForReturnCapability())
+ return;
+ }
+
+ // If the next thread on the run queue is a bound thread,
+ // give this Capability to the appropriate Task.
+ if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
+ // Make sure we're not about to try to wake ourselves up
+ ASSERT(task != cap->run_queue_hd->bound);
+ task = cap->run_queue_hd->bound;
+ giveCapabilityToTask(cap,task);
+ return;
+ }
+
+ if (!cap->spare_workers) {
+ // Create a worker thread if we don't have one. If the system
+ // is interrupted, we only create a worker task if there
+ // are threads that need to be completed. If the system is
+ // shutting down, we never create a new worker.
+ if (!shutting_down_scheduler) {
+ IF_DEBUG(scheduler,
+ sched_belch("starting new worker on capability %d", cap->no));
+ startWorkerTask(cap, workerStart);
+ return;
+ }
+ }
+
+ // If we have an unbound thread on the run queue, or if there's
+ // anything else to do, give the Capability to a worker thread.
+ if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+ if (cap->spare_workers) {
+ giveCapabilityToTask(cap,cap->spare_workers);
+ // The worker Task pops itself from the queue;
+ return;
+ }
+ }
+
+ last_free_capability = cap;
+ IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
}
-#if defined(RTS_SUPPORTS_THREADS)
-/*
- * When a native thread has completed the execution of an external
- * call, it needs to communicate the result back. This is done
- * as follows:
- *
- * - in resumeThread(), the thread calls grabReturnCapability().
- * - If no capabilities are readily available, grabReturnCapability()
- * increments a counter rts_n_waiting_workers, and blocks
- * waiting for the condition returning_worker_cond to become
- * signalled.
- * - upon entry to the Scheduler, a worker thread checks the
- * value of rts_n_waiting_workers. If > 0, the worker thread
- * will yield its capability to let a returning worker thread
- * proceed with returning its result -- this is done via
- * yieldToReturningWorker().
- * - the worker thread that yielded its capability then tries
- * to re-grab a capability and re-enter the Scheduler.
- */
-
-/*
- * Function: grabReturnCapability(Capability**)
+void
+releaseCapability (Capability* cap USED_IF_THREADS)
+{
+ ACQUIRE_LOCK(&cap->lock);
+ releaseCapability_(cap);
+ RELEASE_LOCK(&cap->lock);
+}
+
+static void
+releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
+{
+ Task *task;
+
+ ACQUIRE_LOCK(&cap->lock);
+
+ task = cap->running_task;
+
+ // If the current task is a worker, save it on the spare_workers
+ // list of this Capability. A worker can mark itself as stopped,
+ // in which case it is not replaced on the spare_worker queue.
+ // This happens when the system is shutting down (see
+ // Schedule.c:workerStart()).
+ // Also, be careful to check that this task hasn't just exited
+ // Haskell to do a foreign call (task->suspended_tso).
+ if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
+ task->next = cap->spare_workers;
+ cap->spare_workers = task;
+ }
+ // Bound tasks just float around attached to their TSOs.
+
+ releaseCapability_(cap);
+
+ RELEASE_LOCK(&cap->lock);
+}
+#endif
+
+/* ----------------------------------------------------------------------------
+ * waitForReturnCapability( Task *task )
*
* Purpose: when an OS thread returns from an external call,
- * it calls grabReturnCapability() (via Schedule.resumeThread())
- * to wait for permissions to enter the RTS & communicate the
+ * it calls waitForReturnCapability() (via Schedule.resumeThread())
+ * to wait for permission to enter the RTS & communicate the
* result of the external call back to the Haskell thread that
* made it.
*
- * Pre-condition: pMutex is held.
- * Post-condition: pMutex is still held and a capability has
- * been assigned to the worker thread.
- */
+ * ------------------------------------------------------------------------- */
void
-grabReturnCapability(Mutex* pMutex, Capability** pCap)
+waitForReturnCapability (Capability **pCap, Task *task)
{
- IF_DEBUG(scheduler,
- fprintf(stderr,"worker (%p): returning, waiting for lock.\n", osThreadId()));
- IF_DEBUG(scheduler,
- fprintf(stderr,"worker (%p): returning; workers waiting: %d\n",
- osThreadId(), rts_n_waiting_workers));
- if ( noCapabilities() ) {
- rts_n_waiting_workers++;
- wakeBlockedWorkerThread();
- context_switch = 1; // make sure it's our turn soon
- waitCondition(&returning_worker_cond, pMutex);
-#if defined(SMP)
- *pCap = returning_capabilities;
- returning_capabilities = (*pCap)->link;
-#else
+#if !defined(THREADED_RTS)
+
+ MainCapability.running_task = task;
+ task->cap = &MainCapability;
*pCap = &MainCapability;
- ASSERT(rts_n_free_capabilities == 0);
- handleSignalsInThisThread();
+
+#else
+ Capability *cap = *pCap;
+
+ if (cap == NULL) {
+ // Try last_free_capability first
+ cap = last_free_capability;
+ if (!cap->running_task) {
+ nat i;
+ // otherwise, search for a free capability
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ if (!cap->running_task) {
+ break;
+ }
+ }
+ // Can't find a free one, use last_free_capability.
+ cap = last_free_capability;
+ }
+
+ // record the Capability as the one this Task is now assocated with.
+ task->cap = cap;
+
+ } else {
+ ASSERT(task->cap == cap);
+ }
+
+ ACQUIRE_LOCK(&cap->lock);
+
+ IF_DEBUG(scheduler,
+ sched_belch("returning; I want capability %d", cap->no));
+
+ if (!cap->running_task) {
+ // It's free; just grab it
+ cap->running_task = task;
+ RELEASE_LOCK(&cap->lock);
+ } else {
+ newReturningTask(cap,task);
+ RELEASE_LOCK(&cap->lock);
+
+ for (;;) {
+ ACQUIRE_LOCK(&task->lock);
+ // task->lock held, cap->lock not held
+ if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ cap = task->cap;
+ task->wakeup = rtsFalse;
+ RELEASE_LOCK(&task->lock);
+
+ // now check whether we should wake up...
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task == NULL) {
+ if (cap->returning_tasks_hd != task) {
+ giveCapabilityToTask(cap,cap->returning_tasks_hd);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+ cap->running_task = task;
+ popReturningTask(cap);
+ RELEASE_LOCK(&cap->lock);
+ break;
+ }
+ RELEASE_LOCK(&cap->lock);
+ }
+
+ }
+
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+
+ IF_DEBUG(scheduler,
+ sched_belch("returning; got capability %d", cap->no));
+
+ *pCap = cap;
#endif
- } else {
- grabCapability(pCap);
- }
- return;
}
+#if defined(THREADED_RTS)
+/* ----------------------------------------------------------------------------
+ * yieldCapability
+ * ------------------------------------------------------------------------- */
-/* -----------------------------------------------------------------------------
- Yielding/waiting for capabilities
- -------------------------------------------------------------------------- */
+void
+yieldCapability (Capability** pCap, Task *task)
+{
+ Capability *cap = *pCap;
-/*
- * Function: yieldToReturningWorker(Mutex*,Capability*,Condition*)
- *
- * Purpose: when, upon entry to the Scheduler, an OS worker thread
- * spots that one or more threads are blocked waiting for
- * permission to return back their result, it gives up
- * its Capability.
- * Immediately afterwards, it tries to reaquire the Capabilty
- * using waitForWorkCapability.
- *
+ // The fast path has no locking, if we don't enter this while loop
+
+ while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
+ IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
+
+ // We must now release the capability and wait to be woken up
+ // again.
+ task->wakeup = rtsFalse;
+ releaseCapabilityAndQueueWorker(cap);
+
+ for (;;) {
+ ACQUIRE_LOCK(&task->lock);
+ // task->lock held, cap->lock not held
+ if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ cap = task->cap;
+ task->wakeup = rtsFalse;
+ RELEASE_LOCK(&task->lock);
+
+ IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task != NULL) {
+ IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+
+ if (task->tso == NULL) {
+ ASSERT(cap->spare_workers != NULL);
+ // if we're not at the front of the queue, release it
+ // again. This is unlikely to happen.
+ if (cap->spare_workers != task) {
+ giveCapabilityToTask(cap,cap->spare_workers);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+ cap->spare_workers = task->next;
+ task->next = NULL;
+ }
+ cap->running_task = task;
+ RELEASE_LOCK(&cap->lock);
+ break;
+ }
+
+ IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
+ ASSERT(cap->running_task == task);
+ }
+
+ *pCap = cap;
+
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+
+ return;
+}
+
+/* ----------------------------------------------------------------------------
+ * prodCapabilities
*
- * Pre-condition: pMutex is assumed held and the thread possesses
- * a Capability.
- * Post-condition: pMutex is held and the thread possesses
- * a Capability.
- */
-void
-yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
+ * Used to indicate that the interrupted flag is now set, or some
+ * other global condition that might require waking up a Task on each
+ * Capability.
+ * ------------------------------------------------------------------------- */
+
+static void
+prodCapabilities(rtsBool all)
{
- if ( rts_n_waiting_workers > 0 ) {
- IF_DEBUG(scheduler,
- fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId()));
- releaseCapability(*pCap);
- /* And wait for work */
- waitForWorkCapability(pMutex, pCap, pThreadCond);
- IF_DEBUG(scheduler,
- fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n",
- osThreadId()));
- }
- return;
+ nat i;
+ Capability *cap;
+ Task *task;
+
+ for (i=0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ ACQUIRE_LOCK(&cap->lock);
+ if (!cap->running_task) {
+ if (cap->spare_workers) {
+ task = cap->spare_workers;
+ ASSERT(!task->stopped);
+ giveCapabilityToTask(cap,task);
+ if (!all) {
+ RELEASE_LOCK(&cap->lock);
+ return;
+ }
+ }
+ }
+ RELEASE_LOCK(&cap->lock);
+ }
+ return;
}
+void
+prodAllCapabilities (void)
+{
+ prodCapabilities(rtsTrue);
+}
-/*
- * Function: waitForWorkCapability(Mutex*, Capability**, Condition*)
- *
- * Purpose: wait for a Capability to become available. In
- * the process of doing so, updates the number
- * of tasks currently blocked waiting for a capability/more
- * work. That counter is used when deciding whether or
- * not to create a new worker thread when an external
- * call is made.
- * If pThreadCond is not NULL, a capability can be specifically
- * passed to this thread using passCapability.
+/* ----------------------------------------------------------------------------
+ * prodOneCapability
*
- * Pre-condition: pMutex is held.
- * Post-condition: pMutex is held and *pCap is held by the current thread
- */
-
-static Condition *passTarget = NULL;
-
-void
-waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
+ * Like prodAllCapabilities, but we only require a single Task to wake
+ * up in order to service some global event, such as checking for
+ * deadlock after some idle time has passed.
+ * ------------------------------------------------------------------------- */
+
+void
+prodOneCapability (void)
{
-#ifdef SMP
- #error SMP version not implemented
-#endif
- IF_DEBUG(scheduler,
- fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n",
- osThreadId(),pThreadCond));
- while ( noCapabilities() || (pThreadCond && passTarget != pThreadCond)
- || (!pThreadCond && passTarget)) {
- if(pThreadCond)
- {
- waitCondition(pThreadCond, pMutex);
- IF_DEBUG(scheduler,
- fprintf(stderr,"worker thread (%p): get passed capability\n",
- osThreadId()));
- }
- else
- {
- rts_n_waiting_tasks++;
- waitCondition(&thread_ready_cond, pMutex);
- rts_n_waiting_tasks--;
- IF_DEBUG(scheduler,
- fprintf(stderr,"worker thread (%p): get normal capability\n",
- osThreadId()));
- }
- }
- passTarget = NULL;
- grabCapability(pCap);
- return;
+ prodCapabilities(rtsFalse);
}
-/*
- * Function: passCapability(Mutex*, Capability*, Condition*)
+/* ----------------------------------------------------------------------------
+ * shutdownCapability
+ *
+ * At shutdown time, we want to let everything exit as cleanly as
+ * possible. For each capability, we let its run queue drain, and
+ * allow the workers to stop.
*
- * Purpose: Let go of the capability and make sure the thread associated
- * with the Condition pTargetThreadCond gets it next.
+ * This function should be called when interrupted and
+ * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
+ * will exit the scheduler and call taskStop(), and any bound thread
+ * that wakes up will return to its caller. Runnable threads are
+ * killed.
*
- * Pre-condition: pMutex is held and cap is held by the current thread
- * Post-condition: pMutex is held; cap will be grabbed by the "target"
- * thread when pMutex is released.
- */
+ * ------------------------------------------------------------------------- */
void
-passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond)
+shutdownCapability (Capability *cap, Task *task)
{
-#ifdef SMP
- #error SMP version not implemented
-#endif
- rts_n_free_capabilities = 1;
- signalCondition(pTargetThreadCond);
- passTarget = pTargetThreadCond;
- IF_DEBUG(scheduler,
- fprintf(stderr,"worker thread (%p): passCapability\n",
- osThreadId()));
-}
+ nat i;
+ ASSERT(interrupted && shutting_down_scheduler);
-#endif /* RTS_SUPPORTS_THREADS */
+ task->cap = cap;
-#if defined(SMP)
-/*
- * Function: initCapabilities_(nat)
+ for (i = 0; i < 50; i++) {
+ IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task) {
+ RELEASE_LOCK(&cap->lock);
+ IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
+ yieldThread();
+ continue;
+ }
+ cap->running_task = task;
+ if (!emptyRunQueue(cap) || cap->spare_workers) {
+ IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
+ releaseCapability_(cap); // this will wake up a worker
+ RELEASE_LOCK(&cap->lock);
+ yieldThread();
+ continue;
+ }
+ IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
+ RELEASE_LOCK(&cap->lock);
+ break;
+ }
+ // we now have the Capability, its run queue and spare workers
+ // list are both empty.
+}
+
+/* ----------------------------------------------------------------------------
+ * tryGrabCapability
*
- * Purpose: upon startup, allocate and fill in table
- * holding 'n' Capabilities. Only for SMP, since
- * it is the only build that supports multiple
- * capabilities within the RTS.
- */
-static void
-initCapabilities_(nat n)
+ * Attempt to gain control of a Capability if it is free.
+ *
+ * ------------------------------------------------------------------------- */
+
+rtsBool
+tryGrabCapability (Capability *cap, Task *task)
{
- nat i;
- Capability *cap, *prev;
- cap = NULL;
- prev = NULL;
- for (i = 0; i < n; i++) {
- cap = stgMallocBytes(sizeof(Capability), "initCapabilities");
- initCapability(cap);
- cap->link = prev;
- prev = cap;
- }
- free_capabilities = cap;
- rts_n_free_capabilities = n;
- returning_capabilities = NULL;
- IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n", n_free_capabilities););
+ if (cap->running_task != NULL) return rtsFalse;
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task != NULL) {
+ RELEASE_LOCK(&cap->lock);
+ return rtsFalse;
+ }
+ task->cap = cap;
+ cap->running_task = task;
+ RELEASE_LOCK(&cap->lock);
+ return rtsTrue;
}
-#endif /* SMP */
+
+
+#endif /* THREADED_RTS */
+