change throwTo to use tryWakeupThread rather than unblockOne
authorSimon Marlow <marlowsd@gmail.com>
Mon, 29 Mar 2010 14:46:13 +0000 (14:46 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Mon, 29 Mar 2010 14:46:13 +0000 (14:46 +0000)
includes/stg/MiscClosures.h
rts/Messages.c
rts/RaiseAsync.c
rts/StgMiscClosures.cmm
rts/Threads.c

index 9834c4b..4e5667b 100644 (file)
@@ -120,6 +120,7 @@ RTS_INFO(stg_MSG_WAKEUP_info);
 RTS_INFO(stg_MSG_TRY_WAKEUP_info);
 RTS_INFO(stg_MSG_THROWTO_info);
 RTS_INFO(stg_MSG_BLACKHOLE_info);
+RTS_INFO(stg_MSG_NULL_info);
 RTS_INFO(stg_MUT_CONS_info);
 RTS_INFO(stg_catch_info);
 RTS_INFO(stg_PAP_info);
@@ -171,6 +172,7 @@ RTS_ENTRY(stg_MSG_WAKEUP_entry);
 RTS_ENTRY(stg_MSG_TRY_WAKEUP_entry);
 RTS_ENTRY(stg_MSG_THROWTO_entry);
 RTS_ENTRY(stg_MSG_BLACKHOLE_entry);
+RTS_ENTRY(stg_MSG_NULL_entry);
 RTS_ENTRY(stg_MUT_CONS_entry);
 RTS_ENTRY(stg_catch_entry);
 RTS_ENTRY(stg_PAP_entry);
index 2b40f76..6a7c64d 100644 (file)
@@ -114,11 +114,9 @@ loop:
 
         switch (r) {
         case THROWTO_SUCCESS:
-            ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
-            t->source->sp += 3;
-            unblockOne(cap, t->source);
             // this message is done
-            unlockClosure((StgClosure*)m, &stg_IND_info);
+            unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
+            tryWakeupThread(cap, t->source);
             break;
         case THROWTO_BLOCKED:
             // unlock the message
@@ -137,7 +135,7 @@ loop:
         }
         return;
     }
-    else if (i == &stg_IND_info)
+    else if (i == &stg_IND_info || i == &stg_MSG_NULL_info)
     {
         // message was revoked
         return;
index d5a4918..f974f8c 100644 (file)
@@ -38,8 +38,6 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS,
                             Capability *target_cap USED_IF_THREADS, 
                             MessageThrowTo *msg USED_IF_THREADS);
 
-static void performBlockedException (Capability *cap, MessageThrowTo *msg);
-
 /* -----------------------------------------------------------------------------
    throwToSingleThreaded
 
@@ -148,7 +146,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
                       unlockClosure(msg, &stg_MSG_THROWTO_info);
                       If it decides not to raise the exception after
                       all, it can revoke it safely with
-                      unlockClosure(msg, &stg_IND_info);
+                      unlockClosure(msg, &stg_MSG_NULL_info);
 
    -------------------------------------------------------------------------- */
 
@@ -326,8 +324,17 @@ check_target:
             }
         }
 
+        if (i == &stg_MSG_NULL_info) {
+            // we know there's a MSG_TRY_WAKEUP on the way, so we
+            // might as well just do it now.  The message will
+            // be a no-op when it arrives.
+            unlockClosure((StgClosure*)m, i);
+            tryWakeupThread(cap, target);
+            goto retry;
+        }
+
         if (i != &stg_MSG_THROWTO_info) {
-            // if it's an IND, this TSO has been woken up by another Cap
+            // if it's a MSG_NULL, this TSO has been woken up by another Cap
             unlockClosure((StgClosure*)m, i);
             goto retry;
         }
@@ -340,7 +347,7 @@ check_target:
         }
 
         // nobody else can wake up this TSO after we claim the message
-        unlockClosure((StgClosure*)m, &stg_IND_info);
+        unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
 
         raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
         return THROWTO_SUCCESS;
@@ -535,21 +542,21 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
         if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
         i = lockClosure((StgClosure*)msg);
         tso->blocked_exceptions = (MessageThrowTo*)msg->link;
-        if (i == &stg_IND_info) {
+        if (i == &stg_MSG_NULL_info) {
             unlockClosure((StgClosure*)msg,i);
             goto loop;
         }
 
-        performBlockedException(cap, msg);
-        unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
-        unlockClosure((StgClosure*)msg,&stg_IND_info);
+        throwToSingleThreaded(cap, msg->target, msg->exception);
+        unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info);
+        tryWakeupThread(cap, msg->source);
         return 1;
     }
     return 0;
 }
 
 // awakenBlockedExceptionQueue(): Just wake up the whole queue of
-// blocked exceptions and let them try again.
+// blocked exceptions.
 
 void
 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
@@ -560,31 +567,16 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
     for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
          msg = (MessageThrowTo*)msg->link) {
         i = lockClosure((StgClosure *)msg);
-        if (i != &stg_IND_info) {
-            unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+        if (i != &stg_MSG_NULL_info) {
+            unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info);
+            tryWakeupThread(cap, msg->source);
+        } else {
+            unlockClosure((StgClosure *)msg,i);
         }
-        unlockClosure((StgClosure *)msg,i);
     }
     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
 }    
 
-static void
-performBlockedException (Capability *cap, MessageThrowTo *msg)
-{
-    StgTSO *source;
-
-    source = msg->source;
-
-    ASSERT(source->why_blocked == BlockedOnMsgThrowTo);
-    ASSERT(source->block_info.closure == (StgClosure *)msg);
-    ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
-    ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id);
-    // check ids not pointers, because the thread might be relocated
-
-    throwToSingleThreaded(cap, msg->target, msg->exception);
-    source->sp += 3;
-}
-
 /* -----------------------------------------------------------------------------
    Remove a thread from blocking queues.
 
@@ -638,7 +630,7 @@ removeFromQueues(Capability *cap, StgTSO *tso)
       // ASSERT(m->header.info == &stg_WHITEHOLE_info);
 
       // unlock and revoke it at the same time
-      unlockClosure((StgClosure*)m,&stg_IND_info);
+      unlockClosure((StgClosure*)m,&stg_MSG_NULL_info);
       break;
   }
 
index 830bde5..7e1e6a7 100644 (file)
@@ -503,6 +503,10 @@ INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO")
 INFO_TABLE_CONSTR(stg_MSG_BLACKHOLE,3,0,0,PRIM,"MSG_BLACKHOLE","MSG_BLACKHOLE")
 { foreign "C" barf("MSG_BLACKHOLE object entered!") never returns; }
 
+// used to overwrite a MSG_THROWTO when the message has been used/revoked
+INFO_TABLE_CONSTR(stg_MSG_NULL,1,0,0,PRIM,"MSG_NULL","MSG_NULL")
+{ foreign "C" barf("MSG_NULL object entered!") never returns; }
+
 /* ----------------------------------------------------------------------------
    END_TSO_QUEUE
 
index 0c3e591..05a13c7 100644 (file)
@@ -290,12 +290,31 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
         SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
         msg->tso = tso;
         sendMessage(cap, tso->cap, (Message*)msg);
+        debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
+                      (lnat)tso->id, tso->cap->no);
         return;
     }
 #endif
 
     switch (tso->why_blocked)
     {
+    case BlockedOnMsgThrowTo:
+    {
+        const StgInfoTable *i;
+        
+        i = lockClosure(tso->block_info.closure);
+        unlockClosure(tso->block_info.closure, i);
+        if (i != &stg_MSG_NULL_info) {
+            debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
+                          (lnat)tso->id, tso->block_info.throwto->header.info);
+            break; // still blocked
+        }
+
+        // remove the block frame from the stack
+        ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
+        tso->sp += 3;
+        // fall through...
+    }
     case BlockedOnBlackHole:
     case BlockedOnSTM:
     {