From: tharris Date: Mon, 21 Nov 2005 15:58:47 +0000 (+0000) Subject: [project @ 2005-11-21 15:58:47 by tharris] X-Git-Tag: Initial_conversion_from_CVS_complete~32 X-Git-Url: http://git.megacz.com/?a=commitdiff_plain;h=afd08a9c06ae4b15e33e26e5a2818801c7fee429;p=ghc-hetmet.git [project @ 2005-11-21 15:58:47 by tharris] Re-use temporary storage in the STM implementation --- diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 5872f42..2379c32 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -20,6 +20,7 @@ #include "Rts.h" #include "RtsUtils.h" #include "RtsFlags.h" +#include "STM.h" #include "OSThreads.h" #include "Capability.h" #include "Schedule.h" @@ -155,6 +156,11 @@ initCapability( Capability *cap, nat i ) for (g = 0; g < RtsFlags.GcFlags.generations; g++) { cap->mut_lists[g] = NULL; } + + cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE; + cap->free_trec_chunks = END_STM_CHUNK_LIST; + cap->free_trec_headers = NO_TREC; + cap->transaction_tokens = 0; } /* --------------------------------------------------------------------------- diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h index f9ae894..2a04e41 100644 --- a/ghc/rts/Capability.h +++ b/ghc/rts/Capability.h @@ -81,6 +81,12 @@ struct Capability_ { Task *returning_tasks_hd; // Singly-linked, with head/tail Task *returning_tasks_tl; #endif + + // Per-capability STM-related data + StgTVarWaitQueue *free_tvar_wait_queues; + StgTRecChunk *free_trec_chunks; + StgTRecHeader *free_trec_headers; + nat transaction_tokens; }; // typedef Capability, defined in RtsAPI.h diff --git a/ghc/rts/Exception.cmm b/ghc/rts/Exception.cmm index 4007b78..3fdfdfd 100644 --- a/ghc/rts/Exception.cmm +++ b/ghc/rts/Exception.cmm @@ -360,7 +360,7 @@ retry_pop_stack: W_ r; trec = StgTSO_trec(CurrentTSO); r = foreign "C" stmValidateNestOfTransactions(trec "ptr"); - foreign "C" stmAbortTransaction(trec "ptr"); + foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr"); StgTSO_trec(CurrentTSO) = NO_TREC; if (r) { // Transaction was valid: continue searching for a catch frame @@ -369,7 +369,7 @@ retry_pop_stack: } else { // Transaction was not valid: we retry the exception (otherwise continue // with a further call to raiseExceptionHelper) - "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", NO_TREC "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", NO_TREC "ptr"); StgTSO_trec(CurrentTSO) = trec; R1 = StgAtomicallyFrame_code(Sp); Sp_adj(-1); diff --git a/ghc/rts/GC.c b/ghc/rts/GC.c index bc8546a..513d14a 100644 --- a/ghc/rts/GC.c +++ b/ghc/rts/GC.c @@ -3051,9 +3051,6 @@ scavenge(step *stp) evac_gen = 0; tvar->current_value = evacuate((StgClosure*)tvar->current_value); tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry); -#if defined(SMP) - tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by); -#endif evac_gen = saved_evac_gen; failed_to_evac = rtsTrue; // mutable p += sizeofW(StgTVar); @@ -3408,9 +3405,6 @@ linear_scan: evac_gen = 0; tvar->current_value = evacuate((StgClosure*)tvar->current_value); tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry); -#if defined(SMP) - tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by); -#endif evac_gen = saved_evac_gen; failed_to_evac = rtsTrue; // mutable break; @@ -3732,9 +3726,6 @@ scavenge_one(StgPtr p) evac_gen = 0; tvar->current_value = evacuate((StgClosure*)tvar->current_value); tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry); -#if defined(SMP) - tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by); -#endif evac_gen = saved_evac_gen; failed_to_evac = rtsTrue; // mutable break; diff --git a/ghc/rts/GCCompact.c b/ghc/rts/GCCompact.c index 775aa75..e53429c 100644 --- a/ghc/rts/GCCompact.c +++ b/ghc/rts/GCCompact.c @@ -688,9 +688,6 @@ thread_obj (StgInfoTable *info, StgPtr p) StgTVar *tvar = (StgTVar *)p; thread((StgPtr)&tvar->current_value); thread((StgPtr)&tvar->first_wait_queue_entry); -#if defined(SMP) - thread((StgPtr)&tvar->last_update_by); -#endif return p + sizeofW(StgTVar); } diff --git a/ghc/rts/PrimOps.cmm b/ghc/rts/PrimOps.cmm index b4e95f3..b25a1e5 100644 --- a/ghc/rts/PrimOps.cmm +++ b/ghc/rts/PrimOps.cmm @@ -1048,7 +1048,7 @@ INFO_TABLE_RET(stg_atomically_frame, trec = StgTSO_trec(CurrentTSO); if (StgAtomicallyFrame_waiting(frame)) { /* The TSO is currently waiting: should we stop waiting? */ - valid = foreign "C" stmReWait(CurrentTSO "ptr"); + valid = foreign "C" stmReWait(MyCapability() "ptr", CurrentTSO "ptr"); if (valid) { /* Previous attempt is still valid: no point trying again yet */ IF_NOT_REG_R1(Sp_adj(-2); @@ -1268,6 +1268,8 @@ retry_pop_stack: r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", other_trec "ptr"); if (r) { r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr"); + } else { + foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr"); } if (r) { // Merge between siblings succeeded: commit it back to enclosing transaction diff --git a/ghc/rts/RtsStartup.c b/ghc/rts/RtsStartup.c index f9b1c85..10cd451 100644 --- a/ghc/rts/RtsStartup.c +++ b/ghc/rts/RtsStartup.c @@ -217,8 +217,6 @@ hs_init(int *argc, char **argv[]) startupAsyncIO(); #endif - initSTM(); - #ifdef RTS_GTK_FRONTPANEL if (RtsFlags.GcFlags.frontpanel) { initFrontPanel(); diff --git a/ghc/rts/STM.c b/ghc/rts/STM.c index 15369cb..bc8e9bf 100644 --- a/ghc/rts/STM.c +++ b/ghc/rts/STM.c @@ -1,5 +1,4 @@ /* ----------------------------------------------------------------------------- - * * (c) The GHC Team 1998-2005 * * STM implementation. @@ -173,6 +172,13 @@ static int shake(void) { /*......................................................................*/ +// if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks, +// and wait queue entries without GC + +#define REUSE_MEMORY + +/*......................................................................*/ + #define IF_STM_UNIPROC(__X) do { } while (0) #define IF_STM_CG_LOCK(__X) do { } while (0) #define IF_STM_FG_LOCKS(__X) do { } while (0) @@ -350,7 +356,7 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) { /*......................................................................*/ -// Helper functions for allocation and initialization +// Helper functions for downstream allocation and initialization static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap, StgTSO *waiting_tso) { @@ -392,6 +398,89 @@ static StgTRecHeader *new_stg_trec_header(Capability *cap, /*......................................................................*/ +// Allocation / deallocation functions that retain per-capability lists +// of closures that can be re-used + +static StgTVarWaitQueue *alloc_stg_tvar_wait_queue(Capability *cap, + StgTSO *waiting_tso) { + StgTVarWaitQueue *result = NULL; + if (cap -> free_tvar_wait_queues == END_STM_WAIT_QUEUE) { + result = new_stg_tvar_wait_queue(cap, waiting_tso); + } else { + result = cap -> free_tvar_wait_queues; + result -> waiting_tso = waiting_tso; + cap -> free_tvar_wait_queues = result -> next_queue_entry; + } + return result; +} + +static void free_stg_tvar_wait_queue(Capability *cap, + StgTVarWaitQueue *wq) { +#if defined(REUSE_MEMORY) + wq -> next_queue_entry = cap -> free_tvar_wait_queues; + cap -> free_tvar_wait_queues = wq; +#endif +} + +static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) { + StgTRecChunk *result = NULL; + if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) { + result = new_stg_trec_chunk(cap); + } else { + result = cap -> free_trec_chunks; + cap -> free_trec_chunks = result -> prev_chunk; + result -> prev_chunk = END_STM_CHUNK_LIST; + result -> next_entry_idx = 0; + } + return result; +} + +static void free_stg_trec_chunk(Capability *cap, + StgTRecChunk *c) { +#if defined(REUSE_MEMORY) + c -> prev_chunk = cap -> free_trec_chunks; + cap -> free_trec_chunks = c; +#endif +} + +static StgTRecHeader *alloc_stg_trec_header(Capability *cap, + StgTRecHeader *enclosing_trec) { + StgTRecHeader *result = NULL; + if (cap -> free_trec_headers == NO_TREC) { + result = new_stg_trec_header(cap, enclosing_trec); + } else { + result = cap -> free_trec_headers; + cap -> free_trec_headers = result -> enclosing_trec; + result -> enclosing_trec = enclosing_trec; + result -> current_chunk -> next_entry_idx = 0; + if (enclosing_trec == NO_TREC) { + result -> state = TREC_ACTIVE; + } else { + ASSERT(enclosing_trec -> state == TREC_ACTIVE || + enclosing_trec -> state == TREC_CONDEMNED); + result -> state = enclosing_trec -> state; + } + } + return result; +} + +static void free_stg_trec_header(Capability *cap, + StgTRecHeader *trec) { +#if defined(REUSE_MEMORY) + StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk; + while (chunk != END_STM_CHUNK_LIST) { + StgTRecChunk *prev_chunk = chunk -> prev_chunk; + free_stg_trec_chunk(cap, chunk); + chunk = prev_chunk; + } + trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST; + trec -> enclosing_trec = cap -> free_trec_headers; + cap -> free_trec_headers = trec; +#endif +} + +/*......................................................................*/ + // Helper functions for managing waiting lists static void build_wait_queue_entries_for_trec(Capability *cap, @@ -412,7 +501,7 @@ static void build_wait_queue_entries_for_trec(Capability *cap, ACQ_ASSERT(s -> current_value == trec); NACQ_ASSERT(s -> current_value == e -> expected_value); fq = s -> first_wait_queue_entry; - q = new_stg_tvar_wait_queue(cap, tso); + q = alloc_stg_tvar_wait_queue(cap, tso); q -> next_queue_entry = fq; q -> prev_queue_entry = END_STM_WAIT_QUEUE; if (fq != END_STM_WAIT_QUEUE) { @@ -423,7 +512,8 @@ static void build_wait_queue_entries_for_trec(Capability *cap, }); } -static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) { +static void remove_wait_queue_entries_for_trec(Capability *cap, + StgTRecHeader *trec) { ASSERT(trec != NO_TREC); ASSERT(trec -> enclosing_trec == NO_TREC); ASSERT(trec -> state == TREC_WAITING || @@ -452,6 +542,7 @@ static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) { ASSERT (s -> first_wait_queue_entry == q); s -> first_wait_queue_entry = nq; } + free_stg_tvar_wait_queue(cap, q); unlock_tvar(trec, s, saw, FALSE); }); } @@ -475,7 +566,7 @@ static TRecEntry *get_new_entry(Capability *cap, } else { // Current chunk is full: allocate a fresh one StgTRecChunk *nc; - nc = new_stg_trec_chunk(cap); + nc = alloc_stg_trec_chunk(cap); nc -> prev_chunk = c; nc -> next_entry_idx = 1; t -> current_chunk = nc; @@ -614,13 +705,13 @@ static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, result = FALSE; BREAK_FOR_EACH; } - e -> saw_update_by = s -> last_update_by; + e -> num_updates = s -> num_updates; if (s -> current_value != e -> expected_value) { TRACE("%p : doesn't match (race)\n", trec); result = FALSE; BREAK_FOR_EACH; } else { - TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by); + TRACE("%p : need to check version %d\n", trec, e -> num_updates); } }); } @@ -654,8 +745,8 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) { StgTVar *s; s = e -> tvar; if (entry_is_read_only(e)) { - TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by); - if (s -> last_update_by != e -> saw_update_by) { + TRACE("%p : check_read_only for TVar %p, saw %d\n", trec, s, e -> num_updates); + if (s -> num_updates != e -> num_updates) { // ||s -> current_value != e -> expected_value) { TRACE("%p : mismatch\n", trec); result = FALSE; @@ -672,31 +763,75 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) { /************************************************************************/ void stmPreGCHook() { + nat i; + lock_stm(NO_TREC); TRACE("stmPreGCHook\n"); + for (i = 0; i < n_capabilities; i ++) { + Capability *cap = &capabilities[i]; + cap -> free_tvar_wait_queues = END_STM_WAIT_QUEUE; + cap -> free_trec_chunks = END_STM_CHUNK_LIST; + cap -> free_trec_headers = NO_TREC; + } unlock_stm(NO_TREC); } /************************************************************************/ -void initSTM() { - TRACE("initSTM, NO_TREC=%p\n", NO_TREC); +// check_read_only relies on version numbers held in TVars' "num_updates" +// fields not wrapping around while a transaction is committed. The version +// number is incremented each time an update is committed to the TVar +// This is unlikely to wrap around when 32-bit integers are used for the counts, +// but to ensure correctness we maintain a shared count on the maximum +// number of commit operations that may occur and check that this has +// not increased by more than 2^32 during a commit. + +#define TOKEN_BATCH_SIZE 1024 + +static volatile StgInt64 max_commits = 0; + +static volatile StgBool token_locked = FALSE; + +#if defined(SMP) +static void getTokenBatch(Capability *cap) { + while (cas(&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ } + max_commits += TOKEN_BATCH_SIZE; + cap -> transaction_tokens = TOKEN_BATCH_SIZE; + token_locked = FALSE; +} + +static void getToken(Capability *cap) { + if (cap -> transaction_tokens == 0) { + getTokenBatch(cap); + } + cap -> transaction_tokens --; } +#else +static void getToken(Capability *cap STG_UNUSED) { + // Nothing +} +#endif /*......................................................................*/ StgTRecHeader *stmStartTransaction(Capability *cap, StgTRecHeader *outer) { StgTRecHeader *t; - TRACE("%p : stmStartTransaction\n", outer); - t = new_stg_trec_header(cap, outer); + TRACE("%p : stmStartTransaction with %d tokens\n", + outer, + cap -> transaction_tokens); + + getToken(cap); + + t = alloc_stg_trec_header(cap, outer); TRACE("%p : stmStartTransaction()=%p\n", outer, t); return t; } /*......................................................................*/ -void stmAbortTransaction(StgTRecHeader *trec) { +void stmAbortTransaction(Capability *cap, + StgTRecHeader *trec) { TRACE("%p : stmAbortTransaction\n", trec); ASSERT (trec != NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || @@ -707,17 +842,20 @@ void stmAbortTransaction(StgTRecHeader *trec) { if (trec -> state == TREC_WAITING) { ASSERT (trec -> enclosing_trec == NO_TREC); TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec); - remove_wait_queue_entries_for_trec(trec); + remove_wait_queue_entries_for_trec(cap, trec); } trec -> state = TREC_ABORTED; unlock_stm(trec); + free_stg_trec_header(cap, trec); + TRACE("%p : stmAbortTransaction done\n", trec); } /*......................................................................*/ -void stmCondemnTransaction(StgTRecHeader *trec) { +void stmCondemnTransaction(Capability *cap, + StgTRecHeader *trec) { TRACE("%p : stmCondemnTransaction\n", trec); ASSERT (trec != NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || @@ -728,7 +866,7 @@ void stmCondemnTransaction(StgTRecHeader *trec) { if (trec -> state == TREC_WAITING) { ASSERT (trec -> enclosing_trec == NO_TREC); TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec); - remove_wait_queue_entries_for_trec(trec); + remove_wait_queue_entries_for_trec(cap, trec); } trec -> state = TREC_CONDEMNED; unlock_stm(trec); @@ -781,6 +919,7 @@ StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) { StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { int result; + StgInt64 max_commits_at_start = max_commits; TRACE("%p : stmCommitTransaction()\n", trec); ASSERT (trec != NO_TREC); @@ -800,6 +939,14 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { TRACE("%p : doing read check\n", trec); result = check_read_only(trec); TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed"); + + StgInt64 max_commits_at_end = max_commits; + StgInt64 max_concurrent_commits; + max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) + + (n_capabilities * TOKEN_BATCH_SIZE)); + if (((max_concurrent_commits >> 32) > 0) || shake()) { + result = FALSE; + } } if (result) { @@ -818,7 +965,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s); unpark_waiters_on(cap,s); IF_STM_FG_LOCKS({ - s -> last_update_by = trec; + s -> num_updates ++; }); unlock_tvar(trec, s, e -> new_value, TRUE); } @@ -831,6 +978,8 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { unlock_stm(trec); + free_stg_trec_header(cap, trec); + TRACE("%p : stmCommitTransaction()=%d\n", trec, result); return result; @@ -883,6 +1032,8 @@ StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) { unlock_stm(trec); + free_stg_trec_header(cap, trec); + TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result); return result; @@ -922,6 +1073,7 @@ StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) { } else { unlock_stm(trec); + free_stg_trec_header(cap, trec); } TRACE("%p : stmWait(%p)=%d\n", trec, tso, result); @@ -930,14 +1082,14 @@ StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) { void -stmWaitUnlock(Capability *cap, StgTRecHeader *trec) { +stmWaitUnlock(Capability *cap STG_UNUSED, StgTRecHeader *trec) { revert_ownership(trec, TRUE); unlock_stm(trec); } /*......................................................................*/ -StgBool stmReWait(StgTSO *tso) { +StgBool stmReWait(Capability *cap, StgTSO *tso) { int result; StgTRecHeader *trec = tso->trec; @@ -960,9 +1112,9 @@ StgBool stmReWait(StgTSO *tso) { // The transcation has become invalid. We can now remove it from the wait // queues. if (trec -> state != TREC_CONDEMNED) { - remove_wait_queue_entries_for_trec (trec); + remove_wait_queue_entries_for_trec (cap, trec); } - + free_stg_trec_header(cap, trec); } unlock_stm(trec); @@ -1099,7 +1251,7 @@ StgTVar *stmNewTVar(Capability *cap, result -> current_value = new_value; result -> first_wait_queue_entry = END_STM_WAIT_QUEUE; #if defined(SMP) - result -> last_update_by = NO_TREC; + result -> num_updates = 0; #endif return result; } diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 4891bbf..a82b6a7 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -3802,7 +3802,7 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, case ATOMICALLY_FRAME: if (stop_at_atomically) { ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); - stmCondemnTransaction(tso -> trec); + stmCondemnTransaction(cap, tso -> trec); #ifdef REG_R1 tso->sp = frame; #else @@ -3829,8 +3829,10 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, // and will not be visible after the abort. IF_DEBUG(stm, debugBelch("Found atomically block delivering async exception\n")); - stmAbortTransaction(tso -> trec); - tso -> trec = stmGetEnclosingTRec(tso -> trec); + StgTRecHeader *trec = tso -> trec; + StgTRecHeader *outer = stmGetEnclosingTRec(trec); + stmAbortTransaction(cap, trec); + tso -> trec = outer; break; default: