From ef6e8211dee59eb7fa80a242391b89b52bd57f80 Mon Sep 17 00:00:00 2001 From: Simon Marlow Date: Wed, 7 Apr 2010 09:28:24 +0000 Subject: [PATCH] Fix for derefing ThreadRelocated TSOs in MVar operations --- rts/PrimOps.cmm | 98 ++++++++++++++++++++++++++++++++++-------------------- rts/RaiseAsync.c | 4 +-- rts/Threads.c | 8 +++-- rts/Threads.h | 3 ++ 4 files changed, 73 insertions(+), 40 deletions(-) diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index d09a856..e5427c7 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -1211,22 +1211,29 @@ loop: // There are putMVar(s) waiting... wake up the first thread on the queue tso = StgMVarTSOQueue_tso(q); - ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); - ASSERT(StgTSO_block_info(tso) == mvar); - // actually perform the putMVar for the thread that we just woke up - PerformPut(tso,StgMVar_value(mvar)); - StgMVar_head(mvar) = StgMVarTSOQueue_link(q); if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } - - // indicate that the putMVar has now completed: + +loop2: + if (TO_W_(StgTSO_what_next(tso)) == ThreadRelocated) { + tso = StgTSO__link(tso); + goto loop2; + } + + ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); + ASSERT(StgTSO_block_info(tso) == mvar); + + // actually perform the putMVar for the thread that we just woke up + PerformPut(tso,StgMVar_value(mvar)); + + // indicate that the MVar operation has now completed. StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; // no need to mark the TSO dirty, we have only written END_TSO_QUEUE. - foreign "C" tryWakeupThread(MyCapability() "ptr", tso) []; + foreign "C" tryWakeupThread_(MyCapability() "ptr", tso) []; unlockClosure(mvar, stg_MVAR_DIRTY_info); RET_P(val); @@ -1283,22 +1290,29 @@ loop: // There are putMVar(s) waiting... wake up the first thread on the queue tso = StgMVarTSOQueue_tso(q); - ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); - ASSERT(StgTSO_block_info(tso) == mvar); - // actually perform the putMVar for the thread that we just woke up - PerformPut(tso,StgMVar_value(mvar)); - StgMVar_head(mvar) = StgMVarTSOQueue_link(q); if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } - - // indicate that the putMVar has now completed: + +loop2: + if (TO_W_(StgTSO_what_next(tso)) == ThreadRelocated) { + tso = StgTSO__link(tso); + goto loop2; + } + + ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); + ASSERT(StgTSO_block_info(tso) == mvar); + + // actually perform the putMVar for the thread that we just woke up + PerformPut(tso,StgMVar_value(mvar)); + + // indicate that the MVar operation has now completed. StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; // no need to mark the TSO dirty, we have only written END_TSO_QUEUE. - foreign "C" tryWakeupThread(MyCapability() "ptr", tso) []; + foreign "C" tryWakeupThread_(MyCapability() "ptr", tso) []; unlockClosure(mvar, stg_MVAR_DIRTY_info); RET_P(val); @@ -1368,24 +1382,31 @@ loop: // There are takeMVar(s) waiting: wake up the first one tso = StgMVarTSOQueue_tso(q); + StgMVar_head(mvar) = StgMVarTSOQueue_link(q); + if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; + } + +loop2: + if (TO_W_(StgTSO_what_next(tso)) == ThreadRelocated) { + tso = StgTSO__link(tso); + goto loop2; + } + ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); ASSERT(StgTSO_block_info(tso) == mvar); + // actually perform the takeMVar PerformTake(tso, val); + // indicate that the MVar operation has now completed. + StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; + if (TO_W_(StgTSO_dirty(tso)) == 0) { foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") []; } - StgMVar_head(mvar) = StgMVarTSOQueue_link(q); - if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { - StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; - } - - // indicate that the takeMVar has now completed: - StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; - - foreign "C" tryWakeupThread(MyCapability() "ptr", tso) []; + foreign "C" tryWakeupThread_(MyCapability() "ptr", tso) []; unlockClosure(mvar, stg_MVAR_DIRTY_info); jump %ENTRY_CODE(Sp(0)); @@ -1431,29 +1452,34 @@ loop: goto loop; } - /* There are takeMVar(s) waiting: wake up the first one - */ // There are takeMVar(s) waiting: wake up the first one tso = StgMVarTSOQueue_tso(q); + StgMVar_head(mvar) = StgMVarTSOQueue_link(q); + if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { + StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; + } + +loop2: + if (TO_W_(StgTSO_what_next(tso)) == ThreadRelocated) { + tso = StgTSO__link(tso); + goto loop2; + } + ASSERT(StgTSO_why_blocked(tso) == BlockedOnMVar::I16); ASSERT(StgTSO_block_info(tso) == mvar); + // actually perform the takeMVar PerformTake(tso, val); + // indicate that the MVar operation has now completed. + StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; + if (TO_W_(StgTSO_dirty(tso)) == 0) { foreign "C" dirty_TSO(MyCapability() "ptr", tso "ptr") []; } - StgMVar_head(mvar) = StgMVarTSOQueue_link(q); - if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { - StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; - } - - // indicate that the takeMVar has now completed: - StgTSO__link(tso) = stg_END_TSO_QUEUE_closure; - - foreign "C" tryWakeupThread(MyCapability() "ptr", tso) []; + foreign "C" tryWakeupThread_(MyCapability() "ptr", tso) []; unlockClosure(mvar, stg_MVAR_DIRTY_info); jump %ENTRY_CODE(Sp(0)); diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index bebbcd4..df7429a 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -271,7 +271,7 @@ check_target: // might as well just do it now. The message will // be a no-op when it arrives. unlockClosure((StgClosure*)m, i); - tryWakeupThread(cap, target); + tryWakeupThread_(cap, target); goto retry; } @@ -337,7 +337,7 @@ check_target: // thread now anyway and ignore the message when it // arrives. unlockClosure((StgClosure *)mvar, info); - tryWakeupThread(cap, target); + tryWakeupThread_(cap, target); goto retry; } diff --git a/rts/Threads.c b/rts/Threads.c index 5723eac..25241c7 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -215,10 +215,14 @@ removeThreadFromDeQueue (Capability *cap, ------------------------------------------------------------------------- */ void -tryWakeupThread (Capability *cap, StgTSO *tso_) +tryWakeupThread (Capability *cap, StgTSO *tso) { - StgTSO *tso = deRefTSO(tso_); + tryWakeupThread_(cap, deRefTSO(tso)); +} +void +tryWakeupThread_ (Capability *cap, StgTSO *tso) +{ traceEventThreadWakeup (cap, tso, tso->cap->no); #ifdef THREADED_RTS diff --git a/rts/Threads.h b/rts/Threads.h index e3680f2..bf16dcd 100644 --- a/rts/Threads.h +++ b/rts/Threads.h @@ -21,6 +21,9 @@ void wakeBlockingQueue (Capability *cap, StgBlockingQueue *bq); void tryWakeupThread (Capability *cap, StgTSO *tso); void migrateThread (Capability *from, StgTSO *tso, Capability *to); +// like tryWakeupThread(), but assumes the TSO is not ThreadRelocated +void tryWakeupThread_ (Capability *cap, StgTSO *tso); + // Wakes up a thread on a Capability (probably a different Capability // from the one held by the current Task). // -- 1.7.10.4