New implementation of BLACKHOLEs
[ghc-hetmet.git] / rts / RaiseAsync.c
index ca5e5ea..d5a4918 100644 (file)
@@ -18,6 +18,7 @@
 #include "STM.h"
 #include "sm/Sanity.h"
 #include "Profiling.h"
 #include "STM.h"
 #include "sm/Sanity.h"
 #include "Profiling.h"
+#include "Messages.h"
 #if defined(mingw32_HOST_OS)
 #include "win32/IOManager.h"
 #endif
 #if defined(mingw32_HOST_OS)
 #include "win32/IOManager.h"
 #endif
@@ -30,10 +31,14 @@ static void raiseAsync (Capability *cap,
 
 static void removeFromQueues(Capability *cap, StgTSO *tso);
 
 
 static void removeFromQueues(Capability *cap, StgTSO *tso);
 
-static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target);
+static void blockedThrowTo (Capability *cap, 
+                            StgTSO *target, MessageThrowTo *msg);
 
 
-static void performBlockedException (Capability *cap, 
-                                    StgTSO *source, StgTSO *target);
+static void throwToSendMsg (Capability *cap USED_IF_THREADS,
+                            Capability *target_cap USED_IF_THREADS, 
+                            MessageThrowTo *msg USED_IF_THREADS);
+
+static void performBlockedException (Capability *cap, MessageThrowTo *msg);
 
 /* -----------------------------------------------------------------------------
    throwToSingleThreaded
 
 /* -----------------------------------------------------------------------------
    throwToSingleThreaded
@@ -62,6 +67,8 @@ void
 throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, 
                       rtsBool stop_at_atomically)
 {
 throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, 
                       rtsBool stop_at_atomically)
 {
+    tso = deRefTSO(tso);
+
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
@@ -76,6 +83,8 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
 void
 suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
 {
 void
 suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
 {
+    tso = deRefTSO(tso);
+
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
@@ -96,98 +105,125 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
    may be blocked and could be woken up at any point by another CPU.
    We have some delicate synchronisation to do.
 
    may be blocked and could be woken up at any point by another CPU.
    We have some delicate synchronisation to do.
 
-   There is a completely safe fallback scheme: it is always possible
-   to just block the source TSO on the target TSO's blocked_exceptions
-   queue.  This queue is locked using lockTSO()/unlockTSO().  It is
-   checked at regular intervals: before and after running a thread
-   (schedule() and threadPaused() respectively), and just before GC
-   (scheduleDoGC()).  Activating a thread on this queue should be done
-   using maybePerformBlockedException(): this is done in the context
-   of the target thread, so the exception can be raised eagerly.
-
-   This fallback scheme works even if the target thread is complete or
-   killed: scheduleDoGC() will discover the blocked thread before the
-   target is GC'd.
-
-   Blocking the source thread on the target thread's blocked_exception
-   queue is also employed when the target thread is currently blocking
-   exceptions (ie. inside Control.Exception.block).
-
-   We could use the safe fallback scheme exclusively, but that
-   wouldn't be ideal: most calls to throwTo would block immediately,
-   possibly until the next GC, which might require the deadlock
-   detection mechanism to kick in.  So we try to provide promptness
-   wherever possible.
-
-   We can promptly deliver the exception if the target thread is:
-
-     - runnable, on the same Capability as the source thread (because
-       we own the run queue and therefore the target thread).
-   
-     - blocked, and we can obtain exclusive access to it.  Obtaining
-       exclusive access to the thread depends on how it is blocked.
-
-   We must also be careful to not trip over threadStackOverflow(),
-   which might be moving the TSO to enlarge its stack.
-   lockTSO()/unlockTSO() are used here too.
-
+   The underlying scheme when multiple Capabilities are in use is
+   message passing: when the target of a throwTo is on another
+   Capability, we send a message (a MessageThrowTo closure) to that
+   Capability.
+
+   If the throwTo needs to block because the target TSO is masking
+   exceptions (the TSO_BLOCKEX flag), then the message is placed on
+   the blocked_exceptions queue attached to the target TSO.  When the
+   target TSO enters the unmasked state again, it must check the
+   queue.  The blocked_exceptions queue is not locked; only the
+   Capability owning the TSO may modify it.
+
+   To make things simpler for throwTo, we always create the message
+   first before deciding what to do.  The message may get sent, or it
+   may get attached to a TSO's blocked_exceptions queue, or the
+   exception may get thrown immediately and the message dropped,
+   depending on the current state of the target.
+
+   Currently we send a message if the target belongs to another
+   Capability, and it is
+
+     - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo,
+       BlockedOnCCall
+
+     - or it is masking exceptions (TSO_BLOCKEX)
+
+   Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
+   BlockedOnBlackHole then we acquire ownership of the TSO by locking
+   its parent container (e.g. the MVar) and then raise the exception.
+   We might change these cases to be more message-passing-like in the
+   future.
+  
    Returns: 
 
    Returns: 
 
-   THROWTO_SUCCESS    exception was raised, ok to continue
+   NULL               exception was raised, ok to continue
 
 
-   THROWTO_BLOCKED    exception was not raised; block the source
-                      thread then call throwToReleaseTarget() when
-                     the source thread is properly tidied away.
+   MessageThrowTo *   exception was not raised; the source TSO
+                      should now put itself in the state 
+                      BlockedOnMsgThrowTo, and when it is ready
+                      it should unlock the mssage using
+                      unlockClosure(msg, &stg_MSG_THROWTO_info);
+                      If it decides not to raise the exception after
+                      all, it can revoke it safely with
+                      unlockClosure(msg, &stg_IND_info);
 
    -------------------------------------------------------------------------- */
 
 
    -------------------------------------------------------------------------- */
 
-nat
+MessageThrowTo *
 throwTo (Capability *cap,      // the Capability we hold 
         StgTSO *source,        // the TSO sending the exception (or NULL)
         StgTSO *target,        // the TSO receiving the exception
 throwTo (Capability *cap,      // the Capability we hold 
         StgTSO *source,        // the TSO sending the exception (or NULL)
         StgTSO *target,        // the TSO receiving the exception
-        StgClosure *exception, // the exception closure
-        /*[out]*/ void **out USED_IF_THREADS)
+        StgClosure *exception) // the exception closure
 {
 {
-    StgWord status;
+    MessageThrowTo *msg;
 
 
-    ASSERT(target != END_TSO_QUEUE);
-
-    // follow ThreadRelocated links in the target first
-    while (target->what_next == ThreadRelocated) {
-       target = target->_link;
-       // No, it might be a WHITEHOLE:
-       // ASSERT(get_itbl(target)->type == TSO);
-    }
+    msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
+    // message starts locked; the caller has to unlock it when it is
+    // ready.
+    SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
+    msg->source      = source;
+    msg->target      = target;
+    msg->exception   = exception;
 
 
-    if (source != NULL) {
-        debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu",
-                   (unsigned long)source->id, (unsigned long)target->id);
-    } else {
-        debugTrace(DEBUG_sched, "throwTo: from RTS to thread %lu",
-                   (unsigned long)target->id);
+    switch (throwToMsg(cap, msg))
+    {
+    case THROWTO_SUCCESS:
+        return NULL;
+    case THROWTO_BLOCKED:
+    default:
+        return msg;
     }
     }
+}
+    
 
 
-#ifdef DEBUG
-    traceThreadStatus(DEBUG_sched, target);
-#endif
+nat
+throwToMsg (Capability *cap, MessageThrowTo *msg)
+{
+    StgWord status;
+    StgTSO *target = msg->target;
+    Capability *target_cap;
 
     goto check_target;
 
     goto check_target;
+
 retry:
 retry:
+    write_barrier();
     debugTrace(DEBUG_sched, "throwTo: retrying...");
 
 check_target:
     ASSERT(target != END_TSO_QUEUE);
 
     debugTrace(DEBUG_sched, "throwTo: retrying...");
 
 check_target:
     ASSERT(target != END_TSO_QUEUE);
 
+    // follow ThreadRelocated links in the target first
+    target = deRefTSO(target);
+
     // Thread already dead?
     if (target->what_next == ThreadComplete 
        || target->what_next == ThreadKilled) {
        return THROWTO_SUCCESS;
     }
 
     // Thread already dead?
     if (target->what_next == ThreadComplete 
        || target->what_next == ThreadKilled) {
        return THROWTO_SUCCESS;
     }
 
+    debugTraceCap(DEBUG_sched, cap,
+                  "throwTo: from thread %lu to thread %lu",
+                  (unsigned long)msg->source->id, 
+                  (unsigned long)msg->target->id);
+
+#ifdef DEBUG
+    traceThreadStatus(DEBUG_sched, target);
+#endif
+
+    target_cap = target->cap;
+    if (target->cap != cap) {
+        throwToSendMsg(cap, target_cap, msg);
+        return THROWTO_BLOCKED;
+    }
+
     status = target->why_blocked;
     
     switch (status) {
     case NotBlocked:
     status = target->why_blocked;
     
     switch (status) {
     case NotBlocked:
+    case BlockedOnMsgWakeup:
        /* if status==NotBlocked, and target->cap == cap, then
           we own this TSO and can raise the exception.
           
        /* if status==NotBlocked, and target->cap == cap, then
           we own this TSO and can raise the exception.
           
@@ -247,41 +283,67 @@ check_target:
            have also seen the write to Q.
        */
     {
            have also seen the write to Q.
        */
     {
-       Capability *target_cap;
-
        write_barrier();
        write_barrier();
-       target_cap = target->cap;
-       if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) {
-           // It's on our run queue and not blocking exceptions
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           return THROWTO_SUCCESS;
-       } else {
-           // Otherwise, just block on the blocked_exceptions queue
-           // of the target thread.  The queue will get looked at
-           // soon enough: it is checked before and after running a
-           // thread, and during GC.
-           lockTSO(target);
-
-           // Avoid race with threadStackOverflow, which may have
-           // just moved this TSO.
-           if (target->what_next == ThreadRelocated) {
-               unlockTSO(target);
-               target = target->_link;
-               goto retry;
-           }
-            // check again for ThreadComplete and ThreadKilled.  This
-            // cooperates with scheduleHandleThreadFinished to ensure
-            // that we never miss any threads that are throwing an
-            // exception to a thread in the process of terminating.
-            if (target->what_next == ThreadComplete
-                || target->what_next == ThreadKilled) {
-               unlockTSO(target);
-                return THROWTO_SUCCESS;
+        if ((target->flags & TSO_BLOCKEX) == 0) {
+            // It's on our run queue and not blocking exceptions
+            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+            return THROWTO_SUCCESS;
+        } else {
+            blockedThrowTo(cap,target,msg);
+            return THROWTO_BLOCKED;
+        }
+    }
+
+    case BlockedOnMsgThrowTo:
+    {
+        const StgInfoTable *i;
+        MessageThrowTo *m;
+
+        m = target->block_info.throwto;
+
+        // target is local to this cap, but has sent a throwto
+        // message to another cap.
+        //
+        // The source message is locked.  We need to revoke the
+        // target's message so that we can raise the exception, so
+        // we attempt to lock it.
+
+        // There's a possibility of a deadlock if two threads are both
+        // trying to throwTo each other (or more generally, a cycle of
+        // threads).  To break the symmetry we compare the addresses
+        // of the MessageThrowTo objects, and the one for which m <
+        // msg gets to spin, while the other can only try to lock
+        // once, but must then back off and unlock both before trying
+        // again.
+        if (m < msg) {
+            i = lockClosure((StgClosure *)m);
+        } else {
+            i = tryLockClosure((StgClosure *)m);
+            if (i == NULL) {
+//            debugBelch("collision\n");
+                throwToSendMsg(cap, target->cap, msg);
+                return THROWTO_BLOCKED;
             }
             }
-           blockedThrowTo(cap,source,target);
-           *out = target;
-           return THROWTO_BLOCKED;
-       }
+        }
+
+        if (i != &stg_MSG_THROWTO_info) {
+            // if it's an IND, this TSO has been woken up by another Cap
+            unlockClosure((StgClosure*)m, i);
+            goto retry;
+        }
+
+       if ((target->flags & TSO_BLOCKEX) &&
+           ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+            unlockClosure((StgClosure*)m, i);
+            blockedThrowTo(cap,target,msg);
+            return THROWTO_BLOCKED;
+        }
+
+        // nobody else can wake up this TSO after we claim the message
+        unlockClosure((StgClosure*)m, &stg_IND_info);
+
+        raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+        return THROWTO_SUCCESS;
     }
 
     case BlockedOnMVar:
     }
 
     case BlockedOnMVar:
@@ -322,108 +384,32 @@ check_target:
 
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
 
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           lockClosure((StgClosure *)target);
-           blockedThrowTo(cap,source,target);
+            blockedThrowTo(cap,target,msg);
            unlockClosure((StgClosure *)mvar, info);
            unlockClosure((StgClosure *)mvar, info);
-           *out = target;
-           return THROWTO_BLOCKED; // caller releases TSO
+           return THROWTO_BLOCKED;
        } else {
            removeThreadFromMVarQueue(cap, mvar, target);
        } else {
            removeThreadFromMVarQueue(cap, mvar, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           unblockOne(cap, target);
-           unlockClosure((StgClosure *)mvar, info);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+            if (info == &stg_MVAR_CLEAN_info) {
+                dirty_MVAR(&cap->r,(StgClosure*)mvar);
+            }
+           unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
            return THROWTO_SUCCESS;
        }
     }
 
     case BlockedOnBlackHole:
     {
            return THROWTO_SUCCESS;
        }
     }
 
     case BlockedOnBlackHole:
     {
-       ACQUIRE_LOCK(&sched_mutex);
-       // double checking the status after the memory barrier:
-       if (target->why_blocked != BlockedOnBlackHole) {
-           RELEASE_LOCK(&sched_mutex);
-           goto retry;
-       }
-
-       if (target->flags & TSO_BLOCKEX) {
-           lockTSO(target);
-           blockedThrowTo(cap,source,target);
-           RELEASE_LOCK(&sched_mutex);
-           *out = target;
-           return THROWTO_BLOCKED; // caller releases TSO
-       } else {
-           removeThreadFromQueue(cap, &blackhole_queue, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           unblockOne(cap, target);
-           RELEASE_LOCK(&sched_mutex);
-           return THROWTO_SUCCESS;
-       }
+        // Revoke the message by replacing it with IND. We're not
+        // locking anything here, so we might still get a TRY_WAKEUP
+        // message from the owner of the blackhole some time in the
+        // future, but that doesn't matter.
+        ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+        OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+        raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+        return THROWTO_SUCCESS;
     }
 
     }
 
-    case BlockedOnException:
-    {
-       StgTSO *target2;
-       StgInfoTable *info;
-
-       /*
-         To obtain exclusive access to a BlockedOnException thread,
-         we must call lockClosure() on the TSO on which it is blocked.
-         Since the TSO might change underneath our feet, after we
-         call lockClosure() we must check that 
-          
-             (a) the closure we locked is actually a TSO
-            (b) the original thread is still  BlockedOnException,
-            (c) the original thread is still blocked on the TSO we locked
-            and (d) the target thread has not been relocated.
-
-         We synchronise with threadStackOverflow() (which relocates
-         threads) using lockClosure()/unlockClosure().
-       */
-       target2 = target->block_info.tso;
-
-       info = lockClosure((StgClosure *)target2);
-       if (info != &stg_TSO_info) {
-           unlockClosure((StgClosure *)target2, info);
-           goto retry;
-       }
-       if (target->what_next == ThreadRelocated) {
-           target = target->_link;
-           unlockTSO(target2);
-           goto retry;
-       }
-       if (target2->what_next == ThreadRelocated) {
-           target->block_info.tso = target2->_link;
-           unlockTSO(target2);
-           goto retry;
-       }
-       if (target->why_blocked != BlockedOnException
-           || target->block_info.tso != target2) {
-           unlockTSO(target2);
-           goto retry;
-       }
-       
-       /* 
-          Now we have exclusive rights to the target TSO...
-
-          If it is blocking exceptions, add the source TSO to its
-          blocked_exceptions queue.  Otherwise, raise the exception.
-       */
-       if ((target->flags & TSO_BLOCKEX) &&
-           ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           lockTSO(target);
-           blockedThrowTo(cap,source,target);
-           unlockTSO(target2);
-           *out = target;
-           return THROWTO_BLOCKED;
-       } else {
-           removeThreadFromQueue(cap, &target2->blocked_exceptions, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           unblockOne(cap, target);
-           unlockTSO(target2);
-           return THROWTO_SUCCESS;
-       }
-    }  
-
     case BlockedOnSTM:
        lockTSO(target);
        // Unblocking BlockedOnSTM threads requires the TSO to be
     case BlockedOnSTM:
        lockTSO(target);
        // Unblocking BlockedOnSTM threads requires the TSO to be
@@ -434,30 +420,18 @@ check_target:
        }
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
        }
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           blockedThrowTo(cap,source,target);
-           *out = target;
+            blockedThrowTo(cap,target,msg);
+           unlockTSO(target);
            return THROWTO_BLOCKED;
        } else {
            return THROWTO_BLOCKED;
        } else {
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           unblockOne(cap, target);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            unlockTSO(target);
            return THROWTO_SUCCESS;
        }
 
     case BlockedOnCCall:
     case BlockedOnCCall_NoUnblockExc:
            unlockTSO(target);
            return THROWTO_SUCCESS;
        }
 
     case BlockedOnCCall:
     case BlockedOnCCall_NoUnblockExc:
-       // I don't think it's possible to acquire ownership of a
-       // BlockedOnCCall thread.  We just assume that the target
-       // thread is blocking exceptions, and block on its
-       // blocked_exception queue.
-       lockTSO(target);
-       if (target->why_blocked != BlockedOnCCall &&
-           target->why_blocked != BlockedOnCCall_NoUnblockExc) {
-           unlockTSO(target);
-            goto retry;
-       }
-       blockedThrowTo(cap,source,target);
-       *out = target;
+       blockedThrowTo(cap,target,msg);
        return THROWTO_BLOCKED;
 
 #ifndef THREADEDED_RTS
        return THROWTO_BLOCKED;
 
 #ifndef THREADEDED_RTS
@@ -469,11 +443,11 @@ check_target:
 #endif
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
 #endif
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           blockedThrowTo(cap,source,target);
+           blockedThrowTo(cap,target,msg);
            return THROWTO_BLOCKED;
        } else {
            removeFromQueues(cap,target);
            return THROWTO_BLOCKED;
        } else {
            removeFromQueues(cap,target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            return THROWTO_SUCCESS;
        }
 #endif
            return THROWTO_SUCCESS;
        }
 #endif
@@ -484,33 +458,34 @@ check_target:
     barf("throwTo");
 }
 
     barf("throwTo");
 }
 
-// Block a TSO on another TSO's blocked_exceptions queue.
-// Precondition: we hold an exclusive lock on the target TSO (this is
-// complex to achieve as there's no single lock on a TSO; see
-// throwTo()).
 static void
 static void
-blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target)
+throwToSendMsg (Capability *cap STG_UNUSED,
+                Capability *target_cap USED_IF_THREADS, 
+                MessageThrowTo *msg USED_IF_THREADS)
+            
 {
 {
-    if (source != NULL) {
-        debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id);
-        setTSOLink(cap, source, target->blocked_exceptions);
-        target->blocked_exceptions = source;
-        dirty_TSO(cap,target); // we modified the blocked_exceptions queue
-        
-        source->block_info.tso = target;
-        write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is.
-        source->why_blocked = BlockedOnException;
-    }
-}
+#ifdef THREADED_RTS
+    debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
 
 
+    sendMessage(cap, target_cap, (Message*)msg);
+#endif
+}
 
 
-#ifdef THREADED_RTS
-void
-throwToReleaseTarget (void *tso)
+// Block a throwTo message on the target TSO's blocked_exceptions
+// queue.  The current Capability must own the target TSO in order to
+// modify the blocked_exceptions queue.
+static void
+blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
 {
 {
-    unlockTSO((StgTSO *)tso);
+    debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
+                  (unsigned long)target->id);
+
+    ASSERT(target->cap == cap);
+
+    msg->link = target->blocked_exceptions;
+    target->blocked_exceptions = msg;
+    dirty_TSO(cap,target); // we modified the blocked_exceptions queue
 }
 }
-#endif
 
 /* -----------------------------------------------------------------------------
    Waking up threads blocked in throwTo
 
 /* -----------------------------------------------------------------------------
    Waking up threads blocked in throwTo
@@ -532,10 +507,11 @@ throwToReleaseTarget (void *tso)
 int
 maybePerformBlockedException (Capability *cap, StgTSO *tso)
 {
 int
 maybePerformBlockedException (Capability *cap, StgTSO *tso)
 {
-    StgTSO *source;
+    MessageThrowTo *msg;
+    const StgInfoTable *i;
     
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
     
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
-        if (tso->blocked_exceptions != END_TSO_QUEUE) {
+        if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
             awakenBlockedExceptionQueue(cap,tso);
             return 1;
         } else {
             awakenBlockedExceptionQueue(cap,tso);
             return 1;
         } else {
@@ -543,32 +519,30 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
         }
     }
 
         }
     }
 
-    if (tso->blocked_exceptions != END_TSO_QUEUE && 
+    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && 
         (tso->flags & TSO_BLOCKEX) != 0) {
         (tso->flags & TSO_BLOCKEX) != 0) {
-        debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+        debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
     }
 
     }
 
-    if (tso->blocked_exceptions != END_TSO_QUEUE
+    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
        && ((tso->flags & TSO_BLOCKEX) == 0
            || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
 
        && ((tso->flags & TSO_BLOCKEX) == 0
            || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
 
-       // Lock the TSO, this gives us exclusive access to the queue
-       lockTSO(tso);
-
-       // Check the queue again; it might have changed before we
-       // locked it.
-       if (tso->blocked_exceptions == END_TSO_QUEUE) {
-           unlockTSO(tso);
-           return 0;
-       }
-
        // We unblock just the first thread on the queue, and perform
        // its throw immediately.
        // We unblock just the first thread on the queue, and perform
        // its throw immediately.
-       source = tso->blocked_exceptions;
-       performBlockedException(cap, source, tso);
-       tso->blocked_exceptions = unblockOne_(cap, source, 
-                                             rtsFalse/*no migrate*/);
-       unlockTSO(tso);
+    loop:
+        msg = tso->blocked_exceptions;
+        if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
+        i = lockClosure((StgClosure*)msg);
+        tso->blocked_exceptions = (MessageThrowTo*)msg->link;
+        if (i == &stg_IND_info) {
+            unlockClosure((StgClosure*)msg,i);
+            goto loop;
+        }
+
+        performBlockedException(cap, msg);
+        unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+        unlockClosure((StgClosure*)msg,&stg_IND_info);
         return 1;
     }
     return 0;
         return 1;
     }
     return 0;
@@ -580,25 +554,34 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
 void
 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
 {
 void
 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
 {
-    lockTSO(tso);
-    awakenBlockedQueue(cap, tso->blocked_exceptions);
-    tso->blocked_exceptions = END_TSO_QUEUE;
-    unlockTSO(tso);
+    MessageThrowTo *msg;
+    const StgInfoTable *i;
+
+    for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
+         msg = (MessageThrowTo*)msg->link) {
+        i = lockClosure((StgClosure *)msg);
+        if (i != &stg_IND_info) {
+            unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+        }
+        unlockClosure((StgClosure *)msg,i);
+    }
+    tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
 }    
 
 static void
 }    
 
 static void
-performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
+performBlockedException (Capability *cap, MessageThrowTo *msg)
 {
 {
-    StgClosure *exception;
+    StgTSO *source;
+
+    source = msg->source;
 
 
-    ASSERT(source->why_blocked == BlockedOnException);
-    ASSERT(source->block_info.tso->id == target->id);
+    ASSERT(source->why_blocked == BlockedOnMsgThrowTo);
+    ASSERT(source->block_info.closure == (StgClosure *)msg);
     ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
     ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
-    ASSERT(((StgTSO *)source->sp[1])->id == target->id);
+    ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id);
     // check ids not pointers, because the thread might be relocated
 
     // check ids not pointers, because the thread might be relocated
 
-    exception = (StgClosure *)source->sp[2];
-    throwToSingleThreaded(cap, target, exception);
+    throwToSingleThreaded(cap, msg->target, msg->exception);
     source->sp += 3;
 }
 
     source->sp += 3;
 }
 
@@ -631,28 +614,33 @@ removeFromQueues(Capability *cap, StgTSO *tso)
 
   case BlockedOnMVar:
       removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
 
   case BlockedOnMVar:
       removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
+      // we aren't doing a write barrier here: the MVar is supposed to
+      // be already locked, so replacing the info pointer would unlock it.
       goto done;
 
   case BlockedOnBlackHole:
       goto done;
 
   case BlockedOnBlackHole:
-      removeThreadFromQueue(cap, &blackhole_queue, tso);
+      // nothing to do
       goto done;
 
       goto done;
 
-  case BlockedOnException:
-    {
-      StgTSO *target  = tso->block_info.tso;
-
-      // NO: when called by threadPaused(), we probably have this
-      // TSO already locked (WHITEHOLEd) because we just placed
-      // ourselves on its queue.
-      // ASSERT(get_itbl(target)->type == TSO);
-
-      while (target->what_next == ThreadRelocated) {
-         target = target->_link;
-      }
-      
-      removeThreadFromQueue(cap, &target->blocked_exceptions, tso);
-      goto done;
-    }
+  case BlockedOnMsgWakeup:
+  {
+      // kill the message, atomically:
+      OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
+      break;
+  }
+
+  case BlockedOnMsgThrowTo:
+  {
+      MessageThrowTo *m = tso->block_info.throwto;
+      // The message is locked by us, unless we got here via
+      // deleteAllThreads(), in which case we own all the
+      // capabilities.
+      // ASSERT(m->header.info == &stg_WHITEHOLE_info);
+
+      // unlock and revoke it at the same time
+      unlockClosure((StgClosure*)m,&stg_IND_info);
+      break;
+  }
 
 #if !defined(THREADED_RTS)
   case BlockedOnRead:
 
 #if !defined(THREADED_RTS)
   case BlockedOnRead:
@@ -689,7 +677,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
  * asynchronous exception in an existing thread.
  *
  * We first remove the thread from any queue on which it might be
  * asynchronous exception in an existing thread.
  *
  * We first remove the thread from any queue on which it might be
- * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
+ * blocked.  The possible blockages are MVARs, BLOCKING_QUEUESs, and
+ * TSO blocked_exception queues.
  *
  * We strip the stack down to the innermost CATCH_FRAME, building
  * thunks in the heap for all the active computations, so they can 
  *
  * We strip the stack down to the innermost CATCH_FRAME, building
  * thunks in the heap for all the active computations, so they can 
@@ -728,8 +717,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
     StgClosure *updatee;
     nat i;
 
     StgClosure *updatee;
     nat i;
 
-    debugTrace(DEBUG_sched,
-              "raising exception in thread %ld.", (long)tso->id);
+    debugTraceCap(DEBUG_sched, cap,
+                  "raising exception in thread %ld.", (long)tso->id);
     
 #if defined(PROFILING)
     /* 
     
 #if defined(PROFILING)
     /* 
@@ -742,16 +731,26 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
         fprintCCS_stderr(tso->prof.CCCS);
     }
 #endif
         fprintCCS_stderr(tso->prof.CCCS);
     }
 #endif
+    // ASSUMES: the thread is not already complete or dead, or
+    // ThreadRelocated.  Upper layers should deal with that.
+    ASSERT(tso->what_next != ThreadComplete && 
+           tso->what_next != ThreadKilled && 
+           tso->what_next != ThreadRelocated);
+
+    // only if we own this TSO (except that deleteThread() calls this 
+    ASSERT(tso->cap == cap);
+
+    // wake it up
+    if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
+        tso->why_blocked = NotBlocked;
+        appendToRunQueue(cap,tso);
+    }        
 
     // mark it dirty; we're about to change its stack.
     dirty_TSO(cap, tso);
 
     sp = tso->sp;
     
 
     // mark it dirty; we're about to change its stack.
     dirty_TSO(cap, tso);
 
     sp = tso->sp;
     
-    // ASSUMES: the thread is not already complete or dead.  Upper
-    // layers should deal with that.
-    ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled);
-
     if (stop_here != NULL) {
         updatee = stop_here->updatee;
     } else {
     if (stop_here != NULL) {
         updatee = stop_here->updatee;
     } else {
@@ -834,7 +833,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
                 // Perform the update
                 // TODO: this may waste some work, if the thunk has
                 // already been updated by another thread.
                 // Perform the update
                 // TODO: this may waste some work, if the thunk has
                 // already been updated by another thread.
-                UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+                updateThunk(cap, tso, 
+                            ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
             }
 
            sp += sizeofW(StgUpdateFrame) - 1;
             }
 
            sp += sizeofW(StgUpdateFrame) - 1;
@@ -926,8 +926,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
                {
             StgTRecHeader *trec = tso -> trec;
             StgTRecHeader *outer = trec -> enclosing_trec;
                {
             StgTRecHeader *trec = tso -> trec;
             StgTRecHeader *outer = trec -> enclosing_trec;
-           debugTrace(DEBUG_stm, 
-                      "found atomically block delivering async exception");
+           debugTraceCap(DEBUG_stm, cap,
+                          "found atomically block delivering async exception");
             stmAbortTransaction(cap, trec);
            stmFreeAbortedTRec(cap, trec);
             tso -> trec = outer;
             stmAbortTransaction(cap, trec);
            stmFreeAbortedTRec(cap, trec);
             tso -> trec = outer;