Interruptible FFI calls with pthread_kill and CancelSynchronousIO. v4
[ghc-hetmet.git] / rts / RaiseAsync.c
index 15e6f8f..b94ccea 100644 (file)
@@ -18,6 +18,7 @@
 #include "STM.h"
 #include "sm/Sanity.h"
 #include "Profiling.h"
 #include "STM.h"
 #include "sm/Sanity.h"
 #include "Profiling.h"
+#include "Messages.h"
 #if defined(mingw32_HOST_OS)
 #include "win32/IOManager.h"
 #endif
 #if defined(mingw32_HOST_OS)
 #include "win32/IOManager.h"
 #endif
@@ -30,10 +31,14 @@ static void raiseAsync (Capability *cap,
 
 static void removeFromQueues(Capability *cap, StgTSO *tso);
 
 
 static void removeFromQueues(Capability *cap, StgTSO *tso);
 
-static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target);
+static void removeFromMVarBlockedQueue (StgTSO *tso);
 
 
-static void performBlockedException (Capability *cap, 
-                                    StgTSO *source, StgTSO *target);
+static void blockedThrowTo (Capability *cap, 
+                            StgTSO *target, MessageThrowTo *msg);
+
+static void throwToSendMsg (Capability *cap USED_IF_THREADS,
+                            Capability *target_cap USED_IF_THREADS, 
+                            MessageThrowTo *msg USED_IF_THREADS);
 
 /* -----------------------------------------------------------------------------
    throwToSingleThreaded
 
 /* -----------------------------------------------------------------------------
    throwToSingleThreaded
@@ -62,6 +67,8 @@ void
 throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, 
                       rtsBool stop_at_atomically)
 {
 throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, 
                       rtsBool stop_at_atomically)
 {
+    tso = deRefTSO(tso);
+
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
@@ -76,6 +83,8 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
 void
 suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
 {
 void
 suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
 {
+    tso = deRefTSO(tso);
+
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
@@ -96,192 +105,194 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
    may be blocked and could be woken up at any point by another CPU.
    We have some delicate synchronisation to do.
 
    may be blocked and could be woken up at any point by another CPU.
    We have some delicate synchronisation to do.
 
-   There is a completely safe fallback scheme: it is always possible
-   to just block the source TSO on the target TSO's blocked_exceptions
-   queue.  This queue is locked using lockTSO()/unlockTSO().  It is
-   checked at regular intervals: before and after running a thread
-   (schedule() and threadPaused() respectively), and just before GC
-   (scheduleDoGC()).  Activating a thread on this queue should be done
-   using maybePerformBlockedException(): this is done in the context
-   of the target thread, so the exception can be raised eagerly.
-
-   This fallback scheme works even if the target thread is complete or
-   killed: scheduleDoGC() will discover the blocked thread before the
-   target is GC'd.
-
-   Blocking the source thread on the target thread's blocked_exception
-   queue is also employed when the target thread is currently blocking
-   exceptions (ie. inside Control.Exception.block).
-
-   We could use the safe fallback scheme exclusively, but that
-   wouldn't be ideal: most calls to throwTo would block immediately,
-   possibly until the next GC, which might require the deadlock
-   detection mechanism to kick in.  So we try to provide promptness
-   wherever possible.
-
-   We can promptly deliver the exception if the target thread is:
-
-     - runnable, on the same Capability as the source thread (because
-       we own the run queue and therefore the target thread).
-   
-     - blocked, and we can obtain exclusive access to it.  Obtaining
-       exclusive access to the thread depends on how it is blocked.
-
-   We must also be careful to not trip over threadStackOverflow(),
-   which might be moving the TSO to enlarge its stack.
-   lockTSO()/unlockTSO() are used here too.
-
+   The underlying scheme when multiple Capabilities are in use is
+   message passing: when the target of a throwTo is on another
+   Capability, we send a message (a MessageThrowTo closure) to that
+   Capability.
+
+   If the throwTo needs to block because the target TSO is masking
+   exceptions (the TSO_BLOCKEX flag), then the message is placed on
+   the blocked_exceptions queue attached to the target TSO.  When the
+   target TSO enters the unmasked state again, it must check the
+   queue.  The blocked_exceptions queue is not locked; only the
+   Capability owning the TSO may modify it.
+
+   To make things simpler for throwTo, we always create the message
+   first before deciding what to do.  The message may get sent, or it
+   may get attached to a TSO's blocked_exceptions queue, or the
+   exception may get thrown immediately and the message dropped,
+   depending on the current state of the target.
+
+   Currently we send a message if the target belongs to another
+   Capability, and it is
+
+     - NotBlocked, BlockedOnMsgThrowTo,
+       BlockedOnCCall_Interruptible
+
+     - or it is masking exceptions (TSO_BLOCKEX)
+
+   Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
+   BlockedOnBlackHole then we acquire ownership of the TSO by locking
+   its parent container (e.g. the MVar) and then raise the exception.
+   We might change these cases to be more message-passing-like in the
+   future.
+  
    Returns: 
 
    Returns: 
 
-   THROWTO_SUCCESS    exception was raised, ok to continue
+   NULL               exception was raised, ok to continue
 
 
-   THROWTO_BLOCKED    exception was not raised; block the source
-                      thread then call throwToReleaseTarget() when
-                     the source thread is properly tidied away.
+   MessageThrowTo *   exception was not raised; the source TSO
+                      should now put itself in the state 
+                      BlockedOnMsgThrowTo, and when it is ready
+                      it should unlock the mssage using
+                      unlockClosure(msg, &stg_MSG_THROWTO_info);
+                      If it decides not to raise the exception after
+                      all, it can revoke it safely with
+                      unlockClosure(msg, &stg_MSG_NULL_info);
 
    -------------------------------------------------------------------------- */
 
 
    -------------------------------------------------------------------------- */
 
-nat
+MessageThrowTo *
 throwTo (Capability *cap,      // the Capability we hold 
         StgTSO *source,        // the TSO sending the exception (or NULL)
         StgTSO *target,        // the TSO receiving the exception
 throwTo (Capability *cap,      // the Capability we hold 
         StgTSO *source,        // the TSO sending the exception (or NULL)
         StgTSO *target,        // the TSO receiving the exception
-        StgClosure *exception, // the exception closure
-        /*[out]*/ void **out USED_IF_THREADS)
+        StgClosure *exception) // the exception closure
 {
 {
-    StgWord status;
-
-    ASSERT(target != END_TSO_QUEUE);
+    MessageThrowTo *msg;
 
 
-    // follow ThreadRelocated links in the target first
-    while (target->what_next == ThreadRelocated) {
-       target = target->_link;
-       // No, it might be a WHITEHOLE:
-       // ASSERT(get_itbl(target)->type == TSO);
-    }
+    msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
+    // message starts locked; the caller has to unlock it when it is
+    // ready.
+    SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
+    msg->source      = source;
+    msg->target      = target;
+    msg->exception   = exception;
 
 
-    if (source != NULL) {
-        debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu",
-                   (unsigned long)source->id, (unsigned long)target->id);
-    } else {
-        debugTrace(DEBUG_sched, "throwTo: from RTS to thread %lu",
-                   (unsigned long)target->id);
+    switch (throwToMsg(cap, msg))
+    {
+    case THROWTO_SUCCESS:
+        return NULL;
+    case THROWTO_BLOCKED:
+    default:
+        return msg;
     }
     }
+}
+    
 
 
-#ifdef DEBUG
-    traceThreadStatus(DEBUG_sched, target);
-#endif
+nat
+throwToMsg (Capability *cap, MessageThrowTo *msg)
+{
+    StgWord status;
+    StgTSO *target = msg->target;
+    Capability *target_cap;
 
     goto check_target;
 
     goto check_target;
+
 retry:
 retry:
+    write_barrier();
     debugTrace(DEBUG_sched, "throwTo: retrying...");
 
 check_target:
     ASSERT(target != END_TSO_QUEUE);
 
     debugTrace(DEBUG_sched, "throwTo: retrying...");
 
 check_target:
     ASSERT(target != END_TSO_QUEUE);
 
+    // follow ThreadRelocated links in the target first
+    target = deRefTSO(target);
+
     // Thread already dead?
     if (target->what_next == ThreadComplete 
        || target->what_next == ThreadKilled) {
        return THROWTO_SUCCESS;
     }
 
     // Thread already dead?
     if (target->what_next == ThreadComplete 
        || target->what_next == ThreadKilled) {
        return THROWTO_SUCCESS;
     }
 
+    debugTraceCap(DEBUG_sched, cap,
+                  "throwTo: from thread %lu to thread %lu",
+                  (unsigned long)msg->source->id, 
+                  (unsigned long)msg->target->id);
+
+#ifdef DEBUG
+    traceThreadStatus(DEBUG_sched, target);
+#endif
+
+    target_cap = target->cap;
+    if (target->cap != cap) {
+        throwToSendMsg(cap, target_cap, msg);
+        return THROWTO_BLOCKED;
+    }
+
     status = target->why_blocked;
     
     switch (status) {
     case NotBlocked:
     status = target->why_blocked;
     
     switch (status) {
     case NotBlocked:
-       /* if status==NotBlocked, and target->cap == cap, then
-          we own this TSO and can raise the exception.
-          
-          How do we establish this condition?  Very carefully.
-
-          Let 
-              P = (status == NotBlocked)
-              Q = (tso->cap == cap)
-              
-          Now, if P & Q are true, then the TSO is locked and owned by
-          this capability.  No other OS thread can steal it.
-
-          If P==0 and Q==1: the TSO is blocked, but attached to this
-          capabilty, and it can be stolen by another capability.
-          
-          If P==1 and Q==0: the TSO is runnable on another
-          capability.  At any time, the TSO may change from runnable
-          to blocked and vice versa, while it remains owned by
-          another capability.
-
-          Suppose we test like this:
-
-             p = P
-             q = Q
-             if (p && q) ...
-
-           this is defeated by another capability stealing a blocked
-           TSO from us to wake it up (Schedule.c:unblockOne()).  The
-           other thread is doing
-
-             Q = 0
-             P = 1
-
-           assuming arbitrary reordering, we could see this
-           interleaving:
-
-             start: P==0 && Q==1 
-             P = 1
-             p = P
-             q = Q
-             Q = 0
-             if (p && q) ...
-              
-           so we need a memory barrier:
-
-             p = P
-             mb()
-             q = Q
-             if (p && q) ...
-
-           this avoids the problematic case.  There are other cases
-           to consider, but this is the tricky one.
-
-           Note that we must be sure that unblockOne() does the
-           writes in the correct order: Q before P.  The memory
-           barrier ensures that if we have seen the write to P, we
-           have also seen the write to Q.
-       */
     {
     {
-       Capability *target_cap;
+        if ((target->flags & TSO_BLOCKEX) == 0) {
+            // It's on our run queue and not blocking exceptions
+            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+            return THROWTO_SUCCESS;
+        } else {
+            blockedThrowTo(cap,target,msg);
+            return THROWTO_BLOCKED;
+        }
+    }
 
 
-       write_barrier();
-       target_cap = target->cap;
-       if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) {
-           // It's on our run queue and not blocking exceptions
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           return THROWTO_SUCCESS;
-       } else {
-           // Otherwise, just block on the blocked_exceptions queue
-           // of the target thread.  The queue will get looked at
-           // soon enough: it is checked before and after running a
-           // thread, and during GC.
-           lockTSO(target);
-
-           // Avoid race with threadStackOverflow, which may have
-           // just moved this TSO.
-           if (target->what_next == ThreadRelocated) {
-               unlockTSO(target);
-               target = target->_link;
-               goto retry;
-           }
-            // check again for ThreadComplete and ThreadKilled.  This
-            // cooperates with scheduleHandleThreadFinished to ensure
-            // that we never miss any threads that are throwing an
-            // exception to a thread in the process of terminating.
-            if (target->what_next == ThreadComplete
-                || target->what_next == ThreadKilled) {
-               unlockTSO(target);
-                return THROWTO_SUCCESS;
+    case BlockedOnMsgThrowTo:
+    {
+        const StgInfoTable *i;
+        MessageThrowTo *m;
+
+        m = target->block_info.throwto;
+
+        // target is local to this cap, but has sent a throwto
+        // message to another cap.
+        //
+        // The source message is locked.  We need to revoke the
+        // target's message so that we can raise the exception, so
+        // we attempt to lock it.
+
+        // There's a possibility of a deadlock if two threads are both
+        // trying to throwTo each other (or more generally, a cycle of
+        // threads).  To break the symmetry we compare the addresses
+        // of the MessageThrowTo objects, and the one for which m <
+        // msg gets to spin, while the other can only try to lock
+        // once, but must then back off and unlock both before trying
+        // again.
+        if (m < msg) {
+            i = lockClosure((StgClosure *)m);
+        } else {
+            i = tryLockClosure((StgClosure *)m);
+            if (i == NULL) {
+//            debugBelch("collision\n");
+                throwToSendMsg(cap, target->cap, msg);
+                return THROWTO_BLOCKED;
             }
             }
-           blockedThrowTo(cap,source,target);
-           *out = target;
-           return THROWTO_BLOCKED;
-       }
+        }
+
+        if (i == &stg_MSG_NULL_info) {
+            // we know there's a MSG_TRY_WAKEUP on the way, so we
+            // might as well just do it now.  The message will
+            // be a no-op when it arrives.
+            unlockClosure((StgClosure*)m, i);
+            tryWakeupThread_(cap, target);
+            goto retry;
+        }
+
+        if (i != &stg_MSG_THROWTO_info) {
+            // if it's a MSG_NULL, this TSO has been woken up by another Cap
+            unlockClosure((StgClosure*)m, i);
+            goto retry;
+        }
+
+       if ((target->flags & TSO_BLOCKEX) &&
+           ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+            unlockClosure((StgClosure*)m, i);
+            blockedThrowTo(cap,target,msg);
+            return THROWTO_BLOCKED;
+        }
+
+        // nobody else can wake up this TSO after we claim the message
+        unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
+
+        raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+        return THROWTO_SUCCESS;
     }
 
     case BlockedOnMVar:
     }
 
     case BlockedOnMVar:
@@ -320,17 +331,25 @@ check_target:
            goto retry;
        }
 
            goto retry;
        }
 
+        if (target->_link == END_TSO_QUEUE) {
+            // the MVar operation has already completed.  There is a
+            // MSG_TRY_WAKEUP on the way, but we can just wake up the
+            // thread now anyway and ignore the message when it
+            // arrives.
+           unlockClosure((StgClosure *)mvar, info);
+            tryWakeupThread_(cap, target);
+            goto retry;
+        }
+
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           lockClosure((StgClosure *)target);
-           blockedThrowTo(cap,source,target);
+            blockedThrowTo(cap,target,msg);
            unlockClosure((StgClosure *)mvar, info);
            unlockClosure((StgClosure *)mvar, info);
-           *out = target;
-           return THROWTO_BLOCKED; // caller releases TSO
+           return THROWTO_BLOCKED;
        } else {
        } else {
-           removeThreadFromMVarQueue(cap, mvar, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           unblockOne(cap, target);
+            // revoke the MVar operation
+            removeFromMVarBlockedQueue(target);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            unlockClosure((StgClosure *)mvar, info);
            return THROWTO_SUCCESS;
        }
            unlockClosure((StgClosure *)mvar, info);
            return THROWTO_SUCCESS;
        }
@@ -338,91 +357,21 @@ check_target:
 
     case BlockedOnBlackHole:
     {
 
     case BlockedOnBlackHole:
     {
-       ACQUIRE_LOCK(&sched_mutex);
-       // double checking the status after the memory barrier:
-       if (target->why_blocked != BlockedOnBlackHole) {
-           RELEASE_LOCK(&sched_mutex);
-           goto retry;
-       }
-
        if (target->flags & TSO_BLOCKEX) {
        if (target->flags & TSO_BLOCKEX) {
-           lockTSO(target);
-           blockedThrowTo(cap,source,target);
-           RELEASE_LOCK(&sched_mutex);
-           *out = target;
-           return THROWTO_BLOCKED; // caller releases TSO
-       } else {
-           removeThreadFromQueue(cap, &blackhole_queue, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           unblockOne(cap, target);
-           RELEASE_LOCK(&sched_mutex);
-           return THROWTO_SUCCESS;
-       }
-    }
-
-    case BlockedOnException:
-    {
-       StgTSO *target2;
-       StgInfoTable *info;
-
-       /*
-         To obtain exclusive access to a BlockedOnException thread,
-         we must call lockClosure() on the TSO on which it is blocked.
-         Since the TSO might change underneath our feet, after we
-         call lockClosure() we must check that 
-          
-             (a) the closure we locked is actually a TSO
-            (b) the original thread is still  BlockedOnException,
-            (c) the original thread is still blocked on the TSO we locked
-            and (d) the target thread has not been relocated.
-
-         We synchronise with threadStackOverflow() (which relocates
-         threads) using lockClosure()/unlockClosure().
-       */
-       target2 = target->block_info.tso;
-
-       info = lockClosure((StgClosure *)target2);
-       if (info != &stg_TSO_info) {
-           unlockClosure((StgClosure *)target2, info);
-           goto retry;
-       }
-       if (target->what_next == ThreadRelocated) {
-           target = target->_link;
-           unlockTSO(target2);
-           goto retry;
-       }
-       if (target2->what_next == ThreadRelocated) {
-           target->block_info.tso = target2->_link;
-           unlockTSO(target2);
-           goto retry;
-       }
-       if (target->why_blocked != BlockedOnException
-           || target->block_info.tso != target2) {
-           unlockTSO(target2);
-           goto retry;
-       }
-       
-       /* 
-          Now we have exclusive rights to the target TSO...
-
-          If it is blocking exceptions, add the source TSO to its
-          blocked_exceptions queue.  Otherwise, raise the exception.
-       */
-       if ((target->flags & TSO_BLOCKEX) &&
-           ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           lockTSO(target);
-           blockedThrowTo(cap,source,target);
-           unlockTSO(target2);
-           *out = target;
+            // BlockedOnBlackHole is not interruptible.
+            blockedThrowTo(cap,target,msg);
            return THROWTO_BLOCKED;
        } else {
            return THROWTO_BLOCKED;
        } else {
-           removeThreadFromQueue(cap, &target2->blocked_exceptions, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           unblockOne(cap, target);
-           unlockTSO(target2);
-           return THROWTO_SUCCESS;
-       }
-    }  
+            // Revoke the message by replacing it with IND. We're not
+            // locking anything here, so we might still get a TRY_WAKEUP
+            // message from the owner of the blackhole some time in the
+            // future, but that doesn't matter.
+            ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+            OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+            return THROWTO_SUCCESS;
+        }
+    }
 
     case BlockedOnSTM:
        lockTSO(target);
 
     case BlockedOnSTM:
        lockTSO(target);
@@ -434,30 +383,39 @@ check_target:
        }
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
        }
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           blockedThrowTo(cap,source,target);
-           *out = target;
+            blockedThrowTo(cap,target,msg);
+           unlockTSO(target);
            return THROWTO_BLOCKED;
        } else {
            return THROWTO_BLOCKED;
        } else {
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           unblockOne(cap, target);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            unlockTSO(target);
            return THROWTO_SUCCESS;
        }
 
            unlockTSO(target);
            return THROWTO_SUCCESS;
        }
 
+    case BlockedOnCCall_Interruptible:
+#ifdef THREADED_RTS
+    {
+        Task *task = NULL;
+        // walk suspended_ccalls to find the correct worker thread
+        InCall *incall;
+        for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
+            if (incall->suspended_tso == target) {
+                task = incall->task;
+                break;
+            }
+        }
+        if (task != NULL) {
+            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+            interruptWorkerTask(task);
+            return THROWTO_SUCCESS;
+        } else {
+            debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill");
+        }
+        // fall to next
+    }
+#endif
     case BlockedOnCCall:
     case BlockedOnCCall:
-    case BlockedOnCCall_NoUnblockExc:
-       // I don't think it's possible to acquire ownership of a
-       // BlockedOnCCall thread.  We just assume that the target
-       // thread is blocking exceptions, and block on its
-       // blocked_exception queue.
-       lockTSO(target);
-       if (target->why_blocked != BlockedOnCCall &&
-           target->why_blocked != BlockedOnCCall_NoUnblockExc) {
-           unlockTSO(target);
-            goto retry;
-       }
-       blockedThrowTo(cap,source,target);
-       *out = target;
+       blockedThrowTo(cap,target,msg);
        return THROWTO_BLOCKED;
 
 #ifndef THREADEDED_RTS
        return THROWTO_BLOCKED;
 
 #ifndef THREADEDED_RTS
@@ -469,11 +427,11 @@ check_target:
 #endif
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
 #endif
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           blockedThrowTo(cap,source,target);
+           blockedThrowTo(cap,target,msg);
            return THROWTO_BLOCKED;
        } else {
            removeFromQueues(cap,target);
            return THROWTO_BLOCKED;
        } else {
            removeFromQueues(cap,target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            return THROWTO_SUCCESS;
        }
 #endif
            return THROWTO_SUCCESS;
        }
 #endif
@@ -484,33 +442,34 @@ check_target:
     barf("throwTo");
 }
 
     barf("throwTo");
 }
 
-// Block a TSO on another TSO's blocked_exceptions queue.
-// Precondition: we hold an exclusive lock on the target TSO (this is
-// complex to achieve as there's no single lock on a TSO; see
-// throwTo()).
 static void
 static void
-blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target)
+throwToSendMsg (Capability *cap STG_UNUSED,
+                Capability *target_cap USED_IF_THREADS, 
+                MessageThrowTo *msg USED_IF_THREADS)
+            
 {
 {
-    if (source != NULL) {
-        debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id);
-        setTSOLink(cap, source, target->blocked_exceptions);
-        target->blocked_exceptions = source;
-        dirty_TSO(cap,target); // we modified the blocked_exceptions queue
-        
-        source->block_info.tso = target;
-        write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is.
-        source->why_blocked = BlockedOnException;
-    }
-}
+#ifdef THREADED_RTS
+    debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
 
 
+    sendMessage(cap, target_cap, (Message*)msg);
+#endif
+}
 
 
-#ifdef THREADED_RTS
-void
-throwToReleaseTarget (void *tso)
+// Block a throwTo message on the target TSO's blocked_exceptions
+// queue.  The current Capability must own the target TSO in order to
+// modify the blocked_exceptions queue.
+static void
+blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
 {
 {
-    unlockTSO((StgTSO *)tso);
+    debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
+                  (unsigned long)target->id);
+
+    ASSERT(target->cap == cap);
+
+    msg->link = target->blocked_exceptions;
+    target->blocked_exceptions = msg;
+    dirty_TSO(cap,target); // we modified the blocked_exceptions queue
 }
 }
-#endif
 
 /* -----------------------------------------------------------------------------
    Waking up threads blocked in throwTo
 
 /* -----------------------------------------------------------------------------
    Waking up threads blocked in throwTo
@@ -532,10 +491,11 @@ throwToReleaseTarget (void *tso)
 int
 maybePerformBlockedException (Capability *cap, StgTSO *tso)
 {
 int
 maybePerformBlockedException (Capability *cap, StgTSO *tso)
 {
-    StgTSO *source;
+    MessageThrowTo *msg;
+    const StgInfoTable *i;
     
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
     
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
-        if (tso->blocked_exceptions != END_TSO_QUEUE) {
+        if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
             awakenBlockedExceptionQueue(cap,tso);
             return 1;
         } else {
             awakenBlockedExceptionQueue(cap,tso);
             return 1;
         } else {
@@ -543,65 +503,57 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
         }
     }
 
         }
     }
 
-    if (tso->blocked_exceptions != END_TSO_QUEUE && 
+    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && 
         (tso->flags & TSO_BLOCKEX) != 0) {
         (tso->flags & TSO_BLOCKEX) != 0) {
-        debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+        debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
     }
 
     }
 
-    if (tso->blocked_exceptions != END_TSO_QUEUE
+    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
        && ((tso->flags & TSO_BLOCKEX) == 0
            || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
 
        && ((tso->flags & TSO_BLOCKEX) == 0
            || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
 
-       // Lock the TSO, this gives us exclusive access to the queue
-       lockTSO(tso);
-
-       // Check the queue again; it might have changed before we
-       // locked it.
-       if (tso->blocked_exceptions == END_TSO_QUEUE) {
-           unlockTSO(tso);
-           return 0;
-       }
-
        // We unblock just the first thread on the queue, and perform
        // its throw immediately.
        // We unblock just the first thread on the queue, and perform
        // its throw immediately.
-       source = tso->blocked_exceptions;
-       performBlockedException(cap, source, tso);
-       tso->blocked_exceptions = unblockOne_(cap, source, 
-                                             rtsFalse/*no migrate*/);
-       unlockTSO(tso);
+    loop:
+        msg = tso->blocked_exceptions;
+        if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
+        i = lockClosure((StgClosure*)msg);
+        tso->blocked_exceptions = (MessageThrowTo*)msg->link;
+        if (i == &stg_MSG_NULL_info) {
+            unlockClosure((StgClosure*)msg,i);
+            goto loop;
+        }
+
+        throwToSingleThreaded(cap, msg->target, msg->exception);
+        unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info);
+        tryWakeupThread(cap, msg->source);
         return 1;
     }
     return 0;
 }
 
 // awakenBlockedExceptionQueue(): Just wake up the whole queue of
         return 1;
     }
     return 0;
 }
 
 // awakenBlockedExceptionQueue(): Just wake up the whole queue of
-// blocked exceptions and let them try again.
+// blocked exceptions.
 
 void
 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
 {
 
 void
 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
 {
-    lockTSO(tso);
-    awakenBlockedQueue(cap, tso->blocked_exceptions);
-    tso->blocked_exceptions = END_TSO_QUEUE;
-    unlockTSO(tso);
+    MessageThrowTo *msg;
+    const StgInfoTable *i;
+
+    for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
+         msg = (MessageThrowTo*)msg->link) {
+        i = lockClosure((StgClosure *)msg);
+        if (i != &stg_MSG_NULL_info) {
+            unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info);
+            tryWakeupThread(cap, msg->source);
+        } else {
+            unlockClosure((StgClosure *)msg,i);
+        }
+    }
+    tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
 }    
 
 }    
 
-static void
-performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
-{
-    StgClosure *exception;
-
-    ASSERT(source->why_blocked == BlockedOnException);
-    ASSERT(source->block_info.tso->id == target->id);
-    ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
-    ASSERT(((StgTSO *)source->sp[1])->id == target->id);
-    // check ids not pointers, because the thread might be relocated
-
-    exception = (StgClosure *)source->sp[2];
-    throwToSingleThreaded(cap, target, exception);
-    source->sp += 3;
-}
-
 /* -----------------------------------------------------------------------------
    Remove a thread from blocking queues.
 
 /* -----------------------------------------------------------------------------
    Remove a thread from blocking queues.
 
@@ -613,11 +565,54 @@ performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
    -------------------------------------------------------------------------- */
 
 static void
    -------------------------------------------------------------------------- */
 
 static void
+removeFromMVarBlockedQueue (StgTSO *tso)
+{
+    StgMVar *mvar = (StgMVar*)tso->block_info.closure;
+    StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
+
+    if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
+        // already removed from this MVar
+        return;
+    }
+
+    // Assume the MVar is locked. (not assertable; sometimes it isn't
+    // actually WHITEHOLE'd).
+
+    // We want to remove the MVAR_TSO_QUEUE object from the queue.  It
+    // isn't doubly-linked so we can't actually remove it; instead we
+    // just overwrite it with an IND if possible and let the GC short
+    // it out.  However, we have to be careful to maintain the deque
+    // structure:
+
+    if (mvar->head == q) {
+        mvar->head = q->link;
+        q->header.info = &stg_IND_info;
+        if (mvar->tail == q) {
+            mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
+        }
+    }
+    else if (mvar->tail == q) {
+        // we can't replace it with an IND in this case, because then
+        // we lose the tail pointer when the GC shorts out the IND.
+        // So we use MSG_NULL as a kind of non-dupable indirection;
+        // these are ignored by takeMVar/putMVar.
+        q->header.info = &stg_MSG_NULL_info;
+    }
+    else {
+        q->header.info = &stg_IND_info;
+    }
+
+    // revoke the MVar operation
+    tso->_link = END_TSO_QUEUE;
+}
+
+static void
 removeFromQueues(Capability *cap, StgTSO *tso)
 {
   switch (tso->why_blocked) {
 
   case NotBlocked:
 removeFromQueues(Capability *cap, StgTSO *tso)
 {
   switch (tso->why_blocked) {
 
   case NotBlocked:
+  case ThreadMigrating:
       return;
 
   case BlockedOnSTM:
       return;
 
   case BlockedOnSTM:
@@ -630,29 +625,25 @@ removeFromQueues(Capability *cap, StgTSO *tso)
     goto done;
 
   case BlockedOnMVar:
     goto done;
 
   case BlockedOnMVar:
-      removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
+      removeFromMVarBlockedQueue(tso);
       goto done;
 
   case BlockedOnBlackHole:
       goto done;
 
   case BlockedOnBlackHole:
-      removeThreadFromQueue(cap, &blackhole_queue, tso);
+      // nothing to do
       goto done;
 
       goto done;
 
-  case BlockedOnException:
-    {
-      StgTSO *target  = tso->block_info.tso;
-
-      // NO: when called by threadPaused(), we probably have this
-      // TSO already locked (WHITEHOLEd) because we just placed
-      // ourselves on its queue.
-      // ASSERT(get_itbl(target)->type == TSO);
-
-      while (target->what_next == ThreadRelocated) {
-         target = target->_link;
-      }
-      
-      removeThreadFromQueue(cap, &target->blocked_exceptions, tso);
-      goto done;
-    }
+  case BlockedOnMsgThrowTo:
+  {
+      MessageThrowTo *m = tso->block_info.throwto;
+      // The message is locked by us, unless we got here via
+      // deleteAllThreads(), in which case we own all the
+      // capabilities.
+      // ASSERT(m->header.info == &stg_WHITEHOLE_info);
+
+      // unlock and revoke it at the same time
+      unlockClosure((StgClosure*)m,&stg_MSG_NULL_info);
+      break;
+  }
 
 #if !defined(THREADED_RTS)
   case BlockedOnRead:
 
 #if !defined(THREADED_RTS)
   case BlockedOnRead:
@@ -679,7 +670,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
   }
 
  done:
   }
 
  done:
-  unblockOne(cap, tso);
+  tso->why_blocked = NotBlocked;
+  appendToRunQueue(cap, tso);
 }
 
 /* -----------------------------------------------------------------------------
 }
 
 /* -----------------------------------------------------------------------------
@@ -689,7 +681,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
  * asynchronous exception in an existing thread.
  *
  * We first remove the thread from any queue on which it might be
  * asynchronous exception in an existing thread.
  *
  * We first remove the thread from any queue on which it might be
- * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
+ * blocked.  The possible blockages are MVARs, BLOCKING_QUEUESs, and
+ * TSO blocked_exception queues.
  *
  * We strip the stack down to the innermost CATCH_FRAME, building
  * thunks in the heap for all the active computations, so they can 
  *
  * We strip the stack down to the innermost CATCH_FRAME, building
  * thunks in the heap for all the active computations, so they can 
@@ -728,8 +721,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
     StgClosure *updatee;
     nat i;
 
     StgClosure *updatee;
     nat i;
 
-    debugTrace(DEBUG_sched,
-              "raising exception in thread %ld.", (long)tso->id);
+    debugTraceCap(DEBUG_sched, cap,
+                  "raising exception in thread %ld.", (long)tso->id);
     
 #if defined(PROFILING)
     /* 
     
 #if defined(PROFILING)
     /* 
@@ -742,16 +735,26 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
         fprintCCS_stderr(tso->prof.CCCS);
     }
 #endif
         fprintCCS_stderr(tso->prof.CCCS);
     }
 #endif
+    // ASSUMES: the thread is not already complete or dead, or
+    // ThreadRelocated.  Upper layers should deal with that.
+    ASSERT(tso->what_next != ThreadComplete && 
+           tso->what_next != ThreadKilled && 
+           tso->what_next != ThreadRelocated);
+
+    // only if we own this TSO (except that deleteThread() calls this 
+    ASSERT(tso->cap == cap);
+
+    // wake it up
+    if (tso->why_blocked != NotBlocked) {
+        tso->why_blocked = NotBlocked;
+        appendToRunQueue(cap,tso);
+    }        
 
     // mark it dirty; we're about to change its stack.
     dirty_TSO(cap, tso);
 
     sp = tso->sp;
     
 
     // mark it dirty; we're about to change its stack.
     dirty_TSO(cap, tso);
 
     sp = tso->sp;
     
-    // ASSUMES: the thread is not already complete or dead.  Upper
-    // layers should deal with that.
-    ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled);
-
     if (stop_here != NULL) {
         updatee = stop_here->updatee;
     } else {
     if (stop_here != NULL) {
         updatee = stop_here->updatee;
     } else {
@@ -834,7 +837,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
                 // Perform the update
                 // TODO: this may waste some work, if the thunk has
                 // already been updated by another thread.
                 // Perform the update
                 // TODO: this may waste some work, if the thunk has
                 // already been updated by another thread.
-                UPD_IND(((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+                updateThunk(cap, tso, 
+                            ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
             }
 
            sp += sizeofW(StgUpdateFrame) - 1;
             }
 
            sp += sizeofW(StgUpdateFrame) - 1;
@@ -857,9 +861,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
            // top of the CATCH_FRAME ready to enter.
            //
        {
            // top of the CATCH_FRAME ready to enter.
            //
        {
-#ifdef PROFILING
            StgCatchFrame *cf = (StgCatchFrame *)frame;
            StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
            StgThunk *raise;
            
            if (exception == NULL) break;
            StgThunk *raise;
            
            if (exception == NULL) break;
@@ -880,7 +882,12 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
             * a surprise exception before we get around to executing the
             * handler.
             */
             * a surprise exception before we get around to executing the
             * handler.
             */
-           tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE;
+            tso->flags |= TSO_BLOCKEX;
+            if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
+                tso->flags &= ~TSO_INTERRUPTIBLE;
+            } else {
+                tso->flags |= TSO_INTERRUPTIBLE;
+            }
 
            /* Put the newly-built THUNK on top of the stack, ready to execute
             * when the thread restarts.
 
            /* Put the newly-built THUNK on top of the stack, ready to execute
             * when the thread restarts.
@@ -926,8 +933,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
                {
             StgTRecHeader *trec = tso -> trec;
             StgTRecHeader *outer = trec -> enclosing_trec;
                {
             StgTRecHeader *trec = tso -> trec;
             StgTRecHeader *outer = trec -> enclosing_trec;
-           debugTrace(DEBUG_stm, 
-                      "found atomically block delivering async exception");
+           debugTraceCap(DEBUG_stm, cap,
+                          "found atomically block delivering async exception");
             stmAbortTransaction(cap, trec);
            stmFreeAbortedTRec(cap, trec);
             tso -> trec = outer;
             stmAbortTransaction(cap, trec);
            stmFreeAbortedTRec(cap, trec);
             tso -> trec = outer;