#endif
#if defined(THREADED_RTS)
-rtsBool stealWork( Capability *cap) {
+rtsBool
+stealWork (Capability *cap)
+{
/* use the normal Sparks.h interface (internally modified to enable
concurrent stealing)
and immediately turn the spark into a thread when successful
*/
Capability *robbed;
- SparkPool *pool;
StgClosurePtr spark;
rtsBool success = rtsFalse;
+ rtsBool retry;
nat i = 0;
debugTrace(DEBUG_sched,
if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
- /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
- start at a random place instead of 0 as well. */
- for ( i=0 ; i < n_capabilities ; i++ ) {
- robbed = &capabilities[i];
- if (cap == robbed) // ourselves...
- continue;
+ do {
+ retry = rtsFalse;
- if (emptySparkPoolCap(robbed)) // nothing to steal here
- continue;
-
- spark = findSpark(robbed);
+ /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
+ start at a random place instead of 0 as well. */
+ for ( i=0 ; i < n_capabilities ; i++ ) {
+ robbed = &capabilities[i];
+ if (cap == robbed) // ourselves...
+ continue;
- if (spark == NULL && !emptySparkPoolCap(robbed)) {
- spark = findSpark(robbed); // lost race in concurrent access, try again
- }
- if (spark != NULL) {
- debugTrace(DEBUG_sched,
+ if (emptySparkPoolCap(robbed)) // nothing to steal here
+ continue;
+
+ spark = tryStealSpark(robbed->sparks);
+ if (spark == NULL && !emptySparkPoolCap(robbed)) {
+ // we conflicted with another thread while trying to steal;
+ // try again later.
+ retry = rtsTrue;
+ }
+
+ if (spark != NULL) {
+ debugTrace(DEBUG_sched,
"cap %d: Stole a spark from capability %d",
- cap->no, robbed->no);
+ cap->no, robbed->no);
- createSparkThread(cap,spark);
- success = rtsTrue;
- break; // got one, leave the loop
- }
- // otherwise: no success, try next one
- }
- debugTrace(DEBUG_sched,
- "Leaving work stealing routine (%s)",
- success?"one spark stolen":"thefts did not succeed");
- return success;
-}
+ createSparkThread(cap,spark);
+ return rtsTrue;
+ }
+ // otherwise: no success, try next one
+ }
+ } while (retry);
-STATIC_INLINE rtsBool
-anyWorkForMe( Capability *cap, Task *task )
-{
- if (task->tso != NULL) {
- // A bound task only runs if its thread is on the run queue of
- // the capability on which it was woken up. Otherwise, we
- // can't be sure that we have the right capability: the thread
- // might be woken up on some other capability, and task->cap
- // could change under our feet.
- return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
- } else {
- // A vanilla worker task runs if either there is a lightweight
- // thread at the head of the run queue, or the run queue is
- // empty and (there are sparks to execute, or there is some
- // other global condition to check, such as threads blocked on
- // blackholes).
- if (emptyRunQueue(cap)) {
- return !emptySparkPoolCap(cap)
- || !emptyWakeupQueue(cap)
- || globalWorkToDo()
- || stealWork(cap); /* if all false: try to steal work */
- } else {
- return cap->run_queue_hd->bound == NULL;
- }
- }
+ debugTrace(DEBUG_sched, "No sparks stolen");
+ return rtsFalse;
}
#endif
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
cap->wakeup_queue_tl = END_TSO_QUEUE;
+ cap->sparks_created = 0;
+ cap->sparks_converted = 0;
+ cap->sparks_pruned = 0;
#endif
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
#if defined(THREADED_RTS)
void
-releaseCapability_ (Capability* cap)
+releaseCapability_ (Capability* cap,
+ rtsBool always_wakeup)
{
Task *task;
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
- if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
- || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+ if (always_wakeup ||
+ !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+ !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
releaseCapability (Capability* cap USED_IF_THREADS)
{
ACQUIRE_LOCK(&cap->lock);
- releaseCapability_(cap);
+ releaseCapability_(cap, rtsFalse);
+ RELEASE_LOCK(&cap->lock);
+}
+
+void
+releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
+{
+ ACQUIRE_LOCK(&cap->lock);
+ releaseCapability_(cap, rtsTrue);
RELEASE_LOCK(&cap->lock);
}
}
// Bound tasks just float around attached to their TSOs.
- releaseCapability_(cap);
+ releaseCapability_(cap,rtsFalse);
RELEASE_LOCK(&cap->lock);
}
{
Capability *cap = *pCap;
- // The fast path has no locking, if we don't enter this while loop
-
- while ( waiting_for_gc
- /* i.e. another capability triggered HeapOverflow, is busy
- getting capabilities (stopping their owning tasks) */
- || cap->returning_tasks_hd != NULL
- /* cap reserved for another task */
- || !anyWorkForMe(cap,task)
- /* cap/task have no work */
- ) {
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up
trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
ASSERT(cap->running_task == task);
- }
*pCap = cap;
appendToRunQueue(other_cap,tso);
trace(TRACE_sched, "resuming capability %d", other_cap->no);
- releaseCapability_(other_cap);
+ releaseCapability_(other_cap,rtsFalse);
} else {
appendToWakeupQueue(my_cap,other_cap,tso);
other_cap->context_switch = 1;
if (!emptyRunQueue(cap) || cap->spare_workers) {
debugTrace(DEBUG_sched,
"runnable threads or workers still alive, yielding");
- releaseCapability_(cap); // this will wake up a worker
+ releaseCapability_(cap,rtsFalse); // this will wake up a worker
RELEASE_LOCK(&cap->lock);
yieldThread();
continue;