X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FPrimOps.cmm;h=5c575f695bd576b8de19397e03837450cb781891;hp=c146454ab18fb595efb14431dc0f76e974493f04;hb=5d52d9b64c21dcf77849866584744722f8121389;hpb=0417404f5d1230c9d291ea9f73e2831121c8ec99 diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index c146454..5c575f6 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -35,6 +35,9 @@ import base_ControlziExceptionziBase_nestedAtomically_closure; import EnterCriticalSection; import LeaveCriticalSection; import ghczmprim_GHCziBool_False_closure; +#if !defined(mingw32_HOST_OS) +import sm_mutex; +#endif /*----------------------------------------------------------------------------- Array Primitives @@ -541,9 +544,9 @@ stg_forkzh closure "ptr") []; /* start blocked if the current thread is blocked */ - StgTSO_flags(threadid) = - StgTSO_flags(threadid) | (StgTSO_flags(CurrentTSO) & - (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32)); + StgTSO_flags(threadid) = %lobits16( + TO_W_(StgTSO_flags(threadid)) | + TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE)); foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") []; @@ -571,9 +574,9 @@ stg_forkOnzh closure "ptr") []; /* start blocked if the current thread is blocked */ - StgTSO_flags(threadid) = - StgTSO_flags(threadid) | (StgTSO_flags(CurrentTSO) & - (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32)); + StgTSO_flags(threadid) = %lobits16( + TO_W_(StgTSO_flags(threadid)) | + TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE)); foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") []; @@ -1201,11 +1204,7 @@ stg_takeMVarzh StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif RET_P(val); } else @@ -1213,11 +1212,7 @@ stg_takeMVarzh /* No further putMVars, MVar is now empty */ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure; -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif RET_P(val); } @@ -1276,21 +1271,13 @@ stg_tryTakeMVarzh if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) { StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif } else { /* No further putMVars, MVar is now empty */ StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure; -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif } RET_NP(1, val); @@ -1357,11 +1344,7 @@ stg_putMVarzh StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif jump %ENTRY_CODE(Sp(0)); } else @@ -1369,11 +1352,7 @@ stg_putMVarzh /* No further takes, the MVar is now full. */ StgMVar_value(mvar) = val; -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif jump %ENTRY_CODE(Sp(0)); } @@ -1426,22 +1405,14 @@ stg_tryPutMVarzh StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure; } -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif } else { /* No further takes, the MVar is now full. */ StgMVar_value(mvar) = R2; -#if defined(THREADED_RTS) unlockClosure(mvar, stg_MVAR_DIRTY_info); -#else - SET_INFO(mvar,stg_MVAR_DIRTY_info); -#endif } RET_N(1); @@ -1844,12 +1815,71 @@ stg_asyncDoProczh } #endif -// noDuplicate# tries to ensure that none of the thunks under -// evaluation by the current thread are also under evaluation by -// another thread. It relies on *both* threads doing noDuplicate#; -// the second one will get blocked if they are duplicating some work. +/* ----------------------------------------------------------------------------- + * noDuplicate# + * + * noDuplicate# tries to ensure that none of the thunks under + * evaluation by the current thread are also under evaluation by + * another thread. It relies on *both* threads doing noDuplicate#; + * the second one will get blocked if they are duplicating some work. + * + * The idea is that noDuplicate# is used within unsafePerformIO to + * ensure that the IO operation is performed at most once. + * noDuplicate# calls threadPaused which acquires an exclusive lock on + * all the thunks currently under evaluation by the current thread. + * + * Consider the following scenario. There is a thunk A, whose + * evaluation requires evaluating thunk B, where thunk B is an + * unsafePerformIO. Two threads, 1 and 2, bother enter A. Thread 2 + * is pre-empted before it enters B, and claims A by blackholing it + * (in threadPaused). Thread 1 now enters B, and calls noDuplicate#. + * + * thread 1 thread 2 + * +-----------+ +---------------+ + * | -------+-----> A <-------+------- | + * | update | BLACKHOLE | marked_update | + * +-----------+ +---------------+ + * | | | | + * ... ... + * | | +---------------+ + * +-----------+ + * | ------+-----> B + * | update | BLACKHOLE + * +-----------+ + * + * At this point: A is a blackhole, owned by thread 2. noDuplicate# + * calls threadPaused, which walks up the stack and + * - claims B on behalf of thread 1 + * - then it reaches the update frame for A, which it sees is already + * a BLACKHOLE and is therefore owned by another thread. Since + * thread 1 is duplicating work, the computation up to the update + * frame for A is suspended, including thunk B. + * - thunk B, which is an unsafePerformIO, has now been reverted to + * an AP_STACK which could be duplicated - BAD! + * - The solution is as follows: before calling threadPaused, we + * leave a frame on the stack (stg_noDuplicate_info) that will call + * noDuplicate# again if the current computation is suspended and + * restarted. + * + * See the test program in concurrent/prog003 for a way to demonstrate + * this. It needs to be run with +RTS -N3 or greater, and the bug + * only manifests occasionally (once very 10 runs or so). + * -------------------------------------------------------------------------- */ + +INFO_TABLE_RET(stg_noDuplicate, RET_SMALL) +{ + Sp_adj(1); + jump stg_noDuplicatezh; +} + stg_noDuplicatezh { + STK_CHK_GEN( WDS(1), NO_PTRS, stg_noDuplicatezh ); + // leave noDuplicate frame in case the current + // computation is suspended and restarted (see above). + Sp_adj(-1); + Sp(0) = stg_noDuplicate_info; + SAVE_THREAD_STATE(); ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16); foreign "C" threadPaused (MyCapability() "ptr", CurrentTSO "ptr") []; @@ -1859,10 +1889,18 @@ stg_noDuplicatezh } else { LOAD_THREAD_STATE(); ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16); + // remove the stg_noDuplicate frame if it is still there. + if (Sp(0) == stg_noDuplicate_info) { + Sp_adj(1); + } jump %ENTRY_CODE(Sp(0)); } } +/* ----------------------------------------------------------------------------- + Misc. primitives + -------------------------------------------------------------------------- */ + stg_getApStackValzh { W_ ap_stack, offset, val, ok; @@ -1881,10 +1919,6 @@ stg_getApStackValzh RET_NP(ok,val); } -/* ----------------------------------------------------------------------------- - Misc. primitives - -------------------------------------------------------------------------- */ - // Write the cost center stack of the first argument on stderr; return // the second. Possibly only makes sense for already evaluated // things?