From 55472d778c441b65e013d27f5228283eef85986c Mon Sep 17 00:00:00 2001 From: tharris Date: Fri, 27 May 2005 14:47:10 +0000 Subject: [PATCH] [project @ 2005-05-27 14:47:08 by tharris] Update STM implementation for SMP builds --- ghc/includes/Closures.h | 10 +- ghc/includes/Regs.h | 8 + ghc/includes/SMP.h | 17 + ghc/includes/STM.h | 131 +++-- ghc/includes/StgTypes.h | 1 + ghc/includes/mkDerivedConstants.c | 8 + ghc/rts/Exception.cmm | 4 +- ghc/rts/GC.c | 3 + ghc/rts/GCCompact.c | 1 + ghc/rts/PrimOps.cmm | 262 ++++++---- ghc/rts/STM.c | 1032 ++++++++++++++++++++++--------------- ghc/rts/Schedule.c | 2 +- 12 files changed, 876 insertions(+), 603 deletions(-) diff --git a/ghc/includes/Closures.h b/ghc/includes/Closures.h index 39af80e..c8071fd 100644 --- a/ghc/includes/Closures.h +++ b/ghc/includes/Closures.h @@ -327,6 +327,7 @@ typedef struct { StgClosure* value; } StgMVar; + /* STM data structures * * StgTVar defines the only type that can be updated through the STM @@ -354,8 +355,9 @@ typedef struct StgTVarWaitQueue_ { typedef struct { StgHeader header; - StgClosure *current_value; - StgTVarWaitQueue *first_wait_queue_entry; + StgClosure *volatile current_value; + StgTVarWaitQueue *volatile first_wait_queue_entry; + struct StgTRecHeader_ *volatile last_update_by; } StgTVar; /* new_value == expected_value for read-only accesses */ @@ -364,6 +366,7 @@ typedef struct { StgTVar *tvar; StgClosure *expected_value; StgClosure *new_value; + struct StgTRecHeader_ *saw_update_by; } TRecEntry; #define TREC_CHUNK_NUM_ENTRIES 256 @@ -377,8 +380,7 @@ typedef struct StgTRecChunk_ { typedef enum { TREC_ACTIVE, /* Transaction in progress, outcome undecided */ - TREC_CANNOT_COMMIT, /* Transaction in progress, inconsistent writes performed */ - TREC_MUST_ABORT, /* Transaction in progress, inconsistent / out of date reads */ + TREC_CONDEMNED, /* Transaction in progress, inconsistent / out of date reads */ TREC_COMMITTED, /* Transaction has committed, now updating tvars */ TREC_ABORTED, /* Transaction has aborted, now reverting tvars */ TREC_WAITING, /* Transaction currently waiting */ diff --git a/ghc/includes/Regs.h b/ghc/includes/Regs.h index 5374972..0394257 100644 --- a/ghc/includes/Regs.h +++ b/ghc/includes/Regs.h @@ -22,6 +22,7 @@ #ifndef REGS_H #define REGS_H +#include "gmp.h" // Needs MP_INT definition /* * This is the table that holds shadow-locations for all the STG @@ -90,6 +91,13 @@ typedef struct StgRegTable_ { struct bdescr_ *rCurrentNursery; /* Hp/HpLim point into this block */ struct bdescr_ *rCurrentAlloc; /* for allocation using allocate() */ StgWord rHpAlloc; /* number of *bytes* being allocated in heap */ + // rmp_tmp1..rmp_result2 are only used in SMP builds to avoid per-thread temps + // in bss, but currently always incldue here so we just run mkDerivedConstants once + StgInt rmp_tmp_w; + MP_INT rmp_tmp1; + MP_INT rmp_tmp2; + MP_INT rmp_result1; + MP_INT rmp_result2; #if defined(SMP) || defined(PAR) StgSparkPool rSparks; /* per-task spark pool */ #endif diff --git a/ghc/includes/SMP.h b/ghc/includes/SMP.h index e3916bc..cc95941 100644 --- a/ghc/includes/SMP.h +++ b/ghc/includes/SMP.h @@ -26,6 +26,9 @@ /* * XCHG - the atomic exchange instruction. Used for locking closures * during updates (see LOCK_CLOSURE below) and the MVar primops. + * + * NB: the xchg instruction is implicitly locked, so we do not need + * a lock prefix here. */ INLINE_HEADER StgWord xchg(StgPtr p, StgWord w) @@ -40,6 +43,20 @@ xchg(StgPtr p, StgWord w) return result; } +/* + * CMPXCHG - the single-word atomic compare-and-exchange instruction. Used + * in the STM implementation. + */ +INLINE_HEADER StgWord +cas(StgVolatilePtr p, StgWord o, StgWord n) +{ + __asm__ __volatile__ ( + "lock cmpxchg %3,%1" + :"=a"(o), "=m" (*(volatile unsigned int *)p) + :"0" (o), "r" (n)); + return o; +} + INLINE_HEADER StgInfoTable * lockClosure(StgClosure *p) { diff --git a/ghc/includes/STM.h b/ghc/includes/STM.h index 6b65b0e..cf821dc 100644 --- a/ghc/includes/STM.h +++ b/ghc/includes/STM.h @@ -8,60 +8,38 @@ STM.h defines the C-level interface to the STM. - The interface is designed so that all of the operations return - directly: if the specified StgTSO should block then the Haskell - scheduler's data structures are updated within the STM - implementation, rather than blocking the native thread. + The design follows that of the PPoPP 2005 paper "Composable memory + transactions" extended to include fine-grained locking of TVars. - This interface can be supported by many different implementations, - in particular it is left unspecified: - - - Whether nested transactions are fully supported. + Three different implementations can be built. In overview: + + STM_UNIPROC -- no locking at all: not safe for concurrent invocations - A simple implementation would count the number of - stmStartTransaction operations that a thread invokes and only - attempt to really commit it to the heap when the corresponding - number of stmCommitTransaction calls have been made. This - prevents enclosed transactions from being aborted without also - aborting all of the outer ones. + STM_CG_LOCK -- coarse-grained locking : a single mutex protects all + TVars - The current implementation does support proper nesting. - - - Whether stmWait and stmReWait are blocking. - - A simple implementation would always return 'false' from these - operations, signalling that the calling thread should immediately - retry its transaction. - - A fuller implementation would block the thread and return 'True' - when it is safe for the thread to block. - - The current implementation does provide stmWait and stmReWait - operations which can block the caller's TSO. - - - Whether the transactional read, write, commit and validate - operations are blocking or non-blocking. - - A simple implementation would use an internal lock to prevent - concurrent execution of any STM operations. (This does not - prevent multiple threads having concurrent transactions, merely - the concurrent execution of say stmCommitTransaction by two - threads at the same time). + STM_FG_LOCKS -- per-TVar exclusion : each TVar can be owned by at + most one TRec at any time. This allows dynamically + non-conflicting transactions to commit in parallel. + The implementation treats reads optimisitcally -- + extra versioning information is retained in the + saw_update_by field of the TVars so that they do not + need to be locked for reading. - A fuller implementation would offer obstruction-free or lock-free - progress guarantees, as in our OOPSLA 2003 paper. + STM.C contains more details about the locking schemes used. - The current implementation is lock-free for simple uncontended - operations, but uses an internal lock on SMP systems in some - cases. This aims to provide good performance on uniprocessors: - it substantially streamlines the design, when compared with the - OOPSLA paper, and on a uniprocessor we can be sure that threads - are never pre-empted within STM operations. */ #ifndef STM_H #define STM_H +#ifdef SMP +//#define STM_CG_LOCK +#define STM_FG_LOCKS +#else +#define STM_UNIPROC +#endif + #ifdef __cplusplus extern "C" { #endif @@ -86,7 +64,9 @@ extern void stmPreGCHook(void); /* Create and enter a new transaction context */ -extern StgTRecHeader *stmStartTransaction(StgTRecHeader *outer); +extern StgTRecHeader *stmStartTransaction(StgRegTable *reg, StgTRecHeader *outer); +extern StgTRecHeader *stmStartNestedTransaction(StgRegTable *reg, StgTRecHeader *outer +); /* * Exit the current transaction context, abandoning any read/write @@ -118,16 +98,36 @@ extern StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec); /*---------------------------------------------------------------------- - Validate/commit/wait/rewait operations - -------------------------------------- + Validation + ---------- + + Test whether the specified transaction record, and all those within which + it is nested, are still valid. + + Note: the caller can assume that once stmValidateTransaction has + returned FALSE for a given trec then that transaction will never + again be valid -- we rely on this in Schedule.c when kicking invalid + threads at GC (in case they are stuck looping) +*/ + +extern StgBool stmValidateNestOfTransactions(StgTRecHeader *trec); + +/*---------------------------------------------------------------------- + Commit/wait/rewait operations + ----------------------------- These four operations return boolean results which should be interpreted as follows: - true => The transaction context was definitely valid + true => The transaction record was definitely valid + + false => The transaction record may not have been valid - false => The transaction context may not have been valid + Note that, for nested operations, validity here is solely in terms + of the specified trec: it does not say whether those that it may be + nested are themselves valid. Callers can check this with + stmValidateNestOfTransactions. The user of the STM should ensure that it is always safe to assume that a transaction context is not valid when in fact it is (i.e. to return false in @@ -152,23 +152,14 @@ extern StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec); */ /* - * Test whether the current transaction context is valid, i.e. whether - * it is still possible for it to commit successfully. Note: we assume that - * once stmValidateTransaction has returned FALSE for a given transaction then - * that transaction will never again be valid -- we rely on this in Schedule.c when - * kicking invalid threads at GC (in case they are stuck looping) - */ - -extern StgBool stmValidateTransaction(StgTRecHeader *trec); - -/* * Test whether the current transaction context is valid and, if so, * commit its memory accesses to the heap. stmCommitTransaction must * unblock any threads which are waiting on tvars that updates have * been committed to. */ -extern StgBool stmCommitTransaction(StgTRecHeader *trec); +extern StgBool stmCommitTransaction(StgRegTable *reg, StgTRecHeader *trec); +extern StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec); /* * Test whether the current transaction context is valid and, if so, @@ -177,7 +168,9 @@ extern StgBool stmCommitTransaction(StgTRecHeader *trec); * if the thread is already waiting. */ -extern StgBool stmWait(StgTSO *tso, StgTRecHeader *trec); +extern StgBool stmWait(StgRegTable *reg, + StgTSO *tso, + StgTRecHeader *trec); /* * Test whether the current transaction context is valid and, if so, @@ -189,16 +182,6 @@ extern StgBool stmWait(StgTSO *tso, StgTRecHeader *trec); extern StgBool stmReWait(StgTSO *tso); -/* - * Merge the accesses made so far in the second trec into the first trec. - * Note that the resulting trec is only intended to be used in wait operations. - * This avoids defining what happens if "trec" and "other" contain conflicting - * updates. - */ - -extern StgBool stmMergeForWaiting(StgTRecHeader *trec, StgTRecHeader *other); - - /*---------------------------------------------------------------------- Data access operations @@ -210,14 +193,16 @@ extern StgBool stmMergeForWaiting(StgTRecHeader *trec, StgTRecHeader *other); * thread's current transaction. */ -extern StgClosure *stmReadTVar(StgTRecHeader *trec, +extern StgClosure *stmReadTVar(StgRegTable *reg, + StgTRecHeader *trec, StgTVar *tvar); /* Update the logical contents of 'tvar' within the context of the * thread's current transaction. */ -extern void stmWriteTVar(StgTRecHeader *trec, +extern void stmWriteTVar(StgRegTable *reg, + StgTRecHeader *trec, StgTVar *tvar, StgClosure *new_value); diff --git a/ghc/includes/StgTypes.h b/ghc/includes/StgTypes.h index ba2adb4..ac2f78e 100644 --- a/ghc/includes/StgTypes.h +++ b/ghc/includes/StgTypes.h @@ -114,6 +114,7 @@ typedef void StgVoid; typedef struct StgClosure_ StgClosure; typedef StgClosure* StgClosurePtr; typedef StgWord* StgPtr; /* pointer into closure */ +typedef StgWord volatile* StgVolatilePtr; /* pointer to volatile word */ typedef StgWord StgOffset; /* byte offset within closure */ typedef struct StgTSO_* StgTSOPtr; diff --git a/ghc/includes/mkDerivedConstants.c b/ghc/includes/mkDerivedConstants.c index b7ecda4..4602869 100644 --- a/ghc/includes/mkDerivedConstants.c +++ b/ghc/includes/mkDerivedConstants.c @@ -241,6 +241,13 @@ main(int argc, char *argv[]) field_offset(StgRegTable, rCurrentNursery); field_offset(StgRegTable, rHpAlloc); + // Needed for SMP builds + field_offset(StgRegTable, rmp_tmp_w); + field_offset(StgRegTable, rmp_tmp1); + field_offset(StgRegTable, rmp_tmp2); + field_offset(StgRegTable, rmp_result1); + field_offset(StgRegTable, rmp_result2); + def_offset("stgGCEnter1", FUN_OFFSET(stgGCEnter1)); def_offset("stgGCFun", FUN_OFFSET(stgGCFun)); @@ -370,6 +377,7 @@ main(int argc, char *argv[]) closure_size(StgTVar); closure_field(StgTVar,current_value); closure_field(StgTVar,first_wait_queue_entry); + closure_field(StgTVar,last_update_by); closure_size(StgBCO); closure_field(StgBCO, instrs); diff --git a/ghc/rts/Exception.cmm b/ghc/rts/Exception.cmm index 6192f6d..771af1f 100644 --- a/ghc/rts/Exception.cmm +++ b/ghc/rts/Exception.cmm @@ -345,7 +345,7 @@ retry_pop_stack: W_ trec; W_ r; trec = StgTSO_trec(CurrentTSO); - r = foreign "C" stmValidateTransaction(trec "ptr"); + r = foreign "C" stmValidateNestOfTransactions(trec "ptr"); foreign "C" stmAbortTransaction(trec "ptr"); StgTSO_trec(CurrentTSO) = NO_TREC; if (r) { @@ -355,7 +355,7 @@ retry_pop_stack: } else { // Transaction was not valid: we retry the exception (otherwise continue // with a further call to raiseExceptionHelper) - "ptr" trec = foreign "C" stmStartTransaction(NO_TREC "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", NO_TREC "ptr"); StgTSO_trec(CurrentTSO) = trec; R1 = StgAtomicallyFrame_code(Sp); Sp_adj(-1); diff --git a/ghc/rts/GC.c b/ghc/rts/GC.c index 791cc4c..db00d81 100644 --- a/ghc/rts/GC.c +++ b/ghc/rts/GC.c @@ -2871,6 +2871,7 @@ scavenge(step *stp) evac_gen = 0; tvar->current_value = evacuate((StgClosure*)tvar->current_value); tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry); + tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by); evac_gen = saved_evac_gen; failed_to_evac = rtsTrue; // mutable p += sizeofW(StgTVar); @@ -3216,6 +3217,7 @@ linear_scan: evac_gen = 0; tvar->current_value = evacuate((StgClosure*)tvar->current_value); tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry); + tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by); evac_gen = saved_evac_gen; failed_to_evac = rtsTrue; // mutable break; @@ -3528,6 +3530,7 @@ scavenge_one(StgPtr p) evac_gen = 0; tvar->current_value = evacuate((StgClosure*)tvar->current_value); tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry); + tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by); evac_gen = saved_evac_gen; failed_to_evac = rtsTrue; // mutable break; diff --git a/ghc/rts/GCCompact.c b/ghc/rts/GCCompact.c index 43768c5..012235e 100644 --- a/ghc/rts/GCCompact.c +++ b/ghc/rts/GCCompact.c @@ -675,6 +675,7 @@ thread_obj (StgInfoTable *info, StgPtr p) StgTVar *tvar = (StgTVar *)p; thread((StgPtr)&tvar->current_value); thread((StgPtr)&tvar->first_wait_queue_entry); + thread((StgPtr)&tvar->last_update_by); return p + sizeofW(StgTVar); } diff --git a/ghc/rts/PrimOps.cmm b/ghc/rts/PrimOps.cmm index bf3aa6f..082cb01 100644 --- a/ghc/rts/PrimOps.cmm +++ b/ghc/rts/PrimOps.cmm @@ -527,6 +527,7 @@ word64ToIntegerzh_fast /* ToDo: this is shockingly inefficient */ +#ifndef SMP section "bss" { mp_tmp1: bits8 [SIZEOF_MP_INT]; @@ -538,101 +539,120 @@ section "bss" { } section "bss" { - result1: + mp_result1: bits8 [SIZEOF_MP_INT]; } section "bss" { - result2: + mp_result2: bits8 [SIZEOF_MP_INT]; } +#endif -#define GMP_TAKE2_RET1(name,mp_fun) \ -name \ -{ \ - CInt s1, s2; \ - W_ d1, d2; \ - \ - /* call doYouWantToGC() */ \ - MAYBE_GC(R2_PTR & R4_PTR, name); \ - \ - s1 = W_TO_INT(R1); \ - d1 = R2; \ - s2 = W_TO_INT(R3); \ - d2 = R4; \ - \ - MP_INT__mp_alloc(mp_tmp1) = W_TO_INT(StgArrWords_words(d1)); \ - MP_INT__mp_size(mp_tmp1) = (s1); \ - MP_INT__mp_d(mp_tmp1) = BYTE_ARR_CTS(d1); \ - MP_INT__mp_alloc(mp_tmp2) = W_TO_INT(StgArrWords_words(d2)); \ - MP_INT__mp_size(mp_tmp2) = (s2); \ - MP_INT__mp_d(mp_tmp2) = BYTE_ARR_CTS(d2); \ - \ - foreign "C" mpz_init(result1); \ - \ - /* Perform the operation */ \ - foreign "C" mp_fun(result1,mp_tmp1,mp_tmp2); \ - \ - RET_NP(TO_W_(MP_INT__mp_size(result1)), \ - MP_INT__mp_d(result1) - SIZEOF_StgArrWords); \ -} - -#define GMP_TAKE1_RET1(name,mp_fun) \ -name \ -{ \ - CInt s1; \ - W_ d1; \ - \ - /* call doYouWantToGC() */ \ - MAYBE_GC(R2_PTR, name); \ - \ - d1 = R2; \ - s1 = W_TO_INT(R1); \ - \ - MP_INT__mp_alloc(mp_tmp1) = W_TO_INT(StgArrWords_words(d1)); \ - MP_INT__mp_size(mp_tmp1) = (s1); \ - MP_INT__mp_d(mp_tmp1) = BYTE_ARR_CTS(d1); \ - \ - foreign "C" mpz_init(result1); \ - \ - /* Perform the operation */ \ - foreign "C" mp_fun(result1,mp_tmp1); \ - \ - RET_NP(TO_W_(MP_INT__mp_size(result1)), \ - MP_INT__mp_d(result1) - SIZEOF_StgArrWords); \ -} +#ifdef SMP +#define FETCH_MP_TEMP(X) \ +W_ X; \ +X = BaseReg + (OFFSET_StgRegTable_r ## X); +#else +#define FETCH_MP_TEMP(X) /* Nothing */ +#endif -#define GMP_TAKE2_RET2(name,mp_fun) \ -name \ -{ \ - CInt s1, s2; \ - W_ d1, d2; \ - \ - /* call doYouWantToGC() */ \ - MAYBE_GC(R2_PTR & R4_PTR, name); \ - \ - s1 = W_TO_INT(R1); \ - d1 = R2; \ - s2 = W_TO_INT(R3); \ - d2 = R4; \ - \ - MP_INT__mp_alloc(mp_tmp1) = W_TO_INT(StgArrWords_words(d1)); \ - MP_INT__mp_size(mp_tmp1) = (s1); \ - MP_INT__mp_d(mp_tmp1) = BYTE_ARR_CTS(d1); \ - MP_INT__mp_alloc(mp_tmp2) = W_TO_INT(StgArrWords_words(d2)); \ - MP_INT__mp_size(mp_tmp2) = (s2); \ - MP_INT__mp_d(mp_tmp2) = BYTE_ARR_CTS(d2); \ - \ - foreign "C" mpz_init(result1); \ - foreign "C" mpz_init(result2); \ - \ - /* Perform the operation */ \ - foreign "C" mp_fun(result1,result2,mp_tmp1,mp_tmp2); \ - \ - RET_NPNP(TO_W_(MP_INT__mp_size(result1)), \ - MP_INT__mp_d(result1) - SIZEOF_StgArrWords, \ - TO_W_(MP_INT__mp_size(result2)), \ - MP_INT__mp_d(result2) - SIZEOF_StgArrWords); \ +#define GMP_TAKE2_RET1(name,mp_fun) \ +name \ +{ \ + CInt s1, s2; \ + W_ d1, d2; \ + FETCH_MP_TEMP(mp_tmp1); \ + FETCH_MP_TEMP(mp_tmp2); \ + FETCH_MP_TEMP(mp_result1) \ + FETCH_MP_TEMP(mp_result2); \ + \ + /* call doYouWantToGC() */ \ + MAYBE_GC(R2_PTR & R4_PTR, name); \ + \ + s1 = W_TO_INT(R1); \ + d1 = R2; \ + s2 = W_TO_INT(R3); \ + d2 = R4; \ + \ + MP_INT__mp_alloc(mp_tmp1) = W_TO_INT(StgArrWords_words(d1)); \ + MP_INT__mp_size(mp_tmp1) = (s1); \ + MP_INT__mp_d(mp_tmp1) = BYTE_ARR_CTS(d1); \ + MP_INT__mp_alloc(mp_tmp2) = W_TO_INT(StgArrWords_words(d2)); \ + MP_INT__mp_size(mp_tmp2) = (s2); \ + MP_INT__mp_d(mp_tmp2) = BYTE_ARR_CTS(d2); \ + \ + foreign "C" mpz_init(mp_result1 "ptr"); \ + \ + /* Perform the operation */ \ + foreign "C" mp_fun(mp_result1 "ptr",mp_tmp1 "ptr",mp_tmp2 "ptr"); \ + \ + RET_NP(TO_W_(MP_INT__mp_size(mp_result1)), \ + MP_INT__mp_d(mp_result1) - SIZEOF_StgArrWords); \ +} + +#define GMP_TAKE1_RET1(name,mp_fun) \ +name \ +{ \ + CInt s1; \ + W_ d1; \ + FETCH_MP_TEMP(mp_tmp1); \ + FETCH_MP_TEMP(mp_result1) \ + \ + /* call doYouWantToGC() */ \ + MAYBE_GC(R2_PTR, name); \ + \ + d1 = R2; \ + s1 = W_TO_INT(R1); \ + \ + MP_INT__mp_alloc(mp_tmp1) = W_TO_INT(StgArrWords_words(d1)); \ + MP_INT__mp_size(mp_tmp1) = (s1); \ + MP_INT__mp_d(mp_tmp1) = BYTE_ARR_CTS(d1); \ + \ + foreign "C" mpz_init(mp_result1 "ptr"); \ + \ + /* Perform the operation */ \ + foreign "C" mp_fun(mp_result1 "ptr",mp_tmp1 "ptr"); \ + \ + RET_NP(TO_W_(MP_INT__mp_size(mp_result1)), \ + MP_INT__mp_d(mp_result1) - SIZEOF_StgArrWords); \ +} + +#define GMP_TAKE2_RET2(name,mp_fun) \ +name \ +{ \ + CInt s1, s2; \ + W_ d1, d2; \ + FETCH_MP_TEMP(mp_tmp1); \ + FETCH_MP_TEMP(mp_tmp2); \ + FETCH_MP_TEMP(mp_result1) \ + FETCH_MP_TEMP(mp_result2) \ + \ + /* call doYouWantToGC() */ \ + MAYBE_GC(R2_PTR & R4_PTR, name); \ + \ + s1 = W_TO_INT(R1); \ + d1 = R2; \ + s2 = W_TO_INT(R3); \ + d2 = R4; \ + \ + MP_INT__mp_alloc(mp_tmp1) = W_TO_INT(StgArrWords_words(d1)); \ + MP_INT__mp_size(mp_tmp1) = (s1); \ + MP_INT__mp_d(mp_tmp1) = BYTE_ARR_CTS(d1); \ + MP_INT__mp_alloc(mp_tmp2) = W_TO_INT(StgArrWords_words(d2)); \ + MP_INT__mp_size(mp_tmp2) = (s2); \ + MP_INT__mp_d(mp_tmp2) = BYTE_ARR_CTS(d2); \ + \ + foreign "C" mpz_init(mp_result1 "ptr"); \ + foreign "C" mpz_init(mp_result2 "ptr"); \ + \ + /* Perform the operation */ \ + foreign "C" mp_fun(mp_result1 "ptr",mp_result2 "ptr",mp_tmp1 "ptr",mp_tmp2 "ptr"); \ + \ + RET_NPNP(TO_W_(MP_INT__mp_size(mp_result1)), \ + MP_INT__mp_d(mp_result1) - SIZEOF_StgArrWords, \ + TO_W_(MP_INT__mp_size(mp_result2)), \ + MP_INT__mp_d(mp_result2) - SIZEOF_StgArrWords); \ } GMP_TAKE2_RET1(plusIntegerzh_fast, mpz_add) @@ -650,17 +670,20 @@ GMP_TAKE1_RET1(complementIntegerzh_fast, mpz_com) GMP_TAKE2_RET2(quotRemIntegerzh_fast, mpz_tdiv_qr) GMP_TAKE2_RET2(divModIntegerzh_fast, mpz_fdiv_qr) +#ifndef SMP section "bss" { - aa: W_; // NB. aa is really an mp_limb_t + mp_tmp_w: W_; // NB. mp_tmp_w is really an here mp_limb_t } +#endif gcdIntzh_fast { /* R1 = the first Int#; R2 = the second Int# */ W_ r; + FETCH_MP_TEMP(mp_tmp_w); - W_[aa] = R1; - r = foreign "C" mpn_gcd_1(aa, 1, R2); + W_[mp_tmp_w] = R1; + r = foreign "C" mpn_gcd_1(mp_tmp_w "ptr", 1, R2); R1 = r; /* Result parked in R1, return via info-pointer at TOS */ @@ -806,14 +829,12 @@ integer2Wordzh_fast jump %ENTRY_CODE(Sp(0)); } -section "bss" { - exponent: W_; -} - decodeFloatzh_fast { W_ p; F_ arg; + FETCH_MP_TEMP(mp_tmp1); + FETCH_MP_TEMP(mp_tmp_w); /* arguments: F1 = Float# */ arg = F1; @@ -828,10 +849,10 @@ decodeFloatzh_fast MP_INT__mp_d(mp_tmp1) = BYTE_ARR_CTS(p); /* Perform the operation */ - foreign "C" __decodeFloat(mp_tmp1,exponent,arg); + foreign "C" __decodeFloat(mp_tmp1 "ptr",mp_tmp_w "ptr" ,arg); /* returns: (Int# (expn), Int#, ByteArray#) */ - RET_NNP(W_[exponent], TO_W_(MP_INT__mp_size(mp_tmp1)), p); + RET_NNP(W_[mp_tmp_w], TO_W_(MP_INT__mp_size(mp_tmp1)), p); } #define DOUBLE_MANTISSA_SIZE SIZEOF_DOUBLE @@ -841,6 +862,8 @@ decodeDoublezh_fast { D_ arg; W_ p; + FETCH_MP_TEMP(mp_tmp1); + FETCH_MP_TEMP(mp_tmp_w); /* arguments: D1 = Double# */ arg = D1; @@ -855,10 +878,10 @@ decodeDoublezh_fast MP_INT__mp_d(mp_tmp1) = BYTE_ARR_CTS(p); /* Perform the operation */ - foreign "C" __decodeDouble(mp_tmp1,exponent,arg); + foreign "C" __decodeDouble(mp_tmp1 "ptr", mp_tmp_w "ptr",arg); /* returns: (Int# (expn), Int#, ByteArray#) */ - RET_NNP(W_[exponent], TO_W_(MP_INT__mp_size(mp_tmp1)), p); + RET_NNP(W_[mp_tmp_w], TO_W_(MP_INT__mp_size(mp_tmp1)), p); } /* ----------------------------------------------------------------------------- @@ -969,7 +992,7 @@ INFO_TABLE_RET(stg_catch_retry_frame, frame = Sp; trec = StgTSO_trec(CurrentTSO); "ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr"); - r = foreign "C" stmCommitTransaction(trec "ptr"); + r = foreign "C" stmCommitNestedTransaction(BaseReg "ptr", trec "ptr"); if (r) { /* Succeeded (either first branch or second branch) */ StgTSO_trec(CurrentTSO) = outer; @@ -979,7 +1002,7 @@ INFO_TABLE_RET(stg_catch_retry_frame, } else { /* Did not commit: retry */ W_ new_trec; - "ptr" new_trec = foreign "C" stmStartTransaction(outer "ptr"); + "ptr" new_trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr"); StgTSO_trec(CurrentTSO) = new_trec; if (StgCatchRetryFrame_running_alt_code(frame)) { R1 = StgCatchRetryFrame_alt_code(frame); @@ -1049,7 +1072,7 @@ INFO_TABLE_RET(stg_atomically_frame, jump stg_block_noregs; } else { /* Previous attempt is no longer valid: try again */ - "ptr" trec = foreign "C" stmStartTransaction(NO_TREC "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", NO_TREC "ptr"); StgTSO_trec(CurrentTSO) = trec; StgAtomicallyFrame_waiting(frame) = 0 :: CInt; /* false; */ R1 = StgAtomicallyFrame_code(frame); @@ -1058,7 +1081,7 @@ INFO_TABLE_RET(stg_atomically_frame, } } else { /* The TSO is not currently waiting: try to commit the transaction */ - valid = foreign "C" stmCommitTransaction(trec "ptr"); + valid = foreign "C" stmCommitTransaction(BaseReg "ptr", trec "ptr"); if (valid) { /* Transaction was valid: commit succeeded */ StgTSO_trec(CurrentTSO) = NO_TREC; @@ -1067,7 +1090,7 @@ INFO_TABLE_RET(stg_atomically_frame, jump %ENTRY_CODE(Sp(SP_OFF)); } else { /* Transaction was not valid: try again */ - "ptr" trec = foreign "C" stmStartTransaction(NO_TREC "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", NO_TREC "ptr"); StgTSO_trec(CurrentTSO) = trec; R1 = StgAtomicallyFrame_code(frame); Sp_adj(-1); @@ -1142,6 +1165,9 @@ atomicallyzh_fast W_ old_trec; W_ new_trec; + // stmStartTransaction may allocate + MAYBE_GC (R1_PTR, atomicallyzh_fast); + /* Args: R1 = m :: STM a */ STK_CHK_GEN(SIZEOF_StgAtomicallyFrame + WDS(1), R1_PTR, atomicallyzh_fast); @@ -1155,7 +1181,7 @@ atomicallyzh_fast /* Start the memory transcation */ old_trec = StgTSO_trec(CurrentTSO); - "ptr" new_trec = foreign "C" stmStartTransaction(old_trec "ptr"); + "ptr" new_trec = foreign "C" stmStartTransaction(BaseReg "ptr", old_trec "ptr"); StgTSO_trec(CurrentTSO) = new_trec; /* Apply R1 to the realworld token */ @@ -1191,13 +1217,16 @@ catchRetryzh_fast W_ new_trec; W_ trec; + // stmStartTransaction may allocate + MAYBE_GC (R1_PTR & R2_PTR, catchRetryzh_fast); + /* Args: R1 :: STM a */ /* Args: R2 :: STM a */ STK_CHK_GEN(SIZEOF_StgCatchRetryFrame + WDS(1), R1_PTR & R2_PTR, catchRetryzh_fast); /* Start a nested transaction within which to run the first code */ trec = StgTSO_trec(CurrentTSO); - "ptr" new_trec = foreign "C" stmStartTransaction(trec "ptr"); + "ptr" new_trec = foreign "C" stmStartTransaction(BaseReg "ptr", trec "ptr"); StgTSO_trec(CurrentTSO) = new_trec; /* Set up the catch-retry frame */ @@ -1240,7 +1269,7 @@ retry_pop_stack: ASSERT(outer != NO_TREC); if (!StgCatchRetryFrame_running_alt_code(frame)) { // Retry in the first code: try the alternative - "ptr" trec = foreign "C" stmStartTransaction(outer "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr"); StgTSO_trec(CurrentTSO) = trec; StgCatchRetryFrame_running_alt_code(frame) = 1 :: CInt; // true; R1 = StgCatchRetryFrame_alt_code(frame); @@ -1250,9 +1279,9 @@ retry_pop_stack: // Retry in the alternative code: propagate W_ other_trec; other_trec = StgCatchRetryFrame_first_code_trec(frame); - r = foreign "C" stmMergeForWaiting(trec "ptr", other_trec "ptr"); + r = foreign "C" stmCommitNestedTransaction(BaseReg "ptr", other_trec "ptr"); if (r) { - r = foreign "C" stmCommitTransaction(trec "ptr"); + r = foreign "C" stmCommitNestedTransaction(BaseReg "ptr", trec "ptr"); } if (r) { // Merge between siblings succeeded: commit it back to enclosing transaction @@ -1262,7 +1291,7 @@ retry_pop_stack: goto retry_pop_stack; } else { // Merge failed: we musn't propagate the retry. Try both paths again. - "ptr" trec = foreign "C" stmStartTransaction(outer "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr"); StgCatchRetryFrame_first_code_trec(frame) = trec; StgCatchRetryFrame_running_alt_code(frame) = 0 :: CInt; // false; StgTSO_trec(CurrentTSO) = trec; @@ -1276,7 +1305,7 @@ retry_pop_stack: // We've reached the ATOMICALLY_FRAME: attempt to wait ASSERT(frame_type == ATOMICALLY_FRAME); ASSERT(outer == NO_TREC); - r = foreign "C" stmWait(CurrentTSO "ptr", trec "ptr"); + r = foreign "C" stmWait(BaseReg "ptr", CurrentTSO "ptr", trec "ptr"); if (r) { // Transaction was valid: stmWait put us on the TVars' queues, we now block StgAtomicallyFrame_waiting(frame) = 1 :: CInt; // true @@ -1288,7 +1317,7 @@ retry_pop_stack: jump stg_block_noregs; } else { // Transaction was not valid: retry immediately - "ptr" trec = foreign "C" stmStartTransaction(outer "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr"); StgTSO_trec(CurrentTSO) = trec; R1 = StgAtomicallyFrame_code(frame); Sp = frame; @@ -1301,6 +1330,7 @@ retry_pop_stack: newTVarzh_fast { W_ tv; + W_ trec; /* Args: R1 = initialisation value */ @@ -1309,6 +1339,12 @@ newTVarzh_fast SET_HDR(tv,stg_TVAR_info,W_[CCCS]); StgTVar_current_value(tv) = R1; StgTVar_first_wait_queue_entry(tv) = stg_END_STM_WAIT_QUEUE_closure; +#if defined(SMP) + trec = StgTSO_trec(CurrentTSO); + StgTVar_last_update_by(tv) = trec; +#else + StgTVar_last_update_by(tv) = NO_TREC; +#endif RET_P(tv); } @@ -1325,7 +1361,7 @@ readTVarzh_fast MAYBE_GC (R1_PTR, readTVarzh_fast); // Call to stmReadTVar may allocate trec = StgTSO_trec(CurrentTSO); tvar = R1; - "ptr" result = foreign "C" stmReadTVar(trec "ptr", tvar "ptr"); + "ptr" result = foreign "C" stmReadTVar(BaseReg "ptr", trec "ptr", tvar "ptr"); RET_P(result); } @@ -1344,7 +1380,7 @@ writeTVarzh_fast trec = StgTSO_trec(CurrentTSO); tvar = R1; new_value = R2; - foreign "C" stmWriteTVar(trec "ptr", tvar "ptr", new_value "ptr"); + foreign "C" stmWriteTVar(BaseReg "ptr", trec "ptr", tvar "ptr", new_value "ptr"); jump %ENTRY_CODE(Sp(0)); } diff --git a/ghc/rts/STM.c b/ghc/rts/STM.c index d4ae643..2be5c69 100644 --- a/ghc/rts/STM.c +++ b/ghc/rts/STM.c @@ -1,39 +1,85 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team 1998-2004 + * (c) The GHC Team 1998-2005 * * STM implementation. * - * This implementation is designed for a many-threads, few-CPUs case. This leads - * to a number of design choices: + * Overview + * -------- * - * - We use a simple design which does not aim to be lock-free -- SMP builds use - * a mutex to protect all the TVars and STM datastructures, non-SMP builds - * do not require any locking. The goal is to make fast-path uncontended - * operations fast because, with few CPUs, contention betwen operations on the - * STM interface is expected rarely. + * See the PPoPP 2005 paper "Composable memory transactions". In summary, + * each transcation has a TRec (transaction record) holding entries for each of the + * TVars (transactional variables) that it has accessed. Each entry records + * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that + * the transaction wants to write to the TVar, (d) during commit, the identity of + * the TRec that wrote the expected value. * - * - Each thread is responsible for adding/removing itself to/from the queues - * associated with tvars. This reduces the work that is necessary when a - * large number of threads are waiting on a single tvar and where the update - * to that tvar is really only releasing a single thread. + * Separate TRecs are used for each level in a nest of transactions. This allows + * a nested transaction to be aborted without condemning its enclosing transactions. + * This is needed in the implementation of catchRetry. Note that the "expected value" + * in a nested transaction's TRec is the value expected to be *held in memory* if + * the transaction commits -- not the "new value" stored in one of the enclosing + * transactions. This means that validation can be done without searching through + * a nest of TRecs. * - * Ideas for future experimentation: + * Concurrency control + * ------------------- * - * - Read/write operations here involve a linear search of the trec. Consider - * adding a cache to map tvars to existing entries in the trec. + * Three different concurrency control schemes can be built according to the settings + * in STM.h: + * + * STM_UNIPROC assumes that the caller serialises invocations on the STM interface. + * In the Haskell RTS this means it is suitable only for non-SMP builds. + * + * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during + * an invocation on the STM interface. Note that this does not mean that + * transactions are simply serialized -- the lock is only held *within* the + * implementation of stmCommitTransaction, stmWait etc. + * + * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis + * and, when committing a transaction, no locks are acquired for TVars that have + * been read but not updated. + * + * Concurrency control is implemented in the functions: + * + * lock_stm + * unlock_stm + * lock_tvar / cond_lock_tvar + * unlock_tvar + * + * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the + * implementation of these functions. + * + * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock + * using STM_CG_LOCK, and otherwise they are no-ops. * - * - Consider whether to defer unparking more than one thread. On a uniprocessor - * the deferment could be made until a thread switch from the first thread - * released in the hope that it restores the location to a value on which - * other threads were waiting. That would avoid a stampede on e.g. multiple - * threads blocked reading from a single-cell shared buffer. + * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they + * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well + * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS + * builds). This is because locking a TVar is implemented by writing the lock + * holder's TRec into the TVar's current_value field: * - * - Consider whether to provide a link from a StgTVarWaitQueue to the TRecEntry - * associated with the waiter. This would allow unpark_waiters_on to be - * more selective and avoid unparking threads whose expected value for that - * tvar co-incides with the value now stored there. Does this happen often? - * + * lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value + * it contained. + * + * cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it + * contains a specified value. Return TRUE if this succeeds, + * FALSE otherwise. + * + * unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only), + * storing a specified value in place of the lock entry. + * + * Using these operations, the typcial pattern of a commit/validate/wait operation + * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that + * the TVars that were only read from still contain their expected values, + * (d) release the locks on the TVars, writing updates to them in the case of a + * commit, (e) unlock the STM. + * + * Queues of waiting threads hang off the first_wait_queue_entry field of each + * TVar. This may only be manipulated when holding that TVar's lock. In + * particular, when a thread is putting itself to sleep, it mustn't release + * the TVar's lock until it has added itself to the wait queue and marked its + * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it. * * ---------------------------------------------------------------------------*/ @@ -42,14 +88,31 @@ #include "RtsFlags.h" #include "RtsUtils.h" #include "Schedule.h" +#include "SMP.h" #include "STM.h" #include "Storage.h" #include #include +#define TRUE 1 #define FALSE 0 -#define TRUE 1 + +// ACQ_ASSERT is used for assertions which are only required for SMP builds with +// fine-grained locking. + +#if defined(STM_FG_LOCKS) +#define ACQ_ASSERT(_X) ASSERT(_X) +#define NACQ_ASSERT(_X) /*Nothing*/ +#else +#define ACQ_ASSERT(_X) /*Nothing*/ +#define NACQ_ASSERT(_X) ASSERT(_X) +#endif + +/*......................................................................*/ + +// If SHAKE is defined then validation will sometime spuriously fail. They helps test +// unusualy code paths if genuine contention is rare #if defined(DEBUG) #define SHAKE @@ -58,21 +121,19 @@ #define TRACE(_x...) /*Nothing*/ #endif -// If SHAKE is defined then validation will sometime spuriously fail. They helps test -// unusualy code paths if genuine contention is rare - #ifdef SHAKE static const int do_shake = TRUE; #else static const int do_shake = FALSE; #endif static int shake_ctr = 0; - -/*......................................................................*/ +static int shake_lim = 1; static int shake(void) { if (do_shake) { - if (((shake_ctr++) % 47) == 0) { + if (((shake_ctr++) % shake_lim) == 0) { + shake_ctr = 1; + shake_lim ++; return TRUE; } return FALSE; @@ -86,88 +147,170 @@ static int shake(void) { // Helper macros for iterating over entries within a transaction // record -#define FOR_EACH_ENTRY(_t,_x,CODE) do { \ - StgTRecHeader *__t = (_t); \ - StgTRecChunk *__c = __t -> current_chunk; \ - StgWord __limit = __c -> next_entry_idx; \ - TRACE("trec=%p chunk=%p limit=%ld\n", __t, __c, __limit); \ - while (__c != END_STM_CHUNK_LIST) { \ - StgWord __i; \ - for (__i = 0; __i < __limit; __i ++) { \ - TRecEntry *_x = &(__c -> entries[__i]); \ - do { CODE } while (0); \ - } \ - __c = __c -> prev_chunk; \ - __limit = TREC_CHUNK_NUM_ENTRIES; \ - } \ - exit_for_each: \ - if (FALSE) goto exit_for_each; \ +#define FOR_EACH_ENTRY(_t,_x,CODE) do { \ + StgTRecHeader *__t = (_t); \ + StgTRecChunk *__c = __t -> current_chunk; \ + StgWord __limit = __c -> next_entry_idx; \ + TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld\n", __t, __c, __limit); \ + while (__c != END_STM_CHUNK_LIST) { \ + StgWord __i; \ + for (__i = 0; __i < __limit; __i ++) { \ + TRecEntry *_x = &(__c -> entries[__i]); \ + do { CODE } while (0); \ + } \ + __c = __c -> prev_chunk; \ + __limit = TREC_CHUNK_NUM_ENTRIES; \ + } \ + exit_for_each: \ + if (FALSE) goto exit_for_each; \ } while (0) #define BREAK_FOR_EACH goto exit_for_each /*......................................................................*/ -// Private cache of must-be-unreachable trec headers and chunks +#if defined(STM_UNIPROC) +static const StgBool use_read_phase = FALSE; -static StgTRecHeader *cached_trec_headers = NO_TREC; -static StgTRecChunk *cached_trec_chunks = END_STM_CHUNK_LIST; -static StgTVarWaitQueue *cached_tvar_wait_queues = END_STM_WAIT_QUEUE; +static void lock_stm(StgTRecHeader *trec STG_UNUSED) { + TRACE("%p : lock_stm()\n", trec); +} -static void recycle_tvar_wait_queue(StgTVarWaitQueue *q STG_UNUSED) { -#if 0 - if (shake()) { - TRACE("Shake: not re-using wait queue %p\n", q); - return; +static void unlock_stm(StgTRecHeader *trec STG_UNUSED) { + TRACE("%p : unlock_stm()\n", trec); +} + +static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED) { + StgClosure *result; + TRACE("%p : lock_tvar(%p)\n", trec, s); + result = s -> current_value; + return result; +} + +static void unlock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED, + StgClosure *c, + StgBool force_update) { + TRACE("%p : unlock_tvar(%p)\n", trec, s); + if (force_update) { + s -> current_value = c; } +} - q -> next_queue_entry = cached_tvar_wait_queues; - cached_tvar_wait_queues = q; +static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED, + StgClosure *expected) { + StgClosure *result; + TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected); + result = s -> current_value; + TRACE("%p : %d\n", (result == expected) ? "success" : "failure"); + return (result == expected); +} #endif + +#if defined(STM_CG_LOCK) /*........................................*/ + +static const StgBool use_read_phase = FALSE; +static volatile StgTRecHeader *smp_locked = NULL; + +static void lock_stm(StgTRecHeader *trec) { + while (cas(&smp_locked, NULL, trec) != NULL) { } + TRACE("%p : lock_stm()\n", trec); } -static void recycle_closures_from_trec (StgTRecHeader *t STG_UNUSED) { -#if 0 - if (shake()) { - TRACE("Shake: not re-using closures from %p\n", t); - return; - } +static void unlock_stm(StgTRecHeader *trec STG_UNUSED) { + TRACE("%p : unlock_stm()\n", trec); + ASSERT (smp_locked == trec); + smp_locked = 0; +} - t -> enclosing_trec = cached_trec_headers; - cached_trec_headers = t; - t -> enclosing_trec = NO_TREC; +static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED) { + StgClosure *result; + TRACE("%p : lock_tvar(%p)\n", trec, s); + ASSERT (smp_locked == trec); + result = s -> current_value; + return result; +} - while (t -> current_chunk != END_STM_CHUNK_LIST) { - StgTRecChunk *c = t -> current_chunk; - t -> current_chunk = c -> prev_chunk; - c -> prev_chunk = cached_trec_chunks; - cached_trec_chunks = c; +static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED, + StgClosure *c, + StgBool force_update) { + TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c); + ASSERT (smp_locked == trec); + if (force_update) { + s -> current_value = c; } +} + +static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED, + StgClosure *expected) { + StgClosure *result; + TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected); + ASSERT (smp_locked == trec); + result = s -> current_value; + TRACE("%p : %d\n", result ? "success" : "failure"); + return (result == expected); +} #endif + +#if defined(STM_FG_LOCKS) /*...................................*/ + +static const StgBool use_read_phase = TRUE; + +static void lock_stm(StgTRecHeader *trec STG_UNUSED) { + TRACE("%p : lock_stm()\n", trec); } -/*......................................................................*/ +static void unlock_stm(StgTRecHeader *trec STG_UNUSED) { + TRACE("%p : unlock_stm()\n", trec); +} -// Helper functions for managing internal STM state. This lock is only held -// for a 'short' time, in the sense that it is never held when any of the -// external functions returns. +static StgClosure *lock_tvar(StgTRecHeader *trec, + StgTVar *s STG_UNUSED) { + StgClosure *result; + TRACE("%p : lock_tvar(%p)\n", trec, s); + do { + do { + result = s -> current_value; + } while (GET_INFO(result) == &stg_TREC_HEADER_info); + } while (cas(&(s -> current_value), result, trec) != result); + return result; +} -static void lock_stm(void) { - // Nothing +static void unlock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s, + StgClosure *c, + StgBool force_update STG_UNUSED) { + TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c); + ASSERT(s -> current_value == trec); + s -> current_value = c; } -static void unlock_stm(void) { - // Nothing +static StgBool cond_lock_tvar(StgTRecHeader *trec, + StgTVar *s, + StgClosure *expected) { + StgClosure *result; + TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected); + result = cas(&(s -> current_value), expected, trec); + TRACE("%p : %s\n", trec, result ? "success" : "failure"); + return (result == expected); } +#endif /*......................................................................*/ // Helper functions for thread blocking and unblocking static void park_tso(StgTSO *tso) { + ACQUIRE_LOCK(&sched_mutex); ASSERT(tso -> why_blocked == NotBlocked); tso -> why_blocked = BlockedOnSTM; tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE; + RELEASE_LOCK(&sched_mutex); TRACE("park_tso on tso=%p\n", tso); } @@ -177,8 +320,10 @@ static void unpark_tso(StgTSO *tso) { // if it decides to do so when it is scheduled. if (tso -> why_blocked == BlockedOnSTM) { TRACE("unpark_tso on tso=%p\n", tso); + ACQUIRE_LOCK(&sched_mutex); tso -> why_blocked = NotBlocked; PUSH_ON_RUN_QUEUE(tso); + RELEASE_LOCK(&sched_mutex); } else { TRACE("spurious unpark_tso on tso=%p\n", tso); } @@ -198,57 +343,41 @@ static void unpark_waiters_on(StgTVar *s) { // Helper functions for allocation and initialization -static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgTSO *waiting_tso) { +static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgRegTable *reg, + StgTSO *waiting_tso) { StgTVarWaitQueue *result; - if (cached_tvar_wait_queues != END_STM_WAIT_QUEUE) { - result = cached_tvar_wait_queues; - cached_tvar_wait_queues = result -> next_queue_entry; - } else { - result = (StgTVarWaitQueue *)allocate(sizeofW(StgTVarWaitQueue)); - SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM); - } + result = (StgTVarWaitQueue *)allocateLocal(reg, sizeofW(StgTVarWaitQueue)); + SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM); result -> waiting_tso = waiting_tso; return result; } -static StgTRecChunk *new_stg_trec_chunk(void) { +static StgTRecChunk *new_stg_trec_chunk(StgRegTable *reg) { StgTRecChunk *result; - if (cached_trec_chunks != END_STM_CHUNK_LIST) { - result = cached_trec_chunks; - cached_trec_chunks = result -> prev_chunk; - } else { - result = (StgTRecChunk *)allocate(sizeofW(StgTRecChunk)); - SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM); - } + result = (StgTRecChunk *)allocateLocal(reg, sizeofW(StgTRecChunk)); + SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM); result -> prev_chunk = END_STM_CHUNK_LIST; result -> next_entry_idx = 0; - TRACE("prev from %p is %p\n", result, result -> prev_chunk); return result; } -static StgTRecHeader *new_stg_trec_header(StgTRecHeader *enclosing_trec) { +static StgTRecHeader *new_stg_trec_header(StgRegTable *reg, + StgTRecHeader *enclosing_trec) { StgTRecHeader *result; - if (cached_trec_headers != NO_TREC) { - result = cached_trec_headers; - cached_trec_headers = result -> enclosing_trec; - } else { - result = (StgTRecHeader *) allocate(sizeofW(StgTRecHeader)); - SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM); - } + result = (StgTRecHeader *) allocateLocal(reg, sizeofW(StgTRecHeader)); + SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM); + result -> enclosing_trec = enclosing_trec; - result -> current_chunk = new_stg_trec_chunk(); + result -> current_chunk = new_stg_trec_chunk(reg); if (enclosing_trec == NO_TREC) { result -> state = TREC_ACTIVE; } else { ASSERT(enclosing_trec -> state == TREC_ACTIVE || - enclosing_trec -> state == TREC_MUST_ABORT || - enclosing_trec -> state == TREC_CANNOT_COMMIT); + enclosing_trec -> state == TREC_CONDEMNED); result -> state = enclosing_trec -> state; } - TRACE("new_stg_trec_header creating %p nidx=%ld chunk=%p enclosing_trec=%p state=%d\n", - result, result->current_chunk->next_entry_idx, result -> current_chunk, enclosing_trec, result->state); return result; } @@ -256,19 +385,25 @@ static StgTRecHeader *new_stg_trec_header(StgTRecHeader *enclosing_trec) { // Helper functions for managing waiting lists -static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) { +static void build_wait_queue_entries_for_trec(StgRegTable *reg, + StgTSO *tso, + StgTRecHeader *trec) { ASSERT(trec != NO_TREC); ASSERT(trec -> enclosing_trec == NO_TREC); - ASSERT(trec -> state == TREC_ACTIVE || trec -> state == TREC_CANNOT_COMMIT); + ASSERT(trec -> state == TREC_ACTIVE); + + TRACE("%p : build_wait_queue_entries_for_trec()\n", trec); + FOR_EACH_ENTRY(trec, e, { StgTVar *s; StgTVarWaitQueue *q; StgTVarWaitQueue *fq; s = e -> tvar; - TRACE("Adding tso=%p to wait queue for tvar=%p\n", tso, s); - ASSERT(s -> current_value == e -> expected_value); + TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s); + ACQ_ASSERT(s -> current_value == trec); + NACQ_ASSERT(s -> current_value == e -> expected_value); fq = s -> first_wait_queue_entry; - q = new_stg_tvar_wait_queue(tso); + q = new_stg_tvar_wait_queue(reg, tso); q -> next_queue_entry = fq; q -> prev_queue_entry = END_STM_WAIT_QUEUE; if (fq != END_STM_WAIT_QUEUE) { @@ -279,23 +414,26 @@ static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) { }); } -static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) { +static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) { ASSERT(trec != NO_TREC); ASSERT(trec -> enclosing_trec == NO_TREC); ASSERT(trec -> state == TREC_WAITING || - trec -> state == TREC_MUST_ABORT); - TRACE("stop_tsos_waiting in state=%d\n", trec -> state); + trec -> state == TREC_CONDEMNED); + + TRACE("%p : remove_wait_queue_entries_for_trec()\n", trec); + FOR_EACH_ENTRY(trec, e, { StgTVar *s; StgTVarWaitQueue *pq; StgTVarWaitQueue *nq; StgTVarWaitQueue *q; s = e -> tvar; + StgClosure *saw = lock_tvar(trec, s); q = (StgTVarWaitQueue *) (e -> new_value); - TRACE("Removing tso=%p from wait queue for tvar=%p\n", q -> waiting_tso, s); + TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s); + ACQ_ASSERT(s -> current_value == trec); nq = q -> next_queue_entry; pq = q -> prev_queue_entry; - TRACE("pq=%p nq=%p q=%p\n", pq, nq, q); if (nq != END_STM_WAIT_QUEUE) { nq -> prev_queue_entry = pq; } @@ -305,13 +443,14 @@ static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) { ASSERT (s -> first_wait_queue_entry == q); s -> first_wait_queue_entry = nq; } - recycle_tvar_wait_queue(q); + unlock_tvar(trec, s, saw, FALSE); }); } /*......................................................................*/ -static TRecEntry *get_new_entry(StgTRecHeader *t) { +static TRecEntry *get_new_entry(StgRegTable *reg, + StgTRecHeader *t) { TRecEntry *result; StgTRecChunk *c; int i; @@ -327,7 +466,7 @@ static TRecEntry *get_new_entry(StgTRecHeader *t) { } else { // Current chunk is full: allocate a fresh one StgTRecChunk *nc; - nc = new_stg_trec_chunk(); + nc = new_stg_trec_chunk(reg); nc -> prev_chunk = c; nc -> next_entry_idx = 1; t -> current_chunk = nc; @@ -339,11 +478,11 @@ static TRecEntry *get_new_entry(StgTRecHeader *t) { /*......................................................................*/ -static void merge_update_into(StgTRecHeader *t, +static void merge_update_into(StgRegTable *reg, + StgTRecHeader *t, StgTVar *tvar, StgClosure *expected_value, - StgClosure *new_value, - int merging_sibling) { + StgClosure *new_value) { int found; // Look for an entry in this trec @@ -353,23 +492,12 @@ static void merge_update_into(StgTRecHeader *t, s = e -> tvar; if (s == tvar) { found = TRUE; - if (merging_sibling) { - if (e -> expected_value != expected_value) { - // Must abort if the two entries start from different values - TRACE("Siblings inconsistent at %p (%p vs %p)\n", - tvar, e -> expected_value, expected_value); - t -> state = TREC_MUST_ABORT; - } else if (e -> new_value != new_value) { - // Cannot commit if the two entries lead to different values (wait still OK) - TRACE("Siblings trying conflicting writes to %p (%p vs %p)\n", - tvar, e -> new_value, new_value); - t -> state = TREC_CANNOT_COMMIT; - } - } else { - // Otherwise merging child back into parent - ASSERT (e -> new_value == expected_value); - } - TRACE(" trec=%p exp=%p new=%p\n", t, e->expected_value, e->new_value); + if (e -> expected_value != expected_value) { + // Must abort if the two entries start from different values + TRACE("%p : entries inconsistent at %p (%p vs %p)\n", + t, tvar, e -> expected_value, expected_value); + t -> state = TREC_CONDEMNED; + } e -> new_value = new_value; BREAK_FOR_EACH; } @@ -378,7 +506,7 @@ static void merge_update_into(StgTRecHeader *t, if (!found) { // No entry so far in this trec TRecEntry *ne; - ne = get_new_entry(t); + ne = get_new_entry(reg, t); ne -> tvar = tvar; ne -> expected_value = expected_value; ne -> new_value = new_value; @@ -387,311 +515,392 @@ static void merge_update_into(StgTRecHeader *t, /*......................................................................*/ -static StgClosure *read_current_value_seen_from(StgTRecHeader *t, - StgTVar *tvar) { - int found; - StgClosure *result = NULL; +static StgBool entry_is_update(TRecEntry *e) { + StgBool result; + result = (e -> expected_value != e -> new_value); + return result; +} - // Look for any relevent trec entries - found = FALSE; - while (t != NO_TREC) { - FOR_EACH_ENTRY(t, e, { +static StgBool entry_is_read_only(TRecEntry *e) { + StgBool result; + result = (e -> expected_value == e -> new_value); + return result; +} + +static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) { + StgClosure *c; + StgBool result; + c = s -> current_value; + result = (c == (StgClosure *) h); + return result; +} + +// revert_ownership : release a lock on a TVar, storing back +// the value that it held when the lock was acquired. "revert_all" +// is set in stmWait and stmReWait when we acquired locks on all of +// the TVars involved. "revert_all" is not set in commit operations +// where we don't lock TVars that have been read from but not updated. + +static void revert_ownership(StgTRecHeader *trec STG_UNUSED, + StgBool revert_all STG_UNUSED) { +#if defined(STM_FG_LOCKS) + FOR_EACH_ENTRY(trec, e, { + if (revert_all || entry_is_update(e)) { StgTVar *s; s = e -> tvar; - if (s == tvar) { - found = TRUE; - result = e -> new_value; - BREAK_FOR_EACH; + if (tvar_is_locked(s, trec)) { + unlock_tvar(trec, s, e -> expected_value, TRUE); } - }); - if (found) break; - t = t -> enclosing_trec; - } - - if (!found) { - // Value not yet held in a trec - result = tvar -> current_value; - } - - return result; + } + }); +#endif } - + /*......................................................................*/ -static int transaction_is_valid (StgTRecHeader *t) { - StgTRecHeader *et; - int result; +// validate_and_acquire_ownership : this performs the twin functions +// of checking that the TVars referred to by entries in trec hold the +// expected values and: +// +// - locking the TVar (on updated TVars during commit, or all TVars +// during wait) +// +// - recording the identity of the TRec who wrote the value seen in the +// TVar (on non-updated TVars during commit). These values are +// stashed in the TRec entries and are then checked in check_read_only +// to ensure that an atomic snapshot of all of these locations has been +// seen. + +static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, + int acquire_all, + int retain_ownership) { + StgBool result; if (shake()) { - TRACE("Shake: pretending transaction trec=%p is invalid when it may not be\n", t); + TRACE("%p : shake, pretending trec is invalid when it may not be\n", trec); return FALSE; } - et = t -> enclosing_trec; - ASSERT ((t -> state == TREC_ACTIVE) || - (t -> state == TREC_WAITING) || - (t -> state == TREC_MUST_ABORT) || - (t -> state == TREC_CANNOT_COMMIT)); - result = !((t -> state) == TREC_MUST_ABORT); + ASSERT ((trec -> state == TREC_ACTIVE) || + (trec -> state == TREC_WAITING) || + (trec -> state == TREC_CONDEMNED)); + result = !((trec -> state) == TREC_CONDEMNED); if (result) { - FOR_EACH_ENTRY(t, e, { + FOR_EACH_ENTRY(trec, e, { StgTVar *s; s = e -> tvar; - if (e -> expected_value != read_current_value_seen_from(et, s)) { - result = FALSE; - BREAK_FOR_EACH; + if (acquire_all || entry_is_update(e)) { + TRACE("%p : trying to acquire %p\n", trec, s); + if (!cond_lock_tvar(trec, s, e -> expected_value)) { + TRACE("%p : failed to acquire %p\n", trec, s); + result = FALSE; + BREAK_FOR_EACH; + } + } else { + TRACE("%p : will need to check %p\n", trec, s); + if (s -> current_value != e -> expected_value) { + TRACE("%p : doesn't match\n", trec); + result = FALSE; + BREAK_FOR_EACH; + } + e -> saw_update_by = s -> last_update_by; + if (s -> current_value != e -> expected_value) { + TRACE("%p : doesn't match (race)\n", trec); + result = FALSE; + BREAK_FOR_EACH; + } else { + TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by); + } } }); } + + if ((!result) || (!retain_ownership)) { + revert_ownership(trec, acquire_all); + } + return result; } -/************************************************************************/ +// check_read_only : check that we've seen an atomic snapshot of the +// non-updated TVars accessed by a trec. This checks that the last TRec to +// commit an update to the TVar is unchanged since the value was stashed in +// validate_and_acquire_ownership. If no udpate is seen to any TVar than +// all of them contained their expected values at the start of the call to +// check_read_only. +// +// The paper "Concurrent programming without locks" (under submission), or +// Keir Fraser's PhD dissertation "Practical lock-free programming" discuss +// this kind of algorithm. -/* - * External functions below this point are repsonsible for: - * - * - acquiring/releasing the STM lock - * - * - all updates to the trec status field - * ASSERT(t != NO_TREC); +static StgBool check_read_only(StgTRecHeader *trec) { + StgBool result = TRUE; + + FOR_EACH_ENTRY(trec, e, { + StgTVar *s; + s = e -> tvar; + if (entry_is_read_only(e)) { + TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by); + if (s -> last_update_by != e -> saw_update_by) { + // ||s -> current_value != e -> expected_value) { + TRACE("%p : mismatch\n", trec); + result = FALSE; + BREAK_FOR_EACH; + } + } + }); + + return result; +} - * By convention we increment entry_count when starting a new - * transaction and we decrement it at the point where we can discard - * the contents of the trec when exiting the outermost transaction. - * This means that stmWait and stmRewait decrement the count whenever - * they return FALSE (they do so exactly once for each transaction - * that doesn't remain blocked forever). - */ /************************************************************************/ void stmPreGCHook() { + lock_stm(NO_TREC); TRACE("stmPreGCHook\n"); - cached_trec_headers = NO_TREC; - cached_trec_chunks = END_STM_CHUNK_LIST; - cached_tvar_wait_queues = END_STM_WAIT_QUEUE; + unlock_stm(NO_TREC); } /************************************************************************/ void initSTM() { TRACE("initSTM, NO_TREC=%p\n", NO_TREC); - /* Nothing */ } /*......................................................................*/ -StgTRecHeader *stmStartTransaction(StgTRecHeader *outer) { +StgTRecHeader *stmStartTransaction(StgRegTable *reg, + StgTRecHeader *outer) { StgTRecHeader *t; - TRACE("stmStartTransaction current-trec=%p\n", outer); - t = new_stg_trec_header(outer); - TRACE("stmStartTransaction new-trec=%p\n", t); + TRACE("%p : stmStartTransaction\n", outer); + t = new_stg_trec_header(reg, outer); + TRACE("%p : stmStartTransaction()=%p\n", outer, t); return t; } /*......................................................................*/ void stmAbortTransaction(StgTRecHeader *trec) { - TRACE("stmAbortTransaction trec=%p\n", trec); + TRACE("%p : stmAbortTransaction\n", trec); ASSERT (trec != NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || (trec -> state == TREC_WAITING) || - (trec -> state == TREC_CANNOT_COMMIT)); + (trec -> state == TREC_CONDEMNED)); + + lock_stm(trec); if (trec -> state == TREC_WAITING) { ASSERT (trec -> enclosing_trec == NO_TREC); - TRACE("stmAbortTransaction aborting waiting transaction\n"); - stop_tsos_waiting_on_trec(trec); + TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec); + remove_wait_queue_entries_for_trec(trec); } trec -> state = TREC_ABORTED; + unlock_stm(trec); - // Outcome now reflected by status field; no need for log - recycle_closures_from_trec(trec); - - TRACE("stmAbortTransaction trec=%p done\n", trec); + TRACE("%p : stmAbortTransaction done\n", trec); } /*......................................................................*/ void stmCondemnTransaction(StgTRecHeader *trec) { - TRACE("stmCondemnTransaction trec=%p\n", trec); + TRACE("%p : stmCondemnTransaction\n", trec); ASSERT (trec != NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || (trec -> state == TREC_WAITING) || - (trec -> state == TREC_CANNOT_COMMIT)); + (trec -> state == TREC_CONDEMNED)); + lock_stm(trec); if (trec -> state == TREC_WAITING) { ASSERT (trec -> enclosing_trec == NO_TREC); - TRACE("stmCondemnTransaction condemning waiting transaction\n"); - stop_tsos_waiting_on_trec(trec); + TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec); + remove_wait_queue_entries_for_trec(trec); } + trec -> state = TREC_CONDEMNED; + unlock_stm(trec); - trec -> state = TREC_MUST_ABORT; - - TRACE("stmCondemnTransaction trec=%p done\n", trec); + TRACE("%p : stmCondemnTransaction done\n", trec); } /*......................................................................*/ StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) { StgTRecHeader *outer; - TRACE("stmGetEnclosingTRec trec=%p\n", trec); + TRACE("%p : stmGetEnclosingTRec\n", trec); outer = trec -> enclosing_trec; - TRACE("stmGetEnclosingTRec outer=%p\n", outer); + TRACE("%p : stmGetEnclosingTRec()=%p\n", trec, outer); return outer; } /*......................................................................*/ -StgBool stmValidateTransaction(StgTRecHeader *trec) { - int result; - TRACE("stmValidateTransaction trec=%p\n", trec); +StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) { + StgTRecHeader *t; + StgBool result; + + TRACE("%p : stmValidateNestOfTransactions\n", trec); ASSERT(trec != NO_TREC); ASSERT((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || - (trec -> state == TREC_CANNOT_COMMIT) || - (trec -> state == TREC_WAITING)); + (trec -> state == TREC_WAITING) || + (trec -> state == TREC_CONDEMNED)); + + lock_stm(trec); - lock_stm(); - result = transaction_is_valid(trec); + t = trec; + result = TRUE; + while (t != NO_TREC) { + result &= validate_and_acquire_ownership(t, TRUE, FALSE); + t = t -> enclosing_trec; + } if (!result && trec -> state != TREC_WAITING) { - trec -> state = TREC_MUST_ABORT; + trec -> state = TREC_CONDEMNED; } - unlock_stm(); + unlock_stm(trec); - TRACE("stmValidateTransaction trec=%p result=%d\n", trec, result); + TRACE("%p : stmValidateNestOfTransactions()=%d\n", trec, result); return result; } /*......................................................................*/ -StgBool stmCommitTransaction(StgTRecHeader *trec) { - StgTRecHeader *et; +StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) { int result; - TRACE("stmCommitTransaction trec=%p trec->enclosing_trec=%p\n", trec, trec->enclosing_trec); + TRACE("%p : stmCommitTransaction()\n", trec); ASSERT (trec != NO_TREC); + ASSERT (trec -> enclosing_trec == NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || - (trec -> state == TREC_CANNOT_COMMIT)); + (trec -> state == TREC_CONDEMNED)); - lock_stm(); - result = transaction_is_valid(trec); + lock_stm(trec); + result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE); if (result) { - et = trec -> enclosing_trec; - if (trec -> state == TREC_CANNOT_COMMIT && et == NO_TREC) { - TRACE("Cannot commit trec=%p at top level\n", trec); - trec -> state = TREC_MUST_ABORT; - result = FALSE; - } else { - if (et == NO_TREC) { - TRACE("Non-nesting commit, NO_TREC=%p\n", NO_TREC); - } else { - TRACE("Nested commit into %p, NO_TREC=%p\n", et, NO_TREC); - } + // We now know that all the updated locations hold their expected values. + ASSERT (trec -> state == TREC_ACTIVE); + + if (use_read_phase) { + TRACE("%p : doing read check\n", trec); + result = check_read_only(trec); + } + + if (result) { + // We now know that all of the read-only locations held their exepcted values + // at the end of the call to validate_and_acquire_ownership. This forms the + // linearization point of the commit. + TRACE("%p : read-check succeeded\n", trec); FOR_EACH_ENTRY(trec, e, { StgTVar *s; s = e -> tvar; - if (et == NO_TREC) { - s -> current_value = e -> new_value; + if (e -> new_value != e -> expected_value) { + // Entry is an update: write the value back to the TVar, unlocking it if + // necessary. + + ACQ_ASSERT(tvar_is_locked(s, trec)); + TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s); unpark_waiters_on(s); - } else { - merge_update_into(et, s, e -> expected_value, e -> new_value, FALSE); - } + s -> last_update_by = trec; + unlock_tvar(trec, s, e -> new_value, TRUE); + } + ACQ_ASSERT(!tvar_is_locked(s, trec)); }); - - - if (trec->state == TREC_CANNOT_COMMIT && et -> state == TREC_ACTIVE) { - TRACE("Propagating TREC_CANNOT_COMMIT into %p\n", et); - et -> state = TREC_CANNOT_COMMIT; - } + } else { + revert_ownership(trec, FALSE); } } - // Outcome now reflected by status field; no need for log - recycle_closures_from_trec(trec); - - unlock_stm(); + unlock_stm(trec); - TRACE("stmCommitTransaction trec=%p result=%d\n", trec, result); + TRACE("%p : stmCommitTransaction()=%d\n", trec, result); return result; } /*......................................................................*/ -StgBool stmMergeForWaiting(StgTRecHeader *trec, StgTRecHeader *other) { +StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) { + StgTRecHeader *et; int result; - TRACE("stmMergeForWaiting trec=%p (%d) other=%p (%d)\n", trec, trec -> state, other, other->state); - ASSERT(trec != NO_TREC); - ASSERT(other != NO_TREC); - ASSERT((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || - (trec -> state == TREC_CANNOT_COMMIT)); - ASSERT((other -> state == TREC_ACTIVE) || - (other -> state == TREC_MUST_ABORT) || - (other -> state == TREC_CANNOT_COMMIT)); - - lock_stm(); - result = (transaction_is_valid(trec)); - TRACE("stmMergeForWaiting initial result=%d\n", result); + ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC); + TRACE("%p : stmCommitNestedTransaction() into %p\n", trec, trec -> enclosing_trec); + ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED)); + + lock_stm(trec); + + et = trec -> enclosing_trec; + result = validate_and_acquire_ownership(trec, FALSE, TRUE); if (result) { - result = transaction_is_valid(other); - TRACE("stmMergeForWaiting after both result=%d\n", result); + // We now know that all the updated locations hold their expected values. + + if (use_read_phase) { + TRACE("%p : doing read check\n", trec); + result = check_read_only(trec); + } if (result) { - // Individually the two transactions may be valid. Now copy entries from - // "other" into "trec". This may cause "trec" to become invalid if it - // contains an update that conflicts with one from "other" - FOR_EACH_ENTRY(other, e, { - StgTVar *s = e -> tvar; - TRACE("Merging trec=%p exp=%p new=%p\n", other, e->expected_value, e->new_value); - merge_update_into(trec, s, e-> expected_value, e -> new_value, TRUE); - }); - result = (trec -> state != TREC_MUST_ABORT); - } - } + // We now know that all of the read-only locations held their exepcted values + // at the end of the call to validate_and_acquire_ownership. This forms the + // linearization point of the commit. + + if (result) { + TRACE("%p : read-check succeeded\n", trec); + FOR_EACH_ENTRY(trec, e, { + // Merge each entry into the enclosing transaction record, release all + // locks. + + StgTVar *s; + s = e -> tvar; + if (entry_is_update(e)) { + unlock_tvar(trec, s, e -> expected_value, FALSE); + } + merge_update_into(reg, et, s, e -> expected_value, e -> new_value); + ACQ_ASSERT(s -> current_value != trec); + }); + } else { + revert_ownership(trec, FALSE); + } + } + } - if (!result) { - trec -> state = TREC_MUST_ABORT; - } + unlock_stm(trec); - unlock_stm(); + TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result); - TRACE("stmMergeForWaiting result=%d\n", result); return result; } /*......................................................................*/ -StgBool stmWait(StgTSO *tso, StgTRecHeader *trec) { +StgBool stmWait(StgRegTable *reg, StgTSO *tso, StgTRecHeader *trec) { int result; - TRACE("stmWait tso=%p trec=%p\n", tso, trec); + TRACE("%p : stmWait(%p)\n", trec, tso); ASSERT (trec != NO_TREC); ASSERT (trec -> enclosing_trec == NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || - (trec -> state == TREC_CANNOT_COMMIT)); + (trec -> state == TREC_CONDEMNED)); - lock_stm(); - result = transaction_is_valid(trec); + lock_stm(trec); + result = validate_and_acquire_ownership(trec, TRUE, TRUE); if (result) { // The transaction is valid so far so we can actually start waiting. // (Otherwise the transaction was not valid and the thread will have to // retry it). - start_tso_waiting_on_trec(tso, trec); + + // Put ourselves to sleep. We retain locks on all the TVars involved + // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM + // in the TSO, (c) TREC_WAITING in the Trec. + build_wait_queue_entries_for_trec(reg, tso, trec); park_tso(tso); trec -> state = TREC_WAITING; - } else { - // Outcome now reflected by status field; no need for log - recycle_closures_from_trec(trec); - } - unlock_stm(); - TRACE("stmWait trec=%p result=%d\n", trec, result); + // As soon as we start releasing ownership, another thread may find us + // and wake us up. This may happen even before we have finished + // releasing ownership. + revert_ownership(trec, TRUE); + } + + unlock_stm(trec); + + TRACE("%p : stmWait(%p)=%d\n", trec, tso, result); return result; } @@ -701,150 +910,153 @@ StgBool stmReWait(StgTSO *tso) { int result; StgTRecHeader *trec = tso->trec; - TRACE("stmReWait trec=%p\n", trec); + TRACE("%p : stmReWait\n", trec); ASSERT (trec != NO_TREC); ASSERT (trec -> enclosing_trec == NO_TREC); ASSERT ((trec -> state == TREC_WAITING) || - (trec -> state == TREC_MUST_ABORT)); + (trec -> state == TREC_CONDEMNED)); - lock_stm(); - result = transaction_is_valid(trec); - TRACE("stmReWait trec=%p result=%d\n", trec, result); + lock_stm(trec); + result = validate_and_acquire_ownership(trec, TRUE, TRUE); + TRACE("%p : validation %s\n", trec, result ? "succeeded" : "failed"); if (result) { // The transaction remains valid -- do nothing because it is already on // the wait queues ASSERT (trec -> state == TREC_WAITING); park_tso(tso); + revert_ownership(trec, TRUE); } else { // The transcation has become invalid. We can now remove it from the wait // queues. - if (trec -> state != TREC_MUST_ABORT) { - stop_tsos_waiting_on_trec (trec); - - // Outcome now reflected by status field; no need for log - recycle_closures_from_trec(trec); + if (trec -> state != TREC_CONDEMNED) { + remove_wait_queue_entries_for_trec (trec); } } - unlock_stm(); + unlock_stm(trec); - TRACE("stmReWait trec=%p result=%d\n", trec, result); + TRACE("%p : stmReWait()=%d\n", trec, result); return result; } /*......................................................................*/ -StgClosure *stmReadTVar(StgTRecHeader *trec, - StgTVar *tvar) { - StgTRecHeader *et; - StgClosure *result = NULL; // Suppress unassignment warning - int found = FALSE; - TRecEntry *ne = NULL; - - TRACE("stmReadTVar trec=%p tvar=%p\n", trec, tvar); - ASSERT (trec != NO_TREC); - ASSERT (trec -> state == TREC_ACTIVE || - trec -> state == TREC_MUST_ABORT || - trec -> state == TREC_CANNOT_COMMIT); +static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) { + TRecEntry *result = NULL; - lock_stm(); - found = FALSE; + TRACE("%p : get_entry_for TVar %p\n", trec, tvar); + ASSERT(trec != NO_TREC); - // Look for an existing entry in our trec or in an enclosing trec - et = trec; - while (et != NO_TREC) { - FOR_EACH_ENTRY(et, e, { - TRACE("testing e=%p\n", e); + do { + FOR_EACH_ENTRY(trec, e, { if (e -> tvar == tvar) { - found = TRUE; - result = e -> new_value; + result = e; + if (in != NULL) { + *in = trec; + } BREAK_FOR_EACH; } }); - if (found) break; - et = et -> enclosing_trec; - } + trec = trec -> enclosing_trec; + } while (result == NULL && trec != NO_TREC); - if (found && et != trec) { - // Entry found in another trec - ASSERT (result != NULL); - TRACE("duplicating entry\n"); - ne = get_new_entry(trec); - ne -> tvar = tvar; - ne -> expected_value = result; - ne -> new_value = result; - } else if (!found) { - // No entry found - ASSERT (result == NULL); - TRACE("need new entry\n"); - ne = get_new_entry(trec); - TRACE("got ne=%p\n", ne); + return result; +} + +static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) { + StgClosure *result; + result = tvar -> current_value; + +#if defined(STM_FG_LOCKS) + while (GET_INFO(result) == &stg_TREC_HEADER_info) { + TRACE("%p : read_current_value(%p) saw %p\n", trec, tvar, result); result = tvar -> current_value; - ne -> tvar = tvar; - ne -> expected_value = result; - ne -> new_value = result; } +#endif - unlock_stm(); - ASSERT (result != NULL); - TRACE("stmReadTVar trec=%p result=%p\n", trec, result); + TRACE("%p : read_current_value(%p)=%p\n", trec, tvar, result); + return result; +} +/*......................................................................*/ + +StgClosure *stmReadTVar(StgRegTable *reg, + StgTRecHeader *trec, + StgTVar *tvar) { + StgTRecHeader *entry_in; + StgClosure *result = NULL; + TRecEntry *entry = NULL; + TRACE("%p : stmReadTVar(%p)\n", trec, tvar); + ASSERT (trec != NO_TREC); + ASSERT (trec -> state == TREC_ACTIVE || + trec -> state == TREC_CONDEMNED); + + entry = get_entry_for(trec, tvar, &entry_in); + + if (entry != NULL) { + if (entry_in == trec) { + // Entry found in our trec + result = entry -> new_value; + } else { + // Entry found in another trec + TRecEntry *new_entry = get_new_entry(reg, trec); + new_entry -> tvar = tvar; + new_entry -> expected_value = entry -> expected_value; + new_entry -> new_value = entry -> new_value; + result = new_entry -> new_value; + } + } else { + // No entry found + StgClosure *current_value = read_current_value(trec, tvar); + TRecEntry *new_entry = get_new_entry(reg, trec); + new_entry -> tvar = tvar; + new_entry -> expected_value = current_value; + new_entry -> new_value = current_value; + result = current_value; + } + + TRACE("%p : stmReadTVar(%p)=%p\n", trec, tvar, result); return result; } /*......................................................................*/ -void stmWriteTVar(StgTRecHeader *trec, +void stmWriteTVar(StgRegTable *reg, + StgTRecHeader *trec, StgTVar *tvar, StgClosure *new_value) { - StgTRecHeader *et; - TRecEntry *ne; + + StgTRecHeader *entry_in; TRecEntry *entry = NULL; - int found; - TRACE("stmWriteTVar trec=%p tvar=%p new_value=%p\n", trec, tvar, new_value); + TRACE("%p : stmWriteTVar(%p, %p)\n", trec, tvar, new_value); ASSERT (trec != NO_TREC); ASSERT (trec -> state == TREC_ACTIVE || - trec -> state == TREC_MUST_ABORT || - trec -> state == TREC_CANNOT_COMMIT); - - lock_stm(); - found = FALSE; + trec -> state == TREC_CONDEMNED); - // Look for an existing entry in our trec or in an enclosing trec - et = trec; - while (et != NO_TREC) { - FOR_EACH_ENTRY(et, e, { - if (e -> tvar == tvar) { - found = TRUE; - entry = e; - BREAK_FOR_EACH; - } - }); - if (found) break; - et = et -> enclosing_trec; - } + entry = get_entry_for(trec, tvar, &entry_in); - if (found && et == trec) { - // Entry found in our trec - entry -> new_value = new_value; - } else if (found) { - // Entry found in another trec - ne = get_new_entry(trec); - ne -> tvar = tvar; - ne -> expected_value = entry -> new_value; - ne -> new_value = new_value; + if (entry != NULL) { + if (entry_in == trec) { + // Entry found in our trec + entry -> new_value = new_value; + } else { + // Entry found in another trec + TRecEntry *new_entry = get_new_entry(reg, trec); + new_entry -> tvar = tvar; + new_entry -> expected_value = entry -> expected_value; + new_entry -> new_value = new_value; + } } else { // No entry found - ne = get_new_entry(trec); - ne -> tvar = tvar; - ne -> expected_value = tvar -> current_value; - ne -> new_value = new_value; + StgClosure *current_value = read_current_value(trec, tvar); + TRecEntry *new_entry = get_new_entry(reg, trec); + new_entry -> tvar = tvar; + new_entry -> expected_value = current_value; + new_entry -> new_value = new_value; } - unlock_stm(); - TRACE("stmWriteTVar trec=%p done\n", trec); + TRACE("%p : stmWriteTVar done\n", trec); } - /*......................................................................*/ diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index e646679..8a21d7b 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1927,7 +1927,7 @@ scheduleDoGC( rtsBool force_major ) */ for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { - if (!stmValidateTransaction (t -> trec)) { + if (!stmValidateNestOfTransactions (t -> trec)) { IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); // strip the stack back to the ATOMICALLY_FRAME, aborting -- 1.7.10.4