fix bug in previous patch to this file
[ghc-hetmet.git] / ghc / rts / Capability.c
index 9e28a16..8c40b63 100644 (file)
@@ -1,5 +1,6 @@
 /* ---------------------------------------------------------------------------
- * (c) The GHC Team, 2003
+ *
+ * (c) The GHC Team, 2003-2006
  *
  * Capabilities
  *
  * and all the state an OS thread/task needs to run Haskell code:
  * its STG registers, a pointer to its TSO, a nursery etc. During
  * STG execution, a pointer to the capabilitity is kept in a
- * register (BaseReg).
+ * register (BaseReg; actually it is a pointer to cap->r).
+ *
+ * Only in an THREADED_RTS build will there be multiple capabilities,
+ * for non-threaded builds there is only one global capability, namely
+ * MainCapability.
  *
- * Only in an SMP build will there be multiple capabilities, for
- * the threaded RTS and other non-threaded builds, there is only
- * one global capability, namely MainCapability.
- * 
  * --------------------------------------------------------------------------*/
 
 #include "PosixSource.h"
 #include "Rts.h"
 #include "RtsUtils.h"
 #include "RtsFlags.h"
+#include "STM.h"
 #include "OSThreads.h"
 #include "Capability.h"
-#include "Schedule.h"  /* to get at EMPTY_RUN_QUEUE() */
-#if defined(SMP)
-#include "Hash.h"
-#endif
+#include "Schedule.h"
+#include "Sparks.h"
 
-#if !defined(SMP)
-Capability MainCapability;     /* for non-SMP, we have one global capability */
-#endif
+// one global capability, this is the Capability for non-threaded
+// builds, and for +RTS -N1
+Capability MainCapability;
 
+nat n_capabilities;
 Capability *capabilities = NULL;
-nat rts_n_free_capabilities;
-
-#if defined(RTS_SUPPORTS_THREADS)
-
-/* returning_worker_cond: when a worker thread returns from executing an
- * external call, it needs to wait for an RTS Capability before passing
- * on the result of the call to the Haskell thread that made it.
- * 
- * returning_worker_cond is signalled in Capability.releaseCapability().
- *
- */
-Condition returning_worker_cond = INIT_COND_VAR;
-
-/*
- * To avoid starvation of threads blocked on worker_thread_cond,
- * the task(s) that enter the Scheduler will check to see whether
- * there are one or more worker threads blocked waiting on
- * returning_worker_cond.
- */
-nat rts_n_waiting_workers = 0;
-
-/* thread_ready_cond: when signalled, a thread has become runnable for a
- * task to execute.
- *
- * In the non-SMP case, it also implies that the thread that is woken up has
- * exclusive access to the RTS and all its data structures (that are not
- * locked by the Scheduler's mutex).
- *
- * thread_ready_cond is signalled whenever
- *      !noCapabilities && !EMPTY_RUN_QUEUE().
- */
-Condition thread_ready_cond = INIT_COND_VAR;
-
-/*
- * To be able to make an informed decision about whether or not 
- * to create a new task when making an external call, keep track of
- * the number of tasks currently blocked waiting on thread_ready_cond.
- * (if > 0 => no need for a new task, just unblock an existing one).
- *
- * waitForWorkCapability() takes care of keeping it up-to-date;
- * Task.startTask() uses its current value.
- */
-nat rts_n_waiting_tasks = 0;
-#endif
 
-#if defined(SMP)
-/*
- * Free capability list. 
- */
-Capability *free_capabilities;
+// Holds the Capability which last became free.  This is used so that
+// an in-call has a chance of quickly finding a free Capability.
+// Maintaining a global free list of Capabilities would require global
+// locking, so we don't do that.
+Capability *last_free_capability;
 
-/* 
- * Maps OSThreadId to Capability *
- */
-HashTable *capability_hash;
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+globalWorkToDo (void)
+{
+    return blackholes_need_checking
+       || interrupted
+       ;
+}
 #endif
 
-#ifdef SMP
-#define UNUSED_IF_NOT_SMP
-#else
-#define UNUSED_IF_NOT_SMP STG_UNUSED
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+anyWorkForMe( Capability *cap, Task *task )
+{
+    if (task->tso != NULL) {
+       // A bound task only runs if its thread is on the run queue of
+       // the capability on which it was woken up.  Otherwise, we
+       // can't be sure that we have the right capability: the thread
+       // might be woken up on some other capability, and task->cap
+       // could change under our feet.
+       return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
+    } else {
+       // A vanilla worker task runs if either there is a lightweight
+       // thread at the head of the run queue, or the run queue is
+       // empty and (there are sparks to execute, or there is some
+       // other global condition to check, such as threads blocked on
+       // blackholes).
+       if (emptyRunQueue(cap)) {
+           return !emptySparkPoolCap(cap) || globalWorkToDo();
+       } else
+           return cap->run_queue_hd->bound == NULL;
+    }
+}
 #endif
 
+/* -----------------------------------------------------------------------------
+ * Manage the returning_tasks lists.
+ *
+ * These functions require cap->lock
+ * -------------------------------------------------------------------------- */
 
-#if defined(RTS_SUPPORTS_THREADS)
-INLINE_HEADER rtsBool
-ANY_WORK_FOR_ME( Condition *cond )
+#if defined(THREADED_RTS)
+STATIC_INLINE void
+newReturningTask (Capability *cap, Task *task)
 {
-    // If the run queue is not empty, then we only wake up the guy who
-    // can run the thread at the head, even if there is some other
-    // reason for this task to run (eg. interrupted=rtsTrue).
-    if (!EMPTY_RUN_QUEUE()) {
-       if (run_queue_hd->main == NULL) {
-           return (cond == NULL);
-       } else {
-           return (&run_queue_hd->main->bound_thread_cond == cond);
-       }
+    ASSERT_LOCK_HELD(&cap->lock);
+    ASSERT(task->return_link == NULL);
+    if (cap->returning_tasks_hd) {
+       ASSERT(cap->returning_tasks_tl->return_link == NULL);
+       cap->returning_tasks_tl->return_link = task;
+    } else {
+       cap->returning_tasks_hd = task;
     }
-
-    return blackholes_need_checking
-       || interrupted
-#if defined(RTS_USER_SIGNALS)
-       || signals_pending()
-#endif
-       ;
+    cap->returning_tasks_tl = task;
 }
-#endif
 
-INLINE_HEADER rtsBool
-ANY_WORK_TO_DO(void) 
+STATIC_INLINE Task *
+popReturningTask (Capability *cap)
 {
-    return (!EMPTY_RUN_QUEUE() 
-           || interrupted
-           || blackholes_need_checking
-#if defined(RTS_USER_SIGNALS)
-           || signals_pending()
-#endif
-       );
+    ASSERT_LOCK_HELD(&cap->lock);
+    Task *task;
+    task = cap->returning_tasks_hd;
+    ASSERT(task);
+    cap->returning_tasks_hd = task->return_link;
+    if (!cap->returning_tasks_hd) {
+       cap->returning_tasks_tl = NULL;
+    }
+    task->return_link = NULL;
+    return task;
 }
+#endif
 
 /* ----------------------------------------------------------------------------
-   Initialisation
-   ------------------------------------------------------------------------- */
+ * Initialisation
+ *
+ * The Capability is initially marked not free.
+ * ------------------------------------------------------------------------- */
 
 static void
-initCapability( Capability *cap )
+initCapability( Capability *cap, nat i )
 {
-    cap->r.rInHaskell      = rtsFalse;
+    nat g;
+
+    cap->no = i;
+    cap->in_haskell        = rtsFalse;
+
+    cap->run_queue_hd      = END_TSO_QUEUE;
+    cap->run_queue_tl      = END_TSO_QUEUE;
+
+#if defined(THREADED_RTS)
+    initMutex(&cap->lock);
+    cap->running_task      = NULL; // indicates cap is free
+    cap->spare_workers     = NULL;
+    cap->suspended_ccalling_tasks = NULL;
+    cap->returning_tasks_hd = NULL;
+    cap->returning_tasks_tl = NULL;
+#endif
+
     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
     cap->f.stgGCFun        = (F_)__stg_gc_fun;
+
+    cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
+                                    RtsFlags.GcFlags.generations,
+                                    "initCapability");
+
+    for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+       cap->mut_lists[g] = NULL;
+    }
+
+    cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
+    cap->free_trec_chunks = END_STM_CHUNK_LIST;
+    cap->free_trec_headers = NO_TREC;
+    cap->transaction_tokens = 0;
 }
 
 /* ---------------------------------------------------------------------------
  * Function:  initCapabilities()
  *
- * Purpose:   set up the Capability handling. For the SMP build,
+ * Purpose:   set up the Capability handling. For the THREADED_RTS build,
  *            we keep a table of them, the size of which is
- *            controlled by the user via the RTS flag RtsFlags.ParFlags.nNodes
+ *            controlled by the user via the RTS flag -N.
  *
  * ------------------------------------------------------------------------- */
 void
 initCapabilities( void )
 {
-#if defined(SMP)
-    nat i,n;
+#if defined(THREADED_RTS)
+    nat i;
+
+#ifndef REG_Base
+    // We can't support multiple CPUs if BaseReg is not a register
+    if (RtsFlags.ParFlags.nNodes > 1) {
+       errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
+       RtsFlags.ParFlags.nNodes = 1;
+    }
+#endif
 
-    n = RtsFlags.ParFlags.nNodes;
-    capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
+    n_capabilities = RtsFlags.ParFlags.nNodes;
 
-    for (i = 0; i < n; i++) {
-       initCapability(&capabilities[i]);
-       capabilities[i].link = &capabilities[i+1];
+    if (n_capabilities == 1) {
+       capabilities = &MainCapability;
+       // THREADED_RTS must work on builds that don't have a mutable
+       // BaseReg (eg. unregisterised), so in this case
+       // capabilities[0] must coincide with &MainCapability.
+    } else {
+       capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
+                                     "initCapabilities");
     }
-    capabilities[n-1].link = NULL;
-    
-    free_capabilities = &capabilities[0];
-    rts_n_free_capabilities = n;
-
-    capability_hash = allocHashTable();
 
-    IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
-#else
-    capabilities = &MainCapability;
-    initCapability(&MainCapability);
-    rts_n_free_capabilities = 1;
-#endif
+    for (i = 0; i < n_capabilities; i++) {
+       initCapability(&capabilities[i], i);
+    }
 
-#if defined(RTS_SUPPORTS_THREADS)
-    initCondition(&returning_worker_cond);
-    initCondition(&thread_ready_cond);
-#endif
-}
+    IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", 
+                                   n_capabilities));
 
-/* ----------------------------------------------------------------------------
-   grabCapability( Capability** )
+#else /* !THREADED_RTS */
 
-   (only externally visible when !RTS_SUPPORTS_THREADS.  In the
-   threaded RTS, clients must use waitFor*Capability()).
-   ------------------------------------------------------------------------- */
+    n_capabilities = 1;
+    capabilities = &MainCapability;
+    initCapability(&MainCapability, 0);
 
-#if defined(RTS_SUPPORTS_THREADS)
-static
-#endif
-void
-grabCapability( Capability** cap )
-{
-#if defined(SMP)
-  ASSERT(rts_n_free_capabilities > 0);
-  *cap = free_capabilities;
-  free_capabilities = (*cap)->link;
-  rts_n_free_capabilities--;
-  insertHashTable(capability_hash, osThreadId(), *cap);
-#else
-# if defined(RTS_SUPPORTS_THREADS)
-  ASSERT(rts_n_free_capabilities == 1);
-  rts_n_free_capabilities = 0;
-# endif
-  *cap = &MainCapability;
-#endif
-#if defined(RTS_SUPPORTS_THREADS)
-  IF_DEBUG(scheduler, sched_belch("worker: got capability"));
 #endif
+
+    // There are no free capabilities to begin with.  We will start
+    // a worker Task to each Capability, which will quickly put the
+    // Capability on the free list when it finds nothing to do.
+    last_free_capability = &capabilities[0];
 }
 
 /* ----------------------------------------------------------------------------
- * Function:  myCapability(void)
+ * Give a Capability to a Task.  The task must currently be sleeping
+ * on its condition variable.
+ *
+ * Requires cap->lock (modifies cap->running_task).
+ *
+ * When migrating a Task, the migrater must take task->lock before
+ * modifying task->cap, to synchronise with the waking up Task.
+ * Additionally, the migrater should own the Capability (when
+ * migrating the run queue), or cap->lock (when migrating
+ * returning_workers).
  *
- * Purpose:   Return the capability owned by the current thread.
- *            Should not be used if the current thread does not 
- *            hold a Capability.
  * ------------------------------------------------------------------------- */
-Capability *
-myCapability (void)
+
+#if defined(THREADED_RTS)
+STATIC_INLINE void
+giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
 {
-#if defined(SMP)
-    return lookupHashTable(capability_hash, osThreadId());
-#else
-    return &MainCapability;
-#endif
+    ASSERT_LOCK_HELD(&cap->lock);
+    ASSERT(task->cap == cap);
+    IF_DEBUG(scheduler,
+            sched_belch("passing capability %d to %s %p",
+                        cap->no, task->tso ? "bound task" : "worker",
+                        (void *)task->id));
+    ACQUIRE_LOCK(&task->lock);
+    task->wakeup = rtsTrue;
+    // the wakeup flag is needed because signalCondition() doesn't
+    // flag the condition if the thread is already runniing, but we want
+    // it to be sticky.
+    signalCondition(&task->cond);
+    RELEASE_LOCK(&task->lock);
 }
+#endif
 
 /* ----------------------------------------------------------------------------
  * Function:  releaseCapability(Capability*)
@@ -240,215 +251,382 @@ myCapability (void)
  *            to wake up, in that order.
  * ------------------------------------------------------------------------- */
 
+#if defined(THREADED_RTS)
 void
-releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
+releaseCapability_ (Capability* cap)
 {
-    // Precondition: sched_mutex is held.
-#if defined(RTS_SUPPORTS_THREADS)
-#if !defined(SMP)
-    ASSERT(rts_n_free_capabilities == 0);
-#endif
-#if defined(SMP)
-    cap->link = free_capabilities;
-    free_capabilities = cap;
-    ASSERT(myCapability() == cap);
-    removeHashTable(capability_hash, osThreadId(), NULL);
-#endif
+    Task *task;
+
+    task = cap->running_task;
+
+    ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
+
+    cap->running_task = NULL;
+
     // Check to see whether a worker thread can be given
     // the go-ahead to return the result of an external call..
-    if (rts_n_waiting_workers > 0) {
-       // Decrement the counter here to avoid livelock where the
-       // thread that is yielding its capability will repeatedly
-       // signal returning_worker_cond.
-       rts_n_waiting_workers--;
-       signalCondition(&returning_worker_cond);
-       IF_DEBUG(scheduler, 
-                sched_belch("worker: released capability to returning worker"));
-    } else {
-       rts_n_free_capabilities++;
-       IF_DEBUG(scheduler, sched_belch("worker: released capability"));
-       threadRunnable();
+    if (cap->returning_tasks_hd != NULL) {
+       giveCapabilityToTask(cap,cap->returning_tasks_hd);
+       // The Task pops itself from the queue (see waitForReturnCapability())
+       return;
     }
-#endif
-    return;
+
+    // If the next thread on the run queue is a bound thread,
+    // give this Capability to the appropriate Task.
+    if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
+       // Make sure we're not about to try to wake ourselves up
+       ASSERT(task != cap->run_queue_hd->bound);
+       task = cap->run_queue_hd->bound;
+       giveCapabilityToTask(cap,task);
+       return;
+    }
+
+    if (!cap->spare_workers) {
+       // Create a worker thread if we don't have one.  If the system
+       // is interrupted, we only create a worker task if there
+       // are threads that need to be completed.  If the system is
+       // shutting down, we never create a new worker.
+       if (!shutting_down_scheduler) {
+           IF_DEBUG(scheduler,
+                    sched_belch("starting new worker on capability %d", cap->no));
+           startWorkerTask(cap, workerStart);
+           return;
+       }
+    }
+
+    // If we have an unbound thread on the run queue, or if there's
+    // anything else to do, give the Capability to a worker thread.
+    if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+       if (cap->spare_workers) {
+           giveCapabilityToTask(cap,cap->spare_workers);
+           // The worker Task pops itself from the queue;
+           return;
+       }
+    }
+
+    last_free_capability = cap;
+    IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
 }
 
-#if defined(RTS_SUPPORTS_THREADS)
-/*
- * When a native thread has completed the execution of an external
- * call, it needs to communicate the result back. This is done
- * as follows:
- *
- *  - in resumeThread(), the thread calls waitForReturnCapability().
- *  - If no capabilities are readily available, waitForReturnCapability()
- *    increments a counter rts_n_waiting_workers, and blocks
- *    waiting for the condition returning_worker_cond to become
- *    signalled.
- *  - upon entry to the Scheduler, a worker thread checks the
- *    value of rts_n_waiting_workers. If > 0, the worker thread
- *    will yield its capability to let a returning worker thread
- *    proceed with returning its result -- this is done via
- *    yieldToReturningWorker().
- *  - the worker thread that yielded its capability then tries
- *    to re-grab a capability and re-enter the Scheduler.
- */
+void
+releaseCapability (Capability* cap USED_IF_THREADS)
+{
+    ACQUIRE_LOCK(&cap->lock);
+    releaseCapability_(cap);
+    RELEASE_LOCK(&cap->lock);
+}
+
+static void
+releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
+{
+    Task *task;
+
+    ACQUIRE_LOCK(&cap->lock);
+
+    task = cap->running_task;
+
+    // If the current task is a worker, save it on the spare_workers
+    // list of this Capability.  A worker can mark itself as stopped,
+    // in which case it is not replaced on the spare_worker queue.
+    // This happens when the system is shutting down (see
+    // Schedule.c:workerStart()).
+    // Also, be careful to check that this task hasn't just exited
+    // Haskell to do a foreign call (task->suspended_tso).
+    if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) {
+       task->next = cap->spare_workers;
+       cap->spare_workers = task;
+    }
+    // Bound tasks just float around attached to their TSOs.
+
+    releaseCapability_(cap);
+
+    RELEASE_LOCK(&cap->lock);
+}
+#endif
 
 /* ----------------------------------------------------------------------------
- * waitForReturnCapability( Mutext *pMutex, Capability** )
+ * waitForReturnCapability( Task *task )
  *
  * Purpose:  when an OS thread returns from an external call,
- * it calls grabReturnCapability() (via Schedule.resumeThread())
- * to wait for permissions to enter the RTS & communicate the
+ * it calls waitForReturnCapability() (via Schedule.resumeThread())
+ * to wait for permission to enter the RTS & communicate the
  * result of the external call back to the Haskell thread that
  * made it.
  *
  * ------------------------------------------------------------------------- */
-
 void
-waitForReturnCapability( Mutex* pMutex, Capability** pCap )
+waitForReturnCapability (Capability **pCap, Task *task)
 {
-    // Pre-condition: pMutex is held.
-
-    IF_DEBUG(scheduler, 
-            sched_belch("worker: returning; workers waiting: %d",
-                        rts_n_waiting_workers));
-
-    if ( noCapabilities() ) {
-       rts_n_waiting_workers++;
-       context_switch = 1;     // make sure it's our turn soon
-       waitCondition(&returning_worker_cond, pMutex);
-#if defined(SMP)
-       *pCap = free_capabilities;
-       free_capabilities = (*pCap)->link;
-       ASSERT(pCap != NULL);
-       insertHashTable(capability_hash, osThreadId(), *pCap);
+#if !defined(THREADED_RTS)
+
+    MainCapability.running_task = task;
+    task->cap = &MainCapability;
+    *pCap = &MainCapability;
+
 #else
-       *pCap = &MainCapability;
-       ASSERT(rts_n_free_capabilities == 0);
-#endif
+    Capability *cap = *pCap;
+
+    if (cap == NULL) {
+       // Try last_free_capability first
+       cap = last_free_capability;
+       if (!cap->running_task) {
+           nat i;
+           // otherwise, search for a free capability
+           for (i = 0; i < n_capabilities; i++) {
+               cap = &capabilities[i];
+               if (!cap->running_task) {
+                   break;
+               }
+           }
+           // Can't find a free one, use last_free_capability.
+           cap = last_free_capability;
+       }
+
+       // record the Capability as the one this Task is now assocated with.
+       task->cap = cap;
+
     } else {
-       grabCapability(pCap);
+       ASSERT(task->cap == cap);
     }
 
-    // Post-condition: pMutex is held, pCap points to a capability
-    // which is now held by the current thread.
-    return;
-}
+    ACQUIRE_LOCK(&cap->lock);
 
+    IF_DEBUG(scheduler,
+            sched_belch("returning; I want capability %d", cap->no));
+
+    if (!cap->running_task) {
+       // It's free; just grab it
+       cap->running_task = task;
+       RELEASE_LOCK(&cap->lock);
+    } else {
+       newReturningTask(cap,task);
+       RELEASE_LOCK(&cap->lock);
+
+       for (;;) {
+           ACQUIRE_LOCK(&task->lock);
+           // task->lock held, cap->lock not held
+           if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+           cap = task->cap;
+           task->wakeup = rtsFalse;
+           RELEASE_LOCK(&task->lock);
+
+           // now check whether we should wake up...
+           ACQUIRE_LOCK(&cap->lock);
+           if (cap->running_task == NULL) {
+               if (cap->returning_tasks_hd != task) {
+                   giveCapabilityToTask(cap,cap->returning_tasks_hd);
+                   RELEASE_LOCK(&cap->lock);
+                   continue;
+               }
+               cap->running_task = task;
+               popReturningTask(cap);
+               RELEASE_LOCK(&cap->lock);
+               break;
+           }
+           RELEASE_LOCK(&cap->lock);
+       }
+
+    }
+
+    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+
+    IF_DEBUG(scheduler,
+            sched_belch("returning; got capability %d", cap->no));
+
+    *pCap = cap;
+#endif
+}
 
+#if defined(THREADED_RTS)
 /* ----------------------------------------------------------------------------
- * yieldCapability( Mutex* pMutex, Capability** pCap )
+ * yieldCapability
  * ------------------------------------------------------------------------- */
 
 void
-yieldCapability( Capability** pCap, Condition *cond )
+yieldCapability (Capability** pCap, Task *task)
 {
-    // Pre-condition:  pMutex is assumed held, the current thread
-    // holds the capability pointed to by pCap.
-
-    if ( rts_n_waiting_workers > 0 || !ANY_WORK_FOR_ME(cond)) {
-       IF_DEBUG(scheduler, 
-                if (rts_n_waiting_workers > 0) {
-                    sched_belch("worker: giving up capability (returning wkr)");
-                } else if (!EMPTY_RUN_QUEUE()) {
-                    sched_belch("worker: giving up capability (passing capability)");
-                } else {
-                    sched_belch("worker: giving up capability (no threads to run)");
-                }
-           );
-       releaseCapability(*pCap);
-       *pCap = NULL;
+    Capability *cap = *pCap;
+
+    // The fast path has no locking, if we don't enter this while loop
+
+    while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
+       IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
+
+       // We must now release the capability and wait to be woken up
+       // again.
+       task->wakeup = rtsFalse;
+       releaseCapabilityAndQueueWorker(cap);
+
+       for (;;) {
+           ACQUIRE_LOCK(&task->lock);
+           // task->lock held, cap->lock not held
+           if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+           cap = task->cap;
+           task->wakeup = rtsFalse;
+           RELEASE_LOCK(&task->lock);
+
+           IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
+           ACQUIRE_LOCK(&cap->lock);
+           if (cap->running_task != NULL) {
+               IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
+               RELEASE_LOCK(&cap->lock);
+               continue;
+           }
+
+           if (task->tso == NULL) {
+               ASSERT(cap->spare_workers != NULL);
+               // if we're not at the front of the queue, release it
+               // again.  This is unlikely to happen.
+               if (cap->spare_workers != task) {
+                   giveCapabilityToTask(cap,cap->spare_workers);
+                   RELEASE_LOCK(&cap->lock);
+                   continue;
+               }
+               cap->spare_workers = task->next;
+               task->next = NULL;
+           }
+           cap->running_task = task;
+           RELEASE_LOCK(&cap->lock);
+           break;
+       }
+
+       IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no));
+       ASSERT(cap->running_task == task);
     }
 
-    // Post-condition:  either:
-    //
-    //  1. *pCap is NULL, in which case the current thread does not
-    //     hold a capability now, or
-    //  2. *pCap is not NULL, in which case the current thread still
-    //     holds the capability.
-    //
+    *pCap = cap;
+
+    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+
     return;
 }
 
-
 /* ----------------------------------------------------------------------------
- * waitForCapability( Mutex*, Capability**, Condition* )
+ * prodCapabilities
  *
- * Purpose:  wait for a Capability to become available. In
- *           the process of doing so, updates the number
- *           of tasks currently blocked waiting for a capability/more
- *           work. That counter is used when deciding whether or
- *           not to create a new worker thread when an external
- *           call is made.
- *           If pThreadCond is not NULL, a capability can be specifically
- *           passed to this thread.
+ * Used to indicate that the interrupted flag is now set, or some
+ * other global condition that might require waking up a Task on each
+ * Capability.
  * ------------------------------------------------------------------------- */
-void
-waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond )
+
+static void
+prodCapabilities(rtsBool all)
 {
-    // Pre-condition: pMutex is held.
-
-    while ( noCapabilities() || !ANY_WORK_FOR_ME(pThreadCond)) {
-       IF_DEBUG(scheduler,
-                sched_belch("worker: wait for capability (cond: %p)",
-                            pThreadCond));
-
-       if (pThreadCond != NULL) {
-           waitCondition(pThreadCond, pMutex);
-           IF_DEBUG(scheduler, sched_belch("worker: get passed capability"));
-       } else {
-           rts_n_waiting_tasks++;
-           waitCondition(&thread_ready_cond, pMutex);
-           rts_n_waiting_tasks--;
-           IF_DEBUG(scheduler, sched_belch("worker: get normal capability"));
+    nat i;
+    Capability *cap;
+    Task *task;
+
+    for (i=0; i < n_capabilities; i++) {
+       cap = &capabilities[i];
+       ACQUIRE_LOCK(&cap->lock);
+       if (!cap->running_task) {
+           if (cap->spare_workers) {
+               task = cap->spare_workers;
+               ASSERT(!task->stopped);
+               giveCapabilityToTask(cap,task);
+               if (!all) {
+                   RELEASE_LOCK(&cap->lock);
+                   return;
+               }
+           }
        }
+       RELEASE_LOCK(&cap->lock);
     }
-    grabCapability(pCap);
-
-    // Post-condition: pMutex is held and *pCap is held by the current thread
     return;
 }
 
-#endif /* RTS_SUPPORTS_THREADS */
+void
+prodAllCapabilities (void)
+{
+    prodCapabilities(rtsTrue);
+}
 
 /* ----------------------------------------------------------------------------
-   threadRunnable()
+ * prodOneCapability
+ *
+ * Like prodAllCapabilities, but we only require a single Task to wake
+ * up in order to service some global event, such as checking for
+ * deadlock after some idle time has passed.
+ * ------------------------------------------------------------------------- */
 
-   Signals that a thread has been placed on the run queue, so a worker
-   might need to be woken up to run it.
+void
+prodOneCapability (void)
+{
+    prodCapabilities(rtsFalse);
+}
+
+/* ----------------------------------------------------------------------------
+ * shutdownCapability
+ *
+ * At shutdown time, we want to let everything exit as cleanly as
+ * possible.  For each capability, we let its run queue drain, and
+ * allow the workers to stop.
+ *
+ * This function should be called when interrupted and
+ * shutting_down_scheduler = rtsTrue, thus any worker that wakes up
+ * will exit the scheduler and call taskStop(), and any bound thread
+ * that wakes up will return to its caller.  Runnable threads are
+ * killed.
+ *
+ * ------------------------------------------------------------------------- */
 
-   ToDo: should check whether the thread at the front of the queue is
-   bound, and if so wake up the appropriate worker.
-   -------------------------------------------------------------------------- */
 void
-threadRunnable ( void )
+shutdownCapability (Capability *cap, Task *task)
 {
-#if defined(RTS_SUPPORTS_THREADS)
-    if ( !noCapabilities() && ANY_WORK_TO_DO() ) {
-       if (!EMPTY_RUN_QUEUE() && run_queue_hd->main != NULL) {
-           signalCondition(&run_queue_hd->main->bound_thread_cond);
-           return;
+    nat i;
+
+    ASSERT(interrupted && shutting_down_scheduler);
+
+    task->cap = cap;
+
+    for (i = 0; i < 50; i++) {
+       IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i));
+       ACQUIRE_LOCK(&cap->lock);
+       if (cap->running_task) {
+           RELEASE_LOCK(&cap->lock);
+           IF_DEBUG(scheduler, sched_belch("not owner, yielding"));
+           yieldThread();
+           continue;
        }
-       if (rts_n_waiting_tasks > 0) {
-           signalCondition(&thread_ready_cond);
-       } else {
-           startSchedulerTaskIfNecessary();
+       cap->running_task = task;
+       if (!emptyRunQueue(cap) || cap->spare_workers) {
+           IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding"));
+           releaseCapability_(cap); // this will wake up a worker
+           RELEASE_LOCK(&cap->lock);
+           yieldThread();
+           continue;
        }
+       IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no));
+       RELEASE_LOCK(&cap->lock);
+       break;
     }
-#endif
+    // we now have the Capability, its run queue and spare workers
+    // list are both empty.
 }
 
-
 /* ----------------------------------------------------------------------------
-   prodWorker()
+ * tryGrabCapability
+ *
+ * Attempt to gain control of a Capability if it is free.
+ *
+ * ------------------------------------------------------------------------- */
 
-   Wake up... time to die.
-   -------------------------------------------------------------------------- */
-void
-prodWorker ( void )
+rtsBool
+tryGrabCapability (Capability *cap, Task *task)
 {
-#if defined(RTS_SUPPORTS_THREADS)
-    signalCondition(&thread_ready_cond);
-#endif
+    if (cap->running_task != NULL) return rtsFalse;
+    ACQUIRE_LOCK(&cap->lock);
+    if (cap->running_task != NULL) {
+       RELEASE_LOCK(&cap->lock);
+       return rtsFalse;
+    }
+    task->cap = cap;
+    cap->running_task = task;
+    RELEASE_LOCK(&cap->lock);
+    return rtsTrue;
 }
+
+
+#endif /* THREADED_RTS */
+
+