/* ---------------------------------------------------------------------------
*
- * (c) The GHC Team, 2003-2005
+ * (c) The GHC Team, 2003-2006
*
* Capabilities
*
* STG execution, a pointer to the capabilitity is kept in a
* register (BaseReg; actually it is a pointer to cap->r).
*
- * Only in an SMP build will there be multiple capabilities, for
- * the threaded RTS and other non-threaded builds, there is only
- * one global capability, namely MainCapability.
+ * Only in an THREADED_RTS build will there be multiple capabilities,
+ * for non-threaded builds there is only one global capability, namely
+ * MainCapability.
*
* --------------------------------------------------------------------------*/
#include "Rts.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
+#include "STM.h"
#include "OSThreads.h"
#include "Capability.h"
#include "Schedule.h"
+#include "Sparks.h"
-#if !defined(SMP)
-Capability MainCapability; // for non-SMP, we have one global capability
-#endif
+// one global capability, this is the Capability for non-threaded
+// builds, and for +RTS -N1
+Capability MainCapability;
nat n_capabilities;
Capability *capabilities = NULL;
// locking, so we don't do that.
Capability *last_free_capability;
-#ifdef SMP
-#define UNUSED_IF_NOT_SMP
-#else
-#define UNUSED_IF_NOT_SMP STG_UNUSED
-#endif
-
-#ifdef RTS_USER_SIGNALS
-#define UNUSED_IF_NOT_THREADS
-#else
-#define UNUSED_IF_NOT_THREADS STG_UNUSED
-#endif
-
-
+#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
return blackholes_need_checking
- || interrupted
-#if defined(RTS_USER_SIGNALS)
- || signals_pending()
-#endif
+ || sched_state >= SCHED_INTERRUPTING
;
}
+#endif
#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
anyWorkForMe( Capability *cap, Task *task )
{
- // If the run queue is not empty, then we only wake up the guy who
- // can run the thread at the head, even if there is some other
- // reason for this task to run (eg. interrupted=rtsTrue).
- if (!emptyRunQueue(cap)) {
- if (cap->run_queue_hd->bound == NULL) {
- return (task->tso == NULL);
- } else {
- return (cap->run_queue_hd->bound == task);
- }
+ if (task->tso != NULL) {
+ // A bound task only runs if its thread is on the run queue of
+ // the capability on which it was woken up. Otherwise, we
+ // can't be sure that we have the right capability: the thread
+ // might be woken up on some other capability, and task->cap
+ // could change under our feet.
+ return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
+ } else {
+ // A vanilla worker task runs if either there is a lightweight
+ // thread at the head of the run queue, or the run queue is
+ // empty and (there are sparks to execute, or there is some
+ // other global condition to check, such as threads blocked on
+ // blackholes).
+ if (emptyRunQueue(cap)) {
+ return !emptySparkPoolCap(cap) || globalWorkToDo();
+ } else
+ return cap->run_queue_hd->bound == NULL;
}
- return globalWorkToDo();
}
#endif
static void
initCapability( Capability *cap, nat i )
{
- nat g;
+ nat g;
cap->no = i;
cap->in_haskell = rtsFalse;
cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
RtsFlags.GcFlags.generations,
"initCapability");
- for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
- cap->mut_lists[g] = NULL;
+
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ cap->mut_lists[g] = NULL;
}
+
+ cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
+ cap->free_trec_chunks = END_STM_CHUNK_LIST;
+ cap->free_trec_headers = NO_TREC;
+ cap->transaction_tokens = 0;
}
/* ---------------------------------------------------------------------------
* Function: initCapabilities()
*
- * Purpose: set up the Capability handling. For the SMP build,
+ * Purpose: set up the Capability handling. For the THREADED_RTS build,
* we keep a table of them, the size of which is
* controlled by the user via the RTS flag -N.
*
void
initCapabilities( void )
{
-#if defined(SMP)
- nat i,n;
+#if defined(THREADED_RTS)
+ nat i;
- n_capabilities = n = RtsFlags.ParFlags.nNodes;
- capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
+#ifndef REG_Base
+ // We can't support multiple CPUs if BaseReg is not a register
+ if (RtsFlags.ParFlags.nNodes > 1) {
+ errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
+ RtsFlags.ParFlags.nNodes = 1;
+ }
+#endif
+
+ n_capabilities = RtsFlags.ParFlags.nNodes;
+
+ if (n_capabilities == 1) {
+ capabilities = &MainCapability;
+ // THREADED_RTS must work on builds that don't have a mutable
+ // BaseReg (eg. unregisterised), so in this case
+ // capabilities[0] must coincide with &MainCapability.
+ } else {
+ capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
+ "initCapabilities");
+ }
- for (i = 0; i < n; i++) {
+ for (i = 0; i < n_capabilities; i++) {
initCapability(&capabilities[i], i);
}
- IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
-#else
+ IF_DEBUG(scheduler, sched_belch("allocated %d capabilities",
+ n_capabilities));
+
+#else /* !THREADED_RTS */
+
n_capabilities = 1;
capabilities = &MainCapability;
initCapability(&MainCapability, 0);
+
#endif
// There are no free capabilities to begin with. We will start
#if defined(THREADED_RTS)
STATIC_INLINE void
-giveCapabilityToTask (Capability *cap, Task *task)
+giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
{
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->cap == cap);
- // We are not modifying task->cap, so we do not need to take task->lock.
IF_DEBUG(scheduler,
sched_belch("passing capability %d to %s %p",
cap->no, task->tso ? "bound task" : "worker",
task = cap->running_task;
- ASSERT_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
cap->running_task = NULL;
return;
}
- // If we have an unbound thread on the run queue, or if there's
- // anything else to do, give the Capability to a worker thread.
- if (!emptyRunQueue(cap) || globalWorkToDo()) {
- if (cap->spare_workers) {
- giveCapabilityToTask(cap,cap->spare_workers);
- // The worker Task pops itself from the queue;
- return;
- }
-
+ if (!cap->spare_workers) {
// Create a worker thread if we don't have one. If the system
// is interrupted, we only create a worker task if there
// are threads that need to be completed. If the system is
// shutting down, we never create a new worker.
- if (!shutting_down_scheduler) {
+ if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
IF_DEBUG(scheduler,
sched_belch("starting new worker on capability %d", cap->no));
startWorkerTask(cap, workerStart);
}
}
+ // If we have an unbound thread on the run queue, or if there's
+ // anything else to do, give the Capability to a worker thread.
+ if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+ if (cap->spare_workers) {
+ giveCapabilityToTask(cap,cap->spare_workers);
+ // The worker Task pops itself from the queue;
+ return;
+ }
+ }
+
last_free_capability = cap;
IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
}
void
-releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
+releaseCapability (Capability* cap USED_IF_THREADS)
{
ACQUIRE_LOCK(&cap->lock);
releaseCapability_(cap);
}
static void
-releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
+releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
{
Task *task;
*
* ------------------------------------------------------------------------- */
void
-waitForReturnCapability (Capability **pCap,
- Task *task UNUSED_IF_NOT_THREADS)
+waitForReturnCapability (Capability **pCap, Task *task)
{
#if !defined(THREADED_RTS)
}
- ASSERT_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
IF_DEBUG(scheduler,
sched_belch("returning; got capability %d", cap->no));
{
Capability *cap = *pCap;
- // The fast path; no locking
- if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) )
- return;
+ // The fast path has no locking, if we don't enter this while loop
while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
// We must now release the capability and wait to be woken up
// again.
+ task->wakeup = rtsFalse;
releaseCapabilityAndQueueWorker(cap);
for (;;) {
IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task != NULL) {
+ IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
RELEASE_LOCK(&cap->lock);
continue;
}
*pCap = cap;
- ASSERT_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
return;
}
}
RELEASE_LOCK(&cap->lock);
}
+ return;
}
void
{
nat i;
- ASSERT(interrupted && shutting_down_scheduler);
+ ASSERT(sched_state == SCHED_SHUTTING_DOWN);
task->cap = cap;