From b61f70ce5ff947642c96b1ad980351691bb1e07a Mon Sep 17 00:00:00 2001 From: tharris Date: Thu, 18 Nov 2004 09:57:01 +0000 Subject: [PATCH] [project @ 2004-11-18 09:56:07 by tharris] Support for atomic memory transactions and associated regression tests conc041-048 --- ghc/compiler/codeGen/CgPrimOp.hs | 1 + ghc/compiler/prelude/PrelNames.lhs | 1 + ghc/compiler/prelude/TysPrim.lhs | 15 + ghc/compiler/prelude/primops.txt.pp | 62 ++- ghc/includes/ClosureTypes.h | 11 +- ghc/includes/Closures.h | 87 ++++ ghc/includes/Constants.h | 13 +- ghc/includes/RtsFlags.h | 3 +- ghc/includes/STM.h | 211 +++++++++ ghc/includes/StgMiscClosures.h | 22 + ghc/includes/TSO.h | 5 +- ghc/includes/mkDerivedConstants.c | 18 + ghc/rts/ClosureFlags.c | 7 + ghc/rts/Exception.cmm | 48 +- ghc/rts/GC.c | 186 ++++++++ ghc/rts/GCCompact.c | 52 +++ ghc/rts/HSprel.def | 1 + ghc/rts/Linker.c | 7 + ghc/rts/Makefile | 1 + ghc/rts/Prelude.h | 2 + ghc/rts/PrimOps.cmm | 432 ++++++++++++++++++ ghc/rts/Printer.c | 8 +- ghc/rts/RtsFlags.c | 6 + ghc/rts/RtsStartup.c | 3 + ghc/rts/STM.c | 817 +++++++++++++++++++++++++++++++++++ ghc/rts/Sanity.c | 49 ++- ghc/rts/Schedule.c | 149 +++++++ ghc/rts/Schedule.h | 3 + ghc/rts/StgMiscClosures.cmm | 31 ++ ghc/rts/package.conf.in | 2 + ghc/utils/genprimopcode/Main.hs | 2 + 31 files changed, 2234 insertions(+), 21 deletions(-) create mode 100644 ghc/includes/STM.h create mode 100644 ghc/rts/STM.c diff --git a/ghc/compiler/codeGen/CgPrimOp.hs b/ghc/compiler/codeGen/CgPrimOp.hs index 65ad0cc..5c01903 100644 --- a/ghc/compiler/codeGen/CgPrimOp.hs +++ b/ghc/compiler/codeGen/CgPrimOp.hs @@ -486,6 +486,7 @@ translateOp SameMutVarOp = Just mo_wordEq translateOp SameMVarOp = Just mo_wordEq translateOp SameMutableArrayOp = Just mo_wordEq translateOp SameMutableByteArrayOp = Just mo_wordEq +translateOp SameTVarOp = Just mo_wordEq translateOp EqForeignObj = Just mo_wordEq translateOp EqStablePtrOp = Just mo_wordEq diff --git a/ghc/compiler/prelude/PrelNames.lhs b/ghc/compiler/prelude/PrelNames.lhs index 893fed2..f534abe 100644 --- a/ghc/compiler/prelude/PrelNames.lhs +++ b/ghc/compiler/prelude/PrelNames.lhs @@ -789,6 +789,7 @@ threadIdPrimTyConKey = mkPreludeTyConUnique 72 bcoPrimTyConKey = mkPreludeTyConUnique 73 ptrTyConKey = mkPreludeTyConUnique 74 funPtrTyConKey = mkPreludeTyConUnique 75 +tVarPrimTyConKey = mkPreludeTyConUnique 76 -- Generic Type Constructors crossTyConKey = mkPreludeTyConUnique 79 diff --git a/ghc/compiler/prelude/TysPrim.lhs b/ghc/compiler/prelude/TysPrim.lhs index 0cc59d9..155fdf8 100644 --- a/ghc/compiler/prelude/TysPrim.lhs +++ b/ghc/compiler/prelude/TysPrim.lhs @@ -28,6 +28,7 @@ module TysPrim( mutVarPrimTyCon, mkMutVarPrimTy, mVarPrimTyCon, mkMVarPrimTy, + tVarPrimTyCon, mkTVarPrimTy, stablePtrPrimTyCon, mkStablePtrPrimTy, stableNamePrimTyCon, mkStableNamePrimTy, bcoPrimTyCon, bcoPrimTy, @@ -87,6 +88,7 @@ primTyCons , mutableArrayPrimTyCon , mutableByteArrayPrimTyCon , mVarPrimTyCon + , tVarPrimTyCon , mutVarPrimTyCon , realWorldTyCon , stablePtrPrimTyCon @@ -124,6 +126,7 @@ mutableArrayPrimTyConName = mkPrimTc FSLIT("MutableArray#") mutableArrayPrim mutableByteArrayPrimTyConName = mkPrimTc FSLIT("MutableByteArray#") mutableByteArrayPrimTyConKey mutableByteArrayPrimTyCon mutVarPrimTyConName = mkPrimTc FSLIT("MutVar#") mutVarPrimTyConKey mutVarPrimTyCon mVarPrimTyConName = mkPrimTc FSLIT("MVar#") mVarPrimTyConKey mVarPrimTyCon +tVarPrimTyConName = mkPrimTc FSLIT("TVar#") tVarPrimTyConKey tVarPrimTyCon stablePtrPrimTyConName = mkPrimTc FSLIT("StablePtr#") stablePtrPrimTyConKey stablePtrPrimTyCon stableNamePrimTyConName = mkPrimTc FSLIT("StableName#") stableNamePrimTyConKey stableNamePrimTyCon foreignObjPrimTyConName = mkPrimTc FSLIT("ForeignObj#") foreignObjPrimTyConKey foreignObjPrimTyCon @@ -314,6 +317,18 @@ mkMVarPrimTy s elt = mkTyConApp mVarPrimTyCon [s, elt] %************************************************************************ %* * +\subsection[TysPrim-stm-var]{The transactional variable type} +%* * +%************************************************************************ + +\begin{code} +tVarPrimTyCon = pcPrimTyCon tVarPrimTyConName vrcsZP PtrRep + +mkTVarPrimTy s elt = mkTyConApp tVarPrimTyCon [s, elt] +\end{code} + +%************************************************************************ +%* * \subsection[TysPrim-stable-ptrs]{The stable-pointer type} %* * %************************************************************************ diff --git a/ghc/compiler/prelude/primops.txt.pp b/ghc/compiler/prelude/primops.txt.pp index 4d7d4d9..04a7885 100644 --- a/ghc/compiler/prelude/primops.txt.pp +++ b/ghc/compiler/prelude/primops.txt.pp @@ -1,5 +1,5 @@ ----------------------------------------------------------------------- --- $Id: primops.txt.pp,v 1.30 2003/10/01 10:57:39 wolfgang Exp $ +-- $Id: primops.txt.pp,v 1.31 2004/11/18 09:56:15 tharris Exp $ -- -- Primitive Operations -- @@ -1334,6 +1334,66 @@ primop UnblockAsyncExceptionsOp "unblockAsyncExceptions#" GenPrimOp out_of_line = True ------------------------------------------------------------------------ +section "STM-accessible Mutable Variables" +------------------------------------------------------------------------ + +primop AtomicallyOp "atomically#" GenPrimOp + (State# RealWorld -> (# State# RealWorld, a #) ) + -> State# RealWorld -> (# State# RealWorld, a #) + with + out_of_line = True + has_side_effects = True + +primop RetryOp "retry#" GenPrimOp + State# RealWorld -> (# State# RealWorld, a #) + with + out_of_line = True + has_side_effects = True + +primop CatchRetryOp "catchRetry#" GenPrimOp + (State# RealWorld -> (# State# RealWorld, a #) ) + -> (State# RealWorld -> (# State# RealWorld, a #) ) + -> (State# RealWorld -> (# State# RealWorld, a #) ) + with + out_of_line = True + has_side_effects = True + +primop CatchSTMOp "catchSTM#" GenPrimOp + (State# RealWorld -> (# State# RealWorld, a #) ) + -> (b -> State# RealWorld -> (# State# RealWorld, a #) ) + -> (State# RealWorld -> (# State# RealWorld, a #) ) + with + out_of_line = True + has_side_effects = True + +primop NewTVarOp "newTVar#" GenPrimOp + a + -> State# s -> (# State# s, TVar# s a #) + {Create a new Tar\# holding a specified initial value.} + with + out_of_line = True + +primop ReadTVarOp "readTVar#" GenPrimOp + TVar# s a + -> State# s -> (# State# s, a #) + {Read contents of TVar\#. Result is not yet evaluated.} + with + out_of_line = True + +primop WriteTVarOp "writeTVar#" GenPrimOp + TVar# s a + -> a + -> State# s -> State# s + {Write contents of TVar\#.} + with + out_of_line = True + has_side_effects = True + +primop SameTVarOp "sameTVar#" GenPrimOp + TVar# s a -> TVar# s a -> Bool + + +------------------------------------------------------------------------ section "Synchronized Mutable Variables" {Operations on MVar\#s, which are shared mutable variables ({\it not} the same as MutVar\#s!). (Note: in a non-concurrent implementation, diff --git a/ghc/includes/ClosureTypes.h b/ghc/includes/ClosureTypes.h index c384ded..f727fc7 100644 --- a/ghc/includes/ClosureTypes.h +++ b/ghc/includes/ClosureTypes.h @@ -1,5 +1,5 @@ /* ---------------------------------------------------------------------------- - * $Id: ClosureTypes.h,v 1.18 2002/12/11 15:36:37 simonmar Exp $ + * $Id: ClosureTypes.h,v 1.19 2004/11/18 09:56:17 tharris Exp $ * * (c) The GHC Team, 1998-1999 * @@ -79,6 +79,13 @@ #define RBH 63 #define EVACUATED 64 #define REMOTE_REF 65 -#define N_CLOSURE_TYPES 66 +#define TVAR_WAIT_QUEUE 66 +#define TVAR 67 +#define TREC_CHUNK 68 +#define TREC_HEADER 69 +#define ATOMICALLY_FRAME 70 +#define CATCH_RETRY_FRAME 71 +#define CATCH_STM_FRAME 72 +#define N_CLOSURE_TYPES 73 #endif /* CLOSURETYPES_H */ diff --git a/ghc/includes/Closures.h b/ghc/includes/Closures.h index d546792..d160ac5 100644 --- a/ghc/includes/Closures.h +++ b/ghc/includes/Closures.h @@ -308,6 +308,93 @@ typedef struct { StgClosure* value; } StgMVar; +/* STM data structures + * + * StgTVar defines the only type that can be updated through the STM + * interface. + * + * Note that various optimisations may be possible in order to use less + * space for these data structures at the cost of more complexity in the + * implementation: + * + * - In StgTVar, current_value and first_wait_queue_entry could be held in + * the same field: if any thread is waiting then its expected_value for + * the tvar is the current value. + * + * - In StgTRecHeader, it might be worthwhile having separate chunks + * of read-only and read-write locations. This would save a + * new_value field in the read-only locations. + */ + +typedef struct StgTVarWaitQueue_ { + StgHeader header; + struct StgTSO_ *waiting_tso; + StgMutClosure *mut_link; + struct StgTVarWaitQueue_ *next_queue_entry; + struct StgTVarWaitQueue_ *prev_queue_entry; +} StgTVarWaitQueue; + +typedef struct { + StgHeader header; + StgClosure *current_value; + StgMutClosure *mut_link; + StgTVarWaitQueue *first_wait_queue_entry; +} StgTVar; + +// new_value == expected_value for read-only accesses +// new_value is a StgTVarWaitQueue entry when trec in state TREC_WAITING +typedef struct { + StgTVar *tvar; + StgClosure *expected_value; + StgClosure *new_value; +} TRecEntry; + +#define TREC_CHUNK_NUM_ENTRIES 256 + +typedef struct StgTRecChunk_ { + StgHeader header; + struct StgTRecChunk_ *prev_chunk; + StgMutClosure *mut_link; + StgWord next_entry_idx; + TRecEntry entries[TREC_CHUNK_NUM_ENTRIES]; +} StgTRecChunk; + +typedef enum { + TREC_ACTIVE, // Transaction in progress, outcome undecided + TREC_CANNOT_COMMIT, // Transaction in progress, inconsistent writes performed + TREC_MUST_ABORT, // Transaction in progress, inconsistent / out of date reads + TREC_COMMITTED, // Transaction has committed, now updating tvars + TREC_ABORTED, // Transaction has aborted, now reverting tvars + TREC_WAITING, // Transaction currently waiting +} TRecState; + +typedef struct StgTRecHeader_ { + StgHeader header; + TRecState state; + StgMutClosure *mut_link; + struct StgTRecHeader_ *enclosing_trec; + StgTRecChunk *current_chunk; +} StgTRecHeader; + +typedef struct { + StgHeader header; + StgBool waiting; + StgClosure *code; +} StgAtomicallyFrame; + +typedef struct { + StgHeader header; + StgClosure *handler; +} StgCatchSTMFrame; + +typedef struct { + StgHeader header; + StgBool running_alt_code; + StgClosure *first_code; + StgClosure *alt_code; + StgTRecHeader *first_code_trec; +} StgCatchRetryFrame; + #if defined(PAR) || defined(GRAN) /* StgBlockingQueueElement is a ``collective type'' representing the types diff --git a/ghc/includes/Constants.h b/ghc/includes/Constants.h index 2d99ae9..579705e 100644 --- a/ghc/includes/Constants.h +++ b/ghc/includes/Constants.h @@ -1,5 +1,5 @@ /* ---------------------------------------------------------------------------- - * $Id: Constants.h,v 1.26 2004/08/13 13:09:13 simonmar Exp $ + * $Id: Constants.h,v 1.27 2004/11/18 09:56:19 tharris Exp $ * * (c) The GHC Team, 1998-2002 * @@ -222,18 +222,19 @@ #define BlockedOnRead 4 #define BlockedOnWrite 5 #define BlockedOnDelay 6 +#define BlockedOnSTM 7 /* Win32 only: */ -#define BlockedOnDoProc 7 +#define BlockedOnDoProc 8 /* Only relevant for PAR: */ /* blocked on a remote closure represented by a Global Address: */ -#define BlockedOnGA 8 +#define BlockedOnGA 9 /* same as above but without sending a Fetch message */ -#define BlockedOnGA_NoSend 9 +#define BlockedOnGA_NoSend 10 /* Only relevant for RTS_SUPPORTS_THREADS: */ -#define BlockedOnCCall 10 -#define BlockedOnCCall_NoUnblockExc 11 +#define BlockedOnCCall 11 +#define BlockedOnCCall_NoUnblockExc 12 /* same as above but don't unblock async exceptions in resumeThread() */ /* diff --git a/ghc/includes/RtsFlags.h b/ghc/includes/RtsFlags.h index 1d45748..a94bf7b 100644 --- a/ghc/includes/RtsFlags.h +++ b/ghc/includes/RtsFlags.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: RtsFlags.h,v 1.46 2004/08/13 13:09:29 simonmar Exp $ + * $Id: RtsFlags.h,v 1.47 2004/11/18 09:56:20 tharris Exp $ * * (c) The GHC Team, 1998-1999 * @@ -60,6 +60,7 @@ struct DEBUG_FLAGS { rtsBool par; // 'P' rtsBool linker; // 'l' the object linker rtsBool apply; // 'a' + rtsBool stm; // 'm' }; struct COST_CENTRE_FLAGS { diff --git a/ghc/includes/STM.h b/ghc/includes/STM.h new file mode 100644 index 0000000..fc3f29a --- /dev/null +++ b/ghc/includes/STM.h @@ -0,0 +1,211 @@ +/*---------------------------------------------------------------------- + * + * (c) The GHC Team, 1998-2004 + * + * STM interface definition + * + *---------------------------------------------------------------------- + + STM.h defines the C-level interface to the STM. + + The interface is designed so that all of the operations return + directly: if the specified StgTSO should block then the Haskell + scheduler's data structures are updated within the STM + implementation, rather than blocking the native thread. + + This interface can be supported by many different implementations, + in particular it is left unspecified: + + - Whether nested transactions are fully supported. + + A simple implementation would count the number of + stmStartTransaction operations that a thread invokes and only + attempt to really commit it to the heap when the corresponding + number of stmCommitTransaction calls have been made. This + prevents enclosed transactions from being aborted without also + aborting all of the outer ones. + + The current implementation does support proper nesting. + + - Whether stmWait and stmReWait are blocking. + + A simple implementation would always return 'false' from these + operations, signalling that the calling thread should immediately + retry its transaction. + + A fuller implementation would block the thread and return 'True' + when it is safe for the thread to block. + + The current implementation does provide stmWait and stmReWait + operations which can block the caller's TSO. + + - Whether the transactional read, write, commit and validate + operations are blocking or non-blocking. + + A simple implementation would use an internal lock to prevent + concurrent execution of any STM operations. (This does not + prevent multiple threads having concurrent transactions, merely + the concurrent execution of say stmCommitTransaction by two + threads at the same time). + + A fuller implementation would offer obstruction-free or lock-free + progress guarantees, as in our OOPSLA 2003 paper. + + The current implementation is lock-free for simple uncontended + operations, but uses an internal lock on SMP systems in some + cases. This aims to provide good performance on uniprocessors: + it substantially streamlines the design, when compared with the + OOPSLA paper, and on a uniprocessor we can be sure that threads + are never pre-empted within STM operations. +*/ + +#ifndef STM_H +#define STM_H + +#ifdef __cplusplus +extern "C" { +#endif + +/*---------------------------------------------------------------------- + + Start of day + ------------ + +*/ + +extern void initSTM(void); + +extern void stmPreGCHook(void); + +/*---------------------------------------------------------------------- + + Transaction context management + ------------------------------ + +*/ + +// Create and enter a new transaction context + +extern StgTRecHeader *stmStartTransaction(StgTRecHeader *outer); + +// Exit the current transaction context, abandoning any read/write +// operations performed within it and removing the thread from any +// tvar wait queues if it was waitin. Note that if nested transactions +// are not fully supported then this may leave the enclosing +// transaction contexts doomed to abort. + +extern void stmAbortTransaction(StgTRecHeader *trec); + +// Return the trec within which the specified trec was created (not +// valid if trec==NO_TREC). + +extern StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec); + +/*---------------------------------------------------------------------- + + Validate/commit/wait/rewait operations + -------------------------------------- + + + These four operations return boolean results which should be interpreted + as follows: + + true => The transaction context was definitely valid + + false => The transaction context may not have been valid + + The user of the STM should ensure that it is always safe to assume that a + transaction context is not valid when in fact it is (i.e. to return false in + place of true, with side-effects as defined below). This may cause + needless retries of transactions (in the case of validate and commit), or it + may cause needless spinning instead of blocking (in the case of wait and + rewait). + + In defining the behaviour of wait and rewait we distinguish between two + different aspects of a thread's runnability: + + - We say that a thread is "blocked" when it is not running or + runnable as far as the scheduler is concerned. + + - We say that a thread is "waiting" when its StgTRecHeader is linked on an + tvar's wait queue. + + Considering only STM operations, (blocked) => (waiting). The user of the STM + should ensure that they are prepared for threads to be unblocked spuriously + and for wait/reWait to return false even when the previous transaction context + is actually still valid. +*/ + +// Test whether the current transaction context is valid, i.e. whether +// it is still possible for it to commit successfully. Note: we assume that +// once stmValidateTransaction has returned FALSE for a given transaction then +// that transaction will never again be valid -- we rely on this in Schedule.c when +// kicking invalid threads at GC (in case they are stuck looping) + +extern StgBool stmValidateTransaction(StgTRecHeader *trec); + +// Test whether the current transaction context is valid and, if so, +// commit its memory accesses to the heap. stmCommitTransaction must +// unblock any threads which are waiting on tvars that updates have +// been committed to. + +extern StgBool stmCommitTransaction(StgTRecHeader *trec); + +// Test whether the current transaction context is valid and, if so, +// start the thread waiting for updates to any of the tvars it has +// ready from and mark it as blocked. It is an error to call stmWait +// if the thread is already waiting. + +extern StgBool stmWait(StgTSO *tso, StgTRecHeader *trec); + +// Test whether the current transaction context is valid and, if so, +// leave the thread waiting and mark it as blocked again. If the +// transaction context is no longer valid then stop the thread waiting +// and leave it as unblocked. It is an error to call stmReWait if the +// thread is not waiting. + +extern StgBool stmReWait(StgTRecHeader *trec); + +// Merge the accesses made so far in the second trec into the first trec. +// Note that the resulting trec is only intended to be used in wait operations. +// This avoids defining what happens if "trec" and "other" contain conflicting +// updates. + +extern StgBool stmMergeForWaiting(StgTRecHeader *trec, StgTRecHeader *other); + + +/*---------------------------------------------------------------------- + + Data access operations + ---------------------- +*/ + +// Return the logical contents of 'tvar' within the context of the +// thread's current transaction. + +extern StgClosure *stmReadTVar(StgTRecHeader *trec, + StgTVar *tvar); + +// Update the logical contents of 'tvar' within the context of the +// thread's current transaction. + +extern void stmWriteTVar(StgTRecHeader *trec, + StgTVar *tvar, + StgClosure *new_value); + +/*----------------------------------------------------------------------*/ + +// NULLs + +#define END_STM_WAIT_QUEUE ((StgTVarWaitQueue *)(void *)&stg_END_STM_WAIT_QUEUE_closure) +#define END_STM_CHUNK_LIST ((StgTRecChunk *)(void *)&stg_END_STM_CHUNK_LIST_closure) +#define NO_TREC ((StgTRecHeader *)(void *)&stg_NO_TREC_closure) + +/*----------------------------------------------------------------------*/ + +#ifdef __cplusplus +} +#endif + +#endif /* STM_H */ + diff --git a/ghc/includes/StgMiscClosures.h b/ghc/includes/StgMiscClosures.h index 0cdcccc..38ad6f0 100644 --- a/ghc/includes/StgMiscClosures.h +++ b/ghc/includes/StgMiscClosures.h @@ -129,6 +129,13 @@ RTS_INFO(stg_AP_info); RTS_INFO(stg_AP_STACK_info); RTS_INFO(stg_dummy_ret_info); RTS_INFO(stg_raise_info); +RTS_INFO(stg_TVAR_WAIT_QUEUE_info); +RTS_INFO(stg_TVAR_info); +RTS_INFO(stg_TREC_CHUNK_info); +RTS_INFO(stg_TREC_HEADER_info); +RTS_INFO(stg_END_STM_WAIT_QUEUE_info); +RTS_INFO(stg_END_STM_CHUNK_LIST_info); +RTS_INFO(stg_NO_TREC_info); RTS_ENTRY(stg_IND_entry); RTS_ENTRY(stg_IND_direct_entry); @@ -182,6 +189,9 @@ RTS_ENTRY(stg_AP_entry); RTS_ENTRY(stg_AP_STACK_entry); RTS_ENTRY(stg_dummy_ret_entry); RTS_ENTRY(stg_raise_entry); +RTS_ENTRY(stg_END_STM_WAIT_QUEUE_entry); +RTS_ENTRY(stg_END_STM_CHUNK_LIST_entry); +RTS_ENTRY(stg_NO_TREC_entry); RTS_ENTRY(stg_unblockAsyncExceptionszh_ret_ret); @@ -198,6 +208,10 @@ RTS_CLOSURE(stg_NO_FINALIZER_closure); RTS_CLOSURE(stg_dummy_ret_closure); RTS_CLOSURE(stg_forceIO_closure); +RTS_CLOSURE(stg_END_STM_WAIT_QUEUE_closure); +RTS_CLOSURE(stg_END_STM_CHUNK_LIST_closure); +RTS_CLOSURE(stg_NO_TREC_closure); + RTS_ENTRY(stg_NO_FINALIZER_entry); RTS_ENTRY(stg_END_EXCEPTION_LIST_entry); RTS_ENTRY(stg_EXCEPTION_CONS_entry); @@ -544,4 +558,12 @@ RTS_FUN(mkForeignObjzh_fast); RTS_FUN(newBCOzh_fast); RTS_FUN(mkApUpd0zh_fast); +RTS_FUN(retryzh_fast); +RTS_FUN(catchRetryzh_fast); +RTS_FUN(catchSTMzh_fast); +RTS_FUN(atomicallyzh_fast); +RTS_FUN(newTVarzh_fast); +RTS_FUN(readTVarzh_fast); +RTS_FUN(writeTVarzh_fast); + #endif /* STGMISCCLOSURES_H */ diff --git a/ghc/includes/TSO.h b/ghc/includes/TSO.h index 958527c..f68bf80 100644 --- a/ghc/includes/TSO.h +++ b/ghc/includes/TSO.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: TSO.h,v 1.38 2004/11/10 02:13:12 wolfgang Exp $ + * $Id: TSO.h,v 1.39 2004/11/18 09:56:22 tharris Exp $ * * (c) The GHC Team, 1998-1999 * @@ -136,6 +136,7 @@ typedef struct StgTSO_ { StgThreadID id; int saved_errno; struct StgMainThread_* main; + struct StgTRecHeader_ *trec; // STM transaction record #ifdef TICKY_TICKY // TICKY-specific stuff would go here. @@ -183,6 +184,8 @@ typedef struct StgTSO_ { BlockedOnBlackHole the BLACKHOLE_BQ the BLACKHOLE_BQ's queue BlockedOnMVar the MVAR the MVAR's queue + + BlockedOnSTM END_TSO_QUEUE STM wait queue(s) BlockedOnException the TSO TSO->blocked_exception diff --git a/ghc/includes/mkDerivedConstants.c b/ghc/includes/mkDerivedConstants.c index a1b59f7..0499331 100644 --- a/ghc/includes/mkDerivedConstants.c +++ b/ghc/includes/mkDerivedConstants.c @@ -253,6 +253,7 @@ main(int argc, char *argv[]) closure_field(StgTSO, blocked_exceptions); closure_field(StgTSO, id); closure_field(StgTSO, saved_errno); + closure_field(StgTSO, trec); closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS); tso_field(StgTSO, sp); tso_offset(StgTSO, stack); @@ -298,6 +299,19 @@ main(int argc, char *argv[]) closure_size(StgMutVar); closure_field(StgMutVar, var); + closure_size(StgAtomicallyFrame); + closure_field(StgAtomicallyFrame, waiting); + closure_field(StgAtomicallyFrame, code); + + closure_size(StgCatchSTMFrame); + closure_field(StgCatchSTMFrame, handler); + + closure_size(StgCatchRetryFrame); + closure_field(StgCatchRetryFrame, running_alt_code); + closure_field(StgCatchRetryFrame, first_code); + closure_field(StgCatchRetryFrame, alt_code); + closure_field(StgCatchRetryFrame, first_code_trec); + closure_size(StgForeignObj); closure_field(StgForeignObj,data); @@ -315,6 +329,10 @@ main(int argc, char *argv[]) closure_field(StgMVar,tail); closure_field(StgMVar,value); + closure_size(StgTVar); + closure_field(StgTVar,current_value); + closure_field(StgTVar,first_wait_queue_entry); + closure_size(StgBCO); closure_field(StgBCO, instrs); closure_field(StgBCO, literals); diff --git a/ghc/rts/ClosureFlags.c b/ghc/rts/ClosureFlags.c index dc344f2..28876ab 100644 --- a/ghc/rts/ClosureFlags.c +++ b/ghc/rts/ClosureFlags.c @@ -92,5 +92,12 @@ StgWord16 closure_flags[] = { /* RBH = */ ( _NS| _MUT|_UPT ), /* EVACUATED = */ ( 0 ), /* REMOTE_REF = */ (_HNF| _NS| _UPT ), +/* TVAR_WAIT_QUEUE = */ ( _NS| _MUT|_UPT ), +/* TVAR = */ (_HNF| _NS| _MUT|_UPT ), +/* TREC_CHUNK = */ ( _NS| _MUT|_UPT ), +/* TREC_HEADER = */ ( _NS| _MUT|_UPT ), +/* ATOMICALLY_FRAME = */ ( _BTM ), +/* CATCH_RETRY_FRAME = */ ( _BTM ), +/* CATCH_STM_FRAME = */ ( _BTM ), /* STACK = */ (_HNF| _NS| _MUT ) }; diff --git a/ghc/rts/Exception.cmm b/ghc/rts/Exception.cmm index 04f328b..e8cd4cd 100644 --- a/ghc/rts/Exception.cmm +++ b/ghc/rts/Exception.cmm @@ -335,9 +335,34 @@ raisezh_fast } #endif +retry_pop_stack: StgTSO_sp(CurrentTSO) = Sp; frame_type = foreign "C" raiseExceptionHelper(CurrentTSO "ptr", R1 "ptr"); Sp = StgTSO_sp(CurrentTSO); + if (frame_type == ATOMICALLY_FRAME) { + /* The exception has reached the edge of a memory transaction. Check that + * the transaction is valid. If not then perhaps the exception should + * not have been thrown: re-run the transaction */ + W_ trec; + W_ r; + trec = StgTSO_trec(CurrentTSO); + r = foreign "C" stmValidateTransaction(trec "ptr"); + foreign "C" stmAbortTransaction(trec "ptr"); + StgTSO_trec(CurrentTSO) = NO_TREC; + if (r) { + // Transaction was valid: continue searching for a catch frame + Sp = Sp + SIZEOF_StgAtomicallyFrame; + goto retry_pop_stack; + } else { + // Transaction was not valid: we retry the exception (otherwise continue + // with a further call to raiseExceptionHelper) + "ptr" trec = foreign "C" stmStartTransaction(NO_TREC "ptr"); + StgTSO_trec(CurrentTSO) = trec; + R1 = StgAtomicallyFrame_code(Sp); + Sp_adj(-1); + jump RET_LBL(stg_ap_v); + } + } if (frame_type == STOP_FRAME) { /* We've stripped the entire stack, the thread is now dead. */ @@ -350,10 +375,14 @@ raisezh_fast jump StgReturn; } - /* Ok, Sp points to the enclosing CATCH_FRAME. Pop everything down to - * and including this frame, update Su, push R1, and enter the handler. + /* Ok, Sp points to the enclosing CATCH_FRAME or CATCH_STM_FRAME. Pop everything + * down to and including this frame, update Su, push R1, and enter the handler. */ - handler = StgCatchFrame_handler(Sp); + if (frame_type == CATCH_FRAME) { + handler = StgCatchFrame_handler(Sp); + } else { + handler = StgCatchSTMFrame_handler(Sp); + } /* Restore the blocked/unblocked state for asynchronous exceptions * at the CATCH_FRAME. @@ -364,11 +393,14 @@ raisezh_fast */ W_ frame; frame = Sp; - Sp = Sp + SIZEOF_StgCatchFrame; - - if (StgCatchFrame_exceptions_blocked(frame) == 0) { - Sp_adj(-1); - Sp(0) = stg_unblockAsyncExceptionszh_ret_info; + if (frame_type == CATCH_FRAME) { + Sp = Sp + SIZEOF_StgCatchFrame; + if (StgCatchFrame_exceptions_blocked(frame) == 0) { + Sp_adj(-1); + Sp(0) = stg_unblockAsyncExceptionszh_ret_info; + } + } else { + Sp = Sp + SIZEOF_StgCatchSTMFrame; } /* Ensure that async excpetions are blocked when running the handler. diff --git a/ghc/rts/GC.c b/ghc/rts/GC.c index 25f794f..66c53c4 100644 --- a/ghc/rts/GC.c +++ b/ghc/rts/GC.c @@ -26,6 +26,7 @@ #include "ParTicky.h" // ToDo: move into Rts.h #include "GCCompact.h" #include "Signals.h" +#include "STM.h" #if defined(GRAN) || defined(PAR) # include "GranSimRts.h" # include "ParallelRts.h" @@ -314,6 +315,9 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc ) blockUserSignals(); #endif + // tell the STM to discard any cached closures its hoping to re-use + stmPreGCHook(); + // tell the stats department that we've started a GC stat_startGC(); @@ -1881,6 +1885,9 @@ loop: case UPDATE_FRAME: case STOP_FRAME: case CATCH_FRAME: + case CATCH_STM_FRAME: + case CATCH_RETRY_FRAME: + case ATOMICALLY_FRAME: // shouldn't see these barf("evacuate: stack frame at %p\n", q); @@ -1989,6 +1996,18 @@ loop: return to; #endif + case TREC_HEADER: + return copy(q,sizeofW(StgTRecHeader),stp); + + case TVAR_WAIT_QUEUE: + return copy(q,sizeofW(StgTVarWaitQueue),stp); + + case TVAR: + return copy(q,sizeofW(StgTVar),stp); + + case TREC_CHUNK: + return copy(q,sizeofW(StgTRecChunk),stp); + default: barf("evacuate: strange closure type %d", (int)(info->type)); } @@ -2348,6 +2367,9 @@ scavengeTSO (StgTSO *tso) (StgTSO *)evacuate((StgClosure *)tso->blocked_exceptions); } + // scavange current transaction record + (StgClosure *)tso->trec = evacuate((StgClosure *)tso->trec); + // scavenge this thread's stack scavenge_stack(tso->sp, &(tso->stack[tso->stack_size])); } @@ -2800,6 +2822,65 @@ scavenge(step *stp) } #endif + case TVAR_WAIT_QUEUE: + { + StgTVarWaitQueue *wq = ((StgTVarWaitQueue *) p); + evac_gen = 0; + (StgClosure *)wq->waiting_tso = evacuate((StgClosure*)wq->waiting_tso); + (StgClosure *)wq->next_queue_entry = evacuate((StgClosure*)wq->next_queue_entry); + (StgClosure *)wq->prev_queue_entry = evacuate((StgClosure*)wq->prev_queue_entry); + evac_gen = saved_evac_gen; + recordMutable((StgMutClosure *)wq); + failed_to_evac = rtsFalse; // mutable + p += sizeofW(StgTVarWaitQueue); + break; + } + + case TVAR: + { + StgTVar *tvar = ((StgTVar *) p); + evac_gen = 0; + (StgClosure *)tvar->current_value = evacuate((StgClosure*)tvar->current_value); + (StgClosure *)tvar->first_wait_queue_entry = evacuate((StgClosure*)tvar->first_wait_queue_entry); + evac_gen = saved_evac_gen; + recordMutable((StgMutClosure *)tvar); + failed_to_evac = rtsFalse; // mutable + p += sizeofW(StgTVar); + break; + } + + case TREC_HEADER: + { + StgTRecHeader *trec = ((StgTRecHeader *) p); + evac_gen = 0; + (StgClosure *)trec->enclosing_trec = evacuate((StgClosure*)trec->enclosing_trec); + (StgClosure *)trec->current_chunk = evacuate((StgClosure*)trec->current_chunk); + evac_gen = saved_evac_gen; + recordMutable((StgMutClosure *)trec); + failed_to_evac = rtsFalse; // mutable + p += sizeofW(StgTRecHeader); + break; + } + + case TREC_CHUNK: + { + StgWord i; + StgTRecChunk *tc = ((StgTRecChunk *) p); + TRecEntry *e = &(tc -> entries[0]); + evac_gen = 0; + (StgClosure *)tc->prev_chunk = evacuate((StgClosure*)tc->prev_chunk); + for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) { + (StgClosure *)e->tvar = evacuate((StgClosure*)e->tvar); + (StgClosure *)e->expected_value = evacuate((StgClosure*)e->expected_value); + (StgClosure *)e->new_value = evacuate((StgClosure*)e->new_value); + } + evac_gen = saved_evac_gen; + recordMutable((StgMutClosure *)tc); + failed_to_evac = rtsFalse; // mutable + p += sizeofW(StgTRecChunk); + break; + } + default: barf("scavenge: unimplemented/strange closure type %d @ %p", info->type, p); @@ -3104,6 +3185,61 @@ linear_scan: } #endif // PAR + case TVAR_WAIT_QUEUE: + { + StgTVarWaitQueue *wq = ((StgTVarWaitQueue *) p); + evac_gen = 0; + (StgClosure *)wq->waiting_tso = evacuate((StgClosure*)wq->waiting_tso); + (StgClosure *)wq->next_queue_entry = evacuate((StgClosure*)wq->next_queue_entry); + (StgClosure *)wq->prev_queue_entry = evacuate((StgClosure*)wq->prev_queue_entry); + evac_gen = saved_evac_gen; + recordMutable((StgMutClosure *)wq); + failed_to_evac = rtsFalse; // mutable + break; + } + + case TVAR: + { + StgTVar *tvar = ((StgTVar *) p); + evac_gen = 0; + (StgClosure *)tvar->current_value = evacuate((StgClosure*)tvar->current_value); + (StgClosure *)tvar->first_wait_queue_entry = evacuate((StgClosure*)tvar->first_wait_queue_entry); + evac_gen = saved_evac_gen; + recordMutable((StgMutClosure *)tvar); + failed_to_evac = rtsFalse; // mutable + break; + } + + case TREC_CHUNK: + { + StgWord i; + StgTRecChunk *tc = ((StgTRecChunk *) p); + TRecEntry *e = &(tc -> entries[0]); + evac_gen = 0; + (StgClosure *)tc->prev_chunk = evacuate((StgClosure*)tc->prev_chunk); + for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) { + (StgClosure *)e->tvar = evacuate((StgClosure*)e->tvar); + (StgClosure *)e->expected_value = evacuate((StgClosure*)e->expected_value); + (StgClosure *)e->new_value = evacuate((StgClosure*)e->new_value); + } + evac_gen = saved_evac_gen; + recordMutable((StgMutClosure *)tc); + failed_to_evac = rtsFalse; // mutable + break; + } + + case TREC_HEADER: + { + StgTRecHeader *trec = ((StgTRecHeader *) p); + evac_gen = 0; + (StgClosure *)trec->enclosing_trec = evacuate((StgClosure*)trec->enclosing_trec); + (StgClosure *)trec->current_chunk = evacuate((StgClosure*)trec->current_chunk); + evac_gen = saved_evac_gen; + recordMutable((StgMutClosure *)trec); + failed_to_evac = rtsFalse; // mutable + break; + } + default: barf("scavenge_mark_stack: unimplemented/strange closure type %d @ %p", info->type, p); @@ -3594,6 +3730,53 @@ scavenge_mutable_list(generation *gen) } #endif + case TVAR_WAIT_QUEUE: + { + StgTVarWaitQueue *wq = ((StgTVarWaitQueue *) p); + (StgClosure *)wq->waiting_tso = evacuate((StgClosure*)wq->waiting_tso); + (StgClosure *)wq->next_queue_entry = evacuate((StgClosure*)wq->next_queue_entry); + (StgClosure *)wq->prev_queue_entry = evacuate((StgClosure*)wq->prev_queue_entry); + p->mut_link = gen->mut_list; + gen->mut_list = p; + continue; + } + + case TVAR: + { + StgTVar *tvar = ((StgTVar *) p); + (StgClosure *)tvar->current_value = evacuate((StgClosure*)tvar->current_value); + (StgClosure *)tvar->first_wait_queue_entry = evacuate((StgClosure*)tvar->first_wait_queue_entry); + p->mut_link = gen->mut_list; + gen->mut_list = p; + continue; + } + + case TREC_CHUNK: + { + StgWord i; + StgTRecChunk *tc = ((StgTRecChunk *) p); + TRecEntry *e = &(tc -> entries[0]); + (StgClosure *)tc->prev_chunk = evacuate((StgClosure*)tc->prev_chunk); + for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) { + (StgClosure *)e->tvar = evacuate((StgClosure*)e->tvar); + (StgClosure *)e->expected_value = evacuate((StgClosure*)e->expected_value); + (StgClosure *)e->new_value = evacuate((StgClosure*)e->new_value); + } + p->mut_link = gen->mut_list; + gen->mut_list = p; + continue; + } + + case TREC_HEADER: + { + StgTRecHeader *trec = ((StgTRecHeader *) p); + (StgClosure *)trec->enclosing_trec = evacuate((StgClosure*)trec->enclosing_trec); + (StgClosure *)trec->current_chunk = evacuate((StgClosure*)trec->current_chunk); + p->mut_link = gen->mut_list; + gen->mut_list = p; + continue; + } + default: // shouldn't have anything else on the mutables list barf("scavenge_mutable_list: strange object? %d", (int)(info->type)); @@ -3760,6 +3943,9 @@ scavenge_stack(StgPtr p, StgPtr stack_end) continue; // small bitmap (< 32 entries, or 64 on a 64-bit machine) + case CATCH_STM_FRAME: + case CATCH_RETRY_FRAME: + case ATOMICALLY_FRAME: case STOP_FRAME: case CATCH_FRAME: case RET_SMALL: diff --git a/ghc/rts/GCCompact.c b/ghc/rts/GCCompact.c index 6dd0131..45836db 100644 --- a/ghc/rts/GCCompact.c +++ b/ghc/rts/GCCompact.c @@ -142,6 +142,14 @@ obj_sizeW( StgClosure *p, StgInfoTable *info ) return tso_sizeW((StgTSO *)p); case BCO: return bco_sizeW((StgBCO *)p); + case TVAR_WAIT_QUEUE: + return sizeofW(StgTVarWaitQueue); + case TVAR: + return sizeofW(StgTVar); + case TREC_CHUNK: + return sizeofW(StgTRecChunk); + case TREC_HEADER: + return sizeofW(StgTRecHeader); default: return sizeW_fromITBL(info); } @@ -289,6 +297,9 @@ thread_stack(StgPtr p, StgPtr stack_end) } // small bitmap (<= 32 entries, or 64 on a 64-bit machine) + case CATCH_RETRY_FRAME: + case CATCH_STM_FRAME: + case ATOMICALLY_FRAME: case UPDATE_FRAME: case STOP_FRAME: case CATCH_FRAME: @@ -424,6 +435,8 @@ thread_TSO (StgTSO *tso) thread((StgPtr)&tso->blocked_exceptions); } + thread((StgPtr)&tso->trec); + thread_stack(tso->sp, &(tso->stack[tso->stack_size])); return (StgPtr)tso + tso_sizeW(tso); } @@ -605,6 +618,45 @@ thread_obj (StgInfoTable *info, StgPtr p) case TSO: return thread_TSO((StgTSO *)p); + case TVAR_WAIT_QUEUE: + { + StgTVarWaitQueue *wq = (StgTVarWaitQueue *)p; + thread((StgPtr)&wq->waiting_tso); + thread((StgPtr)&wq->next_queue_entry); + thread((StgPtr)&wq->prev_queue_entry); + return p + sizeofW(StgTVarWaitQueue); + } + + case TVAR: + { + StgTVar *tvar = (StgTVar *)p; + thread((StgPtr)&tvar->current_value); + thread((StgPtr)&tvar->first_wait_queue_entry); + return p + sizeofW(StgTVar); + } + + case TREC_HEADER: + { + StgTRecHeader *trec = (StgTRecHeader *)p; + thread((StgPtr)&trec->enclosing_trec); + thread((StgPtr)&trec->current_chunk); + return p + sizeofW(StgTRecHeader); + } + + case TREC_CHUNK: + { + int i; + StgTRecChunk *tc = (StgTRecChunk *)p; + TRecEntry *e = &(tc -> entries[0]); + thread((StgPtr)&tc->prev_chunk); + for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) { + thread((StgPtr)&e->tvar); + thread((StgPtr)&e->expected_value); + thread((StgPtr)&e->new_value); + } + return p + sizeofW(StgTRecChunk); + } + default: barf("update_fwd: unknown/strange object %d", (int)(info->type)); return NULL; diff --git a/ghc/rts/HSprel.def b/ghc/rts/HSprel.def index 073cda3..0ffe00b 100644 --- a/ghc/rts/HSprel.def +++ b/ghc/rts/HSprel.def @@ -22,6 +22,7 @@ PrelStable_StablePtr_static_info DATA PrelPack_unpackCString_closure PrelIOBase_stackOverflow_closure PrelIOBase_BlockedOnDeadMVar_closure +PrelIOBase_BlockedIndefinitely_closure PrelIOBase_NonTermination_closure PrelWeak_runFinalizzerBatch_closure __stginit_Prelude diff --git a/ghc/rts/Linker.c b/ghc/rts/Linker.c index 1f53d38..7768c0b 100644 --- a/ghc/rts/Linker.c +++ b/ghc/rts/Linker.c @@ -388,9 +388,12 @@ typedef struct _RtsSymbolVal { SymX(__int_encodeDouble) \ SymX(__int_encodeFloat) \ SymX(andIntegerzh_fast) \ + SymX(atomicallyzh_fast) \ SymX(barf) \ SymX(blockAsyncExceptionszh_fast) \ SymX(catchzh_fast) \ + SymX(catchRetryzh_fast) \ + SymX(catchSTMzh_fast) \ SymX(closure_flags) \ SymX(cmp_thread) \ SymX(cmpIntegerzh_fast) \ @@ -445,6 +448,7 @@ typedef struct _RtsSymbolVal { SymX_redirect(newCAF, newDynCAF) \ SymX(newMVarzh_fast) \ SymX(newMutVarzh_fast) \ + SymX(newTVarzh_fast) \ SymX(atomicModifyMutVarzh_fast) \ SymX(newPinnedByteArrayzh_fast) \ SymX(orIntegerzh_fast) \ @@ -458,10 +462,12 @@ typedef struct _RtsSymbolVal { SymX(quotRemIntegerzh_fast) \ SymX(raisezh_fast) \ SymX(raiseIOzh_fast) \ + SymX(readTVarzh_fast) \ SymX(remIntegerzh_fast) \ SymX(resetNonBlockingFd) \ SymX(resumeThread) \ SymX(resolveObjs) \ + SymX(retryzh_fast) \ SymX(rts_apply) \ SymX(rts_checkSchedStatus) \ SymX(rts_eval) \ @@ -571,6 +577,7 @@ typedef struct _RtsSymbolVal { SymX(waitReadzh_fast) \ SymX(waitWritezh_fast) \ SymX(word2Integerzh_fast) \ + SymX(writeTVarzh_fast) \ SymX(xorIntegerzh_fast) \ SymX(yieldzh_fast) diff --git a/ghc/rts/Makefile b/ghc/rts/Makefile index cf81863..e18fc77 100644 --- a/ghc/rts/Makefile +++ b/ghc/rts/Makefile @@ -300,6 +300,7 @@ SRC_HC_OPTS += \ -\#include Schedule.h \ -\#include Printer.h \ -\#include Sanity.h \ + -\#include STM.h \ -\#include Storage.h \ -\#include SchedAPI.h \ -\#include Timer.h \ diff --git a/ghc/rts/Prelude.h b/ghc/rts/Prelude.h index 613993b..5b4a8c0 100644 --- a/ghc/rts/Prelude.h +++ b/ghc/rts/Prelude.h @@ -38,6 +38,7 @@ extern StgClosure ZCMain_main_closure; PRELUDE_CLOSURE(GHCziIOBase_stackOverflow_closure); PRELUDE_CLOSURE(GHCziIOBase_heapOverflow_closure); PRELUDE_CLOSURE(GHCziIOBase_BlockedOnDeadMVar_closure); +PRELUDE_CLOSURE(GHCziIOBase_BlockedIndefinitely_closure); PRELUDE_CLOSURE(GHCziIOBase_NonTermination_closure); PRELUDE_INFO(GHCziBase_Czh_static_info); @@ -84,6 +85,7 @@ PRELUDE_INFO(GHCziStable_StablePtr_con_info); #define stackOverflow_closure (&GHCziIOBase_stackOverflow_closure) #define heapOverflow_closure (&GHCziIOBase_heapOverflow_closure) #define BlockedOnDeadMVar_closure (&GHCziIOBase_BlockedOnDeadMVar_closure) +#define BlockedIndefinitely_closure (&GHCziIOBase_BlockedIndefinitely_closure) #define NonTermination_closure (&GHCziIOBase_NonTermination_closure) #define Czh_static_info (&GHCziBase_Czh_static_info) diff --git a/ghc/rts/PrimOps.cmm b/ghc/rts/PrimOps.cmm index 16a3d17..95ec25d 100644 --- a/ghc/rts/PrimOps.cmm +++ b/ghc/rts/PrimOps.cmm @@ -900,6 +900,438 @@ isCurrentThreadBoundzh_fast RET_N(r); } + +/* ----------------------------------------------------------------------------- + * TVar primitives + * -------------------------------------------------------------------------- */ + +#ifdef REG_R1 +#define SP_OFF 0 +#define IF_NOT_REG_R1(x) +#else +#define SP_OFF 1 +#define IF_NOT_REG_R1(x) x +#endif + +// Catch retry frame ------------------------------------------------------------ + + +#define CATCH_RETRY_FRAME_ENTRY_TEMPLATE(label,ret) \ + label \ + { \ + W_ r, frame, trec, outer; \ + IF_NOT_REG_R1(W_ rval; rval = Sp(0); Sp_adj(1); ) \ + \ + frame = Sp; \ + trec = StgTSO_trec(CurrentTSO); \ + "ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr"); \ + r = foreign "C" stmCommitTransaction(trec "ptr"); \ + if (r) { \ + /* Succeeded (either first branch or second branch) */ \ + StgTSO_trec(CurrentTSO) = outer; \ + Sp = Sp + SIZEOF_StgCatchRetryFrame; \ + IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;) \ + jump ret; \ + } else { \ + /* Did not commit: retry */ \ + W_ new_trec; \ + "ptr" new_trec = foreign "C" stmStartTransaction(outer "ptr"); \ + StgTSO_trec(CurrentTSO) = new_trec; \ + if (StgCatchRetryFrame_running_alt_code(frame)) { \ + R1 = StgCatchRetryFrame_alt_code(frame); \ + } else { \ + R1 = StgCatchRetryFrame_first_code(frame); \ + StgCatchRetryFrame_first_code_trec(frame) = new_trec; \ + } \ + Sp_adj(-1); \ + jump RET_LBL(stg_ap_v); \ + } \ + } + +CATCH_RETRY_FRAME_ENTRY_TEMPLATE(stg_catch_retry_frame_0_ret,%RET_VEC(Sp(SP_OFF),0)) +CATCH_RETRY_FRAME_ENTRY_TEMPLATE(stg_catch_retry_frame_1_ret,%RET_VEC(Sp(SP_OFF),1)) +CATCH_RETRY_FRAME_ENTRY_TEMPLATE(stg_catch_retry_frame_2_ret,%RET_VEC(Sp(SP_OFF),2)) +CATCH_RETRY_FRAME_ENTRY_TEMPLATE(stg_catch_retry_frame_3_ret,%RET_VEC(Sp(SP_OFF),3)) +CATCH_RETRY_FRAME_ENTRY_TEMPLATE(stg_catch_retry_frame_4_ret,%RET_VEC(Sp(SP_OFF),4)) +CATCH_RETRY_FRAME_ENTRY_TEMPLATE(stg_catch_retry_frame_5_ret,%RET_VEC(Sp(SP_OFF),5)) +CATCH_RETRY_FRAME_ENTRY_TEMPLATE(stg_catch_retry_frame_6_ret,%RET_VEC(Sp(SP_OFF),6)) +CATCH_RETRY_FRAME_ENTRY_TEMPLATE(stg_catch_retry_frame_7_ret,%RET_VEC(Sp(SP_OFF),7)) + +#if MAX_VECTORED_RTN > 8 +#error MAX_VECTORED_RTN has changed: please modify stg_catch_retry_frame too. +#endif + +#if defined(PROFILING) +#define CATCH_RETRY_FRAME_BITMAP 7 +#define CATCH_RETRY_FRAME_WORDS 6 +#else +#define CATCH_RETRY_FRAME_BITMAP 1 +#define CATCH_RETRY_FRAME_WORDS 4 +#endif + +INFO_TABLE_RET(stg_catch_retry_frame, + CATCH_RETRY_FRAME_WORDS, CATCH_RETRY_FRAME_BITMAP, + CATCH_RETRY_FRAME, + stg_catch_retry_frame_0_ret, + stg_catch_retry_frame_1_ret, + stg_catch_retry_frame_2_ret, + stg_catch_retry_frame_3_ret, + stg_catch_retry_frame_4_ret, + stg_catch_retry_frame_5_ret, + stg_catch_retry_frame_6_ret, + stg_catch_retry_frame_7_ret) +CATCH_RETRY_FRAME_ENTRY_TEMPLATE(,%ENTRY_CODE(Sp(SP_OFF))) + + + +// Atomically frame ------------------------------------------------------------- + +#define ATOMICALLY_FRAME_ENTRY_TEMPLATE(label,ret) \ + label \ + { \ + W_ frame, trec, valid; \ + IF_NOT_REG_R1(W_ rval; rval = Sp(0); Sp_adj(1); ) \ + \ + frame = Sp; \ + trec = StgTSO_trec(CurrentTSO); \ + if (StgAtomicallyFrame_waiting(frame)) { \ + /* The TSO is currently waiting: should we stop waiting? */ \ + valid = foreign "C" stmReWait(trec "ptr"); \ + if (valid) { \ + /* Previous attempt is still valid: no point trying again yet */ \ + IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;) \ + jump stg_block_noregs; \ + } else { \ + /* Previous attempt is no longer valid: try again */ \ + "ptr" trec = foreign "C" stmStartTransaction(NO_TREC "ptr"); \ + StgTSO_trec(CurrentTSO) = trec; \ + StgAtomicallyFrame_waiting(frame) = 0; /* false; */ \ + R1 = StgAtomicallyFrame_code(frame); \ + Sp_adj(-1); \ + jump RET_LBL(stg_ap_v); \ + } \ + } else { \ + /* The TSO is not currently waiting: try to commit the transaction */ \ + valid = foreign "C" stmCommitTransaction(trec "ptr"); \ + if (valid) { \ + /* Transaction was valid: commit succeeded */ \ + StgTSO_trec(CurrentTSO) = NO_TREC; \ + Sp = Sp + SIZEOF_StgAtomicallyFrame; \ + IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;) \ + jump ret; \ + } else { \ + /* Transaction was not valid: try again */ \ + "ptr" trec = foreign "C" stmStartTransaction(NO_TREC "ptr"); \ + StgTSO_trec(CurrentTSO) = trec; \ + R1 = StgAtomicallyFrame_code(frame); \ + Sp_adj(-1); \ + jump RET_LBL(stg_ap_v); \ + } \ + } \ + } + +ATOMICALLY_FRAME_ENTRY_TEMPLATE(stg_atomically_frame_0_ret,%RET_VEC(Sp(SP_OFF),0)) +ATOMICALLY_FRAME_ENTRY_TEMPLATE(stg_atomically_frame_1_ret,%RET_VEC(Sp(SP_OFF),1)) +ATOMICALLY_FRAME_ENTRY_TEMPLATE(stg_atomically_frame_2_ret,%RET_VEC(Sp(SP_OFF),2)) +ATOMICALLY_FRAME_ENTRY_TEMPLATE(stg_atomically_frame_3_ret,%RET_VEC(Sp(SP_OFF),3)) +ATOMICALLY_FRAME_ENTRY_TEMPLATE(stg_atomically_frame_4_ret,%RET_VEC(Sp(SP_OFF),4)) +ATOMICALLY_FRAME_ENTRY_TEMPLATE(stg_atomically_frame_5_ret,%RET_VEC(Sp(SP_OFF),5)) +ATOMICALLY_FRAME_ENTRY_TEMPLATE(stg_atomically_frame_6_ret,%RET_VEC(Sp(SP_OFF),6)) +ATOMICALLY_FRAME_ENTRY_TEMPLATE(stg_atomically_frame_7_ret,%RET_VEC(Sp(SP_OFF),7)) + +#if MAX_VECTORED_RTN > 8 +#error MAX_VECTORED_RTN has changed: please modify stg_atomically_frame too. +#endif + +#if defined(PROFILING) +#define ATOMICALLY_FRAME_BITMAP 7 +#define ATOMICALLY_FRAME_WORDS 4 +#else +#define ATOMICALLY_FRAME_BITMAP 1 +#define ATOMICALLY_FRAME_WORDS 2 +#endif + +INFO_TABLE_RET(stg_atomically_frame, + ATOMICALLY_FRAME_WORDS, ATOMICALLY_FRAME_BITMAP, + ATOMICALLY_FRAME, + stg_atomically_frame_0_ret, + stg_atomically_frame_1_ret, + stg_atomically_frame_2_ret, + stg_atomically_frame_3_ret, + stg_atomically_frame_4_ret, + stg_atomically_frame_5_ret, + stg_atomically_frame_6_ret, + stg_atomically_frame_7_ret) +ATOMICALLY_FRAME_ENTRY_TEMPLATE(,%ENTRY_CODE(Sp(SP_OFF))) + + +// STM catch frame -------------------------------------------------------------- + +#define CATCH_STM_FRAME_ENTRY_TEMPLATE(label,ret) \ + label \ + { \ + IF_NOT_REG_R1(W_ rval; rval = Sp(0); Sp_adj(1); ) \ + Sp = Sp + SIZEOF_StgCatchSTMFrame; \ + IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;) \ + jump ret; \ + } + +#ifdef REG_R1 +#define SP_OFF 0 +#else +#define SP_OFF 1 +#endif + +CATCH_STM_FRAME_ENTRY_TEMPLATE(stg_catch_stm_frame_0_ret,%RET_VEC(Sp(SP_OFF),0)) +CATCH_STM_FRAME_ENTRY_TEMPLATE(stg_catch_stm_frame_1_ret,%RET_VEC(Sp(SP_OFF),1)) +CATCH_STM_FRAME_ENTRY_TEMPLATE(stg_catch_stm_frame_2_ret,%RET_VEC(Sp(SP_OFF),2)) +CATCH_STM_FRAME_ENTRY_TEMPLATE(stg_catch_stm_frame_3_ret,%RET_VEC(Sp(SP_OFF),3)) +CATCH_STM_FRAME_ENTRY_TEMPLATE(stg_catch_stm_frame_4_ret,%RET_VEC(Sp(SP_OFF),4)) +CATCH_STM_FRAME_ENTRY_TEMPLATE(stg_catch_stm_frame_5_ret,%RET_VEC(Sp(SP_OFF),5)) +CATCH_STM_FRAME_ENTRY_TEMPLATE(stg_catch_stm_frame_6_ret,%RET_VEC(Sp(SP_OFF),6)) +CATCH_STM_FRAME_ENTRY_TEMPLATE(stg_catch_stm_frame_7_ret,%RET_VEC(Sp(SP_OFF),7)) + +#if MAX_VECTORED_RTN > 8 +#error MAX_VECTORED_RTN has changed: please modify stg_catch_stm_frame too. +#endif + +#if defined(PROFILING) +#define CATCH_STM_FRAME_BITMAP 3 +#define CATCH_STM_FRAME_WORDS 3 +#else +#define CATCH_STM_FRAME_BITMAP 0 +#define CATCH_STM_FRAME_WORDS 1 +#endif + +/* Catch frames are very similar to update frames, but when entering + * one we just pop the frame off the stack and perform the correct + * kind of return to the activation record underneath us on the stack. + */ + +INFO_TABLE_RET(stg_catch_stm_frame, + CATCH_STM_FRAME_WORDS, CATCH_STM_FRAME_BITMAP, + CATCH_STM_FRAME, + stg_catch_stm_frame_0_ret, + stg_catch_stm_frame_1_ret, + stg_catch_stm_frame_2_ret, + stg_catch_stm_frame_3_ret, + stg_catch_stm_frame_4_ret, + stg_catch_stm_frame_5_ret, + stg_catch_stm_frame_6_ret, + stg_catch_stm_frame_7_ret) +CATCH_STM_FRAME_ENTRY_TEMPLATE(,%ENTRY_CODE(Sp(SP_OFF))) + + +// Primop definition ------------------------------------------------------------ + +atomicallyzh_fast +{ + W_ frame; + W_ old_trec; + W_ new_trec; + + /* Args: R1 = m :: STM a */ + STK_CHK_GEN(SIZEOF_StgAtomicallyFrame + WDS(1), R1_PTR, atomicallyzh_fast); + + /* Set up the atomically frame */ + Sp = Sp - SIZEOF_StgAtomicallyFrame; + frame = Sp; + + SET_HDR(frame,stg_atomically_frame_info,CCCS); + StgAtomicallyFrame_waiting(frame) = 0; // False + StgAtomicallyFrame_code(frame) = R1; + + /* Start the memory transcation */ + old_trec = StgTSO_trec(CurrentTSO); + "ptr" new_trec = foreign "C" stmStartTransaction(old_trec "ptr"); + StgTSO_trec(CurrentTSO) = new_trec; + + /* Apply R1 to the realworld token */ + Sp_adj(-1); + jump RET_LBL(stg_ap_v); +} + + +catchSTMzh_fast +{ + W_ frame; + + /* Args: R1 :: STM a */ + /* Args: R2 :: Exception -> STM a */ + STK_CHK_GEN(SIZEOF_StgCatchSTMFrame + WDS(1), R1_PTR & R2_PTR, catchSTMzh_fast); + + /* Set up the catch frame */ + Sp = Sp - SIZEOF_StgCatchSTMFrame; + frame = Sp; + + SET_HDR(frame, stg_catch_stm_frame_info, CCCS); + StgCatchSTMFrame_handler(frame) = R2; + + /* Apply R1 to the realworld token */ + Sp_adj(-1); + jump RET_LBL(stg_ap_v); +} + + +catchRetryzh_fast +{ + W_ frame; + W_ new_trec; + W_ trec; + + /* Args: R1 :: STM a */ + /* Args: R2 :: STM a */ + STK_CHK_GEN(SIZEOF_StgCatchRetryFrame + WDS(1), R1_PTR & R2_PTR, catchRetryzh_fast); + + /* Start a nested transaction within which to run the first code */ + trec = StgTSO_trec(CurrentTSO); + "ptr" new_trec = foreign "C" stmStartTransaction(trec "ptr"); + StgTSO_trec(CurrentTSO) = new_trec; + + /* Set up the catch-retry frame */ + Sp = Sp - SIZEOF_StgCatchRetryFrame; + frame = Sp; + + SET_HDR(frame, stg_catch_retry_frame_info, CCCS); + StgCatchRetryFrame_running_alt_code(frame) = 0; // false; + StgCatchRetryFrame_first_code(frame) = R1; + StgCatchRetryFrame_alt_code(frame) = R2; + StgCatchRetryFrame_first_code_trec(frame) = new_trec; + + /* Apply R1 to the realworld token */ + Sp_adj(-1); + jump RET_LBL(stg_ap_v); +} + + +retryzh_fast +{ + W_ frame_type; + W_ frame; + W_ trec; + W_ outer; + W_ r; + + MAYBE_GC (NO_PTRS, readTVarzh_fast); // STM operations may allocate + + // Find the enclosing ATOMICALLY_FRAME or CATCH_RETRY_FRAME +retry_pop_stack: + trec = StgTSO_trec(CurrentTSO); + "ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr"); + StgTSO_sp(CurrentTSO) = Sp; + frame_type = foreign "C" findRetryFrameHelper(CurrentTSO "ptr"); + Sp = StgTSO_sp(CurrentTSO); + frame = Sp; + + if (frame_type == CATCH_RETRY_FRAME) { + // The retry reaches a CATCH_RETRY_FRAME before the atomic frame + ASSERT(outer != NO_TREC); + if (!StgCatchRetryFrame_running_alt_code(frame)) { + // Retry in the first code: try the alternative + "ptr" trec = foreign "C" stmStartTransaction(outer "ptr"); + StgTSO_trec(CurrentTSO) = trec; + StgCatchRetryFrame_running_alt_code(frame) = 1; // true; + R1 = StgCatchRetryFrame_alt_code(frame); + Sp_adj(-1); + jump RET_LBL(stg_ap_v); + } else { + // Retry in the alternative code: propagate + W_ other_trec; + other_trec = StgCatchRetryFrame_first_code_trec(frame); + r = foreign "C" stmMergeForWaiting(trec "ptr", other_trec "ptr"); + if (r) { + // Merge between siblings succeeded: commit it back to enclosing transaction + // and then propagate the retry + r = foreign "C" stmCommitTransaction(trec "ptr"); + StgTSO_trec(CurrentTSO) = outer; + Sp = Sp + SIZEOF_StgCatchRetryFrame; + goto retry_pop_stack; + } else { + // Merge failed: we musn't propagate the retry. Try both paths again. + "ptr" trec = foreign "C" stmStartTransaction(outer "ptr"); + StgCatchRetryFrame_first_code_trec(frame) = trec; + StgCatchRetryFrame_running_alt_code(frame) = 0; // false; + StgTSO_trec(CurrentTSO) = trec; + R1 = StgCatchRetryFrame_first_code(frame); + Sp_adj(-1); + jump RET_LBL(stg_ap_v); + } + } + } + + // We've reached the ATOMICALLY_FRAME: attempt to wait + ASSERT(frame_type == ATOMICALLY_FRAME); + ASSERT(outer == NO_TREC); + r = foreign "C" stmWait(CurrentTSO "ptr", trec "ptr"); + if (r) { + // Transaction was valid: stmWait put us on the TVars' queues, we now block + StgAtomicallyFrame_waiting(frame) = 1; // true + Sp = frame; + jump stg_block_noregs; + } else { + // Transaction was not valid: retry immediately + "ptr" trec = foreign "C" stmStartTransaction(outer "ptr"); + StgTSO_trec(CurrentTSO) = trec; + R1 = StgAtomicallyFrame_code(frame); + Sp = frame; + Sp_adj(-1); + jump RET_LBL(stg_ap_v); + } +} + + +newTVarzh_fast +{ + W_ tv; + + /* Args: R1 = initialisation value */ + + ALLOC_PRIM( SIZEOF_StgTVar, R1_PTR, newTVarzh_fast); + tv = Hp - SIZEOF_StgTVar + WDS(1); + SET_HDR(tv,stg_TVAR_info,W_[CCCS]); + StgTVar_current_value(tv) = R1; + StgTVar_first_wait_queue_entry(tv) = stg_END_STM_WAIT_QUEUE_closure; + + RET_P(tv); +} + + +readTVarzh_fast +{ + W_ trec; + W_ tvar; + W_ result; + + /* Args: R1 = TVar closure */ + + MAYBE_GC (R1_PTR, readTVarzh_fast); // Call to stmReadTVar may allocate + trec = StgTSO_trec(CurrentTSO); + tvar = R1; + "ptr" result = foreign "C" stmReadTVar(trec "ptr", tvar "ptr"); + + RET_P(result); +} + + +writeTVarzh_fast +{ + W_ trec; + W_ tvar; + W_ new_value; + + /* Args: R1 = TVar closure */ + /* R2 = New value */ + + MAYBE_GC (R1_PTR & R2_PTR, writeTVarzh_fast); // Call to stmWriteTVar may allocate + trec = StgTSO_trec(CurrentTSO); + tvar = R1; + new_value = R2; + foreign "C" stmWriteTVar(trec "ptr", tvar "ptr", new_value "ptr"); + + jump %ENTRY_CODE(Sp(0)); +} + + /* ----------------------------------------------------------------------------- * MVar primitives * diff --git a/ghc/rts/Printer.c b/ghc/rts/Printer.c index 9e8d090..67ca672 100644 --- a/ghc/rts/Printer.c +++ b/ghc/rts/Printer.c @@ -718,7 +718,13 @@ static char *closure_type_names[] = { "FETCH_ME_BQ", "RBH", "EVACUATED", - "REMOTE_REF" + "REMOTE_REF", + "TVAR_WAIT_QUEUE", + "TVAR", + "TREC_CHUNK", + "TREC_HEADER", + "ATOMICALLY_FRAME", + "CATCH_RETRY_FRAME" }; diff --git a/ghc/rts/RtsFlags.c b/ghc/rts/RtsFlags.c index c12ee8b..2f01d5e 100644 --- a/ghc/rts/RtsFlags.c +++ b/ghc/rts/RtsFlags.c @@ -1,3 +1,4 @@ + /* ----------------------------------------------------------------------------- * * (c) The AQUA Project, Glasgow University, 1994-1997 @@ -183,6 +184,7 @@ void initRtsFlagsDefaults(void) RtsFlags.DebugFlags.block_alloc = rtsFalse; RtsFlags.DebugFlags.sanity = rtsFalse; RtsFlags.DebugFlags.stable = rtsFalse; + RtsFlags.DebugFlags.stm = rtsFalse; RtsFlags.DebugFlags.prof = rtsFalse; RtsFlags.DebugFlags.gran = rtsFalse; RtsFlags.DebugFlags.par = rtsFalse; @@ -424,6 +426,7 @@ usage_text[] = { " -Dr DEBUG: gran", " -DP DEBUG: par", " -Dl DEBUG: linker", +" -Dm DEBUG: stm", "", #endif // DEBUG #if defined(SMP) @@ -729,6 +732,9 @@ error = rtsTrue; case 'a': RtsFlags.DebugFlags.apply = rtsTrue; break; + case 'm': + RtsFlags.DebugFlags.stm = rtsTrue; + break; default: bad_option( rts_argv[arg] ); } diff --git a/ghc/rts/RtsStartup.c b/ghc/rts/RtsStartup.c index 04bad6d..3b6f050 100644 --- a/ghc/rts/RtsStartup.c +++ b/ghc/rts/RtsStartup.c @@ -14,6 +14,7 @@ #include "Storage.h" /* initStorage, exitStorage */ #include "Schedule.h" /* initScheduler */ #include "Stats.h" /* initStats */ +#include "STM.h" /* initSTM */ #include "Signals.h" #include "Timer.h" /* startTimer, stopTimer */ #include "Weak.h" @@ -219,6 +220,8 @@ hs_init(int *argc, char **argv[]) startupAsyncIO(); #endif + initSTM(); + #ifdef RTS_GTK_FRONTPANEL if (RtsFlags.GcFlags.frontpanel) { initFrontPanel(); diff --git a/ghc/rts/STM.c b/ghc/rts/STM.c new file mode 100644 index 0000000..f56bd1f --- /dev/null +++ b/ghc/rts/STM.c @@ -0,0 +1,817 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team 1998-2004 + * + * STM implementation. + * + * This implementation is designed for a many-threads, few-CPUs case. This leads + * to a number of design choices: + * + * - We use a simple design which does not aim to be lock-free -- SMP builds use + * a mutex to protect all the TVars and STM datastructures, non-SMP builds + * do not require any locking. The goal is to make fast-path uncontended + * operations fast because, with few CPUs, contention betwen operations on the + * STM interface is expected rarely. + * + * - Each thread is responsible for adding/removing itself to/from the queues + * associated with tvars. This reduces the work that is necessary when a + * large number of threads are waiting on a single tvar and where the update + * to that tvar is really only releasing a single thread. + * + * Ideas for future experimentation: + * + * - Read/write operations here involve a linear search of the trec. Consider + * adding a cache to map tvars to existing entries in the trec. + * + * - Consider whether to defer unparking more than one thread. On a uniprocessor + * the deferment could be made until a thread switch from the first thread + * released in the hope that it restores the location to a value on which + * other threads were waiting. That would avoid a stampede on e.g. multiple + * threads blocked reading from a single-cell shared buffer. + * + * - Consider whether to provide a link from a StgTVarWaitQueue to the TRecEntry + * associated with the waiter. This would allow unpark_waiters_on to be + * more selective and avoid unparking threads whose expected value for that + * tvar co-incides with the value now stored there. Does this happen often? + * + * + * ---------------------------------------------------------------------------*/ + +#include "PosixSource.h" +#include "Rts.h" +#include "RtsFlags.h" +#include "RtsUtils.h" +#include "Schedule.h" +#include "STM.h" +#include "Storage.h" + +#include +#include + +#define FALSE 0 +#define TRUE 1 + +#if defined(DEBUG) +#define SHAKE +#define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x )) +#else +#define TRACE(_x...) /*Nothing*/ +#endif + +// If SHAKE is defined then validation will sometime spuriously fail. They helps test +// unusualy code paths if genuine contention is rare + +#ifdef SHAKE +static const int do_shake = TRUE; +#else +static const int do_shake = FALSE; +#endif +static int shake_ctr = 0; + +/*......................................................................*/ + +static int shake(void) { + if (do_shake) { + if (((shake_ctr++) % 47) == 0) { + return TRUE; + } + return FALSE; + } else { + return FALSE; + } +} + +/*......................................................................*/ + +// Helper macros for iterating over entries within a transaction +// record + +#define FOR_EACH_ENTRY(_t,_x,CODE) do { \ + StgTRecHeader *__t = (_t); \ + StgTRecChunk *__c = __t -> current_chunk; \ + StgWord __limit = __c -> next_entry_idx; \ + TRACE("trec=%p chunk=%p limit=%d\n", __t, __c, __limit); \ + while (__c != END_STM_CHUNK_LIST) { \ + StgWord __i; \ + for (__i = 0; __i < __limit; __i ++) { \ + TRecEntry *_x = &(__c -> entries[__i]); \ + do { CODE } while (0); \ + } \ + __c = __c -> prev_chunk; \ + __limit = TREC_CHUNK_NUM_ENTRIES; \ + } \ + exit_for_each: \ + if (FALSE) goto exit_for_each; \ +} while (0) + +#define BREAK_FOR_EACH goto exit_for_each + +/*......................................................................*/ + +// Private cache of must-be-unreachable trec headers and chunks + +static StgTRecHeader *cached_trec_headers = NO_TREC; +static StgTRecChunk *cached_trec_chunks = END_STM_CHUNK_LIST; +static StgTVarWaitQueue *cached_tvar_wait_queues = END_STM_WAIT_QUEUE; + +static void recycle_tvar_wait_queue(StgTVarWaitQueue *q) { + if (shake()) { + TRACE("Shake: not re-using wait queue %p\n", q); + return; + } + + q -> next_queue_entry = cached_tvar_wait_queues; + cached_tvar_wait_queues = q; +} + +static void recycle_closures_from_trec (StgTRecHeader *t) { + if (shake()) { + TRACE("Shake: not re-using closures from %p\n", t); + return; + } + + t -> enclosing_trec = cached_trec_headers; + cached_trec_headers = t; + t -> enclosing_trec = NO_TREC; + + while (t -> current_chunk != END_STM_CHUNK_LIST) { + StgTRecChunk *c = t -> current_chunk; + t -> current_chunk = c -> prev_chunk; + c -> prev_chunk = cached_trec_chunks; + cached_trec_chunks = c; + } +} + +/*......................................................................*/ + +// Helper functions for managing internal STM state. This lock is only held +// for a 'short' time, in the sense that it is never held when any of the +// external functions returns. + +static void lock_stm(void) { + // Nothing +} + +static void unlock_stm(void) { + // Nothing +} + +/*......................................................................*/ + +// Helper functions for thread blocking and unblocking + +static void park_tso(StgTSO *tso) { + ASSERT(tso -> why_blocked == NotBlocked); + tso -> why_blocked = BlockedOnSTM; + tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE; + TRACE("park_tso on tso=%p\n", tso); +} + +static void unpark_tso(StgTSO *tso) { + // We will continue unparking threads while they remain on one of the wait + // queues: it's up to the thread itself to remove it from the wait queues + // if it decides to do so when it is scheduled. + if (tso -> why_blocked == BlockedOnSTM) { + TRACE("unpark_tso on tso=%p\n", tso); + tso -> why_blocked = NotBlocked; + PUSH_ON_RUN_QUEUE(tso); + } else { + TRACE("spurious unpark_tso on tso=%p\n", tso); + } +} + +static void unpark_waiters_on(StgTVar *s) { + StgTVarWaitQueue *q; + TRACE("unpark_waiters_on tvar=%p\n", s); + for (q = s -> first_wait_queue_entry; + q != END_STM_WAIT_QUEUE; + q = q -> next_queue_entry) { + unpark_tso(q -> waiting_tso); + } +} + +/*......................................................................*/ + +// Helper functions for allocation and initialization + +static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgTSO *waiting_tso) { + StgTVarWaitQueue *result; + if (cached_tvar_wait_queues != END_STM_WAIT_QUEUE) { + result = cached_tvar_wait_queues; + cached_tvar_wait_queues = result -> next_queue_entry; + } else { + result = (StgTVarWaitQueue *)allocate(sizeofW(StgTVarWaitQueue)); + SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM); + } + result -> waiting_tso = waiting_tso; + return result; +} + +static StgTRecChunk *new_stg_trec_chunk(void) { + StgTRecChunk *result; + if (cached_trec_chunks != END_STM_CHUNK_LIST) { + result = cached_trec_chunks; + cached_trec_chunks = result -> prev_chunk; + } else { + result = (StgTRecChunk *)allocate(sizeofW(StgTRecChunk)); + SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM); + } + result -> prev_chunk = END_STM_CHUNK_LIST; + result -> next_entry_idx = 0; + TRACE("prev from %p is %p\n", result, result -> prev_chunk); + return result; +} + +static StgTRecHeader *new_stg_trec_header(StgTRecHeader *enclosing_trec) { + StgTRecHeader *result; + if (cached_trec_headers != NO_TREC) { + result = cached_trec_headers; + cached_trec_headers = result -> enclosing_trec; + } else { + result = (StgTRecHeader *) allocate(sizeofW(StgTRecHeader)); + SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM); + } + result -> enclosing_trec = enclosing_trec; + result -> current_chunk = new_stg_trec_chunk(); + + if (enclosing_trec == NO_TREC) { + result -> state = TREC_ACTIVE; + } else { + ASSERT(enclosing_trec -> state == TREC_ACTIVE || + enclosing_trec -> state == TREC_MUST_ABORT || + enclosing_trec -> state == TREC_CANNOT_COMMIT); + result -> state = enclosing_trec -> state; + } + + TRACE("new_stg_trec_header creating %p nidx=%d chunk=%p enclosing_trec=%p state=%d\n", + result, result->current_chunk->next_entry_idx, result -> current_chunk, enclosing_trec, result->state); + return result; +} + +/*......................................................................*/ + +// Helper functions for managing waiting lists + +static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) { + ASSERT(trec != NO_TREC); + ASSERT(trec -> enclosing_trec == NO_TREC); + ASSERT(trec -> state == TREC_ACTIVE || trec -> state == TREC_CANNOT_COMMIT); + FOR_EACH_ENTRY(trec, e, { + StgTVar *s; + StgTVarWaitQueue *q; + StgTVarWaitQueue *fq; + s = e -> tvar; + TRACE("Adding tso=%p to wait queue for tvar=%p\n", tso, s); + ASSERT(s -> current_value == e -> expected_value); + fq = s -> first_wait_queue_entry; + q = new_stg_tvar_wait_queue(tso); + q -> next_queue_entry = fq; + q -> prev_queue_entry = END_STM_WAIT_QUEUE; + if (fq != END_STM_WAIT_QUEUE) { + fq -> prev_queue_entry = q; + } + s -> first_wait_queue_entry = q; + e -> new_value = (StgClosure *) q; + }); +} + +static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) { + ASSERT(trec != NO_TREC); + ASSERT(trec -> enclosing_trec == NO_TREC); + ASSERT(trec -> state == TREC_WAITING); + TRACE("stop_tsos_waiting in state=%d\n", trec -> state); + FOR_EACH_ENTRY(trec, e, { + StgTVar *s; + StgTVarWaitQueue *pq; + StgTVarWaitQueue *nq; + StgTVarWaitQueue *q; + s = e -> tvar; + q = (StgTVarWaitQueue *) (e -> new_value); + TRACE("Removing tso=%p from wait queue for tvar=%p\n", q -> waiting_tso, s); + nq = q -> next_queue_entry; + pq = q -> prev_queue_entry; + TRACE("pq=%p nq=%p q=%p\n", pq, nq, q); + if (nq != END_STM_WAIT_QUEUE) { + nq -> prev_queue_entry = pq; + } + if (pq != END_STM_WAIT_QUEUE) { + pq -> next_queue_entry = nq; + } else { + ASSERT (s -> first_wait_queue_entry == q); + s -> first_wait_queue_entry = nq; + } + recycle_tvar_wait_queue(q); + }); +} + +/*......................................................................*/ + +static TRecEntry *get_new_entry(StgTRecHeader *t) { + TRecEntry *result; + StgTRecChunk *c; + int i; + + c = t -> current_chunk; + i = c -> next_entry_idx; + ASSERT(c != END_STM_CHUNK_LIST); + + if (i < TREC_CHUNK_NUM_ENTRIES) { + // Continue to use current chunk + result = &(c -> entries[i]); + c -> next_entry_idx ++; + } else { + // Current chunk is full: allocate a fresh one + StgTRecChunk *nc; + nc = new_stg_trec_chunk(); + nc -> prev_chunk = c; + nc -> next_entry_idx = 1; + t -> current_chunk = nc; + result = &(nc -> entries[0]); + } + + return result; +} + +/*......................................................................*/ + +static void merge_update_into(StgTRecHeader *t, + StgTVar *tvar, + StgClosure *expected_value, + StgClosure *new_value, + int merging_sibling) { + int found; + + // Look for an entry in this trec + found = FALSE; + FOR_EACH_ENTRY(t, e, { + StgTVar *s; + s = e -> tvar; + if (s == tvar) { + found = TRUE; + if (merging_sibling) { + if (e -> expected_value != expected_value) { + // Must abort if the two entries start from different values + TRACE("Siblings inconsistent at %p (%p vs %p)\n", + tvar, e -> expected_value, expected_value); + t -> state = TREC_MUST_ABORT; + } else if (e -> new_value != new_value) { + // Cannot commit if the two entries lead to different values (wait still OK) + TRACE("Siblings trying conflicting writes to %p (%p vs %p)\n", + tvar, e -> new_value, new_value); + t -> state = TREC_CANNOT_COMMIT; + } + } else { + // Otherwise merging child back into parent + ASSERT (e -> new_value == expected_value); + } + TRACE(" trec=%p exp=%p new=%p\n", t, e->expected_value, e->new_value); + e -> new_value = new_value; + BREAK_FOR_EACH; + } + }); + + if (!found) { + // No entry so far in this trec + TRecEntry *ne; + ne = get_new_entry(t); + ne -> tvar = tvar; + ne -> expected_value = expected_value; + ne -> new_value = new_value; + } +} + +/*......................................................................*/ + +static StgClosure *read_current_value_seen_from(StgTRecHeader *t, + StgTVar *tvar) { + int found; + StgClosure *result = NULL; + + // Look for any relevent trec entries + found = FALSE; + while (t != NO_TREC) { + FOR_EACH_ENTRY(t, e, { + StgTVar *s; + s = e -> tvar; + if (s == tvar) { + found = TRUE; + result = e -> new_value; + BREAK_FOR_EACH; + } + }); + if (found) break; + t = t -> enclosing_trec; + } + + if (!found) { + // Value not yet held in a trec + result = tvar -> current_value; + } + + return result; +} + +/*......................................................................*/ + +static int transaction_is_valid (StgTRecHeader *t) { + StgTRecHeader *et; + int result; + + if (shake()) { + TRACE("Shake: pretending transaction trec=%p is invalid when it may not be\n", t); + return FALSE; + } + + et = t -> enclosing_trec; + ASSERT ((t -> state == TREC_ACTIVE) || + (t -> state == TREC_WAITING) || + (t -> state == TREC_MUST_ABORT) || + (t -> state == TREC_CANNOT_COMMIT)); + result = !((t -> state) == TREC_MUST_ABORT); + if (result) { + FOR_EACH_ENTRY(t, e, { + StgTVar *s; + s = e -> tvar; + if (e -> expected_value != read_current_value_seen_from(et, s)) { + result = FALSE; + BREAK_FOR_EACH; + } + }); + } + return result; +} + +/************************************************************************/ + +/* + * External functions below this point are repsonsible for: + * + * - acquiring/releasing the STM lock + * + * - all updates to the trec status field + * ASSERT(t != NO_TREC); + + * By convention we increment entry_count when starting a new + * transaction and we decrement it at the point where we can discard + * the contents of the trec when exiting the outermost transaction. + * This means that stmWait and stmRewait decrement the count whenever + * they return FALSE (they do so exactly once for each transaction + * that doesn't remain blocked forever). + */ + +/************************************************************************/ + +void stmPreGCHook() { + TRACE("stmPreGCHook\n"); + cached_trec_headers = NO_TREC; + cached_trec_chunks = END_STM_CHUNK_LIST; + cached_tvar_wait_queues = END_STM_WAIT_QUEUE; +} + +/************************************************************************/ + +void initSTM() { + TRACE("initSTM, NO_TREC=%p\n", NO_TREC); + /* Nothing */ +} + +/*......................................................................*/ + +StgTRecHeader *stmStartTransaction(StgTRecHeader *outer) { + StgTRecHeader *t; + TRACE("stmStartTransaction current-trec=%p\n", outer); + t = new_stg_trec_header(outer); + TRACE("stmStartTransaction new-trec=%p\n", t); + return t; +} + +/*......................................................................*/ + +void stmAbortTransaction(StgTRecHeader *trec) { + TRACE("stmAbortTransaction trec=%p\n", trec); + ASSERT (trec != NO_TREC); + ASSERT ((trec -> state == TREC_ACTIVE) || + (trec -> state == TREC_MUST_ABORT) || + (trec -> state == TREC_WAITING) || + (trec -> state == TREC_CANNOT_COMMIT)); + if (trec -> state == TREC_WAITING) { + ASSERT (trec -> enclosing_trec == NO_TREC); + TRACE("stmAbortTransaction aborting waiting transaction\n"); + stop_tsos_waiting_on_trec(trec); + } + trec -> state = TREC_ABORTED; + + // Outcome now reflected by status field; no need for log + recycle_closures_from_trec(trec); + + TRACE("stmAbortTransaction trec=%p done\n", trec); +} + +/*......................................................................*/ + +StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) { + StgTRecHeader *outer; + TRACE("stmGetEnclosingTRec trec=%p\n", trec); + outer = trec -> enclosing_trec; + TRACE("stmGetEnclosingTRec outer=%p\n", outer); + return outer; +} + +/*......................................................................*/ + +StgBool stmValidateTransaction(StgTRecHeader *trec) { + int result; + TRACE("stmValidateTransaction trec=%p\n", trec); + ASSERT(trec != NO_TREC); + ASSERT((trec -> state == TREC_ACTIVE) || + (trec -> state == TREC_MUST_ABORT) || + (trec -> state == TREC_CANNOT_COMMIT) || + (trec -> state == TREC_WAITING)); + + lock_stm(); + result = transaction_is_valid(trec); + + if (!result && trec -> state != TREC_WAITING) { + trec -> state = TREC_MUST_ABORT; + } + + unlock_stm(); + + TRACE("stmValidateTransaction trec=%p result=%d\n", trec, result); + return result; +} + +/*......................................................................*/ + +StgBool stmCommitTransaction(StgTRecHeader *trec) { + StgTRecHeader *et; + int result; + TRACE("stmCommitTransaction trec=%p trec->enclosing_trec=%p\n", trec, trec->enclosing_trec); + ASSERT (trec != NO_TREC); + ASSERT ((trec -> state == TREC_ACTIVE) || + (trec -> state == TREC_MUST_ABORT) || + (trec -> state == TREC_CANNOT_COMMIT)); + + lock_stm(); + result = transaction_is_valid(trec); + if (result) { + et = trec -> enclosing_trec; + if (trec -> state == TREC_CANNOT_COMMIT && et == NO_TREC) { + TRACE("Cannot commit trec=%p at top level\n", trec); + trec -> state = TREC_MUST_ABORT; + result = FALSE; + } else { + if (et == NO_TREC) { + TRACE("Non-nesting commit, NO_TREC=%p\n", NO_TREC); + } else { + TRACE("Nested commit into %p, NO_TREC=%p\n", et, NO_TREC); + } + + FOR_EACH_ENTRY(trec, e, { + StgTVar *s; + s = e -> tvar; + if (et == NO_TREC) { + s -> current_value = e -> new_value; + unpark_waiters_on(s); + } else { + merge_update_into(et, s, e -> expected_value, e -> new_value, FALSE); + } + }); + + + if (trec->state == TREC_CANNOT_COMMIT && et -> state == TREC_ACTIVE) { + TRACE("Propagating TREC_CANNOT_COMMIT into %p\n", et); + et -> state = TREC_CANNOT_COMMIT; + } + } + } + + // Outcome now reflected by status field; no need for log + recycle_closures_from_trec(trec); + + unlock_stm(); + + TRACE("stmCommitTransaction trec=%p result=%d\n", trec, result); + + return result; +} + +/*......................................................................*/ + +StgBool stmMergeForWaiting(StgTRecHeader *trec, StgTRecHeader *other) { + int result; + TRACE("stmMergeForWaiting trec=%p (%d) other=%p (%d)\n", trec, trec -> state, other, other->state); + ASSERT(trec != NO_TREC); + ASSERT(other != NO_TREC); + ASSERT((trec -> state == TREC_ACTIVE) || + (trec -> state == TREC_MUST_ABORT) || + (trec -> state == TREC_CANNOT_COMMIT)); + ASSERT((other -> state == TREC_ACTIVE) || + (other -> state == TREC_MUST_ABORT) || + (other -> state == TREC_CANNOT_COMMIT)); + + lock_stm(); + result = (transaction_is_valid(trec)); + TRACE("stmMergeForWaiting initial result=%d\n", result); + if (result) { + result = transaction_is_valid(other); + TRACE("stmMergeForWaiting after both result=%d\n", result); + if (result) { + // Individually the two transactions may be valid. Now copy entries from + // "other" into "trec". This may cause "trec" to become invalid if it + // contains an update that conflicts with one from "other" + FOR_EACH_ENTRY(other, e, { + StgTVar *s = e -> tvar; + TRACE("Merging trec=%p exp=%p new=%p\n", other, e->expected_value, e->new_value); + merge_update_into(trec, s, e-> expected_value, e -> new_value, TRUE); + }); + result = (trec -> state != TREC_MUST_ABORT); + } + } + + if (!result) { + trec -> state = TREC_MUST_ABORT; + } + + unlock_stm(); + + TRACE("stmMergeForWaiting result=%d\n", result); + return result; +} + +/*......................................................................*/ + +StgBool stmWait(StgTSO *tso, StgTRecHeader *trec) { + int result; + TRACE("stmWait tso=%p trec=%p\n", tso, trec); + ASSERT (trec != NO_TREC); + ASSERT (trec -> enclosing_trec == NO_TREC); + ASSERT ((trec -> state == TREC_ACTIVE) || + (trec -> state == TREC_MUST_ABORT) || + (trec -> state == TREC_CANNOT_COMMIT)); + + lock_stm(); + result = transaction_is_valid(trec); + if (result) { + // The transaction is valid so far so we can actually start waiting. + // (Otherwise the transaction was not valid and the thread will have to + // retry it). + start_tso_waiting_on_trec(tso, trec); + park_tso(tso); + trec -> state = TREC_WAITING; + } else { + // Outcome now reflected by status field; no need for log + recycle_closures_from_trec(trec); + } + unlock_stm(); + + TRACE("stmWait trec=%p result=%d\n", trec, result); + return result; +} + +/*......................................................................*/ + +StgBool stmReWait(StgTRecHeader *trec) { + int result; + TRACE("stmReWait trec=%p\n", trec); + ASSERT (trec != NO_TREC); + ASSERT (trec -> enclosing_trec == NO_TREC); + ASSERT (trec -> state == TREC_WAITING); + + lock_stm(); + result = transaction_is_valid(trec); + TRACE("stmReWait trec=%p result=%d\n", trec, result); + if (result) { + // The transaction remains valid -- do nothing because it is already on + // the wait queues + ASSERT (trec -> state == TREC_WAITING); + } else { + // The transcation has become invalid. We can now remove it from the wait + // queues. + stop_tsos_waiting_on_trec (trec); + + // Outcome now reflected by status field; no need for log + recycle_closures_from_trec(trec); + } + unlock_stm(); + + TRACE("stmReWait trec=%p result=%d\n", trec, result); + return result; +} + +/*......................................................................*/ + +StgClosure *stmReadTVar(StgTRecHeader *trec, + StgTVar *tvar) { + StgTRecHeader *et; + StgClosure *result = NULL; // Suppress unassignment warning + int found = FALSE; + TRecEntry *ne = NULL; + + TRACE("stmReadTVar trec=%p tvar=%p\n", trec, tvar); + ASSERT (trec != NO_TREC); + ASSERT (trec -> state == TREC_ACTIVE || + trec -> state == TREC_MUST_ABORT || + trec -> state == TREC_CANNOT_COMMIT); + + lock_stm(); + found = FALSE; + + // Look for an existing entry in our trec or in an enclosing trec + et = trec; + while (et != NO_TREC) { + FOR_EACH_ENTRY(et, e, { + TRACE("testing e=%p\n", e); + if (e -> tvar == tvar) { + found = TRUE; + result = e -> new_value; + BREAK_FOR_EACH; + } + }); + if (found) break; + et = et -> enclosing_trec; + } + + if (found && et != trec) { + // Entry found in another trec + ASSERT (result != NULL); + TRACE("duplicating entry\n"); + ne = get_new_entry(trec); + ne -> tvar = tvar; + ne -> expected_value = result; + ne -> new_value = result; + } else if (!found) { + // No entry found + ASSERT (result == NULL); + TRACE("need new entry\n"); + ne = get_new_entry(trec); + TRACE("got ne=%p\n", ne); + result = tvar -> current_value; + ne -> tvar = tvar; + ne -> expected_value = result; + ne -> new_value = result; + } + + unlock_stm(); + ASSERT (result != NULL); + TRACE("stmReadTVar trec=%p result=%p\n", trec, result); + + return result; +} + +/*......................................................................*/ + +void stmWriteTVar(StgTRecHeader *trec, + StgTVar *tvar, + StgClosure *new_value) { + StgTRecHeader *et; + TRecEntry *ne; + TRecEntry *entry = NULL; + int found; + TRACE("stmWriteTVar trec=%p tvar=%p new_value=%p\n", trec, tvar, new_value); + ASSERT (trec != NO_TREC); + ASSERT (trec -> state == TREC_ACTIVE || + trec -> state == TREC_MUST_ABORT || + trec -> state == TREC_CANNOT_COMMIT); + + lock_stm(); + found = FALSE; + + // Look for an existing entry in our trec or in an enclosing trec + et = trec; + while (et != NO_TREC) { + FOR_EACH_ENTRY(et, e, { + if (e -> tvar == tvar) { + found = TRUE; + entry = e; + BREAK_FOR_EACH; + } + }); + if (found) break; + et = et -> enclosing_trec; + } + + if (found && et == trec) { + // Entry found in our trec + entry -> new_value = new_value; + } else if (found) { + // Entry found in another trec + ne = get_new_entry(trec); + ne -> tvar = tvar; + ne -> expected_value = entry -> new_value; + ne -> new_value = new_value; + } else { + // No entry found + ne = get_new_entry(trec); + ne -> tvar = tvar; + ne -> expected_value = tvar -> current_value; + ne -> new_value = new_value; + } + + unlock_stm(); + TRACE("stmWriteTVar trec=%p done\n", trec); +} + + +/*......................................................................*/ + diff --git a/ghc/rts/Sanity.c b/ghc/rts/Sanity.c index 43e7b5a..5941329 100644 --- a/ghc/rts/Sanity.c +++ b/ghc/rts/Sanity.c @@ -130,6 +130,9 @@ checkStackFrame( StgPtr c ) case UPDATE_FRAME: ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgUpdateFrame*)c)->updatee)); + case ATOMICALLY_FRAME: + case CATCH_RETRY_FRAME: + case CATCH_STM_FRAME: case CATCH_FRAME: // small bitmap cases (<= 32 entries) case STOP_FRAME: @@ -182,7 +185,7 @@ checkStackFrame( StgPtr c ) } default: - barf("checkStackFrame: weird activation record found on stack (%p).",c); + barf("checkStackFrame: weird activation record found on stack (%p %d).",c,info->i.type); } } @@ -341,6 +344,9 @@ checkClosure( StgClosure* p ) case UPDATE_FRAME: case STOP_FRAME: case CATCH_FRAME: + case ATOMICALLY_FRAME: + case CATCH_RETRY_FRAME: + case CATCH_STM_FRAME: barf("checkClosure: stack frame"); case AP: /* we can treat this as being the same as a PAP */ @@ -433,6 +439,44 @@ checkClosure( StgClosure* p ) // sizeW_fromITBL(REVERT_INFOPTR(get_itbl((StgClosure *)p))); #endif + + case TVAR_WAIT_QUEUE: + { + StgTVarWaitQueue *wq = (StgTVarWaitQueue *)p; + ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->next_queue_entry)); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->prev_queue_entry)); + return sizeofW(StgTVarWaitQueue); + } + + case TVAR: + { + StgTVar *tv = (StgTVar *)p; + ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->current_value)); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->first_wait_queue_entry)); + return sizeofW(StgTVar); + } + + case TREC_CHUNK: + { + nat i; + StgTRecChunk *tc = (StgTRecChunk *)p; + ASSERT(LOOKS_LIKE_CLOSURE_PTR(tc->prev_chunk)); + for (i = 0; i < tc -> next_entry_idx; i ++) { + ASSERT(LOOKS_LIKE_CLOSURE_PTR(tc->entries[i].tvar)); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(tc->entries[i].expected_value)); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(tc->entries[i].new_value)); + } + return sizeofW(StgTRecChunk); + } + + case TREC_HEADER: + { + StgTRecHeader *trec = (StgTRecHeader *)p; + ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> enclosing_trec)); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> current_chunk)); + return sizeofW(StgTRecHeader); + } + case EVACUATED: barf("checkClosure: found EVACUATED closure %d", @@ -620,6 +664,9 @@ checkTSO(StgTSO *tso) case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type==MVAR); break; + case BlockedOnSTM: + ASSERT(tso->block_info.closure == END_TSO_QUEUE); + break; default: /* Could check other values of why_blocked but I am more diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 851a957..09c4602 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -55,6 +55,7 @@ #include "Signals.h" #include "Sanity.h" #include "Stats.h" +#include "STM.h" #include "Timer.h" #include "Prelude.h" #include "ThreadLabels.h" @@ -1326,6 +1327,54 @@ run_thread: #endif if (ready_to_gc) { + /* Kick any transactions which are invalid back to their atomically frames. + * When next scheduled they will try to commit, this commit will fail and + * they will retry. */ + for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { + if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateTransaction (t -> trec)) { + StgRetInfoTable *info; + StgPtr sp = t -> sp; + + IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); + + if (sp[0] == (W_)&stg_enter_info) { + sp++; + } else { + sp--; + sp[0] = (W_)&stg_dummy_ret_closure; + } + + // Look up the stack for its atomically frame + StgPtr frame; + frame = sp + 1; + info = get_ret_itbl((StgClosure *)frame); + + while (info->i.type != ATOMICALLY_FRAME && + info->i.type != STOP_FRAME && + info->i.type != UPDATE_FRAME) { + if (info -> i.type == CATCH_RETRY_FRAME) { + IF_DEBUG(stm, sched_belch("Aborting transaction in catch-retry frame")); + stmAbortTransaction(t -> trec); + t -> trec = stmGetEnclosingTRec(t -> trec); + } + frame += stack_frame_sizeW((StgClosure *)frame); + info = get_ret_itbl((StgClosure *)frame); + } + + if (!info -> i.type == ATOMICALLY_FRAME) { + barf("Could not find ATOMICALLY_FRAME for unvalidatable thread"); + } + + // Cause the thread to enter its atomically frame again when + // scheduled -- this will attempt stmCommitTransaction or stmReWait + // which will fail triggering re-rexecution. + t->sp = frame; + t->what_next = ThreadRunGHC; + } + } + } + /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1734,6 +1783,8 @@ createThread(nat size) - TSO_STRUCT_SIZEW; tso->sp = (P_)&(tso->stack) + stack_size; + tso->trec = NO_TREC; + #ifdef PROFILING tso->prof.CCCS = CCS_MAIN; #endif @@ -2659,6 +2710,14 @@ unblockThread(StgTSO *tso) case NotBlocked: return; /* not blocked */ + case BlockedOnSTM: + // Be careful: nothing to do here! We tell the scheduler that the thread + // is runnable and we leave it to the stack-walking code to abort the + // transaction while unwinding the stack. We should perhaps have a debugging + // test to make sure that this really happens and that the 'zombie' transaction + // does not get committed. + goto done; + case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { @@ -2792,6 +2851,14 @@ unblockThread(StgTSO *tso) switch (tso->why_blocked) { + case BlockedOnSTM: + // Be careful: nothing to do here! We tell the scheduler that the thread + // is runnable and we leave it to the stack-walking code to abort the + // transaction while unwinding the stack. We should perhaps have a debugging + // test to make sure that this really happens and that the 'zombie' transaction + // does not get committed. + goto done; + case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { @@ -3024,6 +3091,10 @@ raiseAsync(StgTSO *tso, StgClosure *exception) // top of the stack applied to the exception. // // 5. If it's a STOP_FRAME, then kill the thread. + // + // NB: if we pass an ATOMICALLY_FRAME then abort the associated + // transaction + StgPtr frame; @@ -3033,6 +3104,18 @@ raiseAsync(StgTSO *tso, StgClosure *exception) while (info->i.type != UPDATE_FRAME && (info->i.type != CATCH_FRAME || exception == NULL) && info->i.type != STOP_FRAME) { + if (info->i.type == ATOMICALLY_FRAME) { + // IF we find an ATOMICALLY_FRAME then we abort the + // current transaction and propagate the exception. In + // this case (unlike ordinary exceptions) we do not care + // whether the transaction is valid or not because its + // possible validity cannot have caused the exception + // and will not be visible after the abort. + IF_DEBUG(stm, + debugBelch("Found atomically block delivering async exception\n")); + stmAbortTransaction(tso -> trec); + tso -> trec = stmGetEnclosingTRec(tso -> trec); + } frame += stack_frame_sizeW((StgClosure *)frame); info = get_ret_itbl((StgClosure *)frame); } @@ -3202,15 +3285,26 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception) UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure); p = next; continue; + + case ATOMICALLY_FRAME: + IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p)); + tso->sp = p; + return ATOMICALLY_FRAME; case CATCH_FRAME: tso->sp = p; return CATCH_FRAME; + + case CATCH_STM_FRAME: + IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p)); + tso->sp = p; + return CATCH_STM_FRAME; case STOP_FRAME: tso->sp = p; return STOP_FRAME; + case CATCH_RETRY_FRAME: default: p = next; continue; @@ -3218,6 +3312,55 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception) } } + +/* ----------------------------------------------------------------------------- + findRetryFrameHelper + + This function is called by the retry# primitive. It traverses the stack + leaving tso->sp referring to the frame which should handle the retry. + + This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) + or should be a ATOMICALLY_FRAME (if the retry# reaches the top level). + + We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions, + despite the similar implementation. + + We should not expect to see CATCH_FRAME or STOP_FRAME because those should + not be created within memory transactions. + -------------------------------------------------------------------------- */ + +StgWord +findRetryFrameHelper (StgTSO *tso) +{ + StgPtr p, next; + StgRetInfoTable *info; + + p = tso -> sp; + while (1) { + info = get_ret_itbl((StgClosure *)p); + next = p + stack_frame_sizeW((StgClosure *)p); + switch (info->i.type) { + + case ATOMICALLY_FRAME: + IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p)); + tso->sp = p; + return ATOMICALLY_FRAME; + + case CATCH_RETRY_FRAME: + IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p)); + tso->sp = p; + return CATCH_RETRY_FRAME; + + case CATCH_STM_FRAME: + default: + ASSERT(info->i.type != CATCH_FRAME); + ASSERT(info->i.type != STOP_FRAME); + p = next; + continue; + } + } +} + /* ----------------------------------------------------------------------------- resurrectThreads is called after garbage collection on the list of threads found to be garbage. Each of these threads will be woken @@ -3248,6 +3391,9 @@ resurrectThreads( StgTSO *threads ) case BlockedOnBlackHole: raiseAsync(tso,(StgClosure *)NonTermination_closure); break; + case BlockedOnSTM: + raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure); + break; case NotBlocked: /* This might happen if the thread was blocked on a black hole * belonging to a thread that we've just woken up (raiseAsync @@ -3314,6 +3460,9 @@ printThreadBlockage(StgTSO *tso) case BlockedOnCCall_NoUnblockExc: debugBelch("is blocked on an external call (exceptions were already blocked)"); break; + case BlockedOnSTM: + debugBelch("is blocked on an STM operation"); + break; default: barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)", tso->why_blocked, tso->id, tso); diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index 516cabf..ddc7f56 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -63,6 +63,9 @@ void raiseAsyncWithLock(StgTSO *tso, StgClosure *exception); /* raiseExceptionHelper */ StgWord raiseExceptionHelper (StgTSO *tso, StgClosure *exception); +/* findRetryFrameHelper */ +StgWord findRetryFrameHelper (StgTSO *tso); + /* awaitEvent(rtsBool wait) * * Checks for blocked threads that need to be woken. diff --git a/ghc/rts/StgMiscClosures.cmm b/ghc/rts/StgMiscClosures.cmm index 78eef91..b71b13d 100644 --- a/ghc/rts/StgMiscClosures.cmm +++ b/ghc/rts/StgMiscClosures.cmm @@ -567,6 +567,37 @@ INFO_TABLE(stg_FULL_MVAR,4,0,MVAR,"MVAR","MVAR") INFO_TABLE(stg_EMPTY_MVAR,4,0,MVAR,"MVAR","MVAR") { foreign "C" barf("EMPTY_MVAR object entered!"); } +/* ----------------------------------------------------------------------------- + STM + -------------------------------------------------------------------------- */ + +INFO_TABLE(stg_TVAR, 0, 0, TVAR, "TVAR", "TVAR") +{ foreign "C" barf("TVAR object entered!"); } + +INFO_TABLE(stg_TVAR_WAIT_QUEUE, 0, 0, TVAR_WAIT_QUEUE, "TVAR_WAIT_QUEUE", "TVAR_WAIT_QUEUE") +{ foreign "C" barf("TVAR_WAIT_QUEUE object entered!"); } + +INFO_TABLE(stg_TREC_CHUNK, 0, 0, TREC_CHUNK, "TREC_CHUNK", "TREC_CHUNK") +{ foreign "C" barf("TREC_CHUNK object entered!"); } + +INFO_TABLE(stg_TREC_HEADER, 0, 0, TREC_HEADER, "TREC_HEADER", "TREC_HEADER") +{ foreign "C" barf("TREC_HEADER object entered!"); } + +INFO_TABLE_CONSTR(stg_END_STM_WAIT_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_WAIT_QUEUE","END_STM_WAIT_QUEUE") +{ foreign "C" barf("END_STM_WAIT_QUEUE object entered!"); } + +INFO_TABLE_CONSTR(stg_END_STM_CHUNK_LIST,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_CHUNK_LIST","END_STM_CHUNK_LIST") +{ foreign "C" barf("END_STM_CHUNK_LIST object entered!"); } + +INFO_TABLE_CONSTR(stg_NO_TREC,0,0,0,CONSTR_NOCAF_STATIC,"NO_TREC","NO_TREC") +{ foreign "C" barf("NO_TREC object entered!"); } + +CLOSURE(stg_END_STM_WAIT_QUEUE_closure,stg_END_STM_WAIT_QUEUE); + +CLOSURE(stg_END_STM_CHUNK_LIST_closure,stg_END_STM_CHUNK_LIST); + +CLOSURE(stg_NO_TREC_closure,stg_NO_TREC); + /* ---------------------------------------------------------------------------- END_TSO_QUEUE diff --git a/ghc/rts/package.conf.in b/ghc/rts/package.conf.in index e1af341..736452a 100644 --- a/ghc/rts/package.conf.in +++ b/ghc/rts/package.conf.in @@ -94,6 +94,7 @@ Package { , "-u", "_GHCziIOBase_heapOverflow_closure" , "-u", "_GHCziIOBase_NonTermination_closure" , "-u", "_GHCziIOBase_BlockedOnDeadMVar_closure" + , "-u", "_GHCziIOBase_BlockedIndefinitely_closure" , "-u", "_GHCziIOBase_Deadlock_closure" , "-u", "_GHCziWeak_runFinalizzerBatch_closure" , "-u", "___stginit_Prelude" @@ -127,6 +128,7 @@ Package { , "-u", "GHCziIOBase_heapOverflow_closure" , "-u", "GHCziIOBase_NonTermination_closure" , "-u", "GHCziIOBase_BlockedOnDeadMVar_closure" + , "-u", "GHCziIOBase_BlockedIndefinitely_closure" , "-u", "GHCziIOBase_Deadlock_closure" , "-u", "GHCziWeak_runFinalizzerBatch_closure" , "-u", "__stginit_Prelude" diff --git a/ghc/utils/genprimopcode/Main.hs b/ghc/utils/genprimopcode/Main.hs index e486403..cc29e6d 100644 --- a/ghc/utils/genprimopcode/Main.hs +++ b/ghc/utils/genprimopcode/Main.hs @@ -411,6 +411,8 @@ ppType (TyApp "StableName#" [x]) = "mkStableNamePrimTy " ++ ppType x ppType (TyApp "MVar#" [x,y]) = "mkMVarPrimTy " ++ ppType x ++ " " ++ ppType y +ppType (TyApp "TVar#" [x,y]) = "mkTVarPrimTy " ++ ppType x + ++ " " ++ ppType y ppType (TyUTup ts) = "(mkTupleTy Unboxed " ++ show (length ts) ++ " " ++ listify (map ppType ts) ++ ")" -- 1.7.10.4