#endif
#if defined(THREADED_RTS)
-STATIC_INLINE rtsBool
-anyWorkForMe( Capability *cap, Task *task )
+rtsBool
+stealWork (Capability *cap)
{
- if (task->tso != NULL) {
- // A bound task only runs if its thread is on the run queue of
- // the capability on which it was woken up. Otherwise, we
- // can't be sure that we have the right capability: the thread
- // might be woken up on some other capability, and task->cap
- // could change under our feet.
- return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
- } else {
- // A vanilla worker task runs if either there is a lightweight
- // thread at the head of the run queue, or the run queue is
- // empty and (there are sparks to execute, or there is some
- // other global condition to check, such as threads blocked on
- // blackholes).
- if (emptyRunQueue(cap)) {
- return !emptySparkPoolCap(cap)
- || !emptyWakeupQueue(cap)
- || globalWorkToDo();
- } else
- return cap->run_queue_hd->bound == NULL;
- }
+ /* use the normal Sparks.h interface (internally modified to enable
+ concurrent stealing)
+ and immediately turn the spark into a thread when successful
+ */
+ Capability *robbed;
+ StgClosurePtr spark;
+ rtsBool retry;
+ nat i = 0;
+
+ debugTrace(DEBUG_sched,
+ "cap %d: Trying to steal work from other capabilities",
+ cap->no);
+
+ if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
+
+ do {
+ retry = rtsFalse;
+
+ /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
+ start at a random place instead of 0 as well. */
+ for ( i=0 ; i < n_capabilities ; i++ ) {
+ robbed = &capabilities[i];
+ if (cap == robbed) // ourselves...
+ continue;
+
+ if (emptySparkPoolCap(robbed)) // nothing to steal here
+ continue;
+
+ spark = tryStealSpark(robbed->sparks);
+ if (spark == NULL && !emptySparkPoolCap(robbed)) {
+ // we conflicted with another thread while trying to steal;
+ // try again later.
+ retry = rtsTrue;
+ }
+
+ if (spark != NULL) {
+ debugTrace(DEBUG_sched,
+ "cap %d: Stole a spark from capability %d",
+ cap->no, robbed->no);
+
+ createSparkThread(cap,spark);
+ return rtsTrue;
+ }
+ // otherwise: no success, try next one
+ }
+ } while (retry);
+
+ debugTrace(DEBUG_sched, "No sparks stolen");
+ return rtsFalse;
}
#endif
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
cap->wakeup_queue_tl = END_TSO_QUEUE;
+ cap->sparks_created = 0;
+ cap->sparks_converted = 0;
+ cap->sparks_pruned = 0;
#endif
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
#if defined(THREADED_RTS)
void
-releaseCapability_ (Capability* cap)
+releaseCapability_ (Capability* cap,
+ rtsBool always_wakeup)
{
Task *task;
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
- if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
- || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+ if (always_wakeup ||
+ !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+ !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
releaseCapability (Capability* cap USED_IF_THREADS)
{
ACQUIRE_LOCK(&cap->lock);
- releaseCapability_(cap);
+ releaseCapability_(cap, rtsFalse);
+ RELEASE_LOCK(&cap->lock);
+}
+
+void
+releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
+{
+ ACQUIRE_LOCK(&cap->lock);
+ releaseCapability_(cap, rtsTrue);
RELEASE_LOCK(&cap->lock);
}
}
// Bound tasks just float around attached to their TSOs.
- releaseCapability_(cap);
+ releaseCapability_(cap,rtsFalse);
RELEASE_LOCK(&cap->lock);
}
{
Capability *cap = *pCap;
- // The fast path has no locking, if we don't enter this while loop
-
- while ( waiting_for_gc
- /* i.e. another capability triggered HeapOverflow, is busy
- getting capabilities (stopping their owning tasks) */
- || cap->returning_tasks_hd != NULL
- /* cap reserved for another task */
- || !anyWorkForMe(cap,task)
- /* cap/task have no work */
- ) {
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up
trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
ASSERT(cap->running_task == task);
- }
*pCap = cap;
appendToRunQueue(other_cap,tso);
trace(TRACE_sched, "resuming capability %d", other_cap->no);
- releaseCapability_(other_cap);
+ releaseCapability_(other_cap,rtsFalse);
} else {
appendToWakeupQueue(my_cap,other_cap,tso);
other_cap->context_switch = 1;
if (!emptyRunQueue(cap) || cap->spare_workers) {
debugTrace(DEBUG_sched,
"runnable threads or workers still alive, yielding");
- releaseCapability_(cap); // this will wake up a worker
+ releaseCapability_(cap,rtsFalse); // this will wake up a worker
RELEASE_LOCK(&cap->lock);
yieldThread();
continue;
freeCapability (Capability *cap) {
stgFree(cap->mut_lists);
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
- freeSparkPool(&cap->r.rSparks);
+ freeSparkPool(cap->sparks);
#endif
}
------------------------------------------------------------------------ */
void
-markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta)
+markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
+ rtsBool prune_sparks USED_IF_THREADS)
{
nat i;
Capability *cap;
}
#if defined(THREADED_RTS)
- traverseSparkQueue (evac, user, cap);
+ if (prune_sparks) {
+ pruneSparkQueue (evac, user, cap);
+ } else {
+ traverseSparkQueue (evac, user, cap);
+ }
#endif
}
#endif
}
-// This function is used by the compacting GC to thread all the
-// pointers from spark queues.
-void
-traverseSparkQueues (evac_fn evac USED_IF_THREADS, void *user USED_IF_THREADS)
-{
-#if defined(THREADED_RTS)
- nat i;
- for (i = 0; i < n_capabilities; i++) {
- traverseSparkQueue (evac, user, &capabilities[i]);
- }
-#endif // THREADED_RTS
-
-}
-
void
markCapabilities (evac_fn evac, void *user)
{
- markSomeCapabilities(evac, user, 0, 1);
+ markSomeCapabilities(evac, user, 0, 1, rtsFalse);
}