X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSTM.c;h=d3283a92f04fbf9f89437aaa3443d6794c475232;hb=3eacdc7faf0d0e87a7201253f9f12c1fb4db7249;hp=2be5c696b4b3e6634201e6be7cb4ccf420754686;hpb=55472d778c441b65e013d27f5228283eef85986c;p=ghc-hetmet.git diff --git a/ghc/rts/STM.c b/ghc/rts/STM.c index 2be5c69..d3283a9 100644 --- a/ghc/rts/STM.c +++ b/ghc/rts/STM.c @@ -1,5 +1,4 @@ /* ----------------------------------------------------------------------------- - * * (c) The GHC Team 1998-2005 * * STM implementation. @@ -29,7 +28,7 @@ * in STM.h: * * STM_UNIPROC assumes that the caller serialises invocations on the STM interface. - * In the Haskell RTS this means it is suitable only for non-SMP builds. + * In the Haskell RTS this means it is suitable only for non-THREADED_RTS builds. * * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during * an invocation on the STM interface. Note that this does not mean that @@ -98,8 +97,8 @@ #define TRUE 1 #define FALSE 0 -// ACQ_ASSERT is used for assertions which are only required for SMP builds with -// fine-grained locking. +// ACQ_ASSERT is used for assertions which are only required for +// THREADED_RTS builds with fine-grained locking. #if defined(STM_FG_LOCKS) #define ACQ_ASSERT(_X) ASSERT(_X) @@ -116,7 +115,11 @@ #if defined(DEBUG) #define SHAKE +#if defined(THREADED_RTS) +#define TRACE(_x...) IF_DEBUG(stm, debugBelch("STM (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()); debugBelch ( _x )) +#else #define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x )) +#endif #else #define TRACE(_x...) /*Nothing*/ #endif @@ -169,7 +172,20 @@ static int shake(void) { /*......................................................................*/ +// if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks, +// and wait queue entries without GC + +#define REUSE_MEMORY + +/*......................................................................*/ + +#define IF_STM_UNIPROC(__X) do { } while (0) +#define IF_STM_CG_LOCK(__X) do { } while (0) +#define IF_STM_FG_LOCKS(__X) do { } while (0) + #if defined(STM_UNIPROC) +#undef IF_STM_UNIPROC +#define IF_STM_UNIPROC(__X) do { __X } while (0) static const StgBool use_read_phase = FALSE; static void lock_stm(StgTRecHeader *trec STG_UNUSED) { @@ -204,13 +220,15 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, StgClosure *result; TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected); result = s -> current_value; - TRACE("%p : %d\n", (result == expected) ? "success" : "failure"); + TRACE("%p : %s\n", trec, (result == expected) ? "success" : "failure"); return (result == expected); } #endif #if defined(STM_CG_LOCK) /*........................................*/ +#undef IF_STM_CG_LOCK +#define IF_STM_CG_LOCK(__X) do { __X } while (0) static const StgBool use_read_phase = FALSE; static volatile StgTRecHeader *smp_locked = NULL; @@ -259,6 +277,8 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, #if defined(STM_FG_LOCKS) /*...................................*/ +#undef IF_STM_FG_LOCKS +#define IF_STM_FG_LOCKS(__X) do { __X } while (0) static const StgBool use_read_phase = TRUE; static void lock_stm(StgTRecHeader *trec STG_UNUSED) { @@ -306,69 +326,64 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec, // Helper functions for thread blocking and unblocking static void park_tso(StgTSO *tso) { - ACQUIRE_LOCK(&sched_mutex); ASSERT(tso -> why_blocked == NotBlocked); tso -> why_blocked = BlockedOnSTM; tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE; - RELEASE_LOCK(&sched_mutex); TRACE("park_tso on tso=%p\n", tso); } -static void unpark_tso(StgTSO *tso) { +static void unpark_tso(Capability *cap, StgTSO *tso) { // We will continue unparking threads while they remain on one of the wait // queues: it's up to the thread itself to remove it from the wait queues // if it decides to do so when it is scheduled. if (tso -> why_blocked == BlockedOnSTM) { TRACE("unpark_tso on tso=%p\n", tso); - ACQUIRE_LOCK(&sched_mutex); - tso -> why_blocked = NotBlocked; - PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); + unblockOne(cap,tso); } else { TRACE("spurious unpark_tso on tso=%p\n", tso); } } -static void unpark_waiters_on(StgTVar *s) { +static void unpark_waiters_on(Capability *cap, StgTVar *s) { StgTVarWaitQueue *q; TRACE("unpark_waiters_on tvar=%p\n", s); for (q = s -> first_wait_queue_entry; q != END_STM_WAIT_QUEUE; q = q -> next_queue_entry) { - unpark_tso(q -> waiting_tso); + unpark_tso(cap, q -> waiting_tso); } } /*......................................................................*/ -// Helper functions for allocation and initialization +// Helper functions for downstream allocation and initialization -static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgRegTable *reg, +static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap, StgTSO *waiting_tso) { StgTVarWaitQueue *result; - result = (StgTVarWaitQueue *)allocateLocal(reg, sizeofW(StgTVarWaitQueue)); + result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue)); SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM); result -> waiting_tso = waiting_tso; return result; } -static StgTRecChunk *new_stg_trec_chunk(StgRegTable *reg) { +static StgTRecChunk *new_stg_trec_chunk(Capability *cap) { StgTRecChunk *result; - result = (StgTRecChunk *)allocateLocal(reg, sizeofW(StgTRecChunk)); + result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk)); SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM); result -> prev_chunk = END_STM_CHUNK_LIST; result -> next_entry_idx = 0; return result; } -static StgTRecHeader *new_stg_trec_header(StgRegTable *reg, +static StgTRecHeader *new_stg_trec_header(Capability *cap, StgTRecHeader *enclosing_trec) { StgTRecHeader *result; - result = (StgTRecHeader *) allocateLocal(reg, sizeofW(StgTRecHeader)); + result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader)); SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM); result -> enclosing_trec = enclosing_trec; - result -> current_chunk = new_stg_trec_chunk(reg); + result -> current_chunk = new_stg_trec_chunk(cap); if (enclosing_trec == NO_TREC) { result -> state = TREC_ACTIVE; @@ -383,9 +398,92 @@ static StgTRecHeader *new_stg_trec_header(StgRegTable *reg, /*......................................................................*/ +// Allocation / deallocation functions that retain per-capability lists +// of closures that can be re-used + +static StgTVarWaitQueue *alloc_stg_tvar_wait_queue(Capability *cap, + StgTSO *waiting_tso) { + StgTVarWaitQueue *result = NULL; + if (cap -> free_tvar_wait_queues == END_STM_WAIT_QUEUE) { + result = new_stg_tvar_wait_queue(cap, waiting_tso); + } else { + result = cap -> free_tvar_wait_queues; + result -> waiting_tso = waiting_tso; + cap -> free_tvar_wait_queues = result -> next_queue_entry; + } + return result; +} + +static void free_stg_tvar_wait_queue(Capability *cap, + StgTVarWaitQueue *wq) { +#if defined(REUSE_MEMORY) + wq -> next_queue_entry = cap -> free_tvar_wait_queues; + cap -> free_tvar_wait_queues = wq; +#endif +} + +static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) { + StgTRecChunk *result = NULL; + if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) { + result = new_stg_trec_chunk(cap); + } else { + result = cap -> free_trec_chunks; + cap -> free_trec_chunks = result -> prev_chunk; + result -> prev_chunk = END_STM_CHUNK_LIST; + result -> next_entry_idx = 0; + } + return result; +} + +static void free_stg_trec_chunk(Capability *cap, + StgTRecChunk *c) { +#if defined(REUSE_MEMORY) + c -> prev_chunk = cap -> free_trec_chunks; + cap -> free_trec_chunks = c; +#endif +} + +static StgTRecHeader *alloc_stg_trec_header(Capability *cap, + StgTRecHeader *enclosing_trec) { + StgTRecHeader *result = NULL; + if (cap -> free_trec_headers == NO_TREC) { + result = new_stg_trec_header(cap, enclosing_trec); + } else { + result = cap -> free_trec_headers; + cap -> free_trec_headers = result -> enclosing_trec; + result -> enclosing_trec = enclosing_trec; + result -> current_chunk -> next_entry_idx = 0; + if (enclosing_trec == NO_TREC) { + result -> state = TREC_ACTIVE; + } else { + ASSERT(enclosing_trec -> state == TREC_ACTIVE || + enclosing_trec -> state == TREC_CONDEMNED); + result -> state = enclosing_trec -> state; + } + } + return result; +} + +static void free_stg_trec_header(Capability *cap, + StgTRecHeader *trec) { +#if defined(REUSE_MEMORY) + StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk; + while (chunk != END_STM_CHUNK_LIST) { + StgTRecChunk *prev_chunk = chunk -> prev_chunk; + free_stg_trec_chunk(cap, chunk); + chunk = prev_chunk; + } + trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST; + trec -> enclosing_trec = cap -> free_trec_headers; + cap -> free_trec_headers = trec; +#endif +} + +/*......................................................................*/ + // Helper functions for managing waiting lists -static void build_wait_queue_entries_for_trec(StgRegTable *reg, +static void build_wait_queue_entries_for_trec(Capability *cap, StgTSO *tso, StgTRecHeader *trec) { ASSERT(trec != NO_TREC); @@ -403,7 +501,7 @@ static void build_wait_queue_entries_for_trec(StgRegTable *reg, ACQ_ASSERT(s -> current_value == trec); NACQ_ASSERT(s -> current_value == e -> expected_value); fq = s -> first_wait_queue_entry; - q = new_stg_tvar_wait_queue(reg, tso); + q = alloc_stg_tvar_wait_queue(cap, tso); q -> next_queue_entry = fq; q -> prev_queue_entry = END_STM_WAIT_QUEUE; if (fq != END_STM_WAIT_QUEUE) { @@ -414,7 +512,8 @@ static void build_wait_queue_entries_for_trec(StgRegTable *reg, }); } -static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) { +static void remove_wait_queue_entries_for_trec(Capability *cap, + StgTRecHeader *trec) { ASSERT(trec != NO_TREC); ASSERT(trec -> enclosing_trec == NO_TREC); ASSERT(trec -> state == TREC_WAITING || @@ -443,13 +542,14 @@ static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) { ASSERT (s -> first_wait_queue_entry == q); s -> first_wait_queue_entry = nq; } + free_stg_tvar_wait_queue(cap, q); unlock_tvar(trec, s, saw, FALSE); }); } /*......................................................................*/ -static TRecEntry *get_new_entry(StgRegTable *reg, +static TRecEntry *get_new_entry(Capability *cap, StgTRecHeader *t) { TRecEntry *result; StgTRecChunk *c; @@ -466,7 +566,7 @@ static TRecEntry *get_new_entry(StgRegTable *reg, } else { // Current chunk is full: allocate a fresh one StgTRecChunk *nc; - nc = new_stg_trec_chunk(reg); + nc = alloc_stg_trec_chunk(cap); nc -> prev_chunk = c; nc -> next_entry_idx = 1; t -> current_chunk = nc; @@ -478,7 +578,7 @@ static TRecEntry *get_new_entry(StgRegTable *reg, /*......................................................................*/ -static void merge_update_into(StgRegTable *reg, +static void merge_update_into(Capability *cap, StgTRecHeader *t, StgTVar *tvar, StgClosure *expected_value, @@ -506,7 +606,7 @@ static void merge_update_into(StgRegTable *reg, if (!found) { // No entry so far in this trec TRecEntry *ne; - ne = get_new_entry(reg, t); + ne = get_new_entry(cap, t); ne -> tvar = tvar; ne -> expected_value = expected_value; ne -> new_value = new_value; @@ -521,6 +621,7 @@ static StgBool entry_is_update(TRecEntry *e) { return result; } +#if defined(STM_FG_LOCKS) static StgBool entry_is_read_only(TRecEntry *e) { StgBool result; result = (e -> expected_value == e -> new_value); @@ -534,6 +635,7 @@ static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) { result = (c == (StgClosure *) h); return result; } +#endif // revert_ownership : release a lock on a TVar, storing back // the value that it held when the lock was acquired. "revert_all" @@ -597,20 +699,23 @@ static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, BREAK_FOR_EACH; } } else { - TRACE("%p : will need to check %p\n", trec, s); - if (s -> current_value != e -> expected_value) { - TRACE("%p : doesn't match\n", trec); - result = FALSE; - BREAK_FOR_EACH; - } - e -> saw_update_by = s -> last_update_by; - if (s -> current_value != e -> expected_value) { - TRACE("%p : doesn't match (race)\n", trec); - result = FALSE; - BREAK_FOR_EACH; - } else { - TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by); - } + ASSERT(use_read_phase); + IF_STM_FG_LOCKS({ + TRACE("%p : will need to check %p\n", trec, s); + if (s -> current_value != e -> expected_value) { + TRACE("%p : doesn't match\n", trec); + result = FALSE; + BREAK_FOR_EACH; + } + e -> num_updates = s -> num_updates; + if (s -> current_value != e -> expected_value) { + TRACE("%p : doesn't match (race)\n", trec); + result = FALSE; + BREAK_FOR_EACH; + } else { + TRACE("%p : need to check version %d\n", trec, e -> num_updates); + } + }); } }); } @@ -633,21 +738,24 @@ static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss // this kind of algorithm. -static StgBool check_read_only(StgTRecHeader *trec) { +static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) { StgBool result = TRUE; - FOR_EACH_ENTRY(trec, e, { - StgTVar *s; - s = e -> tvar; - if (entry_is_read_only(e)) { - TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by); - if (s -> last_update_by != e -> saw_update_by) { - // ||s -> current_value != e -> expected_value) { - TRACE("%p : mismatch\n", trec); - result = FALSE; - BREAK_FOR_EACH; + ASSERT (use_read_phase); + IF_STM_FG_LOCKS({ + FOR_EACH_ENTRY(trec, e, { + StgTVar *s; + s = e -> tvar; + if (entry_is_read_only(e)) { + TRACE("%p : check_read_only for TVar %p, saw %d\n", trec, s, e -> num_updates); + if (s -> num_updates != e -> num_updates) { + // ||s -> current_value != e -> expected_value) { + TRACE("%p : mismatch\n", trec); + result = FALSE; + BREAK_FOR_EACH; + } } - } + }); }); return result; @@ -657,31 +765,75 @@ static StgBool check_read_only(StgTRecHeader *trec) { /************************************************************************/ void stmPreGCHook() { + nat i; + lock_stm(NO_TREC); TRACE("stmPreGCHook\n"); + for (i = 0; i < n_capabilities; i ++) { + Capability *cap = &capabilities[i]; + cap -> free_tvar_wait_queues = END_STM_WAIT_QUEUE; + cap -> free_trec_chunks = END_STM_CHUNK_LIST; + cap -> free_trec_headers = NO_TREC; + } unlock_stm(NO_TREC); } /************************************************************************/ -void initSTM() { - TRACE("initSTM, NO_TREC=%p\n", NO_TREC); +// check_read_only relies on version numbers held in TVars' "num_updates" +// fields not wrapping around while a transaction is committed. The version +// number is incremented each time an update is committed to the TVar +// This is unlikely to wrap around when 32-bit integers are used for the counts, +// but to ensure correctness we maintain a shared count on the maximum +// number of commit operations that may occur and check that this has +// not increased by more than 2^32 during a commit. + +#define TOKEN_BATCH_SIZE 1024 + +static volatile StgInt64 max_commits = 0; + +static volatile StgBool token_locked = FALSE; + +#if defined(THREADED_RTS) +static void getTokenBatch(Capability *cap) { + while (cas(&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ } + max_commits += TOKEN_BATCH_SIZE; + cap -> transaction_tokens = TOKEN_BATCH_SIZE; + token_locked = FALSE; } +static void getToken(Capability *cap) { + if (cap -> transaction_tokens == 0) { + getTokenBatch(cap); + } + cap -> transaction_tokens --; +} +#else +static void getToken(Capability *cap STG_UNUSED) { + // Nothing +} +#endif + /*......................................................................*/ -StgTRecHeader *stmStartTransaction(StgRegTable *reg, +StgTRecHeader *stmStartTransaction(Capability *cap, StgTRecHeader *outer) { StgTRecHeader *t; - TRACE("%p : stmStartTransaction\n", outer); - t = new_stg_trec_header(reg, outer); + TRACE("%p : stmStartTransaction with %d tokens\n", + outer, + cap -> transaction_tokens); + + getToken(cap); + + t = alloc_stg_trec_header(cap, outer); TRACE("%p : stmStartTransaction()=%p\n", outer, t); return t; } /*......................................................................*/ -void stmAbortTransaction(StgTRecHeader *trec) { +void stmAbortTransaction(Capability *cap, + StgTRecHeader *trec) { TRACE("%p : stmAbortTransaction\n", trec); ASSERT (trec != NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || @@ -692,17 +844,20 @@ void stmAbortTransaction(StgTRecHeader *trec) { if (trec -> state == TREC_WAITING) { ASSERT (trec -> enclosing_trec == NO_TREC); TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec); - remove_wait_queue_entries_for_trec(trec); + remove_wait_queue_entries_for_trec(cap, trec); } trec -> state = TREC_ABORTED; unlock_stm(trec); + free_stg_trec_header(cap, trec); + TRACE("%p : stmAbortTransaction done\n", trec); } /*......................................................................*/ -void stmCondemnTransaction(StgTRecHeader *trec) { +void stmCondemnTransaction(Capability *cap, + StgTRecHeader *trec) { TRACE("%p : stmCondemnTransaction\n", trec); ASSERT (trec != NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || @@ -713,7 +868,7 @@ void stmCondemnTransaction(StgTRecHeader *trec) { if (trec -> state == TREC_WAITING) { ASSERT (trec -> enclosing_trec == NO_TREC); TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec); - remove_wait_queue_entries_for_trec(trec); + remove_wait_queue_entries_for_trec(cap, trec); } trec -> state = TREC_CONDEMNED; unlock_stm(trec); @@ -764,15 +919,19 @@ StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) { /*......................................................................*/ -StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) { +StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { int result; + StgInt64 max_commits_at_start = max_commits; + TRACE("%p : stmCommitTransaction()\n", trec); ASSERT (trec != NO_TREC); + + lock_stm(trec); + ASSERT (trec -> enclosing_trec == NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED)); - lock_stm(trec); result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE); if (result) { // We now know that all the updated locations hold their expected values. @@ -781,6 +940,15 @@ StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) { if (use_read_phase) { TRACE("%p : doing read check\n", trec); result = check_read_only(trec); + TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed"); + + StgInt64 max_commits_at_end = max_commits; + StgInt64 max_concurrent_commits; + max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) + + (n_capabilities * TOKEN_BATCH_SIZE)); + if (((max_concurrent_commits >> 32) > 0) || shake()) { + result = FALSE; + } } if (result) { @@ -788,7 +956,6 @@ StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) { // at the end of the call to validate_and_acquire_ownership. This forms the // linearization point of the commit. - TRACE("%p : read-check succeeded\n", trec); FOR_EACH_ENTRY(trec, e, { StgTVar *s; s = e -> tvar; @@ -798,8 +965,10 @@ StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) { ACQ_ASSERT(tvar_is_locked(s, trec)); TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s); - unpark_waiters_on(s); - s -> last_update_by = trec; + unpark_waiters_on(cap,s); + IF_STM_FG_LOCKS({ + s -> num_updates ++; + }); unlock_tvar(trec, s, e -> new_value, TRUE); } ACQ_ASSERT(!tvar_is_locked(s, trec)); @@ -811,6 +980,8 @@ StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) { unlock_stm(trec); + free_stg_trec_header(cap, trec); + TRACE("%p : stmCommitTransaction()=%d\n", trec, result); return result; @@ -818,7 +989,7 @@ StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) { /*......................................................................*/ -StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) { +StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) { StgTRecHeader *et; int result; ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC); @@ -828,7 +999,7 @@ StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) { lock_stm(trec); et = trec -> enclosing_trec; - result = validate_and_acquire_ownership(trec, FALSE, TRUE); + result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE); if (result) { // We now know that all the updated locations hold their expected values. @@ -852,7 +1023,7 @@ StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) { if (entry_is_update(e)) { unlock_tvar(trec, s, e -> expected_value, FALSE); } - merge_update_into(reg, et, s, e -> expected_value, e -> new_value); + merge_update_into(cap, et, s, e -> expected_value, e -> new_value); ACQ_ASSERT(s -> current_value != trec); }); } else { @@ -863,6 +1034,8 @@ StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) { unlock_stm(trec); + free_stg_trec_header(cap, trec); + TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result); return result; @@ -870,7 +1043,7 @@ StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) { /*......................................................................*/ -StgBool stmWait(StgRegTable *reg, StgTSO *tso, StgTRecHeader *trec) { +StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) { int result; TRACE("%p : stmWait(%p)\n", trec, tso); ASSERT (trec != NO_TREC); @@ -888,25 +1061,37 @@ StgBool stmWait(StgRegTable *reg, StgTSO *tso, StgTRecHeader *trec) { // Put ourselves to sleep. We retain locks on all the TVars involved // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM // in the TSO, (c) TREC_WAITING in the Trec. - build_wait_queue_entries_for_trec(reg, tso, trec); + build_wait_queue_entries_for_trec(cap, tso, trec); park_tso(tso); trec -> state = TREC_WAITING; - // As soon as we start releasing ownership, another thread may find us - // and wake us up. This may happen even before we have finished - // releasing ownership. - revert_ownership(trec, TRUE); - } + // We haven't released ownership of the transaction yet. The TSO + // has been put on the wait queue for the TVars it is waiting for, + // but we haven't yet tidied up the TSO's stack and made it safe + // to wake up the TSO. Therefore, we must wait until the TSO is + // safe to wake up before we release ownership - when all is well, + // the runtime will call stmWaitUnlock() below, with the same + // TRec. - unlock_stm(trec); + } else { + unlock_stm(trec); + free_stg_trec_header(cap, trec); + } TRACE("%p : stmWait(%p)=%d\n", trec, tso, result); return result; } + +void +stmWaitUnlock(Capability *cap STG_UNUSED, StgTRecHeader *trec) { + revert_ownership(trec, TRUE); + unlock_stm(trec); +} + /*......................................................................*/ -StgBool stmReWait(StgTSO *tso) { +StgBool stmReWait(Capability *cap, StgTSO *tso) { int result; StgTRecHeader *trec = tso->trec; @@ -929,9 +1114,9 @@ StgBool stmReWait(StgTSO *tso) { // The transcation has become invalid. We can now remove it from the wait // queues. if (trec -> state != TREC_CONDEMNED) { - remove_wait_queue_entries_for_trec (trec); + remove_wait_queue_entries_for_trec (cap, trec); } - + free_stg_trec_header(cap, trec); } unlock_stm(trec); @@ -980,7 +1165,7 @@ static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *t /*......................................................................*/ -StgClosure *stmReadTVar(StgRegTable *reg, +StgClosure *stmReadTVar(Capability *cap, StgTRecHeader *trec, StgTVar *tvar) { StgTRecHeader *entry_in; @@ -999,7 +1184,7 @@ StgClosure *stmReadTVar(StgRegTable *reg, result = entry -> new_value; } else { // Entry found in another trec - TRecEntry *new_entry = get_new_entry(reg, trec); + TRecEntry *new_entry = get_new_entry(cap, trec); new_entry -> tvar = tvar; new_entry -> expected_value = entry -> expected_value; new_entry -> new_value = entry -> new_value; @@ -1008,7 +1193,7 @@ StgClosure *stmReadTVar(StgRegTable *reg, } else { // No entry found StgClosure *current_value = read_current_value(trec, tvar); - TRecEntry *new_entry = get_new_entry(reg, trec); + TRecEntry *new_entry = get_new_entry(cap, trec); new_entry -> tvar = tvar; new_entry -> expected_value = current_value; new_entry -> new_value = current_value; @@ -1021,7 +1206,7 @@ StgClosure *stmReadTVar(StgRegTable *reg, /*......................................................................*/ -void stmWriteTVar(StgRegTable *reg, +void stmWriteTVar(Capability *cap, StgTRecHeader *trec, StgTVar *tvar, StgClosure *new_value) { @@ -1041,7 +1226,7 @@ void stmWriteTVar(StgRegTable *reg, entry -> new_value = new_value; } else { // Entry found in another trec - TRecEntry *new_entry = get_new_entry(reg, trec); + TRecEntry *new_entry = get_new_entry(cap, trec); new_entry -> tvar = tvar; new_entry -> expected_value = entry -> expected_value; new_entry -> new_value = new_value; @@ -1049,7 +1234,7 @@ void stmWriteTVar(StgRegTable *reg, } else { // No entry found StgClosure *current_value = read_current_value(trec, tvar); - TRecEntry *new_entry = get_new_entry(reg, trec); + TRecEntry *new_entry = get_new_entry(cap, trec); new_entry -> tvar = tvar; new_entry -> expected_value = current_value; new_entry -> new_value = new_value; @@ -1060,3 +1245,17 @@ void stmWriteTVar(StgRegTable *reg, /*......................................................................*/ +StgTVar *stmNewTVar(Capability *cap, + StgClosure *new_value) { + StgTVar *result; + result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar)); + SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM); + result -> current_value = new_value; + result -> first_wait_queue_entry = END_STM_WAIT_QUEUE; +#if defined(THREADED_RTS) + result -> num_updates = 0; +#endif + return result; +} + +/*......................................................................*/