#endif
#if defined(THREADED_RTS)
-rtsBool stealWork( Capability *cap) {
+StgClosure *
+stealWork (Capability *cap)
+{
/* use the normal Sparks.h interface (internally modified to enable
concurrent stealing)
and immediately turn the spark into a thread when successful
*/
Capability *robbed;
- SparkPool *pool;
StgClosurePtr spark;
- rtsBool success = rtsFalse;
+ rtsBool retry;
nat i = 0;
debugTrace(DEBUG_sched,
"cap %d: Trying to steal work from other capabilities",
cap->no);
- if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
+ if (n_capabilities == 1) { return NULL; } // makes no sense...
- /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
- start at a random place instead of 0 as well. */
- for ( i=0 ; i < n_capabilities ; i++ ) {
- robbed = &capabilities[i];
- if (cap == robbed) // ourselves...
- continue;
+ do {
+ retry = rtsFalse;
- if (emptySparkPoolCap(robbed)) // nothing to steal here
- continue;
-
- spark = findSpark(robbed);
+ /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
+ start at a random place instead of 0 as well. */
+ for ( i=0 ; i < n_capabilities ; i++ ) {
+ robbed = &capabilities[i];
+ if (cap == robbed) // ourselves...
+ continue;
- if (spark == NULL && !emptySparkPoolCap(robbed)) {
- spark = findSpark(robbed); // lost race in concurrent access, try again
- }
- if (spark != NULL) {
- debugTrace(DEBUG_sched,
- "cap %d: Stole a spark from capability %d",
- cap->no, robbed->no);
+ if (emptySparkPoolCap(robbed)) // nothing to steal here
+ continue;
- createSparkThread(cap,spark);
- success = rtsTrue;
- break; // got one, leave the loop
- }
- // otherwise: no success, try next one
- }
- debugTrace(DEBUG_sched,
- "Leaving work stealing routine (%s)",
- success?"one spark stolen":"thefts did not succeed");
- return success;
+ spark = tryStealSpark(robbed);
+ if (spark == NULL && !emptySparkPoolCap(robbed)) {
+ // we conflicted with another thread while trying to steal;
+ // try again later.
+ retry = rtsTrue;
+ }
+
+ if (spark != NULL) {
+ debugTrace(DEBUG_sched,
+ "cap %d: Stole a spark from capability %d",
+ cap->no, robbed->no);
+ return spark;
+ }
+ // otherwise: no success, try next one
+ }
+ } while (retry);
+
+ debugTrace(DEBUG_sched, "No sparks stolen");
+ return NULL;
}
-STATIC_INLINE rtsBool
-anyWorkForMe( Capability *cap, Task *task )
+// Returns True if any spark pool is non-empty at this moment in time
+// The result is only valid for an instant, of course, so in a sense
+// is immediately invalid, and should not be relied upon for
+// correctness.
+rtsBool
+anySparks (void)
{
- if (task->tso != NULL) {
- // A bound task only runs if its thread is on the run queue of
- // the capability on which it was woken up. Otherwise, we
- // can't be sure that we have the right capability: the thread
- // might be woken up on some other capability, and task->cap
- // could change under our feet.
- return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
- } else {
- // A vanilla worker task runs if either there is a lightweight
- // thread at the head of the run queue, or the run queue is
- // empty and (there are sparks to execute, or there is some
- // other global condition to check, such as threads blocked on
- // blackholes).
- if (emptyRunQueue(cap)) {
- return !emptySparkPoolCap(cap)
- || !emptyWakeupQueue(cap)
- || globalWorkToDo()
- || stealWork(cap); /* if all false: try to steal work */
- } else {
- return cap->run_queue_hd->bound == NULL;
- }
+ nat i;
+
+ for (i=0; i < n_capabilities; i++) {
+ if (!emptySparkPoolCap(&capabilities[i])) {
+ return rtsTrue;
+ }
}
+ return rtsFalse;
}
#endif
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
cap->wakeup_queue_tl = END_TSO_QUEUE;
+ cap->sparks_created = 0;
+ cap->sparks_converted = 0;
+ cap->sparks_pruned = 0;
#endif
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
#if defined(THREADED_RTS)
void
-releaseCapability_ (Capability* cap)
+releaseCapability_ (Capability* cap,
+ rtsBool always_wakeup)
{
Task *task;
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
- if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
- || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+ if (always_wakeup ||
+ !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+ !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
releaseCapability (Capability* cap USED_IF_THREADS)
{
ACQUIRE_LOCK(&cap->lock);
- releaseCapability_(cap);
+ releaseCapability_(cap, rtsFalse);
+ RELEASE_LOCK(&cap->lock);
+}
+
+void
+releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
+{
+ ACQUIRE_LOCK(&cap->lock);
+ releaseCapability_(cap, rtsTrue);
RELEASE_LOCK(&cap->lock);
}
}
// Bound tasks just float around attached to their TSOs.
- releaseCapability_(cap);
+ releaseCapability_(cap,rtsFalse);
RELEASE_LOCK(&cap->lock);
}
{
Capability *cap = *pCap;
- // The fast path has no locking, if we don't enter this while loop
-
- while ( waiting_for_gc
- /* i.e. another capability triggered HeapOverflow, is busy
- getting capabilities (stopping their owning tasks) */
- || cap->returning_tasks_hd != NULL
- /* cap reserved for another task */
- || !anyWorkForMe(cap,task)
- /* cap/task have no work */
- ) {
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up
trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
ASSERT(cap->running_task == task);
- }
*pCap = cap;
appendToRunQueue(other_cap,tso);
trace(TRACE_sched, "resuming capability %d", other_cap->no);
- releaseCapability_(other_cap);
+ releaseCapability_(other_cap,rtsFalse);
} else {
appendToWakeupQueue(my_cap,other_cap,tso);
other_cap->context_switch = 1;
if (!emptyRunQueue(cap) || cap->spare_workers) {
debugTrace(DEBUG_sched,
"runnable threads or workers still alive, yielding");
- releaseCapability_(cap); // this will wake up a worker
+ releaseCapability_(cap,rtsFalse); // this will wake up a worker
RELEASE_LOCK(&cap->lock);
yieldThread();
continue;
}
debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no);
- freeCapability(cap);
RELEASE_LOCK(&cap->lock);
break;
}
#endif /* THREADED_RTS */
-void
-freeCapability (Capability *cap) {
+static void
+freeCapability (Capability *cap)
+{
stgFree(cap->mut_lists);
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
freeSparkPool(cap->sparks);
#endif
}
+void
+freeCapabilities (void)
+{
+#if defined(THREADED_RTS)
+ nat i;
+ for (i=0; i < n_capabilities; i++) {
+ freeCapability(&capabilities[i]);
+ }
+#else
+ freeCapability(&MainCapability);
+#endif
+}
+
/* ---------------------------------------------------------------------------
Mark everything directly reachable from the Capabilities. When
using multiple GC threads, each GC thread marks all Capabilities
------------------------------------------------------------------------ */
void
-markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta)
+markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
+ rtsBool prune_sparks USED_IF_THREADS)
{
nat i;
Capability *cap;
}
#if defined(THREADED_RTS)
- traverseSparkQueue (evac, user, cap);
+ if (prune_sparks) {
+ pruneSparkQueue (evac, user, cap);
+ } else {
+ traverseSparkQueue (evac, user, cap);
+ }
#endif
}
#endif
}
-// This function is used by the compacting GC to thread all the
-// pointers from spark queues.
-void
-traverseSparkQueues (evac_fn evac USED_IF_THREADS, void *user USED_IF_THREADS)
-{
-#if defined(THREADED_RTS)
- nat i;
- for (i = 0; i < n_capabilities; i++) {
- traverseSparkQueue (evac, user, &capabilities[i]);
- }
-#endif // THREADED_RTS
-
-}
-
void
markCapabilities (evac_fn evac, void *user)
{
- markSomeCapabilities(evac, user, 0, 1);
+ markSomeCapabilities(evac, user, 0, 1, rtsFalse);
}