#include "PosixSource.h"
#include "Rts.h"
-#include "RtsUtils.h"
-#include "RtsFlags.h"
-#include "STM.h"
-#include "OSThreads.h"
+
#include "Capability.h"
#include "Schedule.h"
#include "Sparks.h"
#include "Trace.h"
+#include "sm/GC.h" // for gcWorkerThread()
+#include "STM.h"
+#include "RtsUtils.h"
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
Capability MainCapability;
-nat n_capabilities;
+nat n_capabilities = 0;
Capability *capabilities = NULL;
// Holds the Capability which last became free. This is used so that
// an in-call has a chance of quickly finding a free Capability.
// Maintaining a global free list of Capabilities would require global
// locking, so we don't do that.
-Capability *last_free_capability;
+Capability *last_free_capability = NULL;
+
+/* GC indicator, in scope for the scheduler, init'ed to false */
+volatile StgWord waiting_for_gc = 0;
+
+/* Let foreign code get the current Capability -- assuming there is one!
+ * This is useful for unsafe foreign calls because they are called with
+ * the current Capability held, but they are not passed it. For example,
+ * see see the integer-gmp package which calls allocateLocal() in its
+ * stgAllocForGMP() function (which gets called by gmp functions).
+ * */
+Capability * rts_unsafeGetMyCapability (void)
+{
+#if defined(THREADED_RTS)
+ return myTask()->cap;
+#else
+ return &MainCapability;
+#endif
+}
#if defined(THREADED_RTS)
STATIC_INLINE rtsBool
#endif
#if defined(THREADED_RTS)
-STATIC_INLINE rtsBool
-anyWorkForMe( Capability *cap, Task *task )
+StgClosure *
+findSpark (Capability *cap)
{
- if (task->tso != NULL) {
- // A bound task only runs if its thread is on the run queue of
- // the capability on which it was woken up. Otherwise, we
- // can't be sure that we have the right capability: the thread
- // might be woken up on some other capability, and task->cap
- // could change under our feet.
- return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
- } else {
- // A vanilla worker task runs if either there is a lightweight
- // thread at the head of the run queue, or the run queue is
- // empty and (there are sparks to execute, or there is some
- // other global condition to check, such as threads blocked on
- // blackholes).
- if (emptyRunQueue(cap)) {
- return !emptySparkPoolCap(cap)
- || !emptyWakeupQueue(cap)
- || globalWorkToDo();
- } else
- return cap->run_queue_hd->bound == NULL;
+ Capability *robbed;
+ StgClosurePtr spark;
+ rtsBool retry;
+ nat i = 0;
+
+ if (!emptyRunQueue(cap) || cap->returning_tasks_hd != NULL) {
+ // If there are other threads, don't try to run any new
+ // sparks: sparks might be speculative, we don't want to take
+ // resources away from the main computation.
+ return 0;
+ }
+
+ do {
+ retry = rtsFalse;
+
+ // first try to get a spark from our own pool.
+ // We should be using reclaimSpark(), because it works without
+ // needing any atomic instructions:
+ // spark = reclaimSpark(cap->sparks);
+ // However, measurements show that this makes at least one benchmark
+ // slower (prsa) and doesn't affect the others.
+ spark = tryStealSpark(cap);
+ if (spark != NULL) {
+ cap->sparks_converted++;
+
+ // Post event for running a spark from capability's own pool.
+ traceEventRunSpark(cap, cap->r.rCurrentTSO);
+
+ return spark;
+ }
+ if (!emptySparkPoolCap(cap)) {
+ retry = rtsTrue;
+ }
+
+ if (n_capabilities == 1) { return NULL; } // makes no sense...
+
+ debugTrace(DEBUG_sched,
+ "cap %d: Trying to steal work from other capabilities",
+ cap->no);
+
+ /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
+ start at a random place instead of 0 as well. */
+ for ( i=0 ; i < n_capabilities ; i++ ) {
+ robbed = &capabilities[i];
+ if (cap == robbed) // ourselves...
+ continue;
+
+ if (emptySparkPoolCap(robbed)) // nothing to steal here
+ continue;
+
+ spark = tryStealSpark(robbed);
+ if (spark == NULL && !emptySparkPoolCap(robbed)) {
+ // we conflicted with another thread while trying to steal;
+ // try again later.
+ retry = rtsTrue;
+ }
+
+ if (spark != NULL) {
+ cap->sparks_converted++;
+
+ traceEventStealSpark(cap, cap->r.rCurrentTSO, robbed->no);
+
+ return spark;
+ }
+ // otherwise: no success, try next one
+ }
+ } while (retry);
+
+ debugTrace(DEBUG_sched, "No sparks stolen");
+ return NULL;
+}
+
+// Returns True if any spark pool is non-empty at this moment in time
+// The result is only valid for an instant, of course, so in a sense
+// is immediately invalid, and should not be relied upon for
+// correctness.
+rtsBool
+anySparks (void)
+{
+ nat i;
+
+ for (i=0; i < n_capabilities; i++) {
+ if (!emptySparkPoolCap(&capabilities[i])) {
+ return rtsTrue;
+ }
}
+ return rtsFalse;
}
#endif
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
cap->wakeup_queue_tl = END_TSO_QUEUE;
+ cap->sparks_created = 0;
+ cap->sparks_converted = 0;
+ cap->sparks_pruned = 0;
#endif
- cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
- cap->f.stgGCFun = (F_)__stg_gc_fun;
+ cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
+ cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1;
+ cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun;
cap->mut_lists = stgMallocBytes(sizeof(bdescr *) *
RtsFlags.GcFlags.generations,
"initCapability");
+ cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
+ RtsFlags.GcFlags.generations,
+ "initCapability");
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
cap->mut_lists[g] = NULL;
cap->free_trec_chunks = END_STM_CHUNK_LIST;
cap->free_trec_headers = NO_TREC;
cap->transaction_tokens = 0;
+ cap->context_switch = 0;
+ cap->pinned_object_block = NULL;
}
/* ---------------------------------------------------------------------------
}
/* ----------------------------------------------------------------------------
+ * setContextSwitches: cause all capabilities to context switch as
+ * soon as possible.
+ * ------------------------------------------------------------------------- */
+
+void setContextSwitches(void)
+{
+ nat i;
+ for (i=0; i < n_capabilities; i++) {
+ contextSwitchCapability(&capabilities[i]);
+ }
+}
+
+/* ----------------------------------------------------------------------------
* Give a Capability to a Task. The task must currently be sleeping
* on its condition variable.
*
{
ASSERT_LOCK_HELD(&cap->lock);
ASSERT(task->cap == cap);
- trace(TRACE_sched | DEBUG_sched,
- "passing capability %d to %s %p",
- cap->no, task->tso ? "bound task" : "worker",
- (void *)task->id);
+ debugTrace(DEBUG_sched, "passing capability %d to %s %p",
+ cap->no, task->tso ? "bound task" : "worker",
+ (void *)task->id);
ACQUIRE_LOCK(&task->lock);
task->wakeup = rtsTrue;
// the wakeup flag is needed because signalCondition() doesn't
#if defined(THREADED_RTS)
void
-releaseCapability_ (Capability* cap)
+releaseCapability_ (Capability* cap,
+ rtsBool always_wakeup)
{
Task *task;
return;
}
+ if (waiting_for_gc == PENDING_GC_SEQ) {
+ last_free_capability = cap; // needed?
+ debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
+ return;
+ }
+
+
// If the next thread on the run queue is a bound thread,
// give this Capability to the appropriate Task.
if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
- if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
- || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+ if (always_wakeup ||
+ !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+ !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
}
last_free_capability = cap;
- trace(TRACE_sched | DEBUG_sched, "freeing capability %d", cap->no);
+ debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
}
void
releaseCapability (Capability* cap USED_IF_THREADS)
{
ACQUIRE_LOCK(&cap->lock);
- releaseCapability_(cap);
+ releaseCapability_(cap, rtsFalse);
+ RELEASE_LOCK(&cap->lock);
+}
+
+void
+releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
+{
+ ACQUIRE_LOCK(&cap->lock);
+ releaseCapability_(cap, rtsTrue);
RELEASE_LOCK(&cap->lock);
}
}
// Bound tasks just float around attached to their TSOs.
- releaseCapability_(cap);
+ releaseCapability_(cap,rtsFalse);
RELEASE_LOCK(&cap->lock);
}
if (cap == NULL) {
// Try last_free_capability first
cap = last_free_capability;
- if (!cap->running_task) {
+ if (cap->running_task) {
nat i;
// otherwise, search for a free capability
+ cap = NULL;
for (i = 0; i < n_capabilities; i++) {
- cap = &capabilities[i];
- if (!cap->running_task) {
+ if (!capabilities[i].running_task) {
+ cap = &capabilities[i];
break;
}
}
- // Can't find a free one, use last_free_capability.
- cap = last_free_capability;
+ if (cap == NULL) {
+ // Can't find a free one, use last_free_capability.
+ cap = last_free_capability;
+ }
}
// record the Capability as the one this Task is now assocated with.
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
- trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
+ debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
*pCap = cap;
#endif
{
Capability *cap = *pCap;
- // The fast path has no locking, if we don't enter this while loop
+ if (waiting_for_gc == PENDING_GC_PAR) {
+ traceEventGcStart(cap);
+ gcWorkerThread(cap);
+ traceEventGcEnd(cap);
+ return;
+ }
- while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) {
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up
break;
}
- trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
+ debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
ASSERT(cap->running_task == task);
- }
*pCap = cap;
* ------------------------------------------------------------------------- */
void
-wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
+wakeupThreadOnCapability (Capability *my_cap,
+ Capability *other_cap,
+ StgTSO *tso)
{
- ASSERT(tso->cap == cap);
- ASSERT(tso->bound ? tso->bound->cap == cap : 1);
- ASSERT_LOCK_HELD(&cap->lock);
-
- tso->cap = cap;
-
- if (cap->running_task == NULL) {
- // nobody is running this Capability, we can add our thread
- // directly onto the run queue and start up a Task to run it.
- appendToRunQueue(cap,tso);
-
- // start it up
- cap->running_task = myTask(); // precond for releaseCapability_()
- trace(TRACE_sched, "resuming capability %d", cap->no);
- releaseCapability_(cap);
- } else {
- appendToWakeupQueue(cap,tso);
- // someone is running on this Capability, so it cannot be
- // freed without first checking the wakeup queue (see
- // releaseCapability_).
- }
-}
+ ACQUIRE_LOCK(&other_cap->lock);
-void
-wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso)
-{
- ACQUIRE_LOCK(&cap->lock);
- migrateThreadToCapability (cap, tso);
- RELEASE_LOCK(&cap->lock);
-}
-
-void
-migrateThreadToCapability (Capability *cap, StgTSO *tso)
-{
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
ASSERT(tso->bound->cap == tso->cap);
- tso->bound->cap = cap;
+ tso->bound->cap = other_cap;
}
- tso->cap = cap;
- wakeupThreadOnCapability(cap,tso);
-}
+ tso->cap = other_cap;
-void
-migrateThreadToCapability_lock (Capability *cap, StgTSO *tso)
-{
- ACQUIRE_LOCK(&cap->lock);
- migrateThreadToCapability (cap, tso);
- RELEASE_LOCK(&cap->lock);
-}
+ ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
-/* ----------------------------------------------------------------------------
- * prodCapabilities
- *
- * Used to indicate that the interrupted flag is now set, or some
- * other global condition that might require waking up a Task on each
- * Capability.
- * ------------------------------------------------------------------------- */
+ if (other_cap->running_task == NULL) {
+ // nobody is running this Capability, we can add our thread
+ // directly onto the run queue and start up a Task to run it.
-static void
-prodCapabilities(rtsBool all)
-{
- nat i;
- Capability *cap;
- Task *task;
+ other_cap->running_task = myTask();
+ // precond for releaseCapability_() and appendToRunQueue()
- for (i=0; i < n_capabilities; i++) {
- cap = &capabilities[i];
- ACQUIRE_LOCK(&cap->lock);
- if (!cap->running_task) {
- if (cap->spare_workers) {
- trace(TRACE_sched, "resuming capability %d", cap->no);
- task = cap->spare_workers;
- ASSERT(!task->stopped);
- giveCapabilityToTask(cap,task);
- if (!all) {
- RELEASE_LOCK(&cap->lock);
- return;
- }
- }
- }
- RELEASE_LOCK(&cap->lock);
+ appendToRunQueue(other_cap,tso);
+
+ releaseCapability_(other_cap,rtsFalse);
+ } else {
+ appendToWakeupQueue(my_cap,other_cap,tso);
+ other_cap->context_switch = 1;
+ // someone is running on this Capability, so it cannot be
+ // freed without first checking the wakeup queue (see
+ // releaseCapability_).
}
- return;
-}
-void
-prodAllCapabilities (void)
-{
- prodCapabilities(rtsTrue);
+ RELEASE_LOCK(&other_cap->lock);
}
/* ----------------------------------------------------------------------------
- * prodOneCapability
+ * prodCapability
*
- * Like prodAllCapabilities, but we only require a single Task to wake
- * up in order to service some global event, such as checking for
- * deadlock after some idle time has passed.
+ * If a Capability is currently idle, wake up a Task on it. Used to
+ * get every Capability into the GC.
* ------------------------------------------------------------------------- */
void
-prodOneCapability (void)
+prodCapability (Capability *cap, Task *task)
{
- prodCapabilities(rtsFalse);
+ ACQUIRE_LOCK(&cap->lock);
+ if (!cap->running_task) {
+ cap->running_task = task;
+ releaseCapability_(cap,rtsTrue);
+ }
+ RELEASE_LOCK(&cap->lock);
}
/* ----------------------------------------------------------------------------
* ------------------------------------------------------------------------- */
void
-shutdownCapability (Capability *cap, Task *task)
+shutdownCapability (Capability *cap, Task *task, rtsBool safe)
{
nat i;
- ASSERT(sched_state == SCHED_SHUTTING_DOWN);
-
task->cap = cap;
// Loop indefinitely until all the workers have exited and there
// isn't safe, for one thing).
for (i = 0; /* i < 50 */; i++) {
+ ASSERT(sched_state == SCHED_SHUTTING_DOWN);
+
debugTrace(DEBUG_sched,
"shutting down capability %d, attempt %d", cap->no, i);
ACQUIRE_LOCK(&cap->lock);
continue;
}
cap->running_task = task;
+
+ if (cap->spare_workers) {
+ // Look for workers that have died without removing
+ // themselves from the list; this could happen if the OS
+ // summarily killed the thread, for example. This
+ // actually happens on Windows when the system is
+ // terminating the program, and the RTS is running in a
+ // DLL.
+ Task *t, *prev;
+ prev = NULL;
+ for (t = cap->spare_workers; t != NULL; t = t->next) {
+ if (!osThreadIsAlive(t->id)) {
+ debugTrace(DEBUG_sched,
+ "worker thread %p has died unexpectedly", (void *)t->id);
+ if (!prev) {
+ cap->spare_workers = t->next;
+ } else {
+ prev->next = t->next;
+ }
+ prev = t;
+ }
+ }
+ }
+
if (!emptyRunQueue(cap) || cap->spare_workers) {
debugTrace(DEBUG_sched,
"runnable threads or workers still alive, yielding");
- releaseCapability_(cap); // this will wake up a worker
+ releaseCapability_(cap,rtsFalse); // this will wake up a worker
RELEASE_LOCK(&cap->lock);
yieldThread();
continue;
}
- debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no);
- stgFree(cap->mut_lists);
- freeSparkPool(&cap->r.rSparks);
+
+ // If "safe", then busy-wait for any threads currently doing
+ // foreign calls. If we're about to unload this DLL, for
+ // example, we need to be sure that there are no OS threads
+ // that will try to return to code that has been unloaded.
+ // We can be a bit more relaxed when this is a standalone
+ // program that is about to terminate, and let safe=false.
+ if (cap->suspended_ccalling_tasks && safe) {
+ debugTrace(DEBUG_sched,
+ "thread(s) are involved in foreign calls, yielding");
+ cap->running_task = NULL;
+ RELEASE_LOCK(&cap->lock);
+ yieldThread();
+ continue;
+ }
+
+ traceEventShutdown(cap);
RELEASE_LOCK(&cap->lock);
break;
}
#endif /* THREADED_RTS */
+static void
+freeCapability (Capability *cap)
+{
+ stgFree(cap->mut_lists);
+ stgFree(cap->saved_mut_lists);
+#if defined(THREADED_RTS)
+ freeSparkPool(cap->sparks);
+#endif
+}
+
+void
+freeCapabilities (void)
+{
+#if defined(THREADED_RTS)
+ nat i;
+ for (i=0; i < n_capabilities; i++) {
+ freeCapability(&capabilities[i]);
+ }
+#else
+ freeCapability(&MainCapability);
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ Mark everything directly reachable from the Capabilities. When
+ using multiple GC threads, each GC thread marks all Capabilities
+ for which (c `mod` n == 0), for Capability c and thread n.
+ ------------------------------------------------------------------------ */
+void
+markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
+ rtsBool prune_sparks USED_IF_THREADS)
+{
+ nat i;
+ Capability *cap;
+ Task *task;
+
+ // Each GC thread is responsible for following roots from the
+ // Capability of the same number. There will usually be the same
+ // or fewer Capabilities as GC threads, but just in case there
+ // are more, we mark every Capability whose number is the GC
+ // thread's index plus a multiple of the number of GC threads.
+ for (i = i0; i < n_capabilities; i += delta) {
+ cap = &capabilities[i];
+ evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
+ evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
+#if defined(THREADED_RTS)
+ evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
+ evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
+#endif
+ for (task = cap->suspended_ccalling_tasks; task != NULL;
+ task=task->next) {
+ evac(user, (StgClosure **)(void *)&task->suspended_tso);
+ }
+
+#if defined(THREADED_RTS)
+ if (prune_sparks) {
+ pruneSparkQueue (evac, user, cap);
+ } else {
+ traverseSparkQueue (evac, user, cap);
+ }
+#endif
+ }
+
+#if !defined(THREADED_RTS)
+ evac(user, (StgClosure **)(void *)&blocked_queue_hd);
+ evac(user, (StgClosure **)(void *)&blocked_queue_tl);
+ evac(user, (StgClosure **)(void *)&sleeping_queue);
+#endif
+}
+
+void
+markCapabilities (evac_fn evac, void *user)
+{
+ markSomeCapabilities(evac, user, 0, 1, rtsFalse);
+}