New implementation of BLACKHOLEs
[ghc-hetmet.git] / rts / RaiseAsync.c
index d54f823..d5a4918 100644 (file)
@@ -18,6 +18,7 @@
 #include "STM.h"
 #include "sm/Sanity.h"
 #include "Profiling.h"
+#include "Messages.h"
 #if defined(mingw32_HOST_OS)
 #include "win32/IOManager.h"
 #endif
@@ -66,6 +67,8 @@ void
 throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, 
                       rtsBool stop_at_atomically)
 {
+    tso = deRefTSO(tso);
+
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
@@ -80,6 +83,8 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
 void
 suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
 {
+    tso = deRefTSO(tso);
+
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
@@ -158,7 +163,7 @@ throwTo (Capability *cap,   // the Capability we hold
     msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
     // message starts locked; the caller has to unlock it when it is
     // ready.
-    msg->header.info = &stg_WHITEHOLE_info;
+    SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
     msg->source      = source;
     msg->target      = target;
     msg->exception   = exception;
@@ -179,14 +184,24 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
 {
     StgWord status;
     StgTSO *target = msg->target;
+    Capability *target_cap;
 
+    goto check_target;
+
+retry:
+    write_barrier();
+    debugTrace(DEBUG_sched, "throwTo: retrying...");
+
+check_target:
     ASSERT(target != END_TSO_QUEUE);
 
     // follow ThreadRelocated links in the target first
-    while (target->what_next == ThreadRelocated) {
-       target = target->_link;
-       // No, it might be a WHITEHOLE:
-       // ASSERT(get_itbl(target)->type == TSO);
+    target = deRefTSO(target);
+
+    // Thread already dead?
+    if (target->what_next == ThreadComplete 
+       || target->what_next == ThreadKilled) {
+       return THROWTO_SUCCESS;
     }
 
     debugTraceCap(DEBUG_sched, cap,
@@ -198,18 +213,10 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
     traceThreadStatus(DEBUG_sched, target);
 #endif
 
-    goto check_target;
-retry:
-    write_barrier();
-    debugTrace(DEBUG_sched, "throwTo: retrying...");
-
-check_target:
-    ASSERT(target != END_TSO_QUEUE);
-
-    // Thread already dead?
-    if (target->what_next == ThreadComplete 
-       || target->what_next == ThreadKilled) {
-       return THROWTO_SUCCESS;
+    target_cap = target->cap;
+    if (target->cap != cap) {
+        throwToSendMsg(cap, target_cap, msg);
+        return THROWTO_BLOCKED;
     }
 
     status = target->why_blocked;
@@ -276,28 +283,19 @@ check_target:
            have also seen the write to Q.
        */
     {
-       Capability *target_cap;
-
        write_barrier();
-       target_cap = target->cap;
-       if (target_cap != cap) {
-            throwToSendMsg(cap, target_cap, msg);
-            return THROWTO_BLOCKED;
+        if ((target->flags & TSO_BLOCKEX) == 0) {
+            // It's on our run queue and not blocking exceptions
+            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+            return THROWTO_SUCCESS;
         } else {
-            if ((target->flags & TSO_BLOCKEX) == 0) {
-                // It's on our run queue and not blocking exceptions
-                raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
-                return THROWTO_SUCCESS;
-            } else {
-                blockedThrowTo(cap,target,msg);
-                return THROWTO_BLOCKED;
-            }
+            blockedThrowTo(cap,target,msg);
+            return THROWTO_BLOCKED;
         }
     }
 
     case BlockedOnMsgThrowTo:
     {
-        Capability *target_cap;
         const StgInfoTable *i;
         MessageThrowTo *m;
 
@@ -334,13 +332,6 @@ check_target:
             goto retry;
         }
 
-        target_cap = target->cap;
-        if (target_cap != cap) {
-            unlockClosure((StgClosure*)m, i);
-            throwToSendMsg(cap, target_cap, msg);
-            return THROWTO_BLOCKED;
-        }
-
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
             unlockClosure((StgClosure*)m, i);
@@ -352,7 +343,6 @@ check_target:
         unlockClosure((StgClosure*)m, &stg_IND_info);
 
         raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
-        unblockOne(cap, target);
         return THROWTO_SUCCESS;
     }
 
@@ -394,48 +384,30 @@ check_target:
 
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-            Capability *target_cap = target->cap;
-            if (target->cap != cap) {
-                throwToSendMsg(cap,target_cap,msg);
-            } else {
-                blockedThrowTo(cap,target,msg);
-            }
+            blockedThrowTo(cap,target,msg);
            unlockClosure((StgClosure *)mvar, info);
            return THROWTO_BLOCKED;
        } else {
            removeThreadFromMVarQueue(cap, mvar, target);
            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
-           unblockOne(cap, target);
-           unlockClosure((StgClosure *)mvar, info);
+            if (info == &stg_MVAR_CLEAN_info) {
+                dirty_MVAR(&cap->r,(StgClosure*)mvar);
+            }
+           unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
            return THROWTO_SUCCESS;
        }
     }
 
     case BlockedOnBlackHole:
     {
-       ACQUIRE_LOCK(&sched_mutex);
-       // double checking the status after the memory barrier:
-       if (target->why_blocked != BlockedOnBlackHole) {
-           RELEASE_LOCK(&sched_mutex);
-           goto retry;
-       }
-
-       if (target->flags & TSO_BLOCKEX) {
-            Capability *target_cap = target->cap;
-            if (target->cap != cap) {
-                throwToSendMsg(cap,target_cap,msg);
-            } else {
-                blockedThrowTo(cap,target,msg);
-            }
-           RELEASE_LOCK(&sched_mutex);
-           return THROWTO_BLOCKED; // caller releases lock
-       } else {
-           removeThreadFromQueue(cap, &blackhole_queue, target);
-           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
-           unblockOne(cap, target);
-           RELEASE_LOCK(&sched_mutex);
-           return THROWTO_SUCCESS;
-       }
+        // Revoke the message by replacing it with IND. We're not
+        // locking anything here, so we might still get a TRY_WAKEUP
+        // message from the owner of the blackhole some time in the
+        // future, but that doesn't matter.
+        ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+        OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+        raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+        return THROWTO_SUCCESS;
     }
 
     case BlockedOnSTM:
@@ -448,35 +420,19 @@ check_target:
        }
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-            Capability *target_cap = target->cap;
-            if (target->cap != cap) {
-                throwToSendMsg(cap,target_cap,msg);
-            } else {
-                blockedThrowTo(cap,target,msg);
-            }
+            blockedThrowTo(cap,target,msg);
            unlockTSO(target);
            return THROWTO_BLOCKED;
        } else {
            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
-           unblockOne(cap, target);
            unlockTSO(target);
            return THROWTO_SUCCESS;
        }
 
     case BlockedOnCCall:
     case BlockedOnCCall_NoUnblockExc:
-    {
-        Capability *target_cap;
-
-        target_cap = target->cap;
-        if (target_cap != cap) {
-            throwToSendMsg(cap, target_cap, msg);
-            return THROWTO_BLOCKED;
-        }
-
        blockedThrowTo(cap,target,msg);
        return THROWTO_BLOCKED;
-    }
 
 #ifndef THREADEDED_RTS
     case BlockedOnRead:
@@ -509,9 +465,9 @@ throwToSendMsg (Capability *cap STG_UNUSED,
             
 {
 #ifdef THREADED_RTS
-    debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
+    debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
 
-    sendMessage(target_cap, (Message*)msg);
+    sendMessage(cap, target_cap, (Message*)msg);
 #endif
 }
 
@@ -526,7 +482,7 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
 
     ASSERT(target->cap == cap);
 
-    msg->link = (Message*)target->blocked_exceptions;
+    msg->link = target->blocked_exceptions;
     target->blocked_exceptions = msg;
     dirty_TSO(cap,target); // we modified the blocked_exceptions queue
 }
@@ -565,7 +521,7 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
 
     if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && 
         (tso->flags & TSO_BLOCKEX) != 0) {
-        debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+        debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
     }
 
     if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
@@ -658,16 +614,18 @@ removeFromQueues(Capability *cap, StgTSO *tso)
 
   case BlockedOnMVar:
       removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
+      // we aren't doing a write barrier here: the MVar is supposed to
+      // be already locked, so replacing the info pointer would unlock it.
       goto done;
 
   case BlockedOnBlackHole:
-      removeThreadFromQueue(cap, &blackhole_queue, tso);
+      // nothing to do
       goto done;
 
   case BlockedOnMsgWakeup:
   {
       // kill the message, atomically:
-      tso->block_info.wakeup->header.info = &stg_IND_info;
+      OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
       break;
   }
 
@@ -719,7 +677,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
  * asynchronous exception in an existing thread.
  *
  * We first remove the thread from any queue on which it might be
- * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
+ * blocked.  The possible blockages are MVARs, BLOCKING_QUEUESs, and
+ * TSO blocked_exception queues.
  *
  * We strip the stack down to the innermost CATCH_FRAME, building
  * thunks in the heap for all the active computations, so they can 
@@ -758,8 +717,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
     StgClosure *updatee;
     nat i;
 
-    debugTrace(DEBUG_sched,
-              "raising exception in thread %ld.", (long)tso->id);
+    debugTraceCap(DEBUG_sched, cap,
+                  "raising exception in thread %ld.", (long)tso->id);
     
 #if defined(PROFILING)
     /* 
@@ -772,20 +731,26 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
         fprintCCS_stderr(tso->prof.CCCS);
     }
 #endif
+    // ASSUMES: the thread is not already complete or dead, or
+    // ThreadRelocated.  Upper layers should deal with that.
+    ASSERT(tso->what_next != ThreadComplete && 
+           tso->what_next != ThreadKilled && 
+           tso->what_next != ThreadRelocated);
 
-    while (tso->what_next == ThreadRelocated) {
-        tso = tso->_link;
-    }
+    // only if we own this TSO (except that deleteThread() calls this 
+    ASSERT(tso->cap == cap);
+
+    // wake it up
+    if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
+        tso->why_blocked = NotBlocked;
+        appendToRunQueue(cap,tso);
+    }        
 
     // mark it dirty; we're about to change its stack.
     dirty_TSO(cap, tso);
 
     sp = tso->sp;
     
-    // ASSUMES: the thread is not already complete or dead.  Upper
-    // layers should deal with that.
-    ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled);
-
     if (stop_here != NULL) {
         updatee = stop_here->updatee;
     } else {
@@ -868,7 +833,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
                 // Perform the update
                 // TODO: this may waste some work, if the thunk has
                 // already been updated by another thread.
-                UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+                updateThunk(cap, tso, 
+                            ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
             }
 
            sp += sizeofW(StgUpdateFrame) - 1;
@@ -960,8 +926,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
                {
             StgTRecHeader *trec = tso -> trec;
             StgTRecHeader *outer = trec -> enclosing_trec;
-           debugTrace(DEBUG_stm, 
-                      "found atomically block delivering async exception");
+           debugTraceCap(DEBUG_stm, cap,
+                          "found atomically block delivering async exception");
             stmAbortTransaction(cap, trec);
            stmFreeAbortedTRec(cap, trec);
             tso -> trec = outer;