Change the representation of the MVar blocked queue
[ghc-hetmet.git] / rts / RaiseAsync.c
index d5a4918..bebbcd4 100644 (file)
@@ -31,6 +31,8 @@ static void raiseAsync (Capability *cap,
 
 static void removeFromQueues(Capability *cap, StgTSO *tso);
 
+static void removeFromMVarBlockedQueue (StgTSO *tso);
+
 static void blockedThrowTo (Capability *cap, 
                             StgTSO *target, MessageThrowTo *msg);
 
@@ -38,8 +40,6 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS,
                             Capability *target_cap USED_IF_THREADS, 
                             MessageThrowTo *msg USED_IF_THREADS);
 
-static void performBlockedException (Capability *cap, MessageThrowTo *msg);
-
 /* -----------------------------------------------------------------------------
    throwToSingleThreaded
 
@@ -126,7 +126,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
    Currently we send a message if the target belongs to another
    Capability, and it is
 
-     - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo,
+     - NotBlocked, BlockedOnMsgThrowTo,
        BlockedOnCCall
 
      - or it is masking exceptions (TSO_BLOCKEX)
@@ -148,7 +148,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
                       unlockClosure(msg, &stg_MSG_THROWTO_info);
                       If it decides not to raise the exception after
                       all, it can revoke it safely with
-                      unlockClosure(msg, &stg_IND_info);
+                      unlockClosure(msg, &stg_MSG_NULL_info);
 
    -------------------------------------------------------------------------- */
 
@@ -223,67 +223,7 @@ check_target:
     
     switch (status) {
     case NotBlocked:
-    case BlockedOnMsgWakeup:
-       /* if status==NotBlocked, and target->cap == cap, then
-          we own this TSO and can raise the exception.
-          
-          How do we establish this condition?  Very carefully.
-
-          Let 
-              P = (status == NotBlocked)
-              Q = (tso->cap == cap)
-              
-          Now, if P & Q are true, then the TSO is locked and owned by
-          this capability.  No other OS thread can steal it.
-
-          If P==0 and Q==1: the TSO is blocked, but attached to this
-          capabilty, and it can be stolen by another capability.
-          
-          If P==1 and Q==0: the TSO is runnable on another
-          capability.  At any time, the TSO may change from runnable
-          to blocked and vice versa, while it remains owned by
-          another capability.
-
-          Suppose we test like this:
-
-             p = P
-             q = Q
-             if (p && q) ...
-
-           this is defeated by another capability stealing a blocked
-           TSO from us to wake it up (Schedule.c:unblockOne()).  The
-           other thread is doing
-
-             Q = 0
-             P = 1
-
-           assuming arbitrary reordering, we could see this
-           interleaving:
-
-             start: P==0 && Q==1 
-             P = 1
-             p = P
-             q = Q
-             Q = 0
-             if (p && q) ...
-              
-           so we need a memory barrier:
-
-             p = P
-             mb()
-             q = Q
-             if (p && q) ...
-
-           this avoids the problematic case.  There are other cases
-           to consider, but this is the tricky one.
-
-           Note that we must be sure that unblockOne() does the
-           writes in the correct order: Q before P.  The memory
-           barrier ensures that if we have seen the write to P, we
-           have also seen the write to Q.
-       */
     {
-       write_barrier();
         if ((target->flags & TSO_BLOCKEX) == 0) {
             // It's on our run queue and not blocking exceptions
             raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
@@ -326,8 +266,17 @@ check_target:
             }
         }
 
+        if (i == &stg_MSG_NULL_info) {
+            // we know there's a MSG_TRY_WAKEUP on the way, so we
+            // might as well just do it now.  The message will
+            // be a no-op when it arrives.
+            unlockClosure((StgClosure*)m, i);
+            tryWakeupThread(cap, target);
+            goto retry;
+        }
+
         if (i != &stg_MSG_THROWTO_info) {
-            // if it's an IND, this TSO has been woken up by another Cap
+            // if it's a MSG_NULL, this TSO has been woken up by another Cap
             unlockClosure((StgClosure*)m, i);
             goto retry;
         }
@@ -340,7 +289,7 @@ check_target:
         }
 
         // nobody else can wake up this TSO after we claim the message
-        unlockClosure((StgClosure*)m, &stg_IND_info);
+        unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
 
         raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
         return THROWTO_SUCCESS;
@@ -382,18 +331,26 @@ check_target:
            goto retry;
        }
 
+        if (target->_link == END_TSO_QUEUE) {
+            // the MVar operation has already completed.  There is a
+            // MSG_TRY_WAKEUP on the way, but we can just wake up the
+            // thread now anyway and ignore the message when it
+            // arrives.
+           unlockClosure((StgClosure *)mvar, info);
+            tryWakeupThread(cap, target);
+            goto retry;
+        }
+
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
             blockedThrowTo(cap,target,msg);
            unlockClosure((StgClosure *)mvar, info);
            return THROWTO_BLOCKED;
        } else {
-           removeThreadFromMVarQueue(cap, mvar, target);
+            // revoke the MVar operation
+            removeFromMVarBlockedQueue(target);
            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
-            if (info == &stg_MVAR_CLEAN_info) {
-                dirty_MVAR(&cap->r,(StgClosure*)mvar);
-            }
-           unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
+           unlockClosure((StgClosure *)mvar, info);
            return THROWTO_SUCCESS;
        }
     }
@@ -535,21 +492,21 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
         if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
         i = lockClosure((StgClosure*)msg);
         tso->blocked_exceptions = (MessageThrowTo*)msg->link;
-        if (i == &stg_IND_info) {
+        if (i == &stg_MSG_NULL_info) {
             unlockClosure((StgClosure*)msg,i);
             goto loop;
         }
 
-        performBlockedException(cap, msg);
-        unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
-        unlockClosure((StgClosure*)msg,&stg_IND_info);
+        throwToSingleThreaded(cap, msg->target, msg->exception);
+        unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info);
+        tryWakeupThread(cap, msg->source);
         return 1;
     }
     return 0;
 }
 
 // awakenBlockedExceptionQueue(): Just wake up the whole queue of
-// blocked exceptions and let them try again.
+// blocked exceptions.
 
 void
 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
@@ -560,31 +517,16 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
     for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
          msg = (MessageThrowTo*)msg->link) {
         i = lockClosure((StgClosure *)msg);
-        if (i != &stg_IND_info) {
-            unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+        if (i != &stg_MSG_NULL_info) {
+            unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info);
+            tryWakeupThread(cap, msg->source);
+        } else {
+            unlockClosure((StgClosure *)msg,i);
         }
-        unlockClosure((StgClosure *)msg,i);
     }
     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
 }    
 
-static void
-performBlockedException (Capability *cap, MessageThrowTo *msg)
-{
-    StgTSO *source;
-
-    source = msg->source;
-
-    ASSERT(source->why_blocked == BlockedOnMsgThrowTo);
-    ASSERT(source->block_info.closure == (StgClosure *)msg);
-    ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
-    ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id);
-    // check ids not pointers, because the thread might be relocated
-
-    throwToSingleThreaded(cap, msg->target, msg->exception);
-    source->sp += 3;
-}
-
 /* -----------------------------------------------------------------------------
    Remove a thread from blocking queues.
 
@@ -596,11 +538,54 @@ performBlockedException (Capability *cap, MessageThrowTo *msg)
    -------------------------------------------------------------------------- */
 
 static void
+removeFromMVarBlockedQueue (StgTSO *tso)
+{
+    StgMVar *mvar = (StgMVar*)tso->block_info.closure;
+    StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
+
+    if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
+        // already removed from this MVar
+        return;
+    }
+
+    // Assume the MVar is locked. (not assertable; sometimes it isn't
+    // actually WHITEHOLE'd).
+
+    // We want to remove the MVAR_TSO_QUEUE object from the queue.  It
+    // isn't doubly-linked so we can't actually remove it; instead we
+    // just overwrite it with an IND if possible and let the GC short
+    // it out.  However, we have to be careful to maintain the deque
+    // structure:
+
+    if (mvar->head == q) {
+        mvar->head = q->link;
+        q->header.info = &stg_IND_info;
+        if (mvar->tail == q) {
+            mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
+        }
+    }
+    else if (mvar->tail == q) {
+        // we can't replace it with an IND in this case, because then
+        // we lose the tail pointer when the GC shorts out the IND.
+        // So we use MSG_NULL as a kind of non-dupable indirection;
+        // these are ignored by takeMVar/putMVar.
+        q->header.info = &stg_MSG_NULL_info;
+    }
+    else {
+        q->header.info = &stg_IND_info;
+    }
+
+    // revoke the MVar operation
+    tso->_link = END_TSO_QUEUE;
+}
+
+static void
 removeFromQueues(Capability *cap, StgTSO *tso)
 {
   switch (tso->why_blocked) {
 
   case NotBlocked:
+  case ThreadMigrating:
       return;
 
   case BlockedOnSTM:
@@ -613,22 +598,13 @@ removeFromQueues(Capability *cap, StgTSO *tso)
     goto done;
 
   case BlockedOnMVar:
-      removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
-      // we aren't doing a write barrier here: the MVar is supposed to
-      // be already locked, so replacing the info pointer would unlock it.
+      removeFromMVarBlockedQueue(tso);
       goto done;
 
   case BlockedOnBlackHole:
       // nothing to do
       goto done;
 
-  case BlockedOnMsgWakeup:
-  {
-      // kill the message, atomically:
-      OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
-      break;
-  }
-
   case BlockedOnMsgThrowTo:
   {
       MessageThrowTo *m = tso->block_info.throwto;
@@ -638,7 +614,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
       // ASSERT(m->header.info == &stg_WHITEHOLE_info);
 
       // unlock and revoke it at the same time
-      unlockClosure((StgClosure*)m,&stg_IND_info);
+      unlockClosure((StgClosure*)m,&stg_MSG_NULL_info);
       break;
   }
 
@@ -667,7 +643,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
   }
 
  done:
-  unblockOne(cap, tso);
+  tso->why_blocked = NotBlocked;
+  appendToRunQueue(cap, tso);
 }
 
 /* -----------------------------------------------------------------------------
@@ -741,7 +718,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
     ASSERT(tso->cap == cap);
 
     // wake it up
-    if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
+    if (tso->why_blocked != NotBlocked) {
         tso->why_blocked = NotBlocked;
         appendToRunQueue(cap,tso);
     }