Fix race condition in wakeupThreadOnCapability() (#2574)
authorSimon Marlow <marlowsd@gmail.com>
Tue, 9 Sep 2008 13:32:23 +0000 (13:32 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Tue, 9 Sep 2008 13:32:23 +0000 (13:32 +0000)
wakeupThreadOnCapbility() is used to signal another capability that
there is a thread waiting to be added to its run queue.  It adds the
thread to the (locked) wakeup queue on the remote capability.  In
order to do this, it has to modify the TSO's link field, which has a
write barrier.  The write barrier might put the TSO on the mutable
list, and the bug was that it was using the mutable list of the
*target* capability, which we do not have exclusive access to.  We
should be using the current Capabilty's mutable list in this case.

rts/Capability.c
rts/Capability.h
rts/Schedule.c
rts/Schedule.h
rts/Threads.c

index 0f03621..1f1a1ae 100644 (file)
@@ -540,57 +540,40 @@ yieldCapability (Capability** pCap, Task *task)
  * ------------------------------------------------------------------------- */
 
 void
  * ------------------------------------------------------------------------- */
 
 void
-wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
+wakeupThreadOnCapability (Capability *my_cap, 
+                          Capability *other_cap, 
+                          StgTSO *tso)
 {
 {
-    ASSERT(tso->cap == cap);
-    ASSERT(tso->bound ? tso->bound->cap == cap : 1);
-    ASSERT_LOCK_HELD(&cap->lock);
+    ACQUIRE_LOCK(&other_cap->lock);
 
 
-    tso->cap = cap;
+    // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
+    if (tso->bound) {
+       ASSERT(tso->bound->cap == tso->cap);
+       tso->bound->cap = other_cap;
+    }
+    tso->cap = other_cap;
+
+    ASSERT(tso->bound ? tso->bound->cap == other_cap : 1);
 
 
-    if (cap->running_task == NULL) {
+    if (other_cap->running_task == NULL) {
        // nobody is running this Capability, we can add our thread
        // directly onto the run queue and start up a Task to run it.
        // nobody is running this Capability, we can add our thread
        // directly onto the run queue and start up a Task to run it.
-       appendToRunQueue(cap,tso);
 
 
-       // start it up
-       cap->running_task = myTask(); // precond for releaseCapability_()
-       trace(TRACE_sched, "resuming capability %d", cap->no);
-       releaseCapability_(cap);
+       other_cap->running_task = myTask(); 
+            // precond for releaseCapability_() and appendToRunQueue()
+
+       appendToRunQueue(other_cap,tso);
+
+       trace(TRACE_sched, "resuming capability %d", other_cap->no);
+       releaseCapability_(other_cap);
     } else {
     } else {
-       appendToWakeupQueue(cap,tso);
+       appendToWakeupQueue(my_cap,other_cap,tso);
        // someone is running on this Capability, so it cannot be
        // freed without first checking the wakeup queue (see
        // releaseCapability_).
     }
        // someone is running on this Capability, so it cannot be
        // freed without first checking the wakeup queue (see
        // releaseCapability_).
     }
-}
 
 
-void
-wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso)
-{
-    ACQUIRE_LOCK(&cap->lock);
-    migrateThreadToCapability (cap, tso);
-    RELEASE_LOCK(&cap->lock);
-}
-
-void
-migrateThreadToCapability (Capability *cap, StgTSO *tso)
-{
-    // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
-    if (tso->bound) {
-       ASSERT(tso->bound->cap == tso->cap);
-       tso->bound->cap = cap;
-    }
-    tso->cap = cap;
-    wakeupThreadOnCapability(cap,tso);
-}
-
-void
-migrateThreadToCapability_lock (Capability *cap, StgTSO *tso)
-{
-    ACQUIRE_LOCK(&cap->lock);
-    migrateThreadToCapability (cap, tso);
-    RELEASE_LOCK(&cap->lock);
+    RELEASE_LOCK(&other_cap->lock);
 }
 
 /* ----------------------------------------------------------------------------
 }
 
 /* ----------------------------------------------------------------------------
@@ -818,7 +801,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta)
        }
 
 #if defined(THREADED_RTS)
        }
 
 #if defined(THREADED_RTS)
-        markSparkQueue (evac, user, cap);
+        traverseSparkQueue (evac, user, cap);
 #endif
     }
 
 #endif
     }
 
index f13afe2..94306eb 100644 (file)
@@ -202,11 +202,8 @@ void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
 // Wakes up a thread on a Capability (probably a different Capability
 // from the one held by the current Task).
 //
 // Wakes up a thread on a Capability (probably a different Capability
 // from the one held by the current Task).
 //
-void wakeupThreadOnCapability (Capability *cap, StgTSO *tso);
-void wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso);
-
-void migrateThreadToCapability (Capability *cap, StgTSO *tso);
-void migrateThreadToCapability_lock (Capability *cap, StgTSO *tso);
+void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
+                               StgTSO *tso);
 
 // Wakes up a worker thread on just one Capability, used when we
 // need to service some global event.
 
 // Wakes up a worker thread on just one Capability, used when we
 // need to service some global event.
@@ -252,6 +249,8 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
 {
     bdescr *bd;
 
 {
     bdescr *bd;
 
+    // We must own this Capability in order to modify its mutable list.
+    ASSERT(cap->running_task == myTask());
     bd = cap->mut_lists[gen];
     if (bd->free >= bd->start + BLOCK_SIZE_W) {
        bdescr *new_bd;
     bd = cap->mut_lists[gen];
     if (bd->free >= bd->start + BLOCK_SIZE_W) {
        bdescr *new_bd;
index 1b5afef..f8a8748 100644 (file)
@@ -1871,7 +1871,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
     if (cpu == cap->no) {
        appendToRunQueue(cap,tso);
     } else {
     if (cpu == cap->no) {
        appendToRunQueue(cap,tso);
     } else {
-       migrateThreadToCapability_lock(&capabilities[cpu],tso);
+       wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
     }
 #else
     appendToRunQueue(cap,tso);
     }
 #else
     appendToRunQueue(cap,tso);
@@ -2312,8 +2312,6 @@ checkBlackHoles (Capability *cap)
        if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
            IF_DEBUG(sanity,checkTSO(t));
            t = unblockOne(cap, t);
        if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
            IF_DEBUG(sanity,checkTSO(t));
            t = unblockOne(cap, t);
-           // urk, the threads migrate to the current capability
-           // here, but we'd like to keep them on the original one.
            *prev = t;
            any_woke_up = rtsTrue;
        } else {
            *prev = t;
            any_woke_up = rtsTrue;
        } else {
index 59bdb9e..45aa000 100644 (file)
@@ -237,16 +237,21 @@ appendToBlockedQueue(StgTSO *tso)
 #endif
 
 #if defined(THREADED_RTS)
 #endif
 
 #if defined(THREADED_RTS)
+// Assumes: my_cap is owned by the current Task.  We hold
+// other_cap->lock, but we do not necessarily own other_cap; another
+// Task may be running on it.
 INLINE_HEADER void
 INLINE_HEADER void
-appendToWakeupQueue (Capability *cap, StgTSO *tso)
+appendToWakeupQueue (Capability *my_cap, Capability *other_cap, StgTSO *tso)
 {
     ASSERT(tso->_link == END_TSO_QUEUE);
 {
     ASSERT(tso->_link == END_TSO_QUEUE);
-    if (cap->wakeup_queue_hd == END_TSO_QUEUE) {
-       cap->wakeup_queue_hd = tso;
+    if (other_cap->wakeup_queue_hd == END_TSO_QUEUE) {
+       other_cap->wakeup_queue_hd = tso;
     } else {
     } else {
-       setTSOLink(cap, cap->wakeup_queue_tl, tso);
+        // my_cap is passed to setTSOLink() because it may need to
+        // write to the mutable list.
+       setTSOLink(my_cap, other_cap->wakeup_queue_tl, tso);
     }
     }
-    cap->wakeup_queue_tl = tso;
+    other_cap->wakeup_queue_tl = tso;
 }
 #endif
 
 }
 #endif
 
index b7f62c8..281cb65 100644 (file)
@@ -510,7 +510,7 @@ unblockOne_ (Capability *cap, StgTSO *tso,
       context_switch = 1;
   } else {
       // we'll try to wake it up on the Capability it was last on.
       context_switch = 1;
   } else {
       // we'll try to wake it up on the Capability it was last on.
-      wakeupThreadOnCapability_lock(tso->cap, tso);
+      wakeupThreadOnCapability(cap, tso->cap, tso);
   }
 #else
   appendToRunQueue(cap,tso);
   }
 #else
   appendToRunQueue(cap,tso);