extern void stmAbortTransaction(StgTRecHeader *trec);
+// Ensure that a subsequent commit / validation will fail. We use this
+// in our current handling of transactions that may have become invalid
+// and started looping. We strip their stack back to the ATOMICALLY_FRAME,
+// and, when the thread is next scheduled, discover it to be invalid and
+// re-execute it. However, we need to force the transaction to stay invalid
+// in case other threads' updates make it valid in the mean time.
+
+extern void stmCondemnTransaction(StgTRecHeader *trec);
+
// Return the trec within which the specified trec was created (not
// valid if trec==NO_TREC).
// and leave it as unblocked. It is an error to call stmReWait if the
// thread is not waiting.
-extern StgBool stmReWait(StgTRecHeader *trec);
+extern StgBool stmReWait(StgTSO *tso);
// Merge the accesses made so far in the second trec into the first trec.
// Note that the resulting trec is only intended to be used in wait operations.
trec = StgTSO_trec(CurrentTSO); \
if (StgAtomicallyFrame_waiting(frame)) { \
/* The TSO is currently waiting: should we stop waiting? */ \
- valid = foreign "C" stmReWait(trec "ptr"); \
+ valid = foreign "C" stmReWait(CurrentTSO "ptr"); \
if (valid) { \
/* Previous attempt is still valid: no point trying again yet */ \
IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;) \
other_trec = StgCatchRetryFrame_first_code_trec(frame);
r = foreign "C" stmMergeForWaiting(trec "ptr", other_trec "ptr");
if (r) {
+ r = foreign "C" stmCommitTransaction(trec "ptr");
+ }
+ if (r) {
// Merge between siblings succeeded: commit it back to enclosing transaction
// and then propagate the retry
- r = foreign "C" stmCommitTransaction(trec "ptr");
StgTSO_trec(CurrentTSO) = outer;
Sp = Sp + SIZEOF_StgCatchRetryFrame;
goto retry_pop_stack;
static StgTVarWaitQueue *cached_tvar_wait_queues = END_STM_WAIT_QUEUE;
static void recycle_tvar_wait_queue(StgTVarWaitQueue *q) {
+#if 0
if (shake()) {
TRACE("Shake: not re-using wait queue %p\n", q);
return;
q -> next_queue_entry = cached_tvar_wait_queues;
cached_tvar_wait_queues = q;
+#endif
}
static void recycle_closures_from_trec (StgTRecHeader *t) {
+#if 0
if (shake()) {
TRACE("Shake: not re-using closures from %p\n", t);
return;
c -> prev_chunk = cached_trec_chunks;
cached_trec_chunks = c;
}
+#endif
}
/*......................................................................*/
static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) {
ASSERT(trec != NO_TREC);
ASSERT(trec -> enclosing_trec == NO_TREC);
- ASSERT(trec -> state == TREC_WAITING);
+ ASSERT(trec -> state == TREC_WAITING ||
+ trec -> state == TREC_MUST_ABORT);
TRACE("stop_tsos_waiting in state=%d\n", trec -> state);
FOR_EACH_ENTRY(trec, e, {
StgTVar *s;
/*......................................................................*/
+void stmCondemnTransaction(StgTRecHeader *trec) {
+ TRACE("stmCondemnTransaction trec=%p\n", trec);
+ ASSERT (trec != NO_TREC);
+ ASSERT ((trec -> state == TREC_ACTIVE) ||
+ (trec -> state == TREC_MUST_ABORT) ||
+ (trec -> state == TREC_WAITING) ||
+ (trec -> state == TREC_CANNOT_COMMIT));
+
+ if (trec -> state == TREC_WAITING) {
+ ASSERT (trec -> enclosing_trec == NO_TREC);
+ TRACE("stmCondemnTransaction condemning waiting transaction\n");
+ stop_tsos_waiting_on_trec(trec);
+ }
+
+ trec -> state = TREC_MUST_ABORT;
+
+ TRACE("stmCondemnTransaction trec=%p done\n", trec);
+}
+
+/*......................................................................*/
+
StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
StgTRecHeader *outer;
TRACE("stmGetEnclosingTRec trec=%p\n", trec);
/*......................................................................*/
-StgBool stmReWait(StgTRecHeader *trec) {
+StgBool stmReWait(StgTSO *tso) {
int result;
+ StgTRecHeader *trec = tso->trec;
+
TRACE("stmReWait trec=%p\n", trec);
ASSERT (trec != NO_TREC);
ASSERT (trec -> enclosing_trec == NO_TREC);
- ASSERT (trec -> state == TREC_WAITING);
+ ASSERT ((trec -> state == TREC_WAITING) ||
+ (trec -> state == TREC_MUST_ABORT));
lock_stm();
result = transaction_is_valid(trec);
// The transaction remains valid -- do nothing because it is already on
// the wait queues
ASSERT (trec -> state == TREC_WAITING);
+ park_tso(tso);
} else {
// The transcation has become invalid. We can now remove it from the wait
// queues.
- stop_tsos_waiting_on_trec (trec);
+ if (trec -> state != TREC_MUST_ABORT) {
+ stop_tsos_waiting_on_trec (trec);
+
+ // Outcome now reflected by status field; no need for log
+ recycle_closures_from_trec(trec);
+ }
- // Outcome now reflected by status field; no need for log
- recycle_closures_from_trec(trec);
}
unlock_stm();
static void detectBlackHoles ( void );
#endif
+static void raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
+
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: carefully document the invariants that go together
* with these synchronisation objects.
* previously, or it's blocked on an MVar or Blackhole, in which
* case it'll be on the relevant queue already.
*/
+ ASSERT(t->why_blocked != NotBlocked);
IF_DEBUG(scheduler,
debugBelch("--<< thread %d (%s) stopped: ",
t->id, whatNext_strs[t->what_next]);
for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
if (!stmValidateTransaction (t -> trec)) {
- StgRetInfoTable *info;
- StgPtr sp = t -> sp;
-
IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
- if (sp[0] == (W_)&stg_enter_info) {
- sp++;
- } else {
- sp--;
- sp[0] = (W_)&stg_dummy_ret_closure;
- }
-
- // Look up the stack for its atomically frame
- StgPtr frame;
- frame = sp + 1;
- info = get_ret_itbl((StgClosure *)frame);
-
- while (info->i.type != ATOMICALLY_FRAME &&
- info->i.type != STOP_FRAME &&
- info->i.type != UPDATE_FRAME) {
- if (info -> i.type == CATCH_RETRY_FRAME) {
- IF_DEBUG(stm, sched_belch("Aborting transaction in catch-retry frame"));
- stmAbortTransaction(t -> trec);
- t -> trec = stmGetEnclosingTRec(t -> trec);
- }
- frame += stack_frame_sizeW((StgClosure *)frame);
- info = get_ret_itbl((StgClosure *)frame);
- }
+ // strip the stack back to the ATOMICALLY_FRAME, aborting
+ // the (nested) transaction, and saving the stack of any
+ // partially-evaluated thunks on the heap.
+ raiseAsync_(t, NULL, rtsTrue);
- if (!info -> i.type == ATOMICALLY_FRAME) {
- barf("Could not find ATOMICALLY_FRAME for unvalidatable thread");
- }
-
- // Cause the thread to enter its atomically frame again when
- // scheduled -- this will attempt stmCommitTransaction or stmReWait
- // which will fail triggering re-rexecution.
- t->sp = frame;
- t->what_next = ThreadRunGHC;
+ ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
}
}
}
void
raiseAsync(StgTSO *tso, StgClosure *exception)
{
+ raiseAsync_(tso, exception, rtsFalse);
+}
+
+static void
+raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
+{
StgRetInfoTable *info;
StgPtr sp;
while (info->i.type != UPDATE_FRAME
&& (info->i.type != CATCH_FRAME || exception == NULL)
- && info->i.type != STOP_FRAME) {
- if (info->i.type == ATOMICALLY_FRAME) {
+ && info->i.type != STOP_FRAME
+ && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
+ {
+ if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
// IF we find an ATOMICALLY_FRAME then we abort the
// current transaction and propagate the exception. In
// this case (unlike ordinary exceptions) we do not care
switch (info->i.type) {
+ case ATOMICALLY_FRAME:
+ ASSERT(stop_at_atomically);
+ ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+ stmCondemnTransaction(tso -> trec);
+ tso->sp = frame;
+ tso->what_next = ThreadRunGHC;
+ return;
+
case CATCH_FRAME:
// If we find a CATCH_FRAME, and we've got an exception to raise,
// then build the THUNK raise(exception), and leave it on