[project @ 2005-01-28 12:55:17 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 04e70da..ecd47aa 100644 (file)
@@ -55,6 +55,7 @@
 #include "Signals.h"
 #include "Sanity.h"
 #include "Stats.h"
+#include "STM.h"
 #include "Timer.h"
 #include "Prelude.h"
 #include "ThreadLabels.h"
@@ -162,7 +163,7 @@ static StgTSO *threadStackOverflow(StgTSO *tso);
 */
 
 /* flag set by signal handler to precipitate a context switch */
-nat context_switch = 0;
+int context_switch = 0;
 
 /* if this flag is set as well, give up execution */
 rtsBool interrupted = rtsFalse;
@@ -220,6 +221,8 @@ static void     schedule          ( StgMainThread *mainThread, Capability *initi
 static void     detectBlackHoles  ( void );
 #endif
 
+static void     raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
+
 #if defined(RTS_SUPPORTS_THREADS)
 /* ToDo: carefully document the invariants that go together
  *       with these synchronisation objects.
@@ -539,7 +542,10 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
 #endif
 
-#if defined(RTS_SUPPORTS_THREADS)
+#if defined(RTS_SUPPORTS_THREADS) || defined(mingw32_HOST_OS)
+    /* win32: might be back here due to awaitEvent() being abandoned
+     * as a result of a console event having been delivered.
+     */
     if ( EMPTY_RUN_QUEUE() ) {
        continue; // nothing to do
     }
@@ -1199,6 +1205,7 @@ run_thread:
        * previously, or it's blocked on an MVar or Blackhole, in which
        * case it'll be on the relevant queue already.
        */
+      ASSERT(t->why_blocked != NotBlocked);
       IF_DEBUG(scheduler,
               debugBelch("--<< thread %d (%s) stopped: ", 
                       t->id, whatNext_strs[t->what_next]);
@@ -1326,6 +1333,26 @@ run_thread:
 #endif
 
     if (ready_to_gc) {
+      /* Kick any transactions which are invalid back to their atomically frames.
+       * When next scheduled they will try to commit, this commit will fail and
+       * they will retry. */
+      for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
+        if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+          if (!stmValidateTransaction (t -> trec)) {
+            IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+
+           // strip the stack back to the ATOMICALLY_FRAME, aborting
+           // the (nested) transaction, and saving the stack of any
+           // partially-evaluated thunks on the heap.
+           raiseAsync_(t, NULL, rtsTrue);
+            
+#ifdef REG_R1
+           ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+#endif
+          }
+        }
+      }
+
       /* everybody back, start the GC.
        * Could do it in this thread, or signal a condition var
        * to do it in another thread.  Either way, we need to
@@ -1397,7 +1424,7 @@ isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
  * Singleton fork(). Do not copy any running threads.
  * ------------------------------------------------------------------------- */
 
-#ifndef mingw32_TARGET_OS
+#ifndef mingw32_HOST_OS
 #define FORKPROCESS_PRIMOP_SUPPORTED
 #endif
 
@@ -1734,6 +1761,8 @@ createThread(nat size)
                               - TSO_STRUCT_SIZEW;
   tso->sp           = (P_)&(tso->stack) + stack_size;
 
+  tso->trec = NO_TREC;
+
 #ifdef PROFILING
   tso->prof.CCCS = CCS_MAIN;
 #endif
@@ -2659,6 +2688,14 @@ unblockThread(StgTSO *tso)
   case NotBlocked:
     return;  /* not blocked */
 
+  case BlockedOnSTM:
+    // Be careful: nothing to do here!  We tell the scheduler that the thread
+    // is runnable and we leave it to the stack-walking code to abort the 
+    // transaction while unwinding the stack.  We should perhaps have a debugging
+    // test to make sure that this really happens and that the 'zombie' transaction
+    // does not get committed.
+    goto done;
+
   case BlockedOnMVar:
     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
     {
@@ -2725,7 +2762,7 @@ unblockThread(StgTSO *tso)
 
   case BlockedOnRead:
   case BlockedOnWrite:
-#if defined(mingw32_TARGET_OS)
+#if defined(mingw32_HOST_OS)
   case BlockedOnDoProc:
 #endif
     {
@@ -2792,6 +2829,14 @@ unblockThread(StgTSO *tso)
 
   switch (tso->why_blocked) {
 
+  case BlockedOnSTM:
+    // Be careful: nothing to do here!  We tell the scheduler that the thread
+    // is runnable and we leave it to the stack-walking code to abort the 
+    // transaction while unwinding the stack.  We should perhaps have a debugging
+    // test to make sure that this really happens and that the 'zombie' transaction
+    // does not get committed.
+    goto done;
+
   case BlockedOnMVar:
     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
     {
@@ -2855,7 +2900,7 @@ unblockThread(StgTSO *tso)
 
   case BlockedOnRead:
   case BlockedOnWrite:
-#if defined(mingw32_TARGET_OS)
+#if defined(mingw32_HOST_OS)
   case BlockedOnDoProc:
 #endif
     {
@@ -2946,7 +2991,10 @@ unblockThread(StgTSO *tso)
 void 
 deleteThread(StgTSO *tso)
 {
-  raiseAsync(tso,NULL);
+  if (tso->why_blocked != BlockedOnCCall &&
+      tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
+      raiseAsync(tso,NULL);
+  }
 }
 
 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
@@ -2981,6 +3029,12 @@ raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
 void
 raiseAsync(StgTSO *tso, StgClosure *exception)
 {
+    raiseAsync_(tso, exception, rtsFalse);
+}
+
+static void
+raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
+{
     StgRetInfoTable *info;
     StgPtr sp;
   
@@ -3024,6 +3078,10 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
        // top of the stack applied to the exception.
        // 
        // 5. If it's a STOP_FRAME, then kill the thread.
+        // 
+        // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
+        // transaction
+       
        
        StgPtr frame;
        
@@ -3032,13 +3090,45 @@ raiseAsync(StgTSO *tso, StgClosure *exception)
        
        while (info->i.type != UPDATE_FRAME
               && (info->i.type != CATCH_FRAME || exception == NULL)
-              && info->i.type != STOP_FRAME) {
+              && info->i.type != STOP_FRAME
+              && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
+       {
+            if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
+              // IF we find an ATOMICALLY_FRAME then we abort the
+              // current transaction and propagate the exception.  In
+              // this case (unlike ordinary exceptions) we do not care
+              // whether the transaction is valid or not because its
+              // possible validity cannot have caused the exception
+              // and will not be visible after the abort.
+              IF_DEBUG(stm,
+                       debugBelch("Found atomically block delivering async exception\n"));
+              stmAbortTransaction(tso -> trec);
+              tso -> trec = stmGetEnclosingTRec(tso -> trec);
+            }
            frame += stack_frame_sizeW((StgClosure *)frame);
            info = get_ret_itbl((StgClosure *)frame);
        }
        
        switch (info->i.type) {
            
+       case ATOMICALLY_FRAME:
+           ASSERT(stop_at_atomically);
+           ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+           stmCondemnTransaction(tso -> trec);
+#ifdef REG_R1
+           tso->sp = frame;
+#else
+           // R1 is not a register: the return convention for IO in
+           // this case puts the return value on the stack, so we
+           // need to set up the stack to return to the atomically
+           // frame properly...
+           tso->sp = frame - 2;
+           tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
+           tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
+#endif
+           tso->what_next = ThreadRunGHC;
+           return;
+
        case CATCH_FRAME:
            // If we find a CATCH_FRAME, and we've got an exception to raise,
            // then build the THUNK raise(exception), and leave it on
@@ -3202,15 +3292,26 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
            UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
            p = next;
            continue;
+
+        case ATOMICALLY_FRAME:
+            IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
+            tso->sp = p;
+            return ATOMICALLY_FRAME;
            
        case CATCH_FRAME:
            tso->sp = p;
            return CATCH_FRAME;
+
+        case CATCH_STM_FRAME:
+            IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
+            tso->sp = p;
+            return CATCH_STM_FRAME;
            
        case STOP_FRAME:
            tso->sp = p;
            return STOP_FRAME;
 
+        case CATCH_RETRY_FRAME:
        default:
            p = next; 
            continue;
@@ -3218,6 +3319,55 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
     }
 }
 
+
+/* -----------------------------------------------------------------------------
+   findRetryFrameHelper
+
+   This function is called by the retry# primitive.  It traverses the stack
+   leaving tso->sp referring to the frame which should handle the retry.  
+
+   This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
+   or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
+
+   We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
+   despite the similar implementation.
+
+   We should not expect to see CATCH_FRAME or STOP_FRAME because those should
+   not be created within memory transactions.
+   -------------------------------------------------------------------------- */
+
+StgWord
+findRetryFrameHelper (StgTSO *tso)
+{
+  StgPtr           p, next;
+  StgRetInfoTable *info;
+
+  p = tso -> sp;
+  while (1) {
+    info = get_ret_itbl((StgClosure *)p);
+    next = p + stack_frame_sizeW((StgClosure *)p);
+    switch (info->i.type) {
+      
+    case ATOMICALLY_FRAME:
+      IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
+      tso->sp = p;
+      return ATOMICALLY_FRAME;
+      
+    case CATCH_RETRY_FRAME:
+      IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
+      tso->sp = p;
+      return CATCH_RETRY_FRAME;
+      
+    case CATCH_STM_FRAME:
+    default:
+      ASSERT(info->i.type != CATCH_FRAME);
+      ASSERT(info->i.type != STOP_FRAME);
+      p = next; 
+      continue;
+    }
+  }
+}
+
 /* -----------------------------------------------------------------------------
    resurrectThreads is called after garbage collection on the list of
    threads found to be garbage.  Each of these threads will be woken
@@ -3248,6 +3398,9 @@ resurrectThreads( StgTSO *threads )
     case BlockedOnBlackHole:
       raiseAsync(tso,(StgClosure *)NonTermination_closure);
       break;
+    case BlockedOnSTM:
+      raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
+      break;
     case NotBlocked:
       /* This might happen if the thread was blocked on a black hole
        * belonging to a thread that we've just woken up (raiseAsync
@@ -3277,7 +3430,7 @@ printThreadBlockage(StgTSO *tso)
   case BlockedOnWrite:
     debugBelch("is blocked on write to fd %d", tso->block_info.fd);
     break;
-#if defined(mingw32_TARGET_OS)
+#if defined(mingw32_HOST_OS)
     case BlockedOnDoProc:
     debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
     break;
@@ -3314,6 +3467,9 @@ printThreadBlockage(StgTSO *tso)
   case BlockedOnCCall_NoUnblockExc:
     debugBelch("is blocked on an external call (exceptions were already blocked)");
     break;
+  case BlockedOnSTM:
+    debugBelch("is blocked on an STM operation");
+    break;
   default:
     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
         tso->why_blocked, tso->id, tso);