Change the representation of the MVar blocked queue
[ghc-hetmet.git] / rts / Messages.c
index 2b40f76..5a1e5bd 100644 (file)
@@ -28,8 +28,7 @@ void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
 #ifdef DEBUG    
     {
         const StgInfoTable *i = msg->header.info;
-        if (i != &stg_MSG_WAKEUP_info &&
-            i != &stg_MSG_THROWTO_info &&
+        if (i != &stg_MSG_THROWTO_info &&
             i != &stg_MSG_BLACKHOLE_info &&
             i != &stg_MSG_TRY_WAKEUP_info &&
             i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
@@ -71,21 +70,7 @@ executeMessage (Capability *cap, Message *m)
 loop:
     write_barrier(); // allow m->header to be modified by another thread
     i = m->header.info;
-    if (i == &stg_MSG_WAKEUP_info)
-    {
-        // the plan is to eventually get rid of these and use
-        // TRY_WAKEUP instead.
-        MessageWakeup *w = (MessageWakeup *)m;
-        StgTSO *tso = w->tso;
-        debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", 
-                      (lnat)tso->id);
-        ASSERT(tso->cap == cap);
-        ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
-        ASSERT(tso->block_info.closure == (StgClosure *)m);
-        tso->why_blocked = NotBlocked;
-        appendToRunQueue(cap, tso);
-    }
-    else if (i == &stg_MSG_TRY_WAKEUP_info)
+    if (i == &stg_MSG_TRY_WAKEUP_info)
     {
         StgTSO *tso = ((MessageWakeup *)m)->tso;
         debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld", 
@@ -114,11 +99,9 @@ loop:
 
         switch (r) {
         case THROWTO_SUCCESS:
-            ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
-            t->source->sp += 3;
-            unblockOne(cap, t->source);
             // this message is done
-            unlockClosure((StgClosure*)m, &stg_IND_info);
+            unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
+            tryWakeupThread(cap, t->source);
             break;
         case THROWTO_BLOCKED:
             // unlock the message
@@ -137,7 +120,7 @@ loop:
         }
         return;
     }
-    else if (i == &stg_IND_info)
+    else if (i == &stg_IND_info || i == &stg_MSG_NULL_info)
     {
         // message was revoked
         return;
@@ -246,7 +229,21 @@ loop:
         bq->link = owner->bq;
         owner->bq = bq;
         dirty_TSO(cap, owner); // we modified owner->bq
-        
+
+        // If the owner of the blackhole is currently runnable, then
+        // bump it to the front of the run queue.  This gives the
+        // blocked-on thread a little boost which should help unblock
+        // this thread, and may avoid a pile-up of other threads
+        // becoming blocked on the same BLACKHOLE (#3838).
+        //
+        // NB. we check to make sure that the owner is not the same as
+        // the current thread, since in that case it will not be on
+        // the run queue.
+        if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
+            removeFromRunQueue(cap, owner);
+            pushOnRunQueue(cap,owner);
+        }
+
         // point to the BLOCKING_QUEUE from the BLACKHOLE
         write_barrier(); // make the BQ visible
         ((StgInd*)bh)->indirectee = (StgClosure *)bq;
@@ -282,12 +279,18 @@ loop:
 
         if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
             bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
-            recordClosureMutated(cap,bq);
+            recordClosureMutated(cap,(StgClosure*)bq);
         }
 
         debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", 
                       (lnat)msg->tso->id, (lnat)owner->id);
 
+        // See above, #3838
+        if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
+            removeFromRunQueue(cap, owner);
+            pushOnRunQueue(cap,owner);
+        }
+
         return 1; // blocked
     }