Fix for derefing ThreadRelocated TSOs in MVar operations
authorSimon Marlow <marlowsd@gmail.com>
Wed, 7 Apr 2010 09:28:24 +0000 (09:28 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Wed, 7 Apr 2010 09:28:24 +0000 (09:28 +0000)
rts/PrimOps.cmm
rts/RaiseAsync.c
rts/Threads.c
rts/Threads.h

index d09a856..e5427c7 100644 (file)
@@ -1211,22 +1211,29 @@ loop:
     // There are putMVar(s) waiting... wake up the first thread on the queue
     
     tso = StgMVarTSOQueue_tso(q);
-    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
-    ASSERT(StgTSO_block_info(tso) == mvar);
-    // actually perform the putMVar for the thread that we just woke up
-    PerformPut(tso,StgMVar_value(mvar));
-    
     StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
     if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
         StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
     }
-    
-    // indicate that the putMVar has now completed:
+
+loop2:
+    if (TO_W_(StgTSO_what_next(tso)) == ThreadRelocated) {
+        tso = StgTSO__link(tso);
+        goto loop2;
+    }
+
+    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+    ASSERT(StgTSO_block_info(tso) == mvar);
+
+    // actually perform the putMVar for the thread that we just woke up
+    PerformPut(tso,StgMVar_value(mvar));
+
+    // indicate that the MVar operation has now completed.
     StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
     
     // no need to mark the TSO dirty, we have only written END_TSO_QUEUE.
 
-    foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+    foreign "C" tryWakeupThread_(MyCapability() "ptr", tso) [];
     
     unlockClosure(mvar, stg_MVAR_DIRTY_info);
     RET_P(val);
@@ -1283,22 +1290,29 @@ loop:
     // There are putMVar(s) waiting... wake up the first thread on the queue
     
     tso = StgMVarTSOQueue_tso(q);
-    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
-    ASSERT(StgTSO_block_info(tso) == mvar);
-    // actually perform the putMVar for the thread that we just woke up
-    PerformPut(tso,StgMVar_value(mvar));
-    
     StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
     if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
         StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
     }
-    
-    // indicate that the putMVar has now completed:
+
+loop2:
+    if (TO_W_(StgTSO_what_next(tso)) == ThreadRelocated) {
+        tso = StgTSO__link(tso);
+        goto loop2;
+    }
+
+    ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
+    ASSERT(StgTSO_block_info(tso) == mvar);
+
+    // actually perform the putMVar for the thread that we just woke up
+    PerformPut(tso,StgMVar_value(mvar));
+
+    // indicate that the MVar operation has now completed.
     StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
     
     // no need to mark the TSO dirty, we have only written END_TSO_QUEUE.
 
-    foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+    foreign "C" tryWakeupThread_(MyCapability() "ptr", tso) [];
     
     unlockClosure(mvar, stg_MVAR_DIRTY_info);
     RET_P(val);
@@ -1368,24 +1382,31 @@ loop:
     // There are takeMVar(s) waiting: wake up the first one
     
     tso = StgMVarTSOQueue_tso(q);
+    StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
+    if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
+        StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
+    }
+
+loop2:
+    if (TO_W_(StgTSO_what_next(tso)) == ThreadRelocated) {
+        tso = StgTSO__link(tso);
+        goto loop2;
+    }
+
     ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
     ASSERT(StgTSO_block_info(tso) == mvar);
+
     // actually perform the takeMVar
     PerformTake(tso, val);
 
+    // indicate that the MVar operation has now completed.
+    StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+    
     if (TO_W_(StgTSO_dirty(tso)) == 0) {
         foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
     }
     
-    StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
-    if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
-        StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
-    }
-    
-    // indicate that the takeMVar has now completed:
-    StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
-    
-    foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+    foreign "C" tryWakeupThread_(MyCapability() "ptr", tso) [];
 
     unlockClosure(mvar, stg_MVAR_DIRTY_info);
     jump %ENTRY_CODE(Sp(0));
@@ -1431,29 +1452,34 @@ loop:
         goto loop;
     }
 
-    /* There are takeMVar(s) waiting: wake up the first one
-     */
     // There are takeMVar(s) waiting: wake up the first one
     
     tso = StgMVarTSOQueue_tso(q);
+    StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
+    if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
+        StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
+    }
+
+loop2:
+    if (TO_W_(StgTSO_what_next(tso)) == ThreadRelocated) {
+        tso = StgTSO__link(tso);
+        goto loop2;
+    }
+
     ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16);
     ASSERT(StgTSO_block_info(tso) == mvar);
+
     // actually perform the takeMVar
     PerformTake(tso, val);
 
+    // indicate that the MVar operation has now completed.
+    StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
+    
     if (TO_W_(StgTSO_dirty(tso)) == 0) {
         foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") [];
     }
     
-    StgMVar_head(mvar) = StgMVarTSOQueue_link(q);
-    if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
-        StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
-    }
-    
-    // indicate that the takeMVar has now completed:
-    StgTSO__link(tso) = stg_END_TSO_QUEUE_closure;
-    
-    foreign "C" tryWakeupThread(MyCapability() "ptr", tso) [];
+    foreign "C" tryWakeupThread_(MyCapability() "ptr", tso) [];
 
     unlockClosure(mvar, stg_MVAR_DIRTY_info);
     jump %ENTRY_CODE(Sp(0));
index bebbcd4..df7429a 100644 (file)
@@ -271,7 +271,7 @@ check_target:
             // might as well just do it now.  The message will
             // be a no-op when it arrives.
             unlockClosure((StgClosure*)m, i);
-            tryWakeupThread(cap, target);
+            tryWakeupThread_(cap, target);
             goto retry;
         }
 
@@ -337,7 +337,7 @@ check_target:
             // thread now anyway and ignore the message when it
             // arrives.
            unlockClosure((StgClosure *)mvar, info);
-            tryWakeupThread(cap, target);
+            tryWakeupThread_(cap, target);
             goto retry;
         }
 
index 5723eac..25241c7 100644 (file)
@@ -215,10 +215,14 @@ removeThreadFromDeQueue (Capability *cap,
    ------------------------------------------------------------------------- */
 
 void
-tryWakeupThread (Capability *cap, StgTSO *tso_)
+tryWakeupThread (Capability *cap, StgTSO *tso)
 {
-    StgTSO *tso = deRefTSO(tso_);
+    tryWakeupThread_(cap, deRefTSO(tso));
+}
 
+void
+tryWakeupThread_ (Capability *cap, StgTSO *tso)
+{
     traceEventThreadWakeup (cap, tso, tso->cap->no);
 
 #ifdef THREADED_RTS
index e3680f2..bf16dcd 100644 (file)
@@ -21,6 +21,9 @@ void wakeBlockingQueue   (Capability *cap, StgBlockingQueue *bq);
 void tryWakeupThread     (Capability *cap, StgTSO *tso);
 void migrateThread       (Capability *from, StgTSO *tso, Capability *to);
 
+// like tryWakeupThread(), but assumes the TSO is not ThreadRelocated
+void tryWakeupThread_    (Capability *cap, StgTSO *tso);
+
 // Wakes up a thread on a Capability (probably a different Capability
 // from the one held by the current Task).
 //