New implementation of BLACKHOLEs
[ghc-hetmet.git] / rts / PrimOps.cmm
index c146454..5c575f6 100644 (file)
@@ -35,6 +35,9 @@ import base_ControlziExceptionziBase_nestedAtomically_closure;
 import EnterCriticalSection;
 import LeaveCriticalSection;
 import ghczmprim_GHCziBool_False_closure;
+#if !defined(mingw32_HOST_OS)
+import sm_mutex;
+#endif
 
 /*-----------------------------------------------------------------------------
   Array Primitives
@@ -541,9 +544,9 @@ stg_forkzh
                                closure "ptr") [];
 
   /* start blocked if the current thread is blocked */
-  StgTSO_flags(threadid) = 
-     StgTSO_flags(threadid) |  (StgTSO_flags(CurrentTSO) & 
-                                (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32));
+  StgTSO_flags(threadid) = %lobits16(
+     TO_W_(StgTSO_flags(threadid)) | 
+     TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE));
 
   foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") [];
 
@@ -571,9 +574,9 @@ stg_forkOnzh
                                closure "ptr") [];
 
   /* start blocked if the current thread is blocked */
-  StgTSO_flags(threadid) = 
-     StgTSO_flags(threadid) |  (StgTSO_flags(CurrentTSO) & 
-                                (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32));
+  StgTSO_flags(threadid) = %lobits16(
+     TO_W_(StgTSO_flags(threadid)) | 
+     TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE));
 
   foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") [];
 
@@ -1201,11 +1204,7 @@ stg_takeMVarzh
          StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
       }
 
-#if defined(THREADED_RTS)
       unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-      SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
       RET_P(val);
   } 
   else
@@ -1213,11 +1212,7 @@ stg_takeMVarzh
       /* No further putMVars, MVar is now empty */
       StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
  
-#if defined(THREADED_RTS)
       unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-      SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
 
       RET_P(val);
   }
@@ -1276,21 +1271,13 @@ stg_tryTakeMVarzh
        if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
            StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
        }
-#if defined(THREADED_RTS)
         unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-        SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
     }
     else 
     {
        /* No further putMVars, MVar is now empty */
        StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
-#if defined(THREADED_RTS)
        unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-       SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
     }
     
     RET_NP(1, val);
@@ -1357,11 +1344,7 @@ stg_putMVarzh
            StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
        }
 
-#if defined(THREADED_RTS)
        unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-        SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
        jump %ENTRY_CODE(Sp(0));
     }
     else
@@ -1369,11 +1352,7 @@ stg_putMVarzh
        /* No further takes, the MVar is now full. */
        StgMVar_value(mvar) = val;
 
-#if defined(THREADED_RTS)
        unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-       SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
        jump %ENTRY_CODE(Sp(0));
     }
     
@@ -1426,22 +1405,14 @@ stg_tryPutMVarzh
            StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
        }
 
-#if defined(THREADED_RTS)
        unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-        SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
     }
     else
     {
        /* No further takes, the MVar is now full. */
        StgMVar_value(mvar) = R2;
 
-#if defined(THREADED_RTS)
        unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-       SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
     }
     
     RET_N(1);
@@ -1844,12 +1815,71 @@ stg_asyncDoProczh
 }
 #endif
 
-// noDuplicate# tries to ensure that none of the thunks under
-// evaluation by the current thread are also under evaluation by
-// another thread.  It relies on *both* threads doing noDuplicate#;
-// the second one will get blocked if they are duplicating some work.
+/* -----------------------------------------------------------------------------
+ * noDuplicate#
+ *
+ * noDuplicate# tries to ensure that none of the thunks under
+ * evaluation by the current thread are also under evaluation by
+ * another thread.  It relies on *both* threads doing noDuplicate#;
+ * the second one will get blocked if they are duplicating some work.
+ *
+ * The idea is that noDuplicate# is used within unsafePerformIO to
+ * ensure that the IO operation is performed at most once.
+ * noDuplicate# calls threadPaused which acquires an exclusive lock on
+ * all the thunks currently under evaluation by the current thread.
+ *
+ * Consider the following scenario.  There is a thunk A, whose
+ * evaluation requires evaluating thunk B, where thunk B is an
+ * unsafePerformIO.  Two threads, 1 and 2, bother enter A.  Thread 2
+ * is pre-empted before it enters B, and claims A by blackholing it
+ * (in threadPaused).  Thread 1 now enters B, and calls noDuplicate#.
+ *
+ *      thread 1                      thread 2
+ *   +-----------+                 +---------------+
+ *   |    -------+-----> A <-------+-------        |
+ *   |  update   |   BLACKHOLE     | marked_update |
+ *   +-----------+                 +---------------+
+ *   |           |                 |               | 
+ *        ...                             ...
+ *   |           |                 +---------------+
+ *   +-----------+
+ *   |     ------+-----> B
+ *   |  update   |   BLACKHOLE
+ *   +-----------+
+ *
+ * At this point: A is a blackhole, owned by thread 2.  noDuplicate#
+ * calls threadPaused, which walks up the stack and
+ *  - claims B on behalf of thread 1
+ *  - then it reaches the update frame for A, which it sees is already
+ *    a BLACKHOLE and is therefore owned by another thread.  Since
+ *    thread 1 is duplicating work, the computation up to the update
+ *    frame for A is suspended, including thunk B.
+ *  - thunk B, which is an unsafePerformIO, has now been reverted to
+ *    an AP_STACK which could be duplicated - BAD!
+ *  - The solution is as follows: before calling threadPaused, we
+ *    leave a frame on the stack (stg_noDuplicate_info) that will call
+ *    noDuplicate# again if the current computation is suspended and
+ *    restarted.
+ *
+ * See the test program in concurrent/prog003 for a way to demonstrate
+ * this.  It needs to be run with +RTS -N3 or greater, and the bug
+ * only manifests occasionally (once very 10 runs or so).
+ * -------------------------------------------------------------------------- */
+
+INFO_TABLE_RET(stg_noDuplicate, RET_SMALL)
+{
+    Sp_adj(1);
+    jump stg_noDuplicatezh;
+}
+
 stg_noDuplicatezh
 {
+    STK_CHK_GEN( WDS(1), NO_PTRS, stg_noDuplicatezh );
+    // leave noDuplicate frame in case the current
+    // computation is suspended and restarted (see above).
+    Sp_adj(-1);
+    Sp(0) = stg_noDuplicate_info;
+
     SAVE_THREAD_STATE();
     ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16);
     foreign "C" threadPaused (MyCapability() "ptr", CurrentTSO "ptr") [];
@@ -1859,10 +1889,18 @@ stg_noDuplicatezh
     } else {
         LOAD_THREAD_STATE();
         ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16);
+        // remove the stg_noDuplicate frame if it is still there.
+        if (Sp(0) == stg_noDuplicate_info) {
+            Sp_adj(1);
+        }
         jump %ENTRY_CODE(Sp(0));
     }
 }
 
+/* -----------------------------------------------------------------------------
+   Misc. primitives
+   -------------------------------------------------------------------------- */
+
 stg_getApStackValzh
 {
    W_ ap_stack, offset, val, ok;
@@ -1881,10 +1919,6 @@ stg_getApStackValzh
    RET_NP(ok,val);
 }
 
-/* -----------------------------------------------------------------------------
-   Misc. primitives
-   -------------------------------------------------------------------------- */
-
 // Write the cost center stack of the first argument on stderr; return
 // the second.  Possibly only makes sense for already evaluated
 // things?