#include "Schedule.h"
#include "Sparks.h"
#include "Trace.h"
+#include "GC.h"
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
/* GC indicator, in scope for the scheduler, init'ed to false */
volatile StgWord waiting_for_gc = 0;
+/* Let foreign code get the current Capability -- assuming there is one!
+ * This is useful for unsafe foreign calls because they are called with
+ * the current Capability held, but they are not passed it. For example,
+ * see see the integer-gmp package which calls allocateLocal() in its
+ * stgAllocForGMP() function (which gets called by gmp functions).
+ * */
+Capability * rts_unsafeGetMyCapability (void)
+{
+#if defined(THREADED_RTS)
+ return myTask()->cap;
+#else
+ return &MainCapability;
+#endif
+}
+
#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
globalWorkToDo (void)
#endif
#if defined(THREADED_RTS)
-rtsBool
-stealWork (Capability *cap)
+StgClosure *
+findSpark (Capability *cap)
{
- /* use the normal Sparks.h interface (internally modified to enable
- concurrent stealing)
- and immediately turn the spark into a thread when successful
- */
Capability *robbed;
StgClosurePtr spark;
rtsBool retry;
nat i = 0;
+ if (!emptyRunQueue(cap)) {
+ // If there are other threads, don't try to run any new
+ // sparks: sparks might be speculative, we don't want to take
+ // resources away from the main computation.
+ return 0;
+ }
+
+ // first try to get a spark from our own pool.
+ // We should be using reclaimSpark(), because it works without
+ // needing any atomic instructions:
+ // spark = reclaimSpark(cap->sparks);
+ // However, measurements show that this makes at least one benchmark
+ // slower (prsa) and doesn't affect the others.
+ spark = tryStealSpark(cap);
+ if (spark != NULL) {
+ cap->sparks_converted++;
+
+ // Post event for running a spark from capability's own pool.
+ postEvent(cap, EVENT_RUN_SPARK, cap->r.rCurrentTSO->id, 0);
+
+ return spark;
+ }
+
+ if (n_capabilities == 1) { return NULL; } // makes no sense...
+
debugTrace(DEBUG_sched,
"cap %d: Trying to steal work from other capabilities",
cap->no);
- if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
-
do {
retry = rtsFalse;
if (emptySparkPoolCap(robbed)) // nothing to steal here
continue;
- spark = tryStealSpark(robbed->sparks);
+ spark = tryStealSpark(robbed);
if (spark == NULL && !emptySparkPoolCap(robbed)) {
// we conflicted with another thread while trying to steal;
// try again later.
debugTrace(DEBUG_sched,
"cap %d: Stole a spark from capability %d",
cap->no, robbed->no);
+ cap->sparks_converted++;
- createSparkThread(cap,spark);
- return rtsTrue;
+ postEvent(cap, EVENT_STEAL_SPARK,
+ cap->r.rCurrentTSO->id, robbed->no);
+
+
+ return spark;
}
// otherwise: no success, try next one
}
} while (retry);
debugTrace(DEBUG_sched, "No sparks stolen");
- return rtsFalse;
+ return NULL;
+}
+
+// Returns True if any spark pool is non-empty at this moment in time
+// The result is only valid for an instant, of course, so in a sense
+// is immediately invalid, and should not be relied upon for
+// correctness.
+rtsBool
+anySparks (void)
+{
+ nat i;
+
+ for (i=0; i < n_capabilities; i++) {
+ if (!emptySparkPoolCap(&capabilities[i])) {
+ return rtsTrue;
+ }
+ }
+ return rtsFalse;
}
#endif
cap->no = i;
cap->in_haskell = rtsFalse;
+ cap->in_gc = rtsFalse;
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
cap->sparks_pruned = 0;
#endif
+ cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
cap->f.stgGCFun = (F_)__stg_gc_fun;
cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
RtsFlags.GcFlags.generations,
"initCapability");
+ cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
+ RtsFlags.GcFlags.generations,
+ "initCapability");
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
cap->mut_lists[g] = NULL;
void setContextSwitches(void)
{
- nat i;
- for (i=0; i < n_capabilities; i++) {
- capabilities[i].context_switch = 1;
- }
+ nat i;
+ for (i=0; i < n_capabilities; i++) {
+ contextSwitchCapability(&capabilities[i]);
+ }
}
/* ----------------------------------------------------------------------------
{
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->cap == cap);
- trace(TRACE_sched | DEBUG_sched,
- "passing capability %d to %s %p",
- cap->no, task->tso ? "bound task" : "worker",
- (void *)task->id);
+ debugTrace(DEBUG_sched, "passing capability %d to %s %p",
+ cap->no, task->tso ? "bound task" : "worker",
+ (void *)task->id);
ACQUIRE_LOCK(&task->lock);
task->wakeup = rtsTrue;
// the wakeup flag is needed because signalCondition() doesn't
return;
}
- /* if waiting_for_gc was the reason to release the cap: thread
- comes from yieldCap->releaseAndQueueWorker. Unconditionally set
- cap. free and return (see default after the if-protected other
- special cases). Thread will wait on cond.var and re-acquire the
- same cap after GC (GC-triggering cap. calls releaseCap and
- enters the spare_workers case)
- */
- if (waiting_for_gc) {
+ if (waiting_for_gc == PENDING_GC_SEQ) {
last_free_capability = cap; // needed?
- trace(TRACE_sched | DEBUG_sched,
- "GC pending, set capability %d free", cap->no);
+ debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
return;
}
}
last_free_capability = cap;
- trace(TRACE_sched | DEBUG_sched, "freeing capability %d", cap->no);
+ debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
}
void
if (!cap->running_task) {
nat i;
// otherwise, search for a free capability
+ cap = NULL;
for (i = 0; i < n_capabilities; i++) {
- cap = &capabilities[i];
- if (!cap->running_task) {
+ if (!capabilities[i].running_task) {
+ cap = &capabilities[i];
break;
}
}
- // Can't find a free one, use last_free_capability.
- cap = last_free_capability;
+ if (cap == NULL) {
+ // Can't find a free one, use last_free_capability.
+ cap = last_free_capability;
+ }
}
// record the Capability as the one this Task is now assocated with.
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
- trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
+ debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
*pCap = cap;
#endif
{
Capability *cap = *pCap;
+ if (waiting_for_gc == PENDING_GC_PAR) {
+ debugTrace(DEBUG_sched, "capability %d: becoming a GC thread", cap->no);
+ postEvent(cap, EVENT_GC_START, 0, 0);
+ gcWorkerThread(cap);
+ postEvent(cap, EVENT_GC_END, 0, 0);
+ return;
+ }
+
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up
break;
}
- trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
+ debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
ASSERT(cap->running_task == task);
*pCap = cap;
appendToRunQueue(other_cap,tso);
- trace(TRACE_sched, "resuming capability %d", other_cap->no);
releaseCapability_(other_cap,rtsFalse);
} else {
appendToWakeupQueue(my_cap,other_cap,tso);
}
/* ----------------------------------------------------------------------------
- * prodCapabilities
- *
- * Used to indicate that the interrupted flag is now set, or some
- * other global condition that might require waking up a Task on each
- * Capability.
- * ------------------------------------------------------------------------- */
-
-static void
-prodCapabilities(rtsBool all)
-{
- nat i;
- Capability *cap;
- Task *task;
-
- for (i=0; i < n_capabilities; i++) {
- cap = &capabilities[i];
- ACQUIRE_LOCK(&cap->lock);
- if (!cap->running_task) {
- if (cap->spare_workers) {
- trace(TRACE_sched, "resuming capability %d", cap->no);
- task = cap->spare_workers;
- ASSERT(!task->stopped);
- giveCapabilityToTask(cap,task);
- if (!all) {
- RELEASE_LOCK(&cap->lock);
- return;
- }
- }
- }
- RELEASE_LOCK(&cap->lock);
- }
- return;
-}
-
-void
-prodAllCapabilities (void)
-{
- prodCapabilities(rtsTrue);
-}
-
-/* ----------------------------------------------------------------------------
- * prodOneCapability
+ * prodCapability
*
- * Like prodAllCapabilities, but we only require a single Task to wake
- * up in order to service some global event, such as checking for
- * deadlock after some idle time has passed.
+ * If a Capability is currently idle, wake up a Task on it. Used to
+ * get every Capability into the GC.
* ------------------------------------------------------------------------- */
void
-prodOneCapability (void)
+prodCapability (Capability *cap, Task *task)
{
- prodCapabilities(rtsFalse);
+ ACQUIRE_LOCK(&cap->lock);
+ if (!cap->running_task) {
+ cap->running_task = task;
+ releaseCapability_(cap,rtsTrue);
+ }
+ RELEASE_LOCK(&cap->lock);
}
/* ----------------------------------------------------------------------------
{
nat i;
- ASSERT(sched_state == SCHED_SHUTTING_DOWN);
-
task->cap = cap;
// Loop indefinitely until all the workers have exited and there
// isn't safe, for one thing).
for (i = 0; /* i < 50 */; i++) {
+ ASSERT(sched_state == SCHED_SHUTTING_DOWN);
+
debugTrace(DEBUG_sched,
"shutting down capability %d, attempt %d", cap->no, i);
ACQUIRE_LOCK(&cap->lock);
continue;
}
+ postEvent(cap, EVENT_SHUTDOWN, 0, 0);
debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no);
- freeCapability(cap);
RELEASE_LOCK(&cap->lock);
break;
}
#endif /* THREADED_RTS */
-void
-freeCapability (Capability *cap) {
+static void
+freeCapability (Capability *cap)
+{
stgFree(cap->mut_lists);
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+#if defined(THREADED_RTS)
freeSparkPool(cap->sparks);
#endif
}
+void
+freeCapabilities (void)
+{
+#if defined(THREADED_RTS)
+ nat i;
+ for (i=0; i < n_capabilities; i++) {
+ freeCapability(&capabilities[i]);
+ }
+#else
+ freeCapability(&MainCapability);
+#endif
+}
+
/* ---------------------------------------------------------------------------
Mark everything directly reachable from the Capabilities. When
using multiple GC threads, each GC thread marks all Capabilities