From: Simon Marlow Date: Tue, 9 Sep 2008 13:32:23 +0000 (+0000) Subject: Fix race condition in wakeupThreadOnCapability() (#2574) X-Git-Tag: 2008-09-12~43 X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=commitdiff_plain;h=d572aed64d9c40dcc38a49b09d18f301555e4efb Fix race condition in wakeupThreadOnCapability() (#2574) wakeupThreadOnCapbility() is used to signal another capability that there is a thread waiting to be added to its run queue. It adds the thread to the (locked) wakeup queue on the remote capability. In order to do this, it has to modify the TSO's link field, which has a write barrier. The write barrier might put the TSO on the mutable list, and the bug was that it was using the mutable list of the *target* capability, which we do not have exclusive access to. We should be using the current Capabilty's mutable list in this case. --- diff --git a/rts/Capability.c b/rts/Capability.c index 0f03621..1f1a1ae 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -540,57 +540,40 @@ yieldCapability (Capability** pCap, Task *task) * ------------------------------------------------------------------------- */ void -wakeupThreadOnCapability (Capability *cap, StgTSO *tso) +wakeupThreadOnCapability (Capability *my_cap, + Capability *other_cap, + StgTSO *tso) { - ASSERT(tso->cap == cap); - ASSERT(tso->bound ? tso->bound->cap == cap : 1); - ASSERT_LOCK_HELD(&cap->lock); + ACQUIRE_LOCK(&other_cap->lock); - tso->cap = cap; + // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) + if (tso->bound) { + ASSERT(tso->bound->cap == tso->cap); + tso->bound->cap = other_cap; + } + tso->cap = other_cap; + + ASSERT(tso->bound ? tso->bound->cap == other_cap : 1); - if (cap->running_task == NULL) { + if (other_cap->running_task == NULL) { // nobody is running this Capability, we can add our thread // directly onto the run queue and start up a Task to run it. - appendToRunQueue(cap,tso); - // start it up - cap->running_task = myTask(); // precond for releaseCapability_() - trace(TRACE_sched, "resuming capability %d", cap->no); - releaseCapability_(cap); + other_cap->running_task = myTask(); + // precond for releaseCapability_() and appendToRunQueue() + + appendToRunQueue(other_cap,tso); + + trace(TRACE_sched, "resuming capability %d", other_cap->no); + releaseCapability_(other_cap); } else { - appendToWakeupQueue(cap,tso); + appendToWakeupQueue(my_cap,other_cap,tso); // someone is running on this Capability, so it cannot be // freed without first checking the wakeup queue (see // releaseCapability_). } -} -void -wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso) -{ - ACQUIRE_LOCK(&cap->lock); - migrateThreadToCapability (cap, tso); - RELEASE_LOCK(&cap->lock); -} - -void -migrateThreadToCapability (Capability *cap, StgTSO *tso) -{ - // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) - if (tso->bound) { - ASSERT(tso->bound->cap == tso->cap); - tso->bound->cap = cap; - } - tso->cap = cap; - wakeupThreadOnCapability(cap,tso); -} - -void -migrateThreadToCapability_lock (Capability *cap, StgTSO *tso) -{ - ACQUIRE_LOCK(&cap->lock); - migrateThreadToCapability (cap, tso); - RELEASE_LOCK(&cap->lock); + RELEASE_LOCK(&other_cap->lock); } /* ---------------------------------------------------------------------------- @@ -818,7 +801,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta) } #if defined(THREADED_RTS) - markSparkQueue (evac, user, cap); + traverseSparkQueue (evac, user, cap); #endif } diff --git a/rts/Capability.h b/rts/Capability.h index f13afe2..94306eb 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -202,11 +202,8 @@ void waitForCapability (Task *task, Mutex *mutex, Capability **pCap); // Wakes up a thread on a Capability (probably a different Capability // from the one held by the current Task). // -void wakeupThreadOnCapability (Capability *cap, StgTSO *tso); -void wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso); - -void migrateThreadToCapability (Capability *cap, StgTSO *tso); -void migrateThreadToCapability_lock (Capability *cap, StgTSO *tso); +void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap, + StgTSO *tso); // Wakes up a worker thread on just one Capability, used when we // need to service some global event. @@ -252,6 +249,8 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen) { bdescr *bd; + // We must own this Capability in order to modify its mutable list. + ASSERT(cap->running_task == myTask()); bd = cap->mut_lists[gen]; if (bd->free >= bd->start + BLOCK_SIZE_W) { bdescr *new_bd; diff --git a/rts/Schedule.c b/rts/Schedule.c index 1b5afef..f8a8748 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -1871,7 +1871,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { - migrateThreadToCapability_lock(&capabilities[cpu],tso); + wakeupThreadOnCapability(cap, &capabilities[cpu], tso); } #else appendToRunQueue(cap,tso); @@ -2312,8 +2312,6 @@ checkBlackHoles (Capability *cap) if (type != BLACKHOLE && type != CAF_BLACKHOLE) { IF_DEBUG(sanity,checkTSO(t)); t = unblockOne(cap, t); - // urk, the threads migrate to the current capability - // here, but we'd like to keep them on the original one. *prev = t; any_woke_up = rtsTrue; } else { diff --git a/rts/Schedule.h b/rts/Schedule.h index 59bdb9e..45aa000 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -237,16 +237,21 @@ appendToBlockedQueue(StgTSO *tso) #endif #if defined(THREADED_RTS) +// Assumes: my_cap is owned by the current Task. We hold +// other_cap->lock, but we do not necessarily own other_cap; another +// Task may be running on it. INLINE_HEADER void -appendToWakeupQueue (Capability *cap, StgTSO *tso) +appendToWakeupQueue (Capability *my_cap, Capability *other_cap, StgTSO *tso) { ASSERT(tso->_link == END_TSO_QUEUE); - if (cap->wakeup_queue_hd == END_TSO_QUEUE) { - cap->wakeup_queue_hd = tso; + if (other_cap->wakeup_queue_hd == END_TSO_QUEUE) { + other_cap->wakeup_queue_hd = tso; } else { - setTSOLink(cap, cap->wakeup_queue_tl, tso); + // my_cap is passed to setTSOLink() because it may need to + // write to the mutable list. + setTSOLink(my_cap, other_cap->wakeup_queue_tl, tso); } - cap->wakeup_queue_tl = tso; + other_cap->wakeup_queue_tl = tso; } #endif diff --git a/rts/Threads.c b/rts/Threads.c index b7f62c8..281cb65 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -510,7 +510,7 @@ unblockOne_ (Capability *cap, StgTSO *tso, context_switch = 1; } else { // we'll try to wake it up on the Capability it was last on. - wakeupThreadOnCapability_lock(tso->cap, tso); + wakeupThreadOnCapability(cap, tso->cap, tso); } #else appendToRunQueue(cap,tso);