/* ---------------------------------------------------------------------------
*
- * (c) The GHC Team, 2003-2005
+ * (c) The GHC Team, 2003-2006
*
* Capabilities
*
* STG execution, a pointer to the capabilitity is kept in a
* register (BaseReg; actually it is a pointer to cap->r).
*
- * Only in an SMP build will there be multiple capabilities, for
- * the threaded RTS and other non-threaded builds, there is only
- * one global capability, namely MainCapability.
- *
+ * Only in an THREADED_RTS build will there be multiple capabilities,
+ * for non-threaded builds there is only one global capability, namely
+ * MainCapability.
+ *
* --------------------------------------------------------------------------*/
#include "PosixSource.h"
#include "Rts.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
+#include "STM.h"
#include "OSThreads.h"
#include "Capability.h"
#include "Schedule.h"
+#include "Sparks.h"
-#if !defined(SMP)
-Capability MainCapability; // for non-SMP, we have one global capability
-#endif
+// one global capability, this is the Capability for non-threaded
+// builds, and for +RTS -N1
+Capability MainCapability;
nat n_capabilities;
Capability *capabilities = NULL;
// locking, so we don't do that.
Capability *last_free_capability;
-#ifdef SMP
-#define UNUSED_IF_NOT_SMP
-#else
-#define UNUSED_IF_NOT_SMP STG_UNUSED
-#endif
-
-#ifdef RTS_USER_SIGNALS
-#define UNUSED_IF_NOT_THREADS
-#else
-#define UNUSED_IF_NOT_THREADS STG_UNUSED
-#endif
-
-
+#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
return blackholes_need_checking
|| interrupted
-#if defined(RTS_USER_SIGNALS)
- || signals_pending()
-#endif
;
}
+#endif
#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
} else {
return (cap->run_queue_hd->bound == task);
}
+ } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
+ return rtsTrue;
}
return globalWorkToDo();
}
/* -----------------------------------------------------------------------------
* Manage the returning_tasks lists.
- *
+ *
* These functions require cap->lock
* -------------------------------------------------------------------------- */
static void
initCapability( Capability *cap, nat i )
{
+ nat g;
+
cap->no = i;
cap->in_haskell = rtsFalse;
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
cap->f.stgGCFun = (F_)__stg_gc_fun;
+
+ cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
+ RtsFlags.GcFlags.generations,
+ "initCapability");
+
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ cap->mut_lists[g] = NULL;
+ }
+
+ cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
+ cap->free_trec_chunks = END_STM_CHUNK_LIST;
+ cap->free_trec_headers = NO_TREC;
+ cap->transaction_tokens = 0;
}
/* ---------------------------------------------------------------------------
* Function: initCapabilities()
*
- * Purpose: set up the Capability handling. For the SMP build,
+ * Purpose: set up the Capability handling. For the THREADED_RTS build,
* we keep a table of them, the size of which is
* controlled by the user via the RTS flag -N.
*
void
initCapabilities( void )
{
-#if defined(SMP)
- nat i,n;
+#if defined(THREADED_RTS)
+ nat i;
- n_capabilities = n = RtsFlags.ParFlags.nNodes;
- capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
+#ifndef REG_Base
+ // We can't support multiple CPUs if BaseReg is not a register
+ if (RtsFlags.ParFlags.nNodes > 1) {
+ errorBelch("warning: multiple CPUs not supported in this build, reverting to 1");
+ RtsFlags.ParFlags.nNodes = 1;
+ }
+#endif
+
+ n_capabilities = RtsFlags.ParFlags.nNodes;
+
+ if (n_capabilities == 1) {
+ capabilities = &MainCapability;
+ // THREADED_RTS must work on builds that don't have a mutable
+ // BaseReg (eg. unregisterised), so in this case
+ // capabilities[0] must coincide with &MainCapability.
+ } else {
+ capabilities = stgMallocBytes(n_capabilities * sizeof(Capability),
+ "initCapabilities");
+ }
- for (i = 0; i < n; i++) {
+ for (i = 0; i < n_capabilities; i++) {
initCapability(&capabilities[i], i);
}
-
- IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
-#else
+
+ IF_DEBUG(scheduler, sched_belch("allocated %d capabilities",
+ n_capabilities));
+
+#else /* !THREADED_RTS */
+
n_capabilities = 1;
capabilities = &MainCapability;
initCapability(&MainCapability, 0);
+
#endif
// There are no free capabilities to begin with. We will start
#if defined(THREADED_RTS)
STATIC_INLINE void
-giveCapabilityToTask (Capability *cap, Task *task)
+giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
{
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->cap == cap);
- // We are not modifying task->cap, so we do not need to take task->lock.
- IF_DEBUG(scheduler,
+ IF_DEBUG(scheduler,
sched_belch("passing capability %d to %s %p",
- cap->no, task->tso ? "bound task" : "worker",
+ cap->no, task->tso ? "bound task" : "worker",
(void *)task->id));
ACQUIRE_LOCK(&task->lock);
task->wakeup = rtsTrue;
task = cap->running_task;
- ASSERT_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
cap->running_task = NULL;
giveCapabilityToTask(cap,cap->returning_tasks_hd);
// The Task pops itself from the queue (see waitForReturnCapability())
return;
- }
+ }
// If the next thread on the run queue is a bound thread,
// give this Capability to the appropriate Task.
task = cap->run_queue_hd->bound;
giveCapabilityToTask(cap,task);
return;
- }
-
- // If we have an unbound thread on the run queue, or if there's
- // anything else to do, give the Capability to a worker thread.
- if (!emptyRunQueue(cap) || globalWorkToDo()) {
- if (cap->spare_workers) {
- giveCapabilityToTask(cap,cap->spare_workers);
- // The worker Task pops itself from the queue;
- return;
- }
+ }
+ if (!cap->spare_workers) {
// Create a worker thread if we don't have one. If the system
// is interrupted, we only create a worker task if there
// are threads that need to be completed. If the system is
// shutting down, we never create a new worker.
if (!shutting_down_scheduler) {
- IF_DEBUG(scheduler,
+ IF_DEBUG(scheduler,
sched_belch("starting new worker on capability %d", cap->no));
startWorkerTask(cap, workerStart);
return;
}
}
+ // If we have an unbound thread on the run queue, or if there's
+ // anything else to do, give the Capability to a worker thread.
+ if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+ if (cap->spare_workers) {
+ giveCapabilityToTask(cap,cap->spare_workers);
+ // The worker Task pops itself from the queue;
+ return;
+ }
+ }
+
last_free_capability = cap;
IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no));
}
void
-releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS)
+releaseCapability (Capability* cap USED_IF_THREADS)
{
ACQUIRE_LOCK(&cap->lock);
releaseCapability_(cap);
}
static void
-releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS)
+releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
{
Task *task;
*
* ------------------------------------------------------------------------- */
void
-waitForReturnCapability (Capability **pCap,
- Task *task UNUSED_IF_NOT_THREADS)
+waitForReturnCapability (Capability **pCap, Task *task)
{
#if !defined(THREADED_RTS)
ACQUIRE_LOCK(&cap->lock);
- IF_DEBUG(scheduler,
+ IF_DEBUG(scheduler,
sched_belch("returning; I want capability %d", cap->no));
if (!cap->running_task) {
}
- ASSERT_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
- IF_DEBUG(scheduler,
+ IF_DEBUG(scheduler,
sched_belch("returning; got capability %d", cap->no));
*pCap = cap;
{
Capability *cap = *pCap;
- // The fast path; no locking
- if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) )
- return;
+ // The fast path has no locking, if we don't enter this while loop
while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no));
// We must now release the capability and wait to be woken up
- // again.
+ // again.
+ task->wakeup = rtsFalse;
releaseCapabilityAndQueueWorker(cap);
for (;;) {
IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no));
ACQUIRE_LOCK(&cap->lock);
if (cap->running_task != NULL) {
+ IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no));
RELEASE_LOCK(&cap->lock);
continue;
}
*pCap = cap;
- ASSERT_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
return;
}
nat i;
Capability *cap;
Task *task;
-
+
for (i=0; i < n_capabilities; i++) {
cap = &capabilities[i];
ACQUIRE_LOCK(&cap->lock);
}
RELEASE_LOCK(&cap->lock);
}
+ return;
}
void
prodOneCapability (void)
{
prodCapabilities(rtsFalse);
-}
+}
/* ----------------------------------------------------------------------------
* shutdownCapability
* will exit the scheduler and call taskStop(), and any bound thread
* that wakes up will return to its caller. Runnable threads are
* killed.
- *
+ *
* ------------------------------------------------------------------------- */
void
// list are both empty.
}
+/* ----------------------------------------------------------------------------
+ * tryGrabCapability
+ *
+ * Attempt to gain control of a Capability if it is free.
+ *
+ * ------------------------------------------------------------------------- */
+
+rtsBool
+tryGrabCapability (Capability *cap, Task *task)
+{
+ if (cap->running_task != NULL) return rtsFalse;
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task != NULL) {
+ RELEASE_LOCK(&cap->lock);
+ return rtsFalse;
+ }
+ task->cap = cap;
+ cap->running_task = task;
+ RELEASE_LOCK(&cap->lock);
+ return rtsTrue;
+}
+
+
#endif /* THREADED_RTS */