X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSTM.c;h=d3283a92f04fbf9f89437aaa3443d6794c475232;hb=3eacdc7faf0d0e87a7201253f9f12c1fb4db7249;hp=d4ae64388d8b6d1e1f09c7d12fbc663883e9ee02;hpb=e5e1449183fc7e5fcc774612fc3fa0ec0e5b2570;p=ghc-hetmet.git diff --git a/ghc/rts/STM.c b/ghc/rts/STM.c index d4ae643..d3283a9 100644 --- a/ghc/rts/STM.c +++ b/ghc/rts/STM.c @@ -1,39 +1,84 @@ /* ----------------------------------------------------------------------------- - * - * (c) The GHC Team 1998-2004 + * (c) The GHC Team 1998-2005 * * STM implementation. * - * This implementation is designed for a many-threads, few-CPUs case. This leads - * to a number of design choices: + * Overview + * -------- + * + * See the PPoPP 2005 paper "Composable memory transactions". In summary, + * each transcation has a TRec (transaction record) holding entries for each of the + * TVars (transactional variables) that it has accessed. Each entry records + * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that + * the transaction wants to write to the TVar, (d) during commit, the identity of + * the TRec that wrote the expected value. + * + * Separate TRecs are used for each level in a nest of transactions. This allows + * a nested transaction to be aborted without condemning its enclosing transactions. + * This is needed in the implementation of catchRetry. Note that the "expected value" + * in a nested transaction's TRec is the value expected to be *held in memory* if + * the transaction commits -- not the "new value" stored in one of the enclosing + * transactions. This means that validation can be done without searching through + * a nest of TRecs. + * + * Concurrency control + * ------------------- + * + * Three different concurrency control schemes can be built according to the settings + * in STM.h: + * + * STM_UNIPROC assumes that the caller serialises invocations on the STM interface. + * In the Haskell RTS this means it is suitable only for non-THREADED_RTS builds. + * + * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during + * an invocation on the STM interface. Note that this does not mean that + * transactions are simply serialized -- the lock is only held *within* the + * implementation of stmCommitTransaction, stmWait etc. + * + * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis + * and, when committing a transaction, no locks are acquired for TVars that have + * been read but not updated. + * + * Concurrency control is implemented in the functions: * - * - We use a simple design which does not aim to be lock-free -- SMP builds use - * a mutex to protect all the TVars and STM datastructures, non-SMP builds - * do not require any locking. The goal is to make fast-path uncontended - * operations fast because, with few CPUs, contention betwen operations on the - * STM interface is expected rarely. + * lock_stm + * unlock_stm + * lock_tvar / cond_lock_tvar + * unlock_tvar * - * - Each thread is responsible for adding/removing itself to/from the queues - * associated with tvars. This reduces the work that is necessary when a - * large number of threads are waiting on a single tvar and where the update - * to that tvar is really only releasing a single thread. + * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the + * implementation of these functions. * - * Ideas for future experimentation: + * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock + * using STM_CG_LOCK, and otherwise they are no-ops. * - * - Read/write operations here involve a linear search of the trec. Consider - * adding a cache to map tvars to existing entries in the trec. + * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they + * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well + * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS + * builds). This is because locking a TVar is implemented by writing the lock + * holder's TRec into the TVar's current_value field: * - * - Consider whether to defer unparking more than one thread. On a uniprocessor - * the deferment could be made until a thread switch from the first thread - * released in the hope that it restores the location to a value on which - * other threads were waiting. That would avoid a stampede on e.g. multiple - * threads blocked reading from a single-cell shared buffer. + * lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value + * it contained. * - * - Consider whether to provide a link from a StgTVarWaitQueue to the TRecEntry - * associated with the waiter. This would allow unpark_waiters_on to be - * more selective and avoid unparking threads whose expected value for that - * tvar co-incides with the value now stored there. Does this happen often? - * + * cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it + * contains a specified value. Return TRUE if this succeeds, + * FALSE otherwise. + * + * unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only), + * storing a specified value in place of the lock entry. + * + * Using these operations, the typcial pattern of a commit/validate/wait operation + * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that + * the TVars that were only read from still contain their expected values, + * (d) release the locks on the TVars, writing updates to them in the case of a + * commit, (e) unlock the STM. + * + * Queues of waiting threads hang off the first_wait_queue_entry field of each + * TVar. This may only be manipulated when holding that TVar's lock. In + * particular, when a thread is putting itself to sleep, it mustn't release + * the TVar's lock until it has added itself to the wait queue and marked its + * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it. * * ---------------------------------------------------------------------------*/ @@ -42,37 +87,56 @@ #include "RtsFlags.h" #include "RtsUtils.h" #include "Schedule.h" +#include "SMP.h" #include "STM.h" #include "Storage.h" #include #include +#define TRUE 1 #define FALSE 0 -#define TRUE 1 + +// ACQ_ASSERT is used for assertions which are only required for +// THREADED_RTS builds with fine-grained locking. + +#if defined(STM_FG_LOCKS) +#define ACQ_ASSERT(_X) ASSERT(_X) +#define NACQ_ASSERT(_X) /*Nothing*/ +#else +#define ACQ_ASSERT(_X) /*Nothing*/ +#define NACQ_ASSERT(_X) ASSERT(_X) +#endif + +/*......................................................................*/ + +// If SHAKE is defined then validation will sometime spuriously fail. They helps test +// unusualy code paths if genuine contention is rare #if defined(DEBUG) #define SHAKE +#if defined(THREADED_RTS) +#define TRACE(_x...) IF_DEBUG(stm, debugBelch("STM (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()); debugBelch ( _x )) +#else #define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x )) +#endif #else #define TRACE(_x...) /*Nothing*/ #endif -// If SHAKE is defined then validation will sometime spuriously fail. They helps test -// unusualy code paths if genuine contention is rare - #ifdef SHAKE static const int do_shake = TRUE; #else static const int do_shake = FALSE; #endif static int shake_ctr = 0; - -/*......................................................................*/ +static int shake_lim = 1; static int shake(void) { if (do_shake) { - if (((shake_ctr++) % 47) == 0) { + if (((shake_ctr++) % shake_lim) == 0) { + shake_ctr = 1; + shake_lim ++; return TRUE; } return FALSE; @@ -86,79 +150,176 @@ static int shake(void) { // Helper macros for iterating over entries within a transaction // record -#define FOR_EACH_ENTRY(_t,_x,CODE) do { \ - StgTRecHeader *__t = (_t); \ - StgTRecChunk *__c = __t -> current_chunk; \ - StgWord __limit = __c -> next_entry_idx; \ - TRACE("trec=%p chunk=%p limit=%ld\n", __t, __c, __limit); \ - while (__c != END_STM_CHUNK_LIST) { \ - StgWord __i; \ - for (__i = 0; __i < __limit; __i ++) { \ - TRecEntry *_x = &(__c -> entries[__i]); \ - do { CODE } while (0); \ - } \ - __c = __c -> prev_chunk; \ - __limit = TREC_CHUNK_NUM_ENTRIES; \ - } \ - exit_for_each: \ - if (FALSE) goto exit_for_each; \ +#define FOR_EACH_ENTRY(_t,_x,CODE) do { \ + StgTRecHeader *__t = (_t); \ + StgTRecChunk *__c = __t -> current_chunk; \ + StgWord __limit = __c -> next_entry_idx; \ + TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld\n", __t, __c, __limit); \ + while (__c != END_STM_CHUNK_LIST) { \ + StgWord __i; \ + for (__i = 0; __i < __limit; __i ++) { \ + TRecEntry *_x = &(__c -> entries[__i]); \ + do { CODE } while (0); \ + } \ + __c = __c -> prev_chunk; \ + __limit = TREC_CHUNK_NUM_ENTRIES; \ + } \ + exit_for_each: \ + if (FALSE) goto exit_for_each; \ } while (0) #define BREAK_FOR_EACH goto exit_for_each /*......................................................................*/ -// Private cache of must-be-unreachable trec headers and chunks +// if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks, +// and wait queue entries without GC -static StgTRecHeader *cached_trec_headers = NO_TREC; -static StgTRecChunk *cached_trec_chunks = END_STM_CHUNK_LIST; -static StgTVarWaitQueue *cached_tvar_wait_queues = END_STM_WAIT_QUEUE; +#define REUSE_MEMORY -static void recycle_tvar_wait_queue(StgTVarWaitQueue *q STG_UNUSED) { -#if 0 - if (shake()) { - TRACE("Shake: not re-using wait queue %p\n", q); - return; +/*......................................................................*/ + +#define IF_STM_UNIPROC(__X) do { } while (0) +#define IF_STM_CG_LOCK(__X) do { } while (0) +#define IF_STM_FG_LOCKS(__X) do { } while (0) + +#if defined(STM_UNIPROC) +#undef IF_STM_UNIPROC +#define IF_STM_UNIPROC(__X) do { __X } while (0) +static const StgBool use_read_phase = FALSE; + +static void lock_stm(StgTRecHeader *trec STG_UNUSED) { + TRACE("%p : lock_stm()\n", trec); +} + +static void unlock_stm(StgTRecHeader *trec STG_UNUSED) { + TRACE("%p : unlock_stm()\n", trec); +} + +static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED) { + StgClosure *result; + TRACE("%p : lock_tvar(%p)\n", trec, s); + result = s -> current_value; + return result; +} + +static void unlock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED, + StgClosure *c, + StgBool force_update) { + TRACE("%p : unlock_tvar(%p)\n", trec, s); + if (force_update) { + s -> current_value = c; } +} - q -> next_queue_entry = cached_tvar_wait_queues; - cached_tvar_wait_queues = q; +static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED, + StgClosure *expected) { + StgClosure *result; + TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected); + result = s -> current_value; + TRACE("%p : %s\n", trec, (result == expected) ? "success" : "failure"); + return (result == expected); +} #endif + +#if defined(STM_CG_LOCK) /*........................................*/ + +#undef IF_STM_CG_LOCK +#define IF_STM_CG_LOCK(__X) do { __X } while (0) +static const StgBool use_read_phase = FALSE; +static volatile StgTRecHeader *smp_locked = NULL; + +static void lock_stm(StgTRecHeader *trec) { + while (cas(&smp_locked, NULL, trec) != NULL) { } + TRACE("%p : lock_stm()\n", trec); } -static void recycle_closures_from_trec (StgTRecHeader *t STG_UNUSED) { -#if 0 - if (shake()) { - TRACE("Shake: not re-using closures from %p\n", t); - return; - } +static void unlock_stm(StgTRecHeader *trec STG_UNUSED) { + TRACE("%p : unlock_stm()\n", trec); + ASSERT (smp_locked == trec); + smp_locked = 0; +} - t -> enclosing_trec = cached_trec_headers; - cached_trec_headers = t; - t -> enclosing_trec = NO_TREC; +static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED) { + StgClosure *result; + TRACE("%p : lock_tvar(%p)\n", trec, s); + ASSERT (smp_locked == trec); + result = s -> current_value; + return result; +} - while (t -> current_chunk != END_STM_CHUNK_LIST) { - StgTRecChunk *c = t -> current_chunk; - t -> current_chunk = c -> prev_chunk; - c -> prev_chunk = cached_trec_chunks; - cached_trec_chunks = c; +static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED, + StgClosure *c, + StgBool force_update) { + TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c); + ASSERT (smp_locked == trec); + if (force_update) { + s -> current_value = c; } +} + +static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s STG_UNUSED, + StgClosure *expected) { + StgClosure *result; + TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected); + ASSERT (smp_locked == trec); + result = s -> current_value; + TRACE("%p : %d\n", result ? "success" : "failure"); + return (result == expected); +} #endif + +#if defined(STM_FG_LOCKS) /*...................................*/ + +#undef IF_STM_FG_LOCKS +#define IF_STM_FG_LOCKS(__X) do { __X } while (0) +static const StgBool use_read_phase = TRUE; + +static void lock_stm(StgTRecHeader *trec STG_UNUSED) { + TRACE("%p : lock_stm()\n", trec); } -/*......................................................................*/ +static void unlock_stm(StgTRecHeader *trec STG_UNUSED) { + TRACE("%p : unlock_stm()\n", trec); +} -// Helper functions for managing internal STM state. This lock is only held -// for a 'short' time, in the sense that it is never held when any of the -// external functions returns. +static StgClosure *lock_tvar(StgTRecHeader *trec, + StgTVar *s STG_UNUSED) { + StgClosure *result; + TRACE("%p : lock_tvar(%p)\n", trec, s); + do { + do { + result = s -> current_value; + } while (GET_INFO(result) == &stg_TREC_HEADER_info); + } while (cas(&(s -> current_value), result, trec) != result); + return result; +} -static void lock_stm(void) { - // Nothing +static void unlock_tvar(StgTRecHeader *trec STG_UNUSED, + StgTVar *s, + StgClosure *c, + StgBool force_update STG_UNUSED) { + TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c); + ASSERT(s -> current_value == trec); + s -> current_value = c; } -static void unlock_stm(void) { - // Nothing +static StgBool cond_lock_tvar(StgTRecHeader *trec, + StgTVar *s, + StgClosure *expected) { + StgClosure *result; + TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected); + result = cas(&(s -> current_value), expected, trec); + TRACE("%p : %s\n", trec, result ? "success" : "failure"); + return (result == expected); } +#endif /*......................................................................*/ @@ -171,104 +332,176 @@ static void park_tso(StgTSO *tso) { TRACE("park_tso on tso=%p\n", tso); } -static void unpark_tso(StgTSO *tso) { +static void unpark_tso(Capability *cap, StgTSO *tso) { // We will continue unparking threads while they remain on one of the wait // queues: it's up to the thread itself to remove it from the wait queues // if it decides to do so when it is scheduled. if (tso -> why_blocked == BlockedOnSTM) { TRACE("unpark_tso on tso=%p\n", tso); - tso -> why_blocked = NotBlocked; - PUSH_ON_RUN_QUEUE(tso); + unblockOne(cap,tso); } else { TRACE("spurious unpark_tso on tso=%p\n", tso); } } -static void unpark_waiters_on(StgTVar *s) { +static void unpark_waiters_on(Capability *cap, StgTVar *s) { StgTVarWaitQueue *q; TRACE("unpark_waiters_on tvar=%p\n", s); for (q = s -> first_wait_queue_entry; q != END_STM_WAIT_QUEUE; q = q -> next_queue_entry) { - unpark_tso(q -> waiting_tso); + unpark_tso(cap, q -> waiting_tso); } } /*......................................................................*/ -// Helper functions for allocation and initialization +// Helper functions for downstream allocation and initialization -static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgTSO *waiting_tso) { +static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap, + StgTSO *waiting_tso) { StgTVarWaitQueue *result; - if (cached_tvar_wait_queues != END_STM_WAIT_QUEUE) { - result = cached_tvar_wait_queues; - cached_tvar_wait_queues = result -> next_queue_entry; - } else { - result = (StgTVarWaitQueue *)allocate(sizeofW(StgTVarWaitQueue)); - SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM); - } + result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue)); + SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM); result -> waiting_tso = waiting_tso; return result; } -static StgTRecChunk *new_stg_trec_chunk(void) { +static StgTRecChunk *new_stg_trec_chunk(Capability *cap) { StgTRecChunk *result; - if (cached_trec_chunks != END_STM_CHUNK_LIST) { - result = cached_trec_chunks; - cached_trec_chunks = result -> prev_chunk; - } else { - result = (StgTRecChunk *)allocate(sizeofW(StgTRecChunk)); - SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM); - } + result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk)); + SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM); result -> prev_chunk = END_STM_CHUNK_LIST; result -> next_entry_idx = 0; - TRACE("prev from %p is %p\n", result, result -> prev_chunk); return result; } -static StgTRecHeader *new_stg_trec_header(StgTRecHeader *enclosing_trec) { +static StgTRecHeader *new_stg_trec_header(Capability *cap, + StgTRecHeader *enclosing_trec) { StgTRecHeader *result; - if (cached_trec_headers != NO_TREC) { - result = cached_trec_headers; - cached_trec_headers = result -> enclosing_trec; - } else { - result = (StgTRecHeader *) allocate(sizeofW(StgTRecHeader)); - SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM); - } + result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader)); + SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM); + result -> enclosing_trec = enclosing_trec; - result -> current_chunk = new_stg_trec_chunk(); + result -> current_chunk = new_stg_trec_chunk(cap); if (enclosing_trec == NO_TREC) { result -> state = TREC_ACTIVE; } else { ASSERT(enclosing_trec -> state == TREC_ACTIVE || - enclosing_trec -> state == TREC_MUST_ABORT || - enclosing_trec -> state == TREC_CANNOT_COMMIT); + enclosing_trec -> state == TREC_CONDEMNED); result -> state = enclosing_trec -> state; } - TRACE("new_stg_trec_header creating %p nidx=%ld chunk=%p enclosing_trec=%p state=%d\n", - result, result->current_chunk->next_entry_idx, result -> current_chunk, enclosing_trec, result->state); return result; } /*......................................................................*/ +// Allocation / deallocation functions that retain per-capability lists +// of closures that can be re-used + +static StgTVarWaitQueue *alloc_stg_tvar_wait_queue(Capability *cap, + StgTSO *waiting_tso) { + StgTVarWaitQueue *result = NULL; + if (cap -> free_tvar_wait_queues == END_STM_WAIT_QUEUE) { + result = new_stg_tvar_wait_queue(cap, waiting_tso); + } else { + result = cap -> free_tvar_wait_queues; + result -> waiting_tso = waiting_tso; + cap -> free_tvar_wait_queues = result -> next_queue_entry; + } + return result; +} + +static void free_stg_tvar_wait_queue(Capability *cap, + StgTVarWaitQueue *wq) { +#if defined(REUSE_MEMORY) + wq -> next_queue_entry = cap -> free_tvar_wait_queues; + cap -> free_tvar_wait_queues = wq; +#endif +} + +static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) { + StgTRecChunk *result = NULL; + if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) { + result = new_stg_trec_chunk(cap); + } else { + result = cap -> free_trec_chunks; + cap -> free_trec_chunks = result -> prev_chunk; + result -> prev_chunk = END_STM_CHUNK_LIST; + result -> next_entry_idx = 0; + } + return result; +} + +static void free_stg_trec_chunk(Capability *cap, + StgTRecChunk *c) { +#if defined(REUSE_MEMORY) + c -> prev_chunk = cap -> free_trec_chunks; + cap -> free_trec_chunks = c; +#endif +} + +static StgTRecHeader *alloc_stg_trec_header(Capability *cap, + StgTRecHeader *enclosing_trec) { + StgTRecHeader *result = NULL; + if (cap -> free_trec_headers == NO_TREC) { + result = new_stg_trec_header(cap, enclosing_trec); + } else { + result = cap -> free_trec_headers; + cap -> free_trec_headers = result -> enclosing_trec; + result -> enclosing_trec = enclosing_trec; + result -> current_chunk -> next_entry_idx = 0; + if (enclosing_trec == NO_TREC) { + result -> state = TREC_ACTIVE; + } else { + ASSERT(enclosing_trec -> state == TREC_ACTIVE || + enclosing_trec -> state == TREC_CONDEMNED); + result -> state = enclosing_trec -> state; + } + } + return result; +} + +static void free_stg_trec_header(Capability *cap, + StgTRecHeader *trec) { +#if defined(REUSE_MEMORY) + StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk; + while (chunk != END_STM_CHUNK_LIST) { + StgTRecChunk *prev_chunk = chunk -> prev_chunk; + free_stg_trec_chunk(cap, chunk); + chunk = prev_chunk; + } + trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST; + trec -> enclosing_trec = cap -> free_trec_headers; + cap -> free_trec_headers = trec; +#endif +} + +/*......................................................................*/ + // Helper functions for managing waiting lists -static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) { +static void build_wait_queue_entries_for_trec(Capability *cap, + StgTSO *tso, + StgTRecHeader *trec) { ASSERT(trec != NO_TREC); ASSERT(trec -> enclosing_trec == NO_TREC); - ASSERT(trec -> state == TREC_ACTIVE || trec -> state == TREC_CANNOT_COMMIT); + ASSERT(trec -> state == TREC_ACTIVE); + + TRACE("%p : build_wait_queue_entries_for_trec()\n", trec); + FOR_EACH_ENTRY(trec, e, { StgTVar *s; StgTVarWaitQueue *q; StgTVarWaitQueue *fq; s = e -> tvar; - TRACE("Adding tso=%p to wait queue for tvar=%p\n", tso, s); - ASSERT(s -> current_value == e -> expected_value); + TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s); + ACQ_ASSERT(s -> current_value == trec); + NACQ_ASSERT(s -> current_value == e -> expected_value); fq = s -> first_wait_queue_entry; - q = new_stg_tvar_wait_queue(tso); + q = alloc_stg_tvar_wait_queue(cap, tso); q -> next_queue_entry = fq; q -> prev_queue_entry = END_STM_WAIT_QUEUE; if (fq != END_STM_WAIT_QUEUE) { @@ -279,23 +512,27 @@ static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) { }); } -static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) { +static void remove_wait_queue_entries_for_trec(Capability *cap, + StgTRecHeader *trec) { ASSERT(trec != NO_TREC); ASSERT(trec -> enclosing_trec == NO_TREC); ASSERT(trec -> state == TREC_WAITING || - trec -> state == TREC_MUST_ABORT); - TRACE("stop_tsos_waiting in state=%d\n", trec -> state); + trec -> state == TREC_CONDEMNED); + + TRACE("%p : remove_wait_queue_entries_for_trec()\n", trec); + FOR_EACH_ENTRY(trec, e, { StgTVar *s; StgTVarWaitQueue *pq; StgTVarWaitQueue *nq; StgTVarWaitQueue *q; s = e -> tvar; + StgClosure *saw = lock_tvar(trec, s); q = (StgTVarWaitQueue *) (e -> new_value); - TRACE("Removing tso=%p from wait queue for tvar=%p\n", q -> waiting_tso, s); + TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s); + ACQ_ASSERT(s -> current_value == trec); nq = q -> next_queue_entry; pq = q -> prev_queue_entry; - TRACE("pq=%p nq=%p q=%p\n", pq, nq, q); if (nq != END_STM_WAIT_QUEUE) { nq -> prev_queue_entry = pq; } @@ -305,13 +542,15 @@ static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) { ASSERT (s -> first_wait_queue_entry == q); s -> first_wait_queue_entry = nq; } - recycle_tvar_wait_queue(q); + free_stg_tvar_wait_queue(cap, q); + unlock_tvar(trec, s, saw, FALSE); }); } /*......................................................................*/ -static TRecEntry *get_new_entry(StgTRecHeader *t) { +static TRecEntry *get_new_entry(Capability *cap, + StgTRecHeader *t) { TRecEntry *result; StgTRecChunk *c; int i; @@ -327,7 +566,7 @@ static TRecEntry *get_new_entry(StgTRecHeader *t) { } else { // Current chunk is full: allocate a fresh one StgTRecChunk *nc; - nc = new_stg_trec_chunk(); + nc = alloc_stg_trec_chunk(cap); nc -> prev_chunk = c; nc -> next_entry_idx = 1; t -> current_chunk = nc; @@ -339,11 +578,11 @@ static TRecEntry *get_new_entry(StgTRecHeader *t) { /*......................................................................*/ -static void merge_update_into(StgTRecHeader *t, +static void merge_update_into(Capability *cap, + StgTRecHeader *t, StgTVar *tvar, StgClosure *expected_value, - StgClosure *new_value, - int merging_sibling) { + StgClosure *new_value) { int found; // Look for an entry in this trec @@ -353,23 +592,12 @@ static void merge_update_into(StgTRecHeader *t, s = e -> tvar; if (s == tvar) { found = TRUE; - if (merging_sibling) { - if (e -> expected_value != expected_value) { - // Must abort if the two entries start from different values - TRACE("Siblings inconsistent at %p (%p vs %p)\n", - tvar, e -> expected_value, expected_value); - t -> state = TREC_MUST_ABORT; - } else if (e -> new_value != new_value) { - // Cannot commit if the two entries lead to different values (wait still OK) - TRACE("Siblings trying conflicting writes to %p (%p vs %p)\n", - tvar, e -> new_value, new_value); - t -> state = TREC_CANNOT_COMMIT; - } - } else { - // Otherwise merging child back into parent - ASSERT (e -> new_value == expected_value); - } - TRACE(" trec=%p exp=%p new=%p\n", t, e->expected_value, e->new_value); + if (e -> expected_value != expected_value) { + // Must abort if the two entries start from different values + TRACE("%p : entries inconsistent at %p (%p vs %p)\n", + t, tvar, e -> expected_value, expected_value); + t -> state = TREC_CONDEMNED; + } e -> new_value = new_value; BREAK_FOR_EACH; } @@ -378,7 +606,7 @@ static void merge_update_into(StgTRecHeader *t, if (!found) { // No entry so far in this trec TRecEntry *ne; - ne = get_new_entry(t); + ne = get_new_entry(cap, t); ne -> tvar = tvar; ne -> expected_value = expected_value; ne -> new_value = new_value; @@ -387,464 +615,647 @@ static void merge_update_into(StgTRecHeader *t, /*......................................................................*/ -static StgClosure *read_current_value_seen_from(StgTRecHeader *t, - StgTVar *tvar) { - int found; - StgClosure *result = NULL; +static StgBool entry_is_update(TRecEntry *e) { + StgBool result; + result = (e -> expected_value != e -> new_value); + return result; +} - // Look for any relevent trec entries - found = FALSE; - while (t != NO_TREC) { - FOR_EACH_ENTRY(t, e, { +#if defined(STM_FG_LOCKS) +static StgBool entry_is_read_only(TRecEntry *e) { + StgBool result; + result = (e -> expected_value == e -> new_value); + return result; +} + +static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) { + StgClosure *c; + StgBool result; + c = s -> current_value; + result = (c == (StgClosure *) h); + return result; +} +#endif + +// revert_ownership : release a lock on a TVar, storing back +// the value that it held when the lock was acquired. "revert_all" +// is set in stmWait and stmReWait when we acquired locks on all of +// the TVars involved. "revert_all" is not set in commit operations +// where we don't lock TVars that have been read from but not updated. + +static void revert_ownership(StgTRecHeader *trec STG_UNUSED, + StgBool revert_all STG_UNUSED) { +#if defined(STM_FG_LOCKS) + FOR_EACH_ENTRY(trec, e, { + if (revert_all || entry_is_update(e)) { StgTVar *s; s = e -> tvar; - if (s == tvar) { - found = TRUE; - result = e -> new_value; - BREAK_FOR_EACH; + if (tvar_is_locked(s, trec)) { + unlock_tvar(trec, s, e -> expected_value, TRUE); } - }); - if (found) break; - t = t -> enclosing_trec; - } - - if (!found) { - // Value not yet held in a trec - result = tvar -> current_value; - } - - return result; + } + }); +#endif } - + /*......................................................................*/ -static int transaction_is_valid (StgTRecHeader *t) { - StgTRecHeader *et; - int result; +// validate_and_acquire_ownership : this performs the twin functions +// of checking that the TVars referred to by entries in trec hold the +// expected values and: +// +// - locking the TVar (on updated TVars during commit, or all TVars +// during wait) +// +// - recording the identity of the TRec who wrote the value seen in the +// TVar (on non-updated TVars during commit). These values are +// stashed in the TRec entries and are then checked in check_read_only +// to ensure that an atomic snapshot of all of these locations has been +// seen. + +static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, + int acquire_all, + int retain_ownership) { + StgBool result; if (shake()) { - TRACE("Shake: pretending transaction trec=%p is invalid when it may not be\n", t); + TRACE("%p : shake, pretending trec is invalid when it may not be\n", trec); return FALSE; } - et = t -> enclosing_trec; - ASSERT ((t -> state == TREC_ACTIVE) || - (t -> state == TREC_WAITING) || - (t -> state == TREC_MUST_ABORT) || - (t -> state == TREC_CANNOT_COMMIT)); - result = !((t -> state) == TREC_MUST_ABORT); + ASSERT ((trec -> state == TREC_ACTIVE) || + (trec -> state == TREC_WAITING) || + (trec -> state == TREC_CONDEMNED)); + result = !((trec -> state) == TREC_CONDEMNED); if (result) { - FOR_EACH_ENTRY(t, e, { + FOR_EACH_ENTRY(trec, e, { StgTVar *s; s = e -> tvar; - if (e -> expected_value != read_current_value_seen_from(et, s)) { - result = FALSE; - BREAK_FOR_EACH; + if (acquire_all || entry_is_update(e)) { + TRACE("%p : trying to acquire %p\n", trec, s); + if (!cond_lock_tvar(trec, s, e -> expected_value)) { + TRACE("%p : failed to acquire %p\n", trec, s); + result = FALSE; + BREAK_FOR_EACH; + } + } else { + ASSERT(use_read_phase); + IF_STM_FG_LOCKS({ + TRACE("%p : will need to check %p\n", trec, s); + if (s -> current_value != e -> expected_value) { + TRACE("%p : doesn't match\n", trec); + result = FALSE; + BREAK_FOR_EACH; + } + e -> num_updates = s -> num_updates; + if (s -> current_value != e -> expected_value) { + TRACE("%p : doesn't match (race)\n", trec); + result = FALSE; + BREAK_FOR_EACH; + } else { + TRACE("%p : need to check version %d\n", trec, e -> num_updates); + } + }); } }); } + + if ((!result) || (!retain_ownership)) { + revert_ownership(trec, acquire_all); + } + return result; } -/************************************************************************/ +// check_read_only : check that we've seen an atomic snapshot of the +// non-updated TVars accessed by a trec. This checks that the last TRec to +// commit an update to the TVar is unchanged since the value was stashed in +// validate_and_acquire_ownership. If no udpate is seen to any TVar than +// all of them contained their expected values at the start of the call to +// check_read_only. +// +// The paper "Concurrent programming without locks" (under submission), or +// Keir Fraser's PhD dissertation "Practical lock-free programming" discuss +// this kind of algorithm. + +static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) { + StgBool result = TRUE; + + ASSERT (use_read_phase); + IF_STM_FG_LOCKS({ + FOR_EACH_ENTRY(trec, e, { + StgTVar *s; + s = e -> tvar; + if (entry_is_read_only(e)) { + TRACE("%p : check_read_only for TVar %p, saw %d\n", trec, s, e -> num_updates); + if (s -> num_updates != e -> num_updates) { + // ||s -> current_value != e -> expected_value) { + TRACE("%p : mismatch\n", trec); + result = FALSE; + BREAK_FOR_EACH; + } + } + }); + }); -/* - * External functions below this point are repsonsible for: - * - * - acquiring/releasing the STM lock - * - * - all updates to the trec status field - * ASSERT(t != NO_TREC); + return result; +} - * By convention we increment entry_count when starting a new - * transaction and we decrement it at the point where we can discard - * the contents of the trec when exiting the outermost transaction. - * This means that stmWait and stmRewait decrement the count whenever - * they return FALSE (they do so exactly once for each transaction - * that doesn't remain blocked forever). - */ /************************************************************************/ void stmPreGCHook() { + nat i; + + lock_stm(NO_TREC); TRACE("stmPreGCHook\n"); - cached_trec_headers = NO_TREC; - cached_trec_chunks = END_STM_CHUNK_LIST; - cached_tvar_wait_queues = END_STM_WAIT_QUEUE; + for (i = 0; i < n_capabilities; i ++) { + Capability *cap = &capabilities[i]; + cap -> free_tvar_wait_queues = END_STM_WAIT_QUEUE; + cap -> free_trec_chunks = END_STM_CHUNK_LIST; + cap -> free_trec_headers = NO_TREC; + } + unlock_stm(NO_TREC); } /************************************************************************/ -void initSTM() { - TRACE("initSTM, NO_TREC=%p\n", NO_TREC); - /* Nothing */ +// check_read_only relies on version numbers held in TVars' "num_updates" +// fields not wrapping around while a transaction is committed. The version +// number is incremented each time an update is committed to the TVar +// This is unlikely to wrap around when 32-bit integers are used for the counts, +// but to ensure correctness we maintain a shared count on the maximum +// number of commit operations that may occur and check that this has +// not increased by more than 2^32 during a commit. + +#define TOKEN_BATCH_SIZE 1024 + +static volatile StgInt64 max_commits = 0; + +static volatile StgBool token_locked = FALSE; + +#if defined(THREADED_RTS) +static void getTokenBatch(Capability *cap) { + while (cas(&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ } + max_commits += TOKEN_BATCH_SIZE; + cap -> transaction_tokens = TOKEN_BATCH_SIZE; + token_locked = FALSE; } +static void getToken(Capability *cap) { + if (cap -> transaction_tokens == 0) { + getTokenBatch(cap); + } + cap -> transaction_tokens --; +} +#else +static void getToken(Capability *cap STG_UNUSED) { + // Nothing +} +#endif + /*......................................................................*/ -StgTRecHeader *stmStartTransaction(StgTRecHeader *outer) { +StgTRecHeader *stmStartTransaction(Capability *cap, + StgTRecHeader *outer) { StgTRecHeader *t; - TRACE("stmStartTransaction current-trec=%p\n", outer); - t = new_stg_trec_header(outer); - TRACE("stmStartTransaction new-trec=%p\n", t); + TRACE("%p : stmStartTransaction with %d tokens\n", + outer, + cap -> transaction_tokens); + + getToken(cap); + + t = alloc_stg_trec_header(cap, outer); + TRACE("%p : stmStartTransaction()=%p\n", outer, t); return t; } /*......................................................................*/ -void stmAbortTransaction(StgTRecHeader *trec) { - TRACE("stmAbortTransaction trec=%p\n", trec); +void stmAbortTransaction(Capability *cap, + StgTRecHeader *trec) { + TRACE("%p : stmAbortTransaction\n", trec); ASSERT (trec != NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || (trec -> state == TREC_WAITING) || - (trec -> state == TREC_CANNOT_COMMIT)); + (trec -> state == TREC_CONDEMNED)); + + lock_stm(trec); if (trec -> state == TREC_WAITING) { ASSERT (trec -> enclosing_trec == NO_TREC); - TRACE("stmAbortTransaction aborting waiting transaction\n"); - stop_tsos_waiting_on_trec(trec); + TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec); + remove_wait_queue_entries_for_trec(cap, trec); } trec -> state = TREC_ABORTED; + unlock_stm(trec); - // Outcome now reflected by status field; no need for log - recycle_closures_from_trec(trec); + free_stg_trec_header(cap, trec); - TRACE("stmAbortTransaction trec=%p done\n", trec); + TRACE("%p : stmAbortTransaction done\n", trec); } /*......................................................................*/ -void stmCondemnTransaction(StgTRecHeader *trec) { - TRACE("stmCondemnTransaction trec=%p\n", trec); +void stmCondemnTransaction(Capability *cap, + StgTRecHeader *trec) { + TRACE("%p : stmCondemnTransaction\n", trec); ASSERT (trec != NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || (trec -> state == TREC_WAITING) || - (trec -> state == TREC_CANNOT_COMMIT)); + (trec -> state == TREC_CONDEMNED)); + lock_stm(trec); if (trec -> state == TREC_WAITING) { ASSERT (trec -> enclosing_trec == NO_TREC); - TRACE("stmCondemnTransaction condemning waiting transaction\n"); - stop_tsos_waiting_on_trec(trec); + TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec); + remove_wait_queue_entries_for_trec(cap, trec); } + trec -> state = TREC_CONDEMNED; + unlock_stm(trec); - trec -> state = TREC_MUST_ABORT; - - TRACE("stmCondemnTransaction trec=%p done\n", trec); + TRACE("%p : stmCondemnTransaction done\n", trec); } /*......................................................................*/ StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) { StgTRecHeader *outer; - TRACE("stmGetEnclosingTRec trec=%p\n", trec); + TRACE("%p : stmGetEnclosingTRec\n", trec); outer = trec -> enclosing_trec; - TRACE("stmGetEnclosingTRec outer=%p\n", outer); + TRACE("%p : stmGetEnclosingTRec()=%p\n", trec, outer); return outer; } /*......................................................................*/ -StgBool stmValidateTransaction(StgTRecHeader *trec) { - int result; - TRACE("stmValidateTransaction trec=%p\n", trec); +StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) { + StgTRecHeader *t; + StgBool result; + + TRACE("%p : stmValidateNestOfTransactions\n", trec); ASSERT(trec != NO_TREC); ASSERT((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || - (trec -> state == TREC_CANNOT_COMMIT) || - (trec -> state == TREC_WAITING)); + (trec -> state == TREC_WAITING) || + (trec -> state == TREC_CONDEMNED)); - lock_stm(); - result = transaction_is_valid(trec); + lock_stm(trec); + + t = trec; + result = TRUE; + while (t != NO_TREC) { + result &= validate_and_acquire_ownership(t, TRUE, FALSE); + t = t -> enclosing_trec; + } if (!result && trec -> state != TREC_WAITING) { - trec -> state = TREC_MUST_ABORT; + trec -> state = TREC_CONDEMNED; } - unlock_stm(); + unlock_stm(trec); - TRACE("stmValidateTransaction trec=%p result=%d\n", trec, result); + TRACE("%p : stmValidateNestOfTransactions()=%d\n", trec, result); return result; } /*......................................................................*/ -StgBool stmCommitTransaction(StgTRecHeader *trec) { - StgTRecHeader *et; +StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { int result; - TRACE("stmCommitTransaction trec=%p trec->enclosing_trec=%p\n", trec, trec->enclosing_trec); + StgInt64 max_commits_at_start = max_commits; + + TRACE("%p : stmCommitTransaction()\n", trec); ASSERT (trec != NO_TREC); + + lock_stm(trec); + + ASSERT (trec -> enclosing_trec == NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || - (trec -> state == TREC_CANNOT_COMMIT)); + (trec -> state == TREC_CONDEMNED)); - lock_stm(); - result = transaction_is_valid(trec); + result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE); if (result) { - et = trec -> enclosing_trec; - if (trec -> state == TREC_CANNOT_COMMIT && et == NO_TREC) { - TRACE("Cannot commit trec=%p at top level\n", trec); - trec -> state = TREC_MUST_ABORT; - result = FALSE; - } else { - if (et == NO_TREC) { - TRACE("Non-nesting commit, NO_TREC=%p\n", NO_TREC); - } else { - TRACE("Nested commit into %p, NO_TREC=%p\n", et, NO_TREC); + // We now know that all the updated locations hold their expected values. + ASSERT (trec -> state == TREC_ACTIVE); + + if (use_read_phase) { + TRACE("%p : doing read check\n", trec); + result = check_read_only(trec); + TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed"); + + StgInt64 max_commits_at_end = max_commits; + StgInt64 max_concurrent_commits; + max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) + + (n_capabilities * TOKEN_BATCH_SIZE)); + if (((max_concurrent_commits >> 32) > 0) || shake()) { + result = FALSE; } + } + + if (result) { + // We now know that all of the read-only locations held their exepcted values + // at the end of the call to validate_and_acquire_ownership. This forms the + // linearization point of the commit. FOR_EACH_ENTRY(trec, e, { StgTVar *s; s = e -> tvar; - if (et == NO_TREC) { - s -> current_value = e -> new_value; - unpark_waiters_on(s); - } else { - merge_update_into(et, s, e -> expected_value, e -> new_value, FALSE); - } + if (e -> new_value != e -> expected_value) { + // Entry is an update: write the value back to the TVar, unlocking it if + // necessary. + + ACQ_ASSERT(tvar_is_locked(s, trec)); + TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s); + unpark_waiters_on(cap,s); + IF_STM_FG_LOCKS({ + s -> num_updates ++; + }); + unlock_tvar(trec, s, e -> new_value, TRUE); + } + ACQ_ASSERT(!tvar_is_locked(s, trec)); }); - - - if (trec->state == TREC_CANNOT_COMMIT && et -> state == TREC_ACTIVE) { - TRACE("Propagating TREC_CANNOT_COMMIT into %p\n", et); - et -> state = TREC_CANNOT_COMMIT; - } + } else { + revert_ownership(trec, FALSE); } } - // Outcome now reflected by status field; no need for log - recycle_closures_from_trec(trec); - - unlock_stm(); + unlock_stm(trec); + + free_stg_trec_header(cap, trec); - TRACE("stmCommitTransaction trec=%p result=%d\n", trec, result); + TRACE("%p : stmCommitTransaction()=%d\n", trec, result); return result; } /*......................................................................*/ -StgBool stmMergeForWaiting(StgTRecHeader *trec, StgTRecHeader *other) { +StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) { + StgTRecHeader *et; int result; - TRACE("stmMergeForWaiting trec=%p (%d) other=%p (%d)\n", trec, trec -> state, other, other->state); - ASSERT(trec != NO_TREC); - ASSERT(other != NO_TREC); - ASSERT((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || - (trec -> state == TREC_CANNOT_COMMIT)); - ASSERT((other -> state == TREC_ACTIVE) || - (other -> state == TREC_MUST_ABORT) || - (other -> state == TREC_CANNOT_COMMIT)); - - lock_stm(); - result = (transaction_is_valid(trec)); - TRACE("stmMergeForWaiting initial result=%d\n", result); + ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC); + TRACE("%p : stmCommitNestedTransaction() into %p\n", trec, trec -> enclosing_trec); + ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED)); + + lock_stm(trec); + + et = trec -> enclosing_trec; + result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE); if (result) { - result = transaction_is_valid(other); - TRACE("stmMergeForWaiting after both result=%d\n", result); + // We now know that all the updated locations hold their expected values. + + if (use_read_phase) { + TRACE("%p : doing read check\n", trec); + result = check_read_only(trec); + } if (result) { - // Individually the two transactions may be valid. Now copy entries from - // "other" into "trec". This may cause "trec" to become invalid if it - // contains an update that conflicts with one from "other" - FOR_EACH_ENTRY(other, e, { - StgTVar *s = e -> tvar; - TRACE("Merging trec=%p exp=%p new=%p\n", other, e->expected_value, e->new_value); - merge_update_into(trec, s, e-> expected_value, e -> new_value, TRUE); - }); - result = (trec -> state != TREC_MUST_ABORT); - } - } + // We now know that all of the read-only locations held their exepcted values + // at the end of the call to validate_and_acquire_ownership. This forms the + // linearization point of the commit. + + if (result) { + TRACE("%p : read-check succeeded\n", trec); + FOR_EACH_ENTRY(trec, e, { + // Merge each entry into the enclosing transaction record, release all + // locks. + + StgTVar *s; + s = e -> tvar; + if (entry_is_update(e)) { + unlock_tvar(trec, s, e -> expected_value, FALSE); + } + merge_update_into(cap, et, s, e -> expected_value, e -> new_value); + ACQ_ASSERT(s -> current_value != trec); + }); + } else { + revert_ownership(trec, FALSE); + } + } + } - if (!result) { - trec -> state = TREC_MUST_ABORT; - } + unlock_stm(trec); - unlock_stm(); + free_stg_trec_header(cap, trec); + + TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result); - TRACE("stmMergeForWaiting result=%d\n", result); return result; } /*......................................................................*/ -StgBool stmWait(StgTSO *tso, StgTRecHeader *trec) { +StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) { int result; - TRACE("stmWait tso=%p trec=%p\n", tso, trec); + TRACE("%p : stmWait(%p)\n", trec, tso); ASSERT (trec != NO_TREC); ASSERT (trec -> enclosing_trec == NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || - (trec -> state == TREC_MUST_ABORT) || - (trec -> state == TREC_CANNOT_COMMIT)); + (trec -> state == TREC_CONDEMNED)); - lock_stm(); - result = transaction_is_valid(trec); + lock_stm(trec); + result = validate_and_acquire_ownership(trec, TRUE, TRUE); if (result) { // The transaction is valid so far so we can actually start waiting. // (Otherwise the transaction was not valid and the thread will have to // retry it). - start_tso_waiting_on_trec(tso, trec); + + // Put ourselves to sleep. We retain locks on all the TVars involved + // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM + // in the TSO, (c) TREC_WAITING in the Trec. + build_wait_queue_entries_for_trec(cap, tso, trec); park_tso(tso); trec -> state = TREC_WAITING; - } else { - // Outcome now reflected by status field; no need for log - recycle_closures_from_trec(trec); + + // We haven't released ownership of the transaction yet. The TSO + // has been put on the wait queue for the TVars it is waiting for, + // but we haven't yet tidied up the TSO's stack and made it safe + // to wake up the TSO. Therefore, we must wait until the TSO is + // safe to wake up before we release ownership - when all is well, + // the runtime will call stmWaitUnlock() below, with the same + // TRec. + + } else { + unlock_stm(trec); + free_stg_trec_header(cap, trec); } - unlock_stm(); - TRACE("stmWait trec=%p result=%d\n", trec, result); + TRACE("%p : stmWait(%p)=%d\n", trec, tso, result); return result; } + +void +stmWaitUnlock(Capability *cap STG_UNUSED, StgTRecHeader *trec) { + revert_ownership(trec, TRUE); + unlock_stm(trec); +} + /*......................................................................*/ -StgBool stmReWait(StgTSO *tso) { +StgBool stmReWait(Capability *cap, StgTSO *tso) { int result; StgTRecHeader *trec = tso->trec; - TRACE("stmReWait trec=%p\n", trec); + TRACE("%p : stmReWait\n", trec); ASSERT (trec != NO_TREC); ASSERT (trec -> enclosing_trec == NO_TREC); ASSERT ((trec -> state == TREC_WAITING) || - (trec -> state == TREC_MUST_ABORT)); + (trec -> state == TREC_CONDEMNED)); - lock_stm(); - result = transaction_is_valid(trec); - TRACE("stmReWait trec=%p result=%d\n", trec, result); + lock_stm(trec); + result = validate_and_acquire_ownership(trec, TRUE, TRUE); + TRACE("%p : validation %s\n", trec, result ? "succeeded" : "failed"); if (result) { // The transaction remains valid -- do nothing because it is already on // the wait queues ASSERT (trec -> state == TREC_WAITING); park_tso(tso); + revert_ownership(trec, TRUE); } else { // The transcation has become invalid. We can now remove it from the wait // queues. - if (trec -> state != TREC_MUST_ABORT) { - stop_tsos_waiting_on_trec (trec); - - // Outcome now reflected by status field; no need for log - recycle_closures_from_trec(trec); + if (trec -> state != TREC_CONDEMNED) { + remove_wait_queue_entries_for_trec (cap, trec); } - + free_stg_trec_header(cap, trec); } - unlock_stm(); + unlock_stm(trec); - TRACE("stmReWait trec=%p result=%d\n", trec, result); + TRACE("%p : stmReWait()=%d\n", trec, result); return result; } /*......................................................................*/ -StgClosure *stmReadTVar(StgTRecHeader *trec, - StgTVar *tvar) { - StgTRecHeader *et; - StgClosure *result = NULL; // Suppress unassignment warning - int found = FALSE; - TRecEntry *ne = NULL; - - TRACE("stmReadTVar trec=%p tvar=%p\n", trec, tvar); - ASSERT (trec != NO_TREC); - ASSERT (trec -> state == TREC_ACTIVE || - trec -> state == TREC_MUST_ABORT || - trec -> state == TREC_CANNOT_COMMIT); +static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) { + TRecEntry *result = NULL; - lock_stm(); - found = FALSE; + TRACE("%p : get_entry_for TVar %p\n", trec, tvar); + ASSERT(trec != NO_TREC); - // Look for an existing entry in our trec or in an enclosing trec - et = trec; - while (et != NO_TREC) { - FOR_EACH_ENTRY(et, e, { - TRACE("testing e=%p\n", e); + do { + FOR_EACH_ENTRY(trec, e, { if (e -> tvar == tvar) { - found = TRUE; - result = e -> new_value; + result = e; + if (in != NULL) { + *in = trec; + } BREAK_FOR_EACH; } }); - if (found) break; - et = et -> enclosing_trec; - } + trec = trec -> enclosing_trec; + } while (result == NULL && trec != NO_TREC); - if (found && et != trec) { - // Entry found in another trec - ASSERT (result != NULL); - TRACE("duplicating entry\n"); - ne = get_new_entry(trec); - ne -> tvar = tvar; - ne -> expected_value = result; - ne -> new_value = result; - } else if (!found) { - // No entry found - ASSERT (result == NULL); - TRACE("need new entry\n"); - ne = get_new_entry(trec); - TRACE("got ne=%p\n", ne); + return result; +} + +static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) { + StgClosure *result; + result = tvar -> current_value; + +#if defined(STM_FG_LOCKS) + while (GET_INFO(result) == &stg_TREC_HEADER_info) { + TRACE("%p : read_current_value(%p) saw %p\n", trec, tvar, result); result = tvar -> current_value; - ne -> tvar = tvar; - ne -> expected_value = result; - ne -> new_value = result; } +#endif + + TRACE("%p : read_current_value(%p)=%p\n", trec, tvar, result); + return result; +} + +/*......................................................................*/ - unlock_stm(); - ASSERT (result != NULL); - TRACE("stmReadTVar trec=%p result=%p\n", trec, result); +StgClosure *stmReadTVar(Capability *cap, + StgTRecHeader *trec, + StgTVar *tvar) { + StgTRecHeader *entry_in; + StgClosure *result = NULL; + TRecEntry *entry = NULL; + TRACE("%p : stmReadTVar(%p)\n", trec, tvar); + ASSERT (trec != NO_TREC); + ASSERT (trec -> state == TREC_ACTIVE || + trec -> state == TREC_CONDEMNED); + entry = get_entry_for(trec, tvar, &entry_in); + + if (entry != NULL) { + if (entry_in == trec) { + // Entry found in our trec + result = entry -> new_value; + } else { + // Entry found in another trec + TRecEntry *new_entry = get_new_entry(cap, trec); + new_entry -> tvar = tvar; + new_entry -> expected_value = entry -> expected_value; + new_entry -> new_value = entry -> new_value; + result = new_entry -> new_value; + } + } else { + // No entry found + StgClosure *current_value = read_current_value(trec, tvar); + TRecEntry *new_entry = get_new_entry(cap, trec); + new_entry -> tvar = tvar; + new_entry -> expected_value = current_value; + new_entry -> new_value = current_value; + result = current_value; + } + + TRACE("%p : stmReadTVar(%p)=%p\n", trec, tvar, result); return result; } /*......................................................................*/ -void stmWriteTVar(StgTRecHeader *trec, +void stmWriteTVar(Capability *cap, + StgTRecHeader *trec, StgTVar *tvar, StgClosure *new_value) { - StgTRecHeader *et; - TRecEntry *ne; + + StgTRecHeader *entry_in; TRecEntry *entry = NULL; - int found; - TRACE("stmWriteTVar trec=%p tvar=%p new_value=%p\n", trec, tvar, new_value); + TRACE("%p : stmWriteTVar(%p, %p)\n", trec, tvar, new_value); ASSERT (trec != NO_TREC); ASSERT (trec -> state == TREC_ACTIVE || - trec -> state == TREC_MUST_ABORT || - trec -> state == TREC_CANNOT_COMMIT); - - lock_stm(); - found = FALSE; + trec -> state == TREC_CONDEMNED); - // Look for an existing entry in our trec or in an enclosing trec - et = trec; - while (et != NO_TREC) { - FOR_EACH_ENTRY(et, e, { - if (e -> tvar == tvar) { - found = TRUE; - entry = e; - BREAK_FOR_EACH; - } - }); - if (found) break; - et = et -> enclosing_trec; - } + entry = get_entry_for(trec, tvar, &entry_in); - if (found && et == trec) { - // Entry found in our trec - entry -> new_value = new_value; - } else if (found) { - // Entry found in another trec - ne = get_new_entry(trec); - ne -> tvar = tvar; - ne -> expected_value = entry -> new_value; - ne -> new_value = new_value; + if (entry != NULL) { + if (entry_in == trec) { + // Entry found in our trec + entry -> new_value = new_value; + } else { + // Entry found in another trec + TRecEntry *new_entry = get_new_entry(cap, trec); + new_entry -> tvar = tvar; + new_entry -> expected_value = entry -> expected_value; + new_entry -> new_value = new_value; + } } else { // No entry found - ne = get_new_entry(trec); - ne -> tvar = tvar; - ne -> expected_value = tvar -> current_value; - ne -> new_value = new_value; + StgClosure *current_value = read_current_value(trec, tvar); + TRecEntry *new_entry = get_new_entry(cap, trec); + new_entry -> tvar = tvar; + new_entry -> expected_value = current_value; + new_entry -> new_value = new_value; } - unlock_stm(); - TRACE("stmWriteTVar trec=%p done\n", trec); + TRACE("%p : stmWriteTVar done\n", trec); } - /*......................................................................*/ +StgTVar *stmNewTVar(Capability *cap, + StgClosure *new_value) { + StgTVar *result; + result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar)); + SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM); + result -> current_value = new_value; + result -> first_wait_queue_entry = END_STM_WAIT_QUEUE; +#if defined(THREADED_RTS) + result -> num_updates = 0; +#endif + return result; +} + +/*......................................................................*/