From: simonmar Date: Wed, 12 Jan 2005 12:36:30 +0000 (+0000) Subject: [project @ 2005-01-12 12:36:28 by simonmar] X-Git-Tag: Initial_conversion_from_CVS_complete~1257 X-Git-Url: http://git.megacz.com/?a=commitdiff_plain;h=96757f6a4ec72dc609468d3da442db38a73df23e;p=ghc-hetmet.git [project @ 2005-01-12 12:36:28 by simonmar] Numerous bug fixes to the STM code, mostly from a debugging session with Tim Harris. The test that flushed out all the bugs will shortly be added to the test suite. --- diff --git a/ghc/includes/STM.h b/ghc/includes/STM.h index fc3f29a..bafc523 100644 --- a/ghc/includes/STM.h +++ b/ghc/includes/STM.h @@ -96,6 +96,15 @@ extern StgTRecHeader *stmStartTransaction(StgTRecHeader *outer); extern void stmAbortTransaction(StgTRecHeader *trec); +// Ensure that a subsequent commit / validation will fail. We use this +// in our current handling of transactions that may have become invalid +// and started looping. We strip their stack back to the ATOMICALLY_FRAME, +// and, when the thread is next scheduled, discover it to be invalid and +// re-execute it. However, we need to force the transaction to stay invalid +// in case other threads' updates make it valid in the mean time. + +extern void stmCondemnTransaction(StgTRecHeader *trec); + // Return the trec within which the specified trec was created (not // valid if trec==NO_TREC). @@ -164,7 +173,7 @@ extern StgBool stmWait(StgTSO *tso, StgTRecHeader *trec); // and leave it as unblocked. It is an error to call stmReWait if the // thread is not waiting. -extern StgBool stmReWait(StgTRecHeader *trec); +extern StgBool stmReWait(StgTSO *tso); // Merge the accesses made so far in the second trec into the first trec. // Note that the resulting trec is only intended to be used in wait operations. diff --git a/ghc/rts/PrimOps.cmm b/ghc/rts/PrimOps.cmm index 26b3ce6..e50b17f 100644 --- a/ghc/rts/PrimOps.cmm +++ b/ghc/rts/PrimOps.cmm @@ -996,7 +996,7 @@ CATCH_RETRY_FRAME_ENTRY_TEMPLATE(,%ENTRY_CODE(Sp(SP_OFF))) trec = StgTSO_trec(CurrentTSO); \ if (StgAtomicallyFrame_waiting(frame)) { \ /* The TSO is currently waiting: should we stop waiting? */ \ - valid = foreign "C" stmReWait(trec "ptr"); \ + valid = foreign "C" stmReWait(CurrentTSO "ptr"); \ if (valid) { \ /* Previous attempt is still valid: no point trying again yet */ \ IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;) \ @@ -1240,9 +1240,11 @@ retry_pop_stack: other_trec = StgCatchRetryFrame_first_code_trec(frame); r = foreign "C" stmMergeForWaiting(trec "ptr", other_trec "ptr"); if (r) { + r = foreign "C" stmCommitTransaction(trec "ptr"); + } + if (r) { // Merge between siblings succeeded: commit it back to enclosing transaction // and then propagate the retry - r = foreign "C" stmCommitTransaction(trec "ptr"); StgTSO_trec(CurrentTSO) = outer; Sp = Sp + SIZEOF_StgCatchRetryFrame; goto retry_pop_stack; diff --git a/ghc/rts/STM.c b/ghc/rts/STM.c index f56bd1f..8edcb8c 100644 --- a/ghc/rts/STM.c +++ b/ghc/rts/STM.c @@ -115,6 +115,7 @@ static StgTRecChunk *cached_trec_chunks = END_STM_CHUNK_LIST; static StgTVarWaitQueue *cached_tvar_wait_queues = END_STM_WAIT_QUEUE; static void recycle_tvar_wait_queue(StgTVarWaitQueue *q) { +#if 0 if (shake()) { TRACE("Shake: not re-using wait queue %p\n", q); return; @@ -122,9 +123,11 @@ static void recycle_tvar_wait_queue(StgTVarWaitQueue *q) { q -> next_queue_entry = cached_tvar_wait_queues; cached_tvar_wait_queues = q; +#endif } static void recycle_closures_from_trec (StgTRecHeader *t) { +#if 0 if (shake()) { TRACE("Shake: not re-using closures from %p\n", t); return; @@ -140,6 +143,7 @@ static void recycle_closures_from_trec (StgTRecHeader *t) { c -> prev_chunk = cached_trec_chunks; cached_trec_chunks = c; } +#endif } /*......................................................................*/ @@ -278,7 +282,8 @@ static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) { static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) { ASSERT(trec != NO_TREC); ASSERT(trec -> enclosing_trec == NO_TREC); - ASSERT(trec -> state == TREC_WAITING); + ASSERT(trec -> state == TREC_WAITING || + trec -> state == TREC_MUST_ABORT); TRACE("stop_tsos_waiting in state=%d\n", trec -> state); FOR_EACH_ENTRY(trec, e, { StgTVar *s; @@ -509,6 +514,27 @@ void stmAbortTransaction(StgTRecHeader *trec) { /*......................................................................*/ +void stmCondemnTransaction(StgTRecHeader *trec) { + TRACE("stmCondemnTransaction trec=%p\n", trec); + ASSERT (trec != NO_TREC); + ASSERT ((trec -> state == TREC_ACTIVE) || + (trec -> state == TREC_MUST_ABORT) || + (trec -> state == TREC_WAITING) || + (trec -> state == TREC_CANNOT_COMMIT)); + + if (trec -> state == TREC_WAITING) { + ASSERT (trec -> enclosing_trec == NO_TREC); + TRACE("stmCondemnTransaction condemning waiting transaction\n"); + stop_tsos_waiting_on_trec(trec); + } + + trec -> state = TREC_MUST_ABORT; + + TRACE("stmCondemnTransaction trec=%p done\n", trec); +} + +/*......................................................................*/ + StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) { StgTRecHeader *outer; TRACE("stmGetEnclosingTRec trec=%p\n", trec); @@ -671,12 +697,15 @@ StgBool stmWait(StgTSO *tso, StgTRecHeader *trec) { /*......................................................................*/ -StgBool stmReWait(StgTRecHeader *trec) { +StgBool stmReWait(StgTSO *tso) { int result; + StgTRecHeader *trec = tso->trec; + TRACE("stmReWait trec=%p\n", trec); ASSERT (trec != NO_TREC); ASSERT (trec -> enclosing_trec == NO_TREC); - ASSERT (trec -> state == TREC_WAITING); + ASSERT ((trec -> state == TREC_WAITING) || + (trec -> state == TREC_MUST_ABORT)); lock_stm(); result = transaction_is_valid(trec); @@ -685,13 +714,17 @@ StgBool stmReWait(StgTRecHeader *trec) { // The transaction remains valid -- do nothing because it is already on // the wait queues ASSERT (trec -> state == TREC_WAITING); + park_tso(tso); } else { // The transcation has become invalid. We can now remove it from the wait // queues. - stop_tsos_waiting_on_trec (trec); + if (trec -> state != TREC_MUST_ABORT) { + stop_tsos_waiting_on_trec (trec); + + // Outcome now reflected by status field; no need for log + recycle_closures_from_trec(trec); + } - // Outcome now reflected by status field; no need for log - recycle_closures_from_trec(trec); } unlock_stm(); diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index ab82e9e..95e9ba4 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -221,6 +221,8 @@ static void schedule ( StgMainThread *mainThread, Capability *initi static void detectBlackHoles ( void ); #endif +static void raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically); + #if defined(RTS_SUPPORTS_THREADS) /* ToDo: carefully document the invariants that go together * with these synchronisation objects. @@ -1200,6 +1202,7 @@ run_thread: * previously, or it's blocked on an MVar or Blackhole, in which * case it'll be on the relevant queue already. */ + ASSERT(t->why_blocked != NotBlocked); IF_DEBUG(scheduler, debugBelch("--<< thread %d (%s) stopped: ", t->id, whatNext_strs[t->what_next]); @@ -1333,44 +1336,14 @@ run_thread: for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { if (!stmValidateTransaction (t -> trec)) { - StgRetInfoTable *info; - StgPtr sp = t -> sp; - IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); - if (sp[0] == (W_)&stg_enter_info) { - sp++; - } else { - sp--; - sp[0] = (W_)&stg_dummy_ret_closure; - } - - // Look up the stack for its atomically frame - StgPtr frame; - frame = sp + 1; - info = get_ret_itbl((StgClosure *)frame); - - while (info->i.type != ATOMICALLY_FRAME && - info->i.type != STOP_FRAME && - info->i.type != UPDATE_FRAME) { - if (info -> i.type == CATCH_RETRY_FRAME) { - IF_DEBUG(stm, sched_belch("Aborting transaction in catch-retry frame")); - stmAbortTransaction(t -> trec); - t -> trec = stmGetEnclosingTRec(t -> trec); - } - frame += stack_frame_sizeW((StgClosure *)frame); - info = get_ret_itbl((StgClosure *)frame); - } + // strip the stack back to the ATOMICALLY_FRAME, aborting + // the (nested) transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + raiseAsync_(t, NULL, rtsTrue); - if (!info -> i.type == ATOMICALLY_FRAME) { - barf("Could not find ATOMICALLY_FRAME for unvalidatable thread"); - } - - // Cause the thread to enter its atomically frame again when - // scheduled -- this will attempt stmCommitTransaction or stmReWait - // which will fail triggering re-rexecution. - t->sp = frame; - t->what_next = ThreadRunGHC; + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); } } } @@ -3051,6 +3024,12 @@ raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) void raiseAsync(StgTSO *tso, StgClosure *exception) { + raiseAsync_(tso, exception, rtsFalse); +} + +static void +raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) +{ StgRetInfoTable *info; StgPtr sp; @@ -3106,8 +3085,10 @@ raiseAsync(StgTSO *tso, StgClosure *exception) while (info->i.type != UPDATE_FRAME && (info->i.type != CATCH_FRAME || exception == NULL) - && info->i.type != STOP_FRAME) { - if (info->i.type == ATOMICALLY_FRAME) { + && info->i.type != STOP_FRAME + && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse)) + { + if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) { // IF we find an ATOMICALLY_FRAME then we abort the // current transaction and propagate the exception. In // this case (unlike ordinary exceptions) we do not care @@ -3125,6 +3106,14 @@ raiseAsync(StgTSO *tso, StgClosure *exception) switch (info->i.type) { + case ATOMICALLY_FRAME: + ASSERT(stop_at_atomically); + ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); + stmCondemnTransaction(tso -> trec); + tso->sp = frame; + tso->what_next = ThreadRunGHC; + return; + case CATCH_FRAME: // If we find a CATCH_FRAME, and we've got an exception to raise, // then build the THUNK raise(exception), and leave it on