Move a thread to the front of the run queue when another thread blocks on it
authorSimon Marlow <marlowsd@gmail.com>
Mon, 29 Mar 2010 14:45:21 +0000 (14:45 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Mon, 29 Mar 2010 14:45:21 +0000 (14:45 +0000)
This fixes #3838, and was made possible by the new BLACKHOLE
infrastructure.  To allow reording of the run queue I had to make it
doubly-linked, which entails some extra trickiness with regard to
GC write barriers and suchlike.

includes/rts/storage/TSO.h
rts/Messages.c
rts/Schedule.c
rts/Schedule.h
rts/sm/GCAux.c
rts/sm/Sanity.c
rts/sm/Sanity.h
rts/sm/Scav.c
rts/sm/Storage.c

index e07be88..abe6215 100644 (file)
@@ -46,6 +46,7 @@ typedef struct {
 /* Reason for thread being blocked. See comment above struct StgTso_. */
 typedef union {
   StgClosure *closure;
+  StgTSO *prev; // a back-link when the TSO is on the run queue (NotBlocked)
   struct MessageBlackHole_ *bh;
   struct MessageThrowTo_ *throwto;
   struct MessageWakeup_  *wakeup;
@@ -163,6 +164,7 @@ typedef struct StgTSO_ {
 
 void dirty_TSO  (Capability *cap, StgTSO *tso);
 void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
+void setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target);
 
 // Apply to a TSO before looking at it if you are not sure whether it
 // might be ThreadRelocated or not (basically, that's most of the time
index 6a7c64d..ae5d5d1 100644 (file)
@@ -244,7 +244,21 @@ loop:
         bq->link = owner->bq;
         owner->bq = bq;
         dirty_TSO(cap, owner); // we modified owner->bq
-        
+
+        // If the owner of the blackhole is currently runnable, then
+        // bump it to the front of the run queue.  This gives the
+        // blocked-on thread a little boost which should help unblock
+        // this thread, and may avoid a pile-up of other threads
+        // becoming blocked on the same BLACKHOLE (#3838).
+        //
+        // NB. we check to make sure that the owner is not the same as
+        // the current thread, since in that case it will not be on
+        // the run queue.
+        if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
+            removeFromRunQueue(cap, owner);
+            pushOnRunQueue(cap,owner);
+        }
+
         // point to the BLOCKING_QUEUE from the BLACKHOLE
         write_barrier(); // make the BQ visible
         ((StgInd*)bh)->indirectee = (StgClosure *)bq;
@@ -280,12 +294,18 @@ loop:
 
         if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
             bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
-            recordClosureMutated(cap,bq);
+            recordClosureMutated(cap,(StgClosure*)bq);
         }
 
         debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", 
                       (lnat)msg->tso->id, (lnat)owner->id);
 
+        // See above, #3838
+        if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
+            removeFromRunQueue(cap, owner);
+            pushOnRunQueue(cap,owner);
+        }
+
         return 1; // blocked
     }
     
index 72f6d44..d7d5741 100644 (file)
@@ -158,17 +158,6 @@ static void deleteAllThreads (Capability *cap);
 static void deleteThread_(Capability *cap, StgTSO *tso);
 #endif
 
-/* -----------------------------------------------------------------------------
- * Putting a thread on the run queue: different scheduling policies
- * -------------------------------------------------------------------------- */
-
-STATIC_INLINE void
-addToRunQueue( Capability *cap, StgTSO *t )
-{
-    // this does round-robin scheduling; good for concurrency
-    appendToRunQueue(cap,t);
-}
-
 /* ---------------------------------------------------------------------------
    Main scheduling loop.
 
@@ -568,6 +557,30 @@ run_thread:
   } /* end of while() */
 }
 
+/* -----------------------------------------------------------------------------
+ * Run queue operations
+ * -------------------------------------------------------------------------- */
+
+void
+removeFromRunQueue (Capability *cap, StgTSO *tso)
+{
+    if (tso->block_info.prev == END_TSO_QUEUE) {
+        ASSERT(cap->run_queue_hd == tso);
+        cap->run_queue_hd = tso->_link;
+    } else {
+        setTSOLink(cap, tso->block_info.prev, tso->_link);
+    }
+    if (tso->_link == END_TSO_QUEUE) {
+        ASSERT(cap->run_queue_tl == tso);
+        cap->run_queue_tl = tso->block_info.prev;
+    } else {
+        setTSOPrev(cap, tso->_link, tso->block_info.prev);
+    }
+    tso->_link = tso->block_info.prev = END_TSO_QUEUE;
+
+    IF_DEBUG(sanity, checkRunQueue(cap));
+}
+
 /* ----------------------------------------------------------------------------
  * Setting up the scheduler loop
  * ------------------------------------------------------------------------- */
@@ -743,12 +756,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                    || t->bound == task->incall // don't move my bound thread
                    || tsoLocked(t)) {  // don't move a locked thread
                    setTSOLink(cap, prev, t);
+                    setTSOPrev(cap, t, prev);
                    prev = t;
                } else if (i == n_free_caps) {
                    pushed_to_all = rtsTrue;
                    i = 0;
                    // keep one for us
                    setTSOLink(cap, prev, t);
+                    setTSOPrev(cap, t, prev);
                    prev = t;
                } else {
                    appendToRunQueue(free_caps[i],t);
@@ -761,6 +776,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                }
            }
            cap->run_queue_tl = prev;
+
+            IF_DEBUG(sanity, checkRunQueue(cap));
        }
 
 #ifdef SPARK_PUSHING
@@ -1093,7 +1110,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
         // context switch flag, and we end up waiting for a GC.
         // See #1984, and concurrent/should_run/1984
         cap->context_switch = 0;
-        addToRunQueue(cap,t);
+        appendToRunQueue(cap,t);
     } else {
         pushOnRunQueue(cap,t);
     }
@@ -1162,7 +1179,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
             //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
             checkTSO(t));
 
-    addToRunQueue(cap,t);
+    appendToRunQueue(cap,t);
 
     return rtsFalse;
 }
index 0db2b1e..1e786ce 100644 (file)
@@ -118,8 +118,10 @@ appendToRunQueue (Capability *cap, StgTSO *tso)
     ASSERT(tso->_link == END_TSO_QUEUE);
     if (cap->run_queue_hd == END_TSO_QUEUE) {
        cap->run_queue_hd = tso;
+        tso->block_info.prev = END_TSO_QUEUE;
     } else {
        setTSOLink(cap, cap->run_queue_tl, tso);
+        setTSOPrev(cap, tso, cap->run_queue_tl);
     }
     cap->run_queue_tl = tso;
     traceEventThreadRunnable (cap, tso);
@@ -135,6 +137,10 @@ EXTERN_INLINE void
 pushOnRunQueue (Capability *cap, StgTSO *tso)
 {
     setTSOLink(cap, tso, cap->run_queue_hd);
+    tso->block_info.prev = END_TSO_QUEUE;
+    if (cap->run_queue_hd != END_TSO_QUEUE) {
+        setTSOPrev(cap, cap->run_queue_hd, tso);
+    }
     cap->run_queue_hd = tso;
     if (cap->run_queue_tl == END_TSO_QUEUE) {
        cap->run_queue_tl = tso;
@@ -149,6 +155,7 @@ popRunQueue (Capability *cap)
     StgTSO *t = cap->run_queue_hd;
     ASSERT(t != END_TSO_QUEUE);
     cap->run_queue_hd = t->_link;
+    cap->run_queue_hd->block_info.prev = END_TSO_QUEUE;
     t->_link = END_TSO_QUEUE; // no write barrier req'd
     if (cap->run_queue_hd == END_TSO_QUEUE) {
        cap->run_queue_tl = END_TSO_QUEUE;
@@ -156,6 +163,8 @@ popRunQueue (Capability *cap)
     return t;
 }
 
+extern void removeFromRunQueue (Capability *cap, StgTSO *tso);
+
 /* Add a thread to the end of the blocked queue.
  */
 #if !defined(THREADED_RTS)
index 3962bf0..0fb8e1f 100644 (file)
@@ -119,7 +119,8 @@ revertCAFs( void )
 {
     StgIndStatic *c;
 
-    for (c = (StgIndStatic *)revertible_caf_list; c != NULL; 
+    for (c = (StgIndStatic *)revertible_caf_list; 
+         c != (StgIndStatic *)END_OF_STATIC_LIST; 
         c = (StgIndStatic *)c->static_link) 
     {
        SET_INFO(c, c->saved_info);
index 1423077..2069711 100644 (file)
@@ -331,7 +331,8 @@ checkClosure( StgClosure* p )
         ASSERT(LOOKS_LIKE_CLOSURE_PTR(bq->bh));
 
         ASSERT(get_itbl(bq->owner)->type == TSO);
-        ASSERT(bq->queue == END_TSO_QUEUE || get_itbl(bq->queue)->type == TSO);
+        ASSERT(bq->queue == (MessageBlackHole*)END_TSO_QUEUE 
+               || get_itbl(bq->queue)->type == TSO);
         ASSERT(bq->link == (StgBlockingQueue*)END_TSO_QUEUE || 
                get_itbl(bq->link)->type == IND ||
                get_itbl(bq->link)->type == BLOCKING_QUEUE);
@@ -745,6 +746,18 @@ findMemoryLeak (void)
   reportUnmarkedBlocks();
 }
 
+void
+checkRunQueue(Capability *cap)
+{
+    StgTSO *prev, *tso;
+    prev = END_TSO_QUEUE;
+    for (tso = cap->run_queue_hd; tso != END_TSO_QUEUE; 
+         prev = tso, tso = tso->_link) {
+        ASSERT(prev == END_TSO_QUEUE || prev->_link == tso);
+        ASSERT(tso->block_info.prev == prev);
+    }
+    ASSERT(cap->run_queue_tl == prev);
+}
 
 /* -----------------------------------------------------------------------------
    Memory leak detection
index 38a7289..5c963b4 100644 (file)
@@ -36,6 +36,8 @@ StgOffset checkClosure  ( StgClosure* p );
 void checkMutableList   ( bdescr *bd, nat gen );
 void checkMutableLists  ( rtsBool checkTSOs );
 
+void checkRunQueue      (Capability *cap);
+
 void memInventory (rtsBool show);
 
 void checkBQ (StgTSO *bqe, StgClosure *closure);
index 75c186c..e6234f6 100644 (file)
@@ -69,10 +69,24 @@ scavengeTSO (StgTSO *tso)
     saved_eager = gct->eager_promotion;
     gct->eager_promotion = rtsFalse;
 
+
+    evacuate((StgClosure **)&tso->blocked_exceptions);
+    evacuate((StgClosure **)&tso->bq);
+    
+    // scavange current transaction record
+    evacuate((StgClosure **)&tso->trec);
+    
+    // scavenge this thread's stack 
+    scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
+
+    tso->dirty = gct->failed_to_evac;
+
+    evacuate((StgClosure **)&tso->_link);
     if (   tso->why_blocked == BlockedOnMVar
        || tso->why_blocked == BlockedOnBlackHole
        || tso->why_blocked == BlockedOnMsgWakeup
        || tso->why_blocked == BlockedOnMsgThrowTo
+        || tso->why_blocked == NotBlocked
        ) {
        evacuate(&tso->block_info.closure);
     }
@@ -86,26 +100,10 @@ scavengeTSO (StgTSO *tso)
     }
 #endif
 
-    evacuate((StgClosure **)&tso->blocked_exceptions);
-    evacuate((StgClosure **)&tso->bq);
-    
-    // scavange current transaction record
-    evacuate((StgClosure **)&tso->trec);
-    
-    // scavenge this thread's stack 
-    scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
-
-    if (gct->failed_to_evac) {
-        tso->dirty = 1;
-        evacuate((StgClosure **)&tso->_link);
+    if (tso->dirty == 0 && gct->failed_to_evac) {
+        tso->flags |= TSO_LINK_DIRTY;
     } else {
-        tso->dirty = 0;
-        evacuate((StgClosure **)&tso->_link);
-        if (gct->failed_to_evac) {
-            tso->flags |= TSO_LINK_DIRTY;
-        } else {
-            tso->flags &= ~TSO_LINK_DIRTY;
-        }
+        tso->flags &= ~TSO_LINK_DIRTY;
     }
 
     gct->eager_promotion = saved_eager;
@@ -1407,6 +1405,14 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
                     // ASSERT(tso->flags & TSO_LINK_DIRTY);
 
                     evacuate((StgClosure **)&tso->_link);
+                    if (   tso->why_blocked == BlockedOnMVar
+                        || tso->why_blocked == BlockedOnBlackHole
+                        || tso->why_blocked == BlockedOnMsgWakeup
+                        || tso->why_blocked == BlockedOnMsgThrowTo
+                        || tso->why_blocked == NotBlocked
+                        ) {
+                        evacuate((StgClosure **)&tso->block_info.prev);
+                    }
                     if (gct->failed_to_evac) {
                         recordMutableGen_GC((StgClosure *)p,gen->no);
                         gct->failed_to_evac = rtsFalse;
index 3b9775e..c2a1911 100644 (file)
@@ -721,6 +721,16 @@ setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target)
 }
 
 void
+setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target)
+{
+    if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {
+        tso->flags |= TSO_LINK_DIRTY;
+        recordClosureMutated(cap,(StgClosure*)tso);
+    }
+    tso->block_info.prev = target;
+}
+
+void
 dirty_TSO (Capability *cap, StgTSO *tso)
 {
     if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {