X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FSTM.c;h=f98d201b07696a37d8e72e1ca27c206634db1804;hp=d840f4ebfe3f1a03b6f561c9405ed5ada61e8760;hb=5d52d9b64c21dcf77849866584744722f8121389;hpb=2246c514eade324d70058ba3135dc0c51ee9353b diff --git a/rts/STM.c b/rts/STM.c index d840f4e..f98d201 100644 --- a/rts/STM.c +++ b/rts/STM.c @@ -74,25 +74,24 @@ * (d) release the locks on the TVars, writing updates to them in the case of a * commit, (e) unlock the STM. * - * Queues of waiting threads hang off the first_watch_queue_entry field of each - * TVar. This may only be manipulated when holding that TVar's lock. In - * particular, when a thread is putting itself to sleep, it mustn't release - * the TVar's lock until it has added itself to the wait queue and marked its - * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it. + * Queues of waiting threads hang off the first_watch_queue_entry + * field of each TVar. This may only be manipulated when holding that + * TVar's lock. In particular, when a thread is putting itself to + * sleep, it mustn't release the TVar's lock until it has added itself + * to the wait queue and marked its TSO as BlockedOnSTM -- this makes + * sure that other threads will know to wake it. * * ---------------------------------------------------------------------------*/ #include "PosixSource.h" #include "Rts.h" -#include "RtsFlags.h" + #include "RtsUtils.h" #include "Schedule.h" -#include "SMP.h" #include "STM.h" -#include "Storage.h" #include "Trace.h" +#include "Threads.h" -#include #include #define TRUE 1 @@ -306,7 +305,7 @@ static StgClosure *lock_tvar(StgTRecHeader *trec, do { do { result = s -> current_value; - } while (GET_INFO(result) == &stg_TREC_HEADER_info); + } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info); } while (cas((void *)&(s -> current_value), (StgWord)result, (StgWord)trec) != (StgWord)result); return result; @@ -353,8 +352,7 @@ static StgBool watcher_is_tso(StgTVarWatchQueue *q) { static StgBool watcher_is_invariant(StgTVarWatchQueue *q) { StgClosure *c = q -> closure; - StgInfoTable *info = get_itbl(c); - return (info -> type) == ATOMIC_INVARIANT; + return (c->header.info == &stg_ATOMIC_INVARIANT_info); } /*......................................................................*/ @@ -379,7 +377,7 @@ static void unpark_tso(Capability *cap, StgTSO *tso) { lockTSO(tso); if (tso -> why_blocked == BlockedOnSTM) { TRACE("unpark_tso on tso=%p", tso); - unblockOne(cap,tso); + tryWakeupThread(cap,tso); } else { TRACE("spurious unpark_tso on tso=%p", tso); } @@ -388,10 +386,18 @@ static void unpark_tso(Capability *cap, StgTSO *tso) { static void unpark_waiters_on(Capability *cap, StgTVar *s) { StgTVarWatchQueue *q; + StgTVarWatchQueue *trail; TRACE("unpark_waiters_on tvar=%p", s); - for (q = s -> first_watch_queue_entry; - q != END_STM_WATCH_QUEUE; + // unblock TSOs in reverse order, to be a bit fairer (#2319) + for (q = s -> first_watch_queue_entry, trail = q; + q != END_STM_WATCH_QUEUE; q = q -> next_queue_entry) { + trail = q; + } + q = trail; + for (; + q != END_STM_WATCH_QUEUE; + q = q -> prev_queue_entry) { if (watcher_is_tso(q)) { unpark_tso(cap, (StgTSO *)(q -> closure)); } @@ -405,7 +411,7 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) { static StgInvariantCheckQueue *new_stg_invariant_check_queue(Capability *cap, StgAtomicInvariant *invariant) { StgInvariantCheckQueue *result; - result = (StgInvariantCheckQueue *)allocateLocal(cap, sizeofW(StgInvariantCheckQueue)); + result = (StgInvariantCheckQueue *)allocate(cap, sizeofW(StgInvariantCheckQueue)); SET_HDR (result, &stg_INVARIANT_CHECK_QUEUE_info, CCS_SYSTEM); result -> invariant = invariant; result -> my_execution = NO_TREC; @@ -415,7 +421,7 @@ static StgInvariantCheckQueue *new_stg_invariant_check_queue(Capability *cap, static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap, StgClosure *closure) { StgTVarWatchQueue *result; - result = (StgTVarWatchQueue *)allocateLocal(cap, sizeofW(StgTVarWatchQueue)); + result = (StgTVarWatchQueue *)allocate(cap, sizeofW(StgTVarWatchQueue)); SET_HDR (result, &stg_TVAR_WATCH_QUEUE_info, CCS_SYSTEM); result -> closure = closure; return result; @@ -423,7 +429,7 @@ static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap, static StgTRecChunk *new_stg_trec_chunk(Capability *cap) { StgTRecChunk *result; - result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk)); + result = (StgTRecChunk *)allocate(cap, sizeofW(StgTRecChunk)); SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM); result -> prev_chunk = END_STM_CHUNK_LIST; result -> next_entry_idx = 0; @@ -433,7 +439,7 @@ static StgTRecChunk *new_stg_trec_chunk(Capability *cap) { static StgTRecHeader *new_stg_trec_header(Capability *cap, StgTRecHeader *enclosing_trec) { StgTRecHeader *result; - result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader)); + result = (StgTRecHeader *) allocate(cap, sizeofW(StgTRecHeader)); SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM); result -> enclosing_trec = enclosing_trec; @@ -596,8 +602,9 @@ static void remove_watch_queue_entries_for_trec(Capability *cap, StgTVarWatchQueue *pq; StgTVarWatchQueue *nq; StgTVarWatchQueue *q; + StgClosure *saw; s = e -> tvar; - StgClosure *saw = lock_tvar(trec, s); + saw = lock_tvar(trec, s); q = (StgTVarWatchQueue *) (e -> new_value); TRACE("%p : removing tso=%p from watch queue for tvar=%p", trec, @@ -943,6 +950,7 @@ StgTRecHeader *stmStartTransaction(Capability *cap, void stmAbortTransaction(Capability *cap, StgTRecHeader *trec) { + StgTRecHeader *et; TRACE("%p : stmAbortTransaction", trec); ASSERT (trec != NO_TREC); ASSERT ((trec -> state == TREC_ACTIVE) || @@ -951,7 +959,7 @@ void stmAbortTransaction(Capability *cap, lock_stm(trec); - StgTRecHeader *et = trec -> enclosing_trec; + et = trec -> enclosing_trec; if (et == NO_TREC) { // We're a top-level transaction: remove any watch queue entries that // we may have. @@ -1017,16 +1025,6 @@ void stmCondemnTransaction(Capability *cap, /*......................................................................*/ -StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) { - StgTRecHeader *outer; - TRACE("%p : stmGetEnclosingTRec", trec); - outer = trec -> enclosing_trec; - TRACE("%p : stmGetEnclosingTRec()=%p", trec, outer); - return outer; -} - -/*......................................................................*/ - StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) { StgTRecHeader *t; StgBool result; @@ -1165,18 +1163,18 @@ static void connect_invariant_to_trec(Capability *cap, void stmAddInvariantToCheck(Capability *cap, StgTRecHeader *trec, StgClosure *code) { + StgAtomicInvariant *invariant; + StgInvariantCheckQueue *q; TRACE("%p : stmAddInvariantToCheck closure=%p", trec, code); ASSERT(trec != NO_TREC); ASSERT(trec -> state == TREC_ACTIVE || trec -> state == TREC_CONDEMNED); - StgAtomicInvariant *invariant; - StgInvariantCheckQueue *q; // 1. Allocate an StgAtomicInvariant, set last_execution to NO_TREC // to signal that this is a new invariant in the current atomic block - invariant = (StgAtomicInvariant *) allocateLocal(cap, sizeofW(StgAtomicInvariant)); + invariant = (StgAtomicInvariant *) allocate(cap, sizeofW(StgAtomicInvariant)); TRACE("%p : stmAddInvariantToCheck allocated invariant=%p", trec, invariant); SET_HDR (invariant, &stg_ATOMIC_INVARIANT_info, CCS_SYSTEM); invariant -> code = code; @@ -1200,6 +1198,7 @@ void stmAddInvariantToCheck(Capability *cap, */ StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *trec) { + StgTRecChunk *c; TRACE("%p : stmGetInvariantsToCheck, head was %p", trec, trec -> invariants_to_check); @@ -1211,7 +1210,7 @@ StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader * ASSERT(trec -> enclosing_trec == NO_TREC); lock_stm(trec); - StgTRecChunk *c = trec -> current_chunk; + c = trec -> current_chunk; while (c != END_STM_CHUNK_LIST) { unsigned int i; for (i = 0; i < c -> next_entry_idx; i ++) { @@ -1223,15 +1222,15 @@ StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader * // Pick up any invariants on the TVar being updated // by entry "e" - TRACE("%p : checking for invariants on %p", trec, s); StgTVarWatchQueue *q; + TRACE("%p : checking for invariants on %p", trec, s); for (q = s -> first_watch_queue_entry; q != END_STM_WATCH_QUEUE; q = q -> next_queue_entry) { if (watcher_is_invariant(q)) { - TRACE("%p : Touching invariant %p", trec, q -> closure); StgBool found = FALSE; StgInvariantCheckQueue *q2; + TRACE("%p : Touching invariant %p", trec, q -> closure); for (q2 = trec -> invariants_to_check; q2 != END_INVARIANT_CHECK_QUEUE; q2 = q2 -> next_queue_entry) { @@ -1243,8 +1242,8 @@ StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader * } if (!found) { - TRACE("%p : Not already found %p", trec, q -> closure); StgInvariantCheckQueue *q3; + TRACE("%p : Not already found %p", trec, q -> closure); q3 = alloc_stg_invariant_check_queue(cap, (StgAtomicInvariant*) q -> closure); q3 -> next_queue_entry = trec -> invariants_to_check; @@ -1273,6 +1272,8 @@ StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader * StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { int result; StgInt64 max_commits_at_start = max_commits; + StgBool touched_invariants; + StgBool use_read_phase; TRACE("%p : stmCommitTransaction()", trec); ASSERT (trec != NO_TREC); @@ -1286,7 +1287,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { // touched_invariants is true if we've written to a TVar with invariants // attached to it, or if we're trying to add a new invariant to the system. - StgBool touched_invariants = (trec -> invariants_to_check != END_INVARIANT_CHECK_QUEUE); + touched_invariants = (trec -> invariants_to_check != END_INVARIANT_CHECK_QUEUE); // If we have touched invariants then (i) lock the invariant, and (ii) add // the invariant's read set to our own. Step (i) is needed to serialize @@ -1298,18 +1299,20 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { // invariant from both tvars). if (touched_invariants) { - TRACE("%p : locking invariants", trec); StgInvariantCheckQueue *q = trec -> invariants_to_check; + TRACE("%p : locking invariants", trec); while (q != END_INVARIANT_CHECK_QUEUE) { + StgTRecHeader *inv_old_trec; + StgAtomicInvariant *inv; TRACE("%p : locking invariant %p", trec, q -> invariant); - StgAtomicInvariant *inv = q -> invariant; + inv = q -> invariant; if (!lock_inv(inv)) { TRACE("%p : failed to lock %p", trec, inv); trec -> state = TREC_CONDEMNED; break; } - StgTRecHeader *inv_old_trec = inv -> last_execution; + inv_old_trec = inv -> last_execution; if (inv_old_trec != NO_TREC) { StgTRecChunk *c = inv_old_trec -> current_chunk; while (c != END_STM_CHUNK_LIST) { @@ -1336,7 +1339,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { // invariants and TVars are managed by the TVar watch queues which are // protected by the TVar's locks. - StgBool use_read_phase = ((config_use_read_phase) && (!touched_invariants)); + use_read_phase = ((config_use_read_phase) && (!touched_invariants)); result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE); if (result) { @@ -1344,12 +1347,13 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { ASSERT (trec -> state == TREC_ACTIVE); if (use_read_phase) { + StgInt64 max_commits_at_end; + StgInt64 max_concurrent_commits; TRACE("%p : doing read check", trec); result = check_read_only(trec); TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed"); - StgInt64 max_commits_at_end = max_commits; - StgInt64 max_concurrent_commits; + max_commits_at_end = max_commits; max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) + (n_capabilities * TOKEN_BATCH_SIZE)); if (((max_concurrent_commits >> 32) > 0) || shake()) { @@ -1557,7 +1561,7 @@ static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *t result = tvar -> current_value; #if defined(STM_FG_LOCKS) - while (GET_INFO(result) == &stg_TREC_HEADER_info) { + while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) { TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result); result = tvar -> current_value; } @@ -1572,7 +1576,7 @@ static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *t StgClosure *stmReadTVar(Capability *cap, StgTRecHeader *trec, StgTVar *tvar) { - StgTRecHeader *entry_in; + StgTRecHeader *entry_in = NULL; StgClosure *result = NULL; TRecEntry *entry = NULL; TRACE("%p : stmReadTVar(%p)", trec, tvar); @@ -1615,7 +1619,7 @@ void stmWriteTVar(Capability *cap, StgTVar *tvar, StgClosure *new_value) { - StgTRecHeader *entry_in; + StgTRecHeader *entry_in = NULL; TRecEntry *entry = NULL; TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value); ASSERT (trec != NO_TREC); @@ -1652,7 +1656,7 @@ void stmWriteTVar(Capability *cap, StgTVar *stmNewTVar(Capability *cap, StgClosure *new_value) { StgTVar *result; - result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar)); + result = (StgTVar *)allocate(cap, sizeofW(StgTVar)); SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM); result -> current_value = new_value; result -> first_watch_queue_entry = END_STM_WATCH_QUEUE;