[project @ 2005-01-12 12:36:28 by simonmar]
authorsimonmar <unknown>
Wed, 12 Jan 2005 12:36:30 +0000 (12:36 +0000)
committersimonmar <unknown>
Wed, 12 Jan 2005 12:36:30 +0000 (12:36 +0000)
Numerous bug fixes to the STM code, mostly from a debugging session
with Tim Harris.  The test that flushed out all the bugs will shortly
be added to the test suite.

ghc/includes/STM.h
ghc/rts/PrimOps.cmm
ghc/rts/STM.c
ghc/rts/Schedule.c

index fc3f29a..bafc523 100644 (file)
@@ -96,6 +96,15 @@ extern StgTRecHeader *stmStartTransaction(StgTRecHeader *outer);
 
 extern void stmAbortTransaction(StgTRecHeader *trec);
 
+// Ensure that a subsequent commit / validation will fail.  We use this 
+// in our current handling of transactions that may have become invalid
+// and started looping.  We strip their stack back to the ATOMICALLY_FRAME,
+// and, when the thread is next scheduled, discover it to be invalid and
+// re-execute it.  However, we need to force the transaction to stay invalid
+// in case other threads' updates make it valid in the mean time.
+
+extern void stmCondemnTransaction(StgTRecHeader *trec);
+
 // Return the trec within which the specified trec was created (not
 // valid if trec==NO_TREC).
 
@@ -164,7 +173,7 @@ extern StgBool stmWait(StgTSO *tso, StgTRecHeader *trec);
 // and leave it as unblocked.  It is an error to call stmReWait if the
 // thread is not waiting.
 
-extern StgBool stmReWait(StgTRecHeader *trec);
+extern StgBool stmReWait(StgTSO *tso);
 
 // Merge the accesses made so far in the second trec into the first trec.
 // Note that the resulting trec is only intended to be used in wait operations.
index 26b3ce6..e50b17f 100644 (file)
@@ -996,7 +996,7 @@ CATCH_RETRY_FRAME_ENTRY_TEMPLATE(,%ENTRY_CODE(Sp(SP_OFF)))
       trec = StgTSO_trec(CurrentTSO);                                                    \
       if (StgAtomicallyFrame_waiting(frame)) {                                           \
         /* The TSO is currently waiting: should we stop waiting? */                      \
-        valid = foreign "C" stmReWait(trec "ptr");                                       \
+        valid = foreign "C" stmReWait(CurrentTSO "ptr");                                 \
         if (valid) {                                                                     \
           /* Previous attempt is still valid: no point trying again yet */               \
           IF_NOT_REG_R1(Sp_adj(-1); Sp(0) = rval;)                                       \
@@ -1240,9 +1240,11 @@ retry_pop_stack:
       other_trec = StgCatchRetryFrame_first_code_trec(frame);
       r = foreign "C" stmMergeForWaiting(trec "ptr", other_trec "ptr");
       if (r) {
+        r = foreign "C" stmCommitTransaction(trec "ptr");
+      }
+      if (r) {
         // Merge between siblings succeeded: commit it back to enclosing transaction
         // and then propagate the retry
-        r = foreign "C" stmCommitTransaction(trec "ptr");
         StgTSO_trec(CurrentTSO) = outer;
         Sp = Sp + SIZEOF_StgCatchRetryFrame;
         goto retry_pop_stack;
index f56bd1f..8edcb8c 100644 (file)
@@ -115,6 +115,7 @@ static StgTRecChunk *cached_trec_chunks = END_STM_CHUNK_LIST;
 static StgTVarWaitQueue *cached_tvar_wait_queues = END_STM_WAIT_QUEUE;
 
 static void recycle_tvar_wait_queue(StgTVarWaitQueue *q) {
+#if 0
   if (shake()) {
     TRACE("Shake: not re-using wait queue %p\n", q);
     return;
@@ -122,9 +123,11 @@ static void recycle_tvar_wait_queue(StgTVarWaitQueue *q) {
 
   q -> next_queue_entry = cached_tvar_wait_queues;
   cached_tvar_wait_queues = q;
+#endif
 }
 
 static void recycle_closures_from_trec (StgTRecHeader *t) {
+#if 0
   if (shake()) {
     TRACE("Shake: not re-using closures from %p\n", t);
     return;
@@ -140,6 +143,7 @@ static void recycle_closures_from_trec (StgTRecHeader *t) {
     c -> prev_chunk = cached_trec_chunks;
     cached_trec_chunks = c;
   }
+#endif
 }
 
 /*......................................................................*/
@@ -278,7 +282,8 @@ static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) {
 static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) {
   ASSERT(trec != NO_TREC);
   ASSERT(trec -> enclosing_trec == NO_TREC);
-  ASSERT(trec -> state == TREC_WAITING);
+  ASSERT(trec -> state == TREC_WAITING ||
+         trec -> state == TREC_MUST_ABORT);
   TRACE("stop_tsos_waiting in state=%d\n", trec -> state);
   FOR_EACH_ENTRY(trec, e, {
     StgTVar *s;
@@ -509,6 +514,27 @@ void stmAbortTransaction(StgTRecHeader *trec) {
 
 /*......................................................................*/
 
+void stmCondemnTransaction(StgTRecHeader *trec) {
+  TRACE("stmCondemnTransaction trec=%p\n", trec);
+  ASSERT (trec != NO_TREC);
+  ASSERT ((trec -> state == TREC_ACTIVE) || 
+          (trec -> state == TREC_MUST_ABORT) ||
+          (trec -> state == TREC_WAITING) ||
+          (trec -> state == TREC_CANNOT_COMMIT));
+
+  if (trec -> state == TREC_WAITING) {
+    ASSERT (trec -> enclosing_trec == NO_TREC);
+    TRACE("stmCondemnTransaction condemning waiting transaction\n");
+    stop_tsos_waiting_on_trec(trec);
+  } 
+
+  trec -> state = TREC_MUST_ABORT;
+
+  TRACE("stmCondemnTransaction trec=%p done\n", trec);
+}
+
+/*......................................................................*/
+
 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
   StgTRecHeader *outer;
   TRACE("stmGetEnclosingTRec trec=%p\n", trec);
@@ -671,12 +697,15 @@ StgBool stmWait(StgTSO *tso, StgTRecHeader *trec) {
 
 /*......................................................................*/
 
-StgBool stmReWait(StgTRecHeader *trec) {
+StgBool stmReWait(StgTSO *tso) {
   int result;
+  StgTRecHeader *trec = tso->trec;
+
   TRACE("stmReWait trec=%p\n", trec);
   ASSERT (trec != NO_TREC);
   ASSERT (trec -> enclosing_trec == NO_TREC);
-  ASSERT (trec -> state == TREC_WAITING);
+  ASSERT ((trec -> state == TREC_WAITING) || 
+          (trec -> state == TREC_MUST_ABORT));
 
   lock_stm();
   result = transaction_is_valid(trec);
@@ -685,13 +714,17 @@ StgBool stmReWait(StgTRecHeader *trec) {
     // The transaction remains valid -- do nothing because it is already on
     // the wait queues
     ASSERT (trec -> state == TREC_WAITING);
+    park_tso(tso);
   } else {
     // The transcation has become invalid.  We can now remove it from the wait
     // queues.
-    stop_tsos_waiting_on_trec (trec);
+    if (trec -> state != TREC_MUST_ABORT) {
+         stop_tsos_waiting_on_trec (trec);
+
+         // Outcome now reflected by status field; no need for log
+         recycle_closures_from_trec(trec);
+    }
 
-    // Outcome now reflected by status field; no need for log
-    recycle_closures_from_trec(trec);
   }
   unlock_stm();
 
index ab82e9e..95e9ba4 100644 (file)
@@ -221,6 +221,8 @@ static void     schedule          ( StgMainThread *mainThread, Capability *initi
 static void     detectBlackHoles  ( void );
 #endif
 
+static void     raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
+
 #if defined(RTS_SUPPORTS_THREADS)
 /* ToDo: carefully document the invariants that go together
  *       with these synchronisation objects.
@@ -1200,6 +1202,7 @@ run_thread:
        * previously, or it's blocked on an MVar or Blackhole, in which
        * case it'll be on the relevant queue already.
        */
+      ASSERT(t->why_blocked != NotBlocked);
       IF_DEBUG(scheduler,
               debugBelch("--<< thread %d (%s) stopped: ", 
                       t->id, whatNext_strs[t->what_next]);
@@ -1333,44 +1336,14 @@ run_thread:
       for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
         if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
           if (!stmValidateTransaction (t -> trec)) {
-            StgRetInfoTable *info;
-            StgPtr sp = t -> sp;
-
             IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
 
-            if (sp[0] == (W_)&stg_enter_info) {
-              sp++;
-            } else {
-              sp--;
-              sp[0] = (W_)&stg_dummy_ret_closure;
-            }
-
-            // Look up the stack for its atomically frame
-            StgPtr frame;
-            frame = sp + 1;
-            info = get_ret_itbl((StgClosure *)frame);
-              
-            while (info->i.type != ATOMICALLY_FRAME &&
-                   info->i.type != STOP_FRAME &&
-                   info->i.type != UPDATE_FRAME) {
-              if (info -> i.type == CATCH_RETRY_FRAME) {
-                IF_DEBUG(stm, sched_belch("Aborting transaction in catch-retry frame"));
-                stmAbortTransaction(t -> trec);
-                t -> trec = stmGetEnclosingTRec(t -> trec);
-              }
-              frame += stack_frame_sizeW((StgClosure *)frame);
-              info = get_ret_itbl((StgClosure *)frame);
-            }
+           // strip the stack back to the ATOMICALLY_FRAME, aborting
+           // the (nested) transaction, and saving the stack of any
+           // partially-evaluated thunks on the heap.
+           raiseAsync_(t, NULL, rtsTrue);
             
-            if (!info -> i.type == ATOMICALLY_FRAME) {
-              barf("Could not find ATOMICALLY_FRAME for unvalidatable thread");
-            }
-
-            // Cause the thread to enter its atomically frame again when
-            // scheduled -- this will attempt stmCommitTransaction or stmReWait
-            // which will fail triggering re-rexecution.
-            t->sp = frame;
-            t->what_next = ThreadRunGHC;
+           ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
           }
         }
       }
@@ -3051,6 +3024,12 @@ raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
 void
 raiseAsync(StgTSO *tso, StgClosure *exception)
 {
+    raiseAsync_(tso, exception, rtsFalse);
+}
+
+static void
+raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
+{
     StgRetInfoTable *info;
     StgPtr sp;
   
@@ -3106,8 +3085,10 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
        
        while (info->i.type != UPDATE_FRAME
               && (info->i.type != CATCH_FRAME || exception == NULL)
-              && info->i.type != STOP_FRAME) {
-            if (info->i.type == ATOMICALLY_FRAME) {
+              && info->i.type != STOP_FRAME
+              && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
+       {
+            if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
               // IF we find an ATOMICALLY_FRAME then we abort the
               // current transaction and propagate the exception.  In
               // this case (unlike ordinary exceptions) we do not care
@@ -3125,6 +3106,14 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
        
        switch (info->i.type) {
            
+       case ATOMICALLY_FRAME:
+           ASSERT(stop_at_atomically);
+           ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+           stmCondemnTransaction(tso -> trec);
+           tso->sp = frame;
+           tso->what_next = ThreadRunGHC;
+           return;
+
        case CATCH_FRAME:
            // If we find a CATCH_FRAME, and we've got an exception to raise,
            // then build the THUNK raise(exception), and leave it on