[project @ 2004-11-18 09:56:07 by tharris]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 851a957..09c4602 100644 (file)
@@ -55,6 +55,7 @@
 #include "Signals.h"
 #include "Sanity.h"
 #include "Stats.h"
+#include "STM.h"
 #include "Timer.h"
 #include "Prelude.h"
 #include "ThreadLabels.h"
@@ -1326,6 +1327,54 @@ run_thread:
 #endif
 
     if (ready_to_gc) {
+      /* Kick any transactions which are invalid back to their atomically frames.
+       * When next scheduled they will try to commit, this commit will fail and
+       * they will retry. */
+      for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
+        if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+          if (!stmValidateTransaction (t -> trec)) {
+            StgRetInfoTable *info;
+            StgPtr sp = t -> sp;
+
+            IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+
+            if (sp[0] == (W_)&stg_enter_info) {
+              sp++;
+            } else {
+              sp--;
+              sp[0] = (W_)&stg_dummy_ret_closure;
+            }
+
+            // Look up the stack for its atomically frame
+            StgPtr frame;
+            frame = sp + 1;
+            info = get_ret_itbl((StgClosure *)frame);
+              
+            while (info->i.type != ATOMICALLY_FRAME &&
+                   info->i.type != STOP_FRAME &&
+                   info->i.type != UPDATE_FRAME) {
+              if (info -> i.type == CATCH_RETRY_FRAME) {
+                IF_DEBUG(stm, sched_belch("Aborting transaction in catch-retry frame"));
+                stmAbortTransaction(t -> trec);
+                t -> trec = stmGetEnclosingTRec(t -> trec);
+              }
+              frame += stack_frame_sizeW((StgClosure *)frame);
+              info = get_ret_itbl((StgClosure *)frame);
+            }
+            
+            if (!info -> i.type == ATOMICALLY_FRAME) {
+              barf("Could not find ATOMICALLY_FRAME for unvalidatable thread");
+            }
+
+            // Cause the thread to enter its atomically frame again when
+            // scheduled -- this will attempt stmCommitTransaction or stmReWait
+            // which will fail triggering re-rexecution.
+            t->sp = frame;
+            t->what_next = ThreadRunGHC;
+          }
+        }
+      }
+
       /* everybody back, start the GC.
        * Could do it in this thread, or signal a condition var
        * to do it in another thread.  Either way, we need to
@@ -1734,6 +1783,8 @@ createThread(nat size)
                               - TSO_STRUCT_SIZEW;
   tso->sp           = (P_)&(tso->stack) + stack_size;
 
+  tso->trec = NO_TREC;
+
 #ifdef PROFILING
   tso->prof.CCCS = CCS_MAIN;
 #endif
@@ -2659,6 +2710,14 @@ unblockThread(StgTSO *tso)
   case NotBlocked:
     return;  /* not blocked */
 
+  case BlockedOnSTM:
+    // Be careful: nothing to do here!  We tell the scheduler that the thread
+    // is runnable and we leave it to the stack-walking code to abort the 
+    // transaction while unwinding the stack.  We should perhaps have a debugging
+    // test to make sure that this really happens and that the 'zombie' transaction
+    // does not get committed.
+    goto done;
+
   case BlockedOnMVar:
     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
     {
@@ -2792,6 +2851,14 @@ unblockThread(StgTSO *tso)
 
   switch (tso->why_blocked) {
 
+  case BlockedOnSTM:
+    // Be careful: nothing to do here!  We tell the scheduler that the thread
+    // is runnable and we leave it to the stack-walking code to abort the 
+    // transaction while unwinding the stack.  We should perhaps have a debugging
+    // test to make sure that this really happens and that the 'zombie' transaction
+    // does not get committed.
+    goto done;
+
   case BlockedOnMVar:
     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
     {
@@ -3024,6 +3091,10 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
        // top of the stack applied to the exception.
        // 
        // 5. If it's a STOP_FRAME, then kill the thread.
+        // 
+        // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
+        // transaction
+       
        
        StgPtr frame;
        
@@ -3033,6 +3104,18 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
        while (info->i.type != UPDATE_FRAME
               && (info->i.type != CATCH_FRAME || exception == NULL)
               && info->i.type != STOP_FRAME) {
+            if (info->i.type == ATOMICALLY_FRAME) {
+              // IF we find an ATOMICALLY_FRAME then we abort the
+              // current transaction and propagate the exception.  In
+              // this case (unlike ordinary exceptions) we do not care
+              // whether the transaction is valid or not because its
+              // possible validity cannot have caused the exception
+              // and will not be visible after the abort.
+              IF_DEBUG(stm,
+                       debugBelch("Found atomically block delivering async exception\n"));
+              stmAbortTransaction(tso -> trec);
+              tso -> trec = stmGetEnclosingTRec(tso -> trec);
+            }
            frame += stack_frame_sizeW((StgClosure *)frame);
            info = get_ret_itbl((StgClosure *)frame);
        }
@@ -3202,15 +3285,26 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
            UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
            p = next;
            continue;
+
+        case ATOMICALLY_FRAME:
+            IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
+            tso->sp = p;
+            return ATOMICALLY_FRAME;
            
        case CATCH_FRAME:
            tso->sp = p;
            return CATCH_FRAME;
+
+        case CATCH_STM_FRAME:
+            IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
+            tso->sp = p;
+            return CATCH_STM_FRAME;
            
        case STOP_FRAME:
            tso->sp = p;
            return STOP_FRAME;
 
+        case CATCH_RETRY_FRAME:
        default:
            p = next; 
            continue;
@@ -3218,6 +3312,55 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
     }
 }
 
+
+/* -----------------------------------------------------------------------------
+   findRetryFrameHelper
+
+   This function is called by the retry# primitive.  It traverses the stack
+   leaving tso->sp referring to the frame which should handle the retry.  
+
+   This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
+   or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
+
+   We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
+   despite the similar implementation.
+
+   We should not expect to see CATCH_FRAME or STOP_FRAME because those should
+   not be created within memory transactions.
+   -------------------------------------------------------------------------- */
+
+StgWord
+findRetryFrameHelper (StgTSO *tso)
+{
+  StgPtr           p, next;
+  StgRetInfoTable *info;
+
+  p = tso -> sp;
+  while (1) {
+    info = get_ret_itbl((StgClosure *)p);
+    next = p + stack_frame_sizeW((StgClosure *)p);
+    switch (info->i.type) {
+      
+    case ATOMICALLY_FRAME:
+      IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
+      tso->sp = p;
+      return ATOMICALLY_FRAME;
+      
+    case CATCH_RETRY_FRAME:
+      IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
+      tso->sp = p;
+      return CATCH_RETRY_FRAME;
+      
+    case CATCH_STM_FRAME:
+    default:
+      ASSERT(info->i.type != CATCH_FRAME);
+      ASSERT(info->i.type != STOP_FRAME);
+      p = next; 
+      continue;
+    }
+  }
+}
+   
 /* -----------------------------------------------------------------------------
    resurrectThreads is called after garbage collection on the list of
    threads found to be garbage.  Each of these threads will be woken
@@ -3248,6 +3391,9 @@ resurrectThreads( StgTSO *threads )
     case BlockedOnBlackHole:
       raiseAsync(tso,(StgClosure *)NonTermination_closure);
       break;
+    case BlockedOnSTM:
+      raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
+      break;
     case NotBlocked:
       /* This might happen if the thread was blocked on a black hole
        * belonging to a thread that we've just woken up (raiseAsync
@@ -3314,6 +3460,9 @@ printThreadBlockage(StgTSO *tso)
   case BlockedOnCCall_NoUnblockExc:
     debugBelch("is blocked on an external call (exceptions were already blocked)");
     break;
+  case BlockedOnSTM:
+    debugBelch("is blocked on an STM operation");
+    break;
   default:
     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
         tso->why_blocked, tso->id, tso);