X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FCapability.c;h=92396e9ef05d2d720d2648f42ee5fc2314cc88ba;hb=2dce32ca5188120ca220a36139ff216b852b389e;hp=4950df63bb1b199ff4c15382e854ee2eddf08961;hpb=f86e7206ea94b48b94fb61007a1c5d55b8c60f45;p=ghc-hetmet.git diff --git a/rts/Capability.c b/rts/Capability.c index 4950df6..92396e9 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -40,6 +40,9 @@ Capability *capabilities = NULL; // locking, so we don't do that. Capability *last_free_capability; +/* GC indicator, in scope for the scheduler, init'ed to false */ +volatile StgWord waiting_for_gc = 0; + #if defined(THREADED_RTS) STATIC_INLINE rtsBool globalWorkToDo (void) @@ -51,29 +54,58 @@ globalWorkToDo (void) #endif #if defined(THREADED_RTS) -STATIC_INLINE rtsBool -anyWorkForMe( Capability *cap, Task *task ) +rtsBool +stealWork (Capability *cap) { - if (task->tso != NULL) { - // A bound task only runs if its thread is on the run queue of - // the capability on which it was woken up. Otherwise, we - // can't be sure that we have the right capability: the thread - // might be woken up on some other capability, and task->cap - // could change under our feet. - return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task; - } else { - // A vanilla worker task runs if either there is a lightweight - // thread at the head of the run queue, or the run queue is - // empty and (there are sparks to execute, or there is some - // other global condition to check, such as threads blocked on - // blackholes). - if (emptyRunQueue(cap)) { - return !emptySparkPoolCap(cap) - || !emptyWakeupQueue(cap) - || globalWorkToDo(); - } else - return cap->run_queue_hd->bound == NULL; - } + /* use the normal Sparks.h interface (internally modified to enable + concurrent stealing) + and immediately turn the spark into a thread when successful + */ + Capability *robbed; + StgClosurePtr spark; + rtsBool retry; + nat i = 0; + + debugTrace(DEBUG_sched, + "cap %d: Trying to steal work from other capabilities", + cap->no); + + if (n_capabilities == 1) { return rtsFalse; } // makes no sense... + + do { + retry = rtsFalse; + + /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could + start at a random place instead of 0 as well. */ + for ( i=0 ; i < n_capabilities ; i++ ) { + robbed = &capabilities[i]; + if (cap == robbed) // ourselves... + continue; + + if (emptySparkPoolCap(robbed)) // nothing to steal here + continue; + + spark = tryStealSpark(robbed->sparks); + if (spark == NULL && !emptySparkPoolCap(robbed)) { + // we conflicted with another thread while trying to steal; + // try again later. + retry = rtsTrue; + } + + if (spark != NULL) { + debugTrace(DEBUG_sched, + "cap %d: Stole a spark from capability %d", + cap->no, robbed->no); + + createSparkThread(cap,spark); + return rtsTrue; + } + // otherwise: no success, try next one + } + } while (retry); + + debugTrace(DEBUG_sched, "No sparks stolen"); + return rtsFalse; } #endif @@ -140,6 +172,9 @@ initCapability( Capability *cap, nat i ) cap->returning_tasks_tl = NULL; cap->wakeup_queue_hd = END_TSO_QUEUE; cap->wakeup_queue_tl = END_TSO_QUEUE; + cap->sparks_created = 0; + cap->sparks_converted = 0; + cap->sparks_pruned = 0; #endif cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; @@ -158,6 +193,7 @@ initCapability( Capability *cap, nat i ) cap->free_trec_chunks = END_STM_CHUNK_LIST; cap->free_trec_headers = NO_TREC; cap->transaction_tokens = 0; + cap->context_switch = 0; } /* --------------------------------------------------------------------------- @@ -215,6 +251,19 @@ initCapabilities( void ) } /* ---------------------------------------------------------------------------- + * setContextSwitches: cause all capabilities to context switch as + * soon as possible. + * ------------------------------------------------------------------------- */ + +void setContextSwitches(void) +{ + nat i; + for (i=0; i < n_capabilities; i++) { + capabilities[i].context_switch = 1; + } +} + +/* ---------------------------------------------------------------------------- * Give a Capability to a Task. The task must currently be sleeping * on its condition variable. * @@ -258,7 +307,8 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) #if defined(THREADED_RTS) void -releaseCapability_ (Capability* cap) +releaseCapability_ (Capability* cap, + rtsBool always_wakeup) { Task *task; @@ -276,6 +326,21 @@ releaseCapability_ (Capability* cap) return; } + /* if waiting_for_gc was the reason to release the cap: thread + comes from yieldCap->releaseAndQueueWorker. Unconditionally set + cap. free and return (see default after the if-protected other + special cases). Thread will wait on cond.var and re-acquire the + same cap after GC (GC-triggering cap. calls releaseCap and + enters the spare_workers case) + */ + if (waiting_for_gc) { + last_free_capability = cap; // needed? + trace(TRACE_sched | DEBUG_sched, + "GC pending, set capability %d free", cap->no); + return; + } + + // If the next thread on the run queue is a bound thread, // give this Capability to the appropriate Task. if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) { @@ -301,8 +366,9 @@ releaseCapability_ (Capability* cap) // If we have an unbound thread on the run queue, or if there's // anything else to do, give the Capability to a worker thread. - if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap) - || !emptySparkPoolCap(cap) || globalWorkToDo()) { + if (always_wakeup || + !emptyRunQueue(cap) || !emptyWakeupQueue(cap) || + !emptySparkPoolCap(cap) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap,cap->spare_workers); // The worker Task pops itself from the queue; @@ -318,7 +384,15 @@ void releaseCapability (Capability* cap USED_IF_THREADS) { ACQUIRE_LOCK(&cap->lock); - releaseCapability_(cap); + releaseCapability_(cap, rtsFalse); + RELEASE_LOCK(&cap->lock); +} + +void +releaseAndWakeupCapability (Capability* cap USED_IF_THREADS) +{ + ACQUIRE_LOCK(&cap->lock); + releaseCapability_(cap, rtsTrue); RELEASE_LOCK(&cap->lock); } @@ -344,7 +418,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS) } // Bound tasks just float around attached to their TSOs. - releaseCapability_(cap); + releaseCapability_(cap,rtsFalse); RELEASE_LOCK(&cap->lock); } @@ -451,9 +525,6 @@ yieldCapability (Capability** pCap, Task *task) { Capability *cap = *pCap; - // The fast path has no locking, if we don't enter this while loop - - while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) { debugTrace(DEBUG_sched, "giving up capability %d", cap->no); // We must now release the capability and wait to be woken up @@ -498,7 +569,6 @@ yieldCapability (Capability** pCap, Task *task) trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no); ASSERT(cap->running_task == task); - } *pCap = cap; @@ -515,57 +585,41 @@ yieldCapability (Capability** pCap, Task *task) * ------------------------------------------------------------------------- */ void -wakeupThreadOnCapability (Capability *cap, StgTSO *tso) +wakeupThreadOnCapability (Capability *my_cap, + Capability *other_cap, + StgTSO *tso) { - ASSERT(tso->cap == cap); - ASSERT(tso->bound ? tso->bound->cap == cap : 1); - ASSERT_LOCK_HELD(&cap->lock); + ACQUIRE_LOCK(&other_cap->lock); + + // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) + if (tso->bound) { + ASSERT(tso->bound->cap == tso->cap); + tso->bound->cap = other_cap; + } + tso->cap = other_cap; - tso->cap = cap; + ASSERT(tso->bound ? tso->bound->cap == other_cap : 1); - if (cap->running_task == NULL) { + if (other_cap->running_task == NULL) { // nobody is running this Capability, we can add our thread // directly onto the run queue and start up a Task to run it. - appendToRunQueue(cap,tso); - // start it up - cap->running_task = myTask(); // precond for releaseCapability_() - trace(TRACE_sched, "resuming capability %d", cap->no); - releaseCapability_(cap); + other_cap->running_task = myTask(); + // precond for releaseCapability_() and appendToRunQueue() + + appendToRunQueue(other_cap,tso); + + trace(TRACE_sched, "resuming capability %d", other_cap->no); + releaseCapability_(other_cap,rtsFalse); } else { - appendToWakeupQueue(cap,tso); + appendToWakeupQueue(my_cap,other_cap,tso); + other_cap->context_switch = 1; // someone is running on this Capability, so it cannot be // freed without first checking the wakeup queue (see // releaseCapability_). } -} -void -wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso) -{ - ACQUIRE_LOCK(&cap->lock); - migrateThreadToCapability (cap, tso); - RELEASE_LOCK(&cap->lock); -} - -void -migrateThreadToCapability (Capability *cap, StgTSO *tso) -{ - // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) - if (tso->bound) { - ASSERT(tso->bound->cap == tso->cap); - tso->bound->cap = cap; - } - tso->cap = cap; - wakeupThreadOnCapability(cap,tso); -} - -void -migrateThreadToCapability_lock (Capability *cap, StgTSO *tso) -{ - ACQUIRE_LOCK(&cap->lock); - migrateThreadToCapability (cap, tso); - RELEASE_LOCK(&cap->lock); + RELEASE_LOCK(&other_cap->lock); } /* ---------------------------------------------------------------------------- @@ -691,7 +745,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe) if (!emptyRunQueue(cap) || cap->spare_workers) { debugTrace(DEBUG_sched, "runnable threads or workers still alive, yielding"); - releaseCapability_(cap); // this will wake up a worker + releaseCapability_(cap,rtsFalse); // this will wake up a worker RELEASE_LOCK(&cap->lock); yieldThread(); continue; @@ -755,7 +809,7 @@ void freeCapability (Capability *cap) { stgFree(cap->mut_lists); #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) - freeSparkPool(&cap->r.rSparks); + freeSparkPool(cap->sparks); #endif } @@ -766,7 +820,8 @@ freeCapability (Capability *cap) { ------------------------------------------------------------------------ */ void -markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta) +markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, + rtsBool prune_sparks USED_IF_THREADS) { nat i; Capability *cap; @@ -793,10 +848,14 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta) } #if defined(THREADED_RTS) - markSparkQueue (evac, user, cap); + if (prune_sparks) { + pruneSparkQueue (evac, user, cap); + } else { + traverseSparkQueue (evac, user, cap); + } #endif } - + #if !defined(THREADED_RTS) evac(user, (StgClosure **)(void *)&blocked_queue_hd); evac(user, (StgClosure **)(void *)&blocked_queue_tl); @@ -807,5 +866,5 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta) void markCapabilities (evac_fn evac, void *user) { - markSomeCapabilities(evac, user, 0, 1); + markSomeCapabilities(evac, user, 0, 1, rtsFalse); }