From 2726a2f10256710cc6ed80b1098cb32e121e1be7 Mon Sep 17 00:00:00 2001 From: Simon Marlow Date: Mon, 29 Mar 2010 14:45:21 +0000 Subject: [PATCH] Move a thread to the front of the run queue when another thread blocks on it This fixes #3838, and was made possible by the new BLACKHOLE infrastructure. To allow reording of the run queue I had to make it doubly-linked, which entails some extra trickiness with regard to GC write barriers and suchlike. --- includes/rts/storage/TSO.h | 2 ++ rts/Messages.c | 24 ++++++++++++++++++++++-- rts/Schedule.c | 43 ++++++++++++++++++++++++++++++------------- rts/Schedule.h | 9 +++++++++ rts/sm/GCAux.c | 3 ++- rts/sm/Sanity.c | 15 ++++++++++++++- rts/sm/Sanity.h | 2 ++ rts/sm/Scav.c | 44 +++++++++++++++++++++++++------------------- rts/sm/Storage.c | 10 ++++++++++ 9 files changed, 116 insertions(+), 36 deletions(-) diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h index e07be88..abe6215 100644 --- a/includes/rts/storage/TSO.h +++ b/includes/rts/storage/TSO.h @@ -46,6 +46,7 @@ typedef struct { /* Reason for thread being blocked. See comment above struct StgTso_. */ typedef union { StgClosure *closure; + StgTSO *prev; // a back-link when the TSO is on the run queue (NotBlocked) struct MessageBlackHole_ *bh; struct MessageThrowTo_ *throwto; struct MessageWakeup_ *wakeup; @@ -163,6 +164,7 @@ typedef struct StgTSO_ { void dirty_TSO (Capability *cap, StgTSO *tso); void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target); +void setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target); // Apply to a TSO before looking at it if you are not sure whether it // might be ThreadRelocated or not (basically, that's most of the time diff --git a/rts/Messages.c b/rts/Messages.c index 6a7c64d..ae5d5d1 100644 --- a/rts/Messages.c +++ b/rts/Messages.c @@ -244,7 +244,21 @@ loop: bq->link = owner->bq; owner->bq = bq; dirty_TSO(cap, owner); // we modified owner->bq - + + // If the owner of the blackhole is currently runnable, then + // bump it to the front of the run queue. This gives the + // blocked-on thread a little boost which should help unblock + // this thread, and may avoid a pile-up of other threads + // becoming blocked on the same BLACKHOLE (#3838). + // + // NB. we check to make sure that the owner is not the same as + // the current thread, since in that case it will not be on + // the run queue. + if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) { + removeFromRunQueue(cap, owner); + pushOnRunQueue(cap,owner); + } + // point to the BLOCKING_QUEUE from the BLACKHOLE write_barrier(); // make the BQ visible ((StgInd*)bh)->indirectee = (StgClosure *)bq; @@ -280,12 +294,18 @@ loop: if (info == &stg_BLOCKING_QUEUE_CLEAN_info) { bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info; - recordClosureMutated(cap,bq); + recordClosureMutated(cap,(StgClosure*)bq); } debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", (lnat)msg->tso->id, (lnat)owner->id); + // See above, #3838 + if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) { + removeFromRunQueue(cap, owner); + pushOnRunQueue(cap,owner); + } + return 1; // blocked } diff --git a/rts/Schedule.c b/rts/Schedule.c index 72f6d44..d7d5741 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -158,17 +158,6 @@ static void deleteAllThreads (Capability *cap); static void deleteThread_(Capability *cap, StgTSO *tso); #endif -/* ----------------------------------------------------------------------------- - * Putting a thread on the run queue: different scheduling policies - * -------------------------------------------------------------------------- */ - -STATIC_INLINE void -addToRunQueue( Capability *cap, StgTSO *t ) -{ - // this does round-robin scheduling; good for concurrency - appendToRunQueue(cap,t); -} - /* --------------------------------------------------------------------------- Main scheduling loop. @@ -568,6 +557,30 @@ run_thread: } /* end of while() */ } +/* ----------------------------------------------------------------------------- + * Run queue operations + * -------------------------------------------------------------------------- */ + +void +removeFromRunQueue (Capability *cap, StgTSO *tso) +{ + if (tso->block_info.prev == END_TSO_QUEUE) { + ASSERT(cap->run_queue_hd == tso); + cap->run_queue_hd = tso->_link; + } else { + setTSOLink(cap, tso->block_info.prev, tso->_link); + } + if (tso->_link == END_TSO_QUEUE) { + ASSERT(cap->run_queue_tl == tso); + cap->run_queue_tl = tso->block_info.prev; + } else { + setTSOPrev(cap, tso->_link, tso->block_info.prev); + } + tso->_link = tso->block_info.prev = END_TSO_QUEUE; + + IF_DEBUG(sanity, checkRunQueue(cap)); +} + /* ---------------------------------------------------------------------------- * Setting up the scheduler loop * ------------------------------------------------------------------------- */ @@ -743,12 +756,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS, || t->bound == task->incall // don't move my bound thread || tsoLocked(t)) { // don't move a locked thread setTSOLink(cap, prev, t); + setTSOPrev(cap, t, prev); prev = t; } else if (i == n_free_caps) { pushed_to_all = rtsTrue; i = 0; // keep one for us setTSOLink(cap, prev, t); + setTSOPrev(cap, t, prev); prev = t; } else { appendToRunQueue(free_caps[i],t); @@ -761,6 +776,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS, } } cap->run_queue_tl = prev; + + IF_DEBUG(sanity, checkRunQueue(cap)); } #ifdef SPARK_PUSHING @@ -1093,7 +1110,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) // context switch flag, and we end up waiting for a GC. // See #1984, and concurrent/should_run/1984 cap->context_switch = 0; - addToRunQueue(cap,t); + appendToRunQueue(cap,t); } else { pushOnRunQueue(cap,t); } @@ -1162,7 +1179,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id); checkTSO(t)); - addToRunQueue(cap,t); + appendToRunQueue(cap,t); return rtsFalse; } diff --git a/rts/Schedule.h b/rts/Schedule.h index 0db2b1e..1e786ce 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -118,8 +118,10 @@ appendToRunQueue (Capability *cap, StgTSO *tso) ASSERT(tso->_link == END_TSO_QUEUE); if (cap->run_queue_hd == END_TSO_QUEUE) { cap->run_queue_hd = tso; + tso->block_info.prev = END_TSO_QUEUE; } else { setTSOLink(cap, cap->run_queue_tl, tso); + setTSOPrev(cap, tso, cap->run_queue_tl); } cap->run_queue_tl = tso; traceEventThreadRunnable (cap, tso); @@ -135,6 +137,10 @@ EXTERN_INLINE void pushOnRunQueue (Capability *cap, StgTSO *tso) { setTSOLink(cap, tso, cap->run_queue_hd); + tso->block_info.prev = END_TSO_QUEUE; + if (cap->run_queue_hd != END_TSO_QUEUE) { + setTSOPrev(cap, cap->run_queue_hd, tso); + } cap->run_queue_hd = tso; if (cap->run_queue_tl == END_TSO_QUEUE) { cap->run_queue_tl = tso; @@ -149,6 +155,7 @@ popRunQueue (Capability *cap) StgTSO *t = cap->run_queue_hd; ASSERT(t != END_TSO_QUEUE); cap->run_queue_hd = t->_link; + cap->run_queue_hd->block_info.prev = END_TSO_QUEUE; t->_link = END_TSO_QUEUE; // no write barrier req'd if (cap->run_queue_hd == END_TSO_QUEUE) { cap->run_queue_tl = END_TSO_QUEUE; @@ -156,6 +163,8 @@ popRunQueue (Capability *cap) return t; } +extern void removeFromRunQueue (Capability *cap, StgTSO *tso); + /* Add a thread to the end of the blocked queue. */ #if !defined(THREADED_RTS) diff --git a/rts/sm/GCAux.c b/rts/sm/GCAux.c index 3962bf0..0fb8e1f 100644 --- a/rts/sm/GCAux.c +++ b/rts/sm/GCAux.c @@ -119,7 +119,8 @@ revertCAFs( void ) { StgIndStatic *c; - for (c = (StgIndStatic *)revertible_caf_list; c != NULL; + for (c = (StgIndStatic *)revertible_caf_list; + c != (StgIndStatic *)END_OF_STATIC_LIST; c = (StgIndStatic *)c->static_link) { SET_INFO(c, c->saved_info); diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c index 1423077..2069711 100644 --- a/rts/sm/Sanity.c +++ b/rts/sm/Sanity.c @@ -331,7 +331,8 @@ checkClosure( StgClosure* p ) ASSERT(LOOKS_LIKE_CLOSURE_PTR(bq->bh)); ASSERT(get_itbl(bq->owner)->type == TSO); - ASSERT(bq->queue == END_TSO_QUEUE || get_itbl(bq->queue)->type == TSO); + ASSERT(bq->queue == (MessageBlackHole*)END_TSO_QUEUE + || get_itbl(bq->queue)->type == TSO); ASSERT(bq->link == (StgBlockingQueue*)END_TSO_QUEUE || get_itbl(bq->link)->type == IND || get_itbl(bq->link)->type == BLOCKING_QUEUE); @@ -745,6 +746,18 @@ findMemoryLeak (void) reportUnmarkedBlocks(); } +void +checkRunQueue(Capability *cap) +{ + StgTSO *prev, *tso; + prev = END_TSO_QUEUE; + for (tso = cap->run_queue_hd; tso != END_TSO_QUEUE; + prev = tso, tso = tso->_link) { + ASSERT(prev == END_TSO_QUEUE || prev->_link == tso); + ASSERT(tso->block_info.prev == prev); + } + ASSERT(cap->run_queue_tl == prev); +} /* ----------------------------------------------------------------------------- Memory leak detection diff --git a/rts/sm/Sanity.h b/rts/sm/Sanity.h index 38a7289..5c963b4 100644 --- a/rts/sm/Sanity.h +++ b/rts/sm/Sanity.h @@ -36,6 +36,8 @@ StgOffset checkClosure ( StgClosure* p ); void checkMutableList ( bdescr *bd, nat gen ); void checkMutableLists ( rtsBool checkTSOs ); +void checkRunQueue (Capability *cap); + void memInventory (rtsBool show); void checkBQ (StgTSO *bqe, StgClosure *closure); diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index 75c186c..e6234f6 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -69,10 +69,24 @@ scavengeTSO (StgTSO *tso) saved_eager = gct->eager_promotion; gct->eager_promotion = rtsFalse; + + evacuate((StgClosure **)&tso->blocked_exceptions); + evacuate((StgClosure **)&tso->bq); + + // scavange current transaction record + evacuate((StgClosure **)&tso->trec); + + // scavenge this thread's stack + scavenge_stack(tso->sp, &(tso->stack[tso->stack_size])); + + tso->dirty = gct->failed_to_evac; + + evacuate((StgClosure **)&tso->_link); if ( tso->why_blocked == BlockedOnMVar || tso->why_blocked == BlockedOnBlackHole || tso->why_blocked == BlockedOnMsgWakeup || tso->why_blocked == BlockedOnMsgThrowTo + || tso->why_blocked == NotBlocked ) { evacuate(&tso->block_info.closure); } @@ -86,26 +100,10 @@ scavengeTSO (StgTSO *tso) } #endif - evacuate((StgClosure **)&tso->blocked_exceptions); - evacuate((StgClosure **)&tso->bq); - - // scavange current transaction record - evacuate((StgClosure **)&tso->trec); - - // scavenge this thread's stack - scavenge_stack(tso->sp, &(tso->stack[tso->stack_size])); - - if (gct->failed_to_evac) { - tso->dirty = 1; - evacuate((StgClosure **)&tso->_link); + if (tso->dirty == 0 && gct->failed_to_evac) { + tso->flags |= TSO_LINK_DIRTY; } else { - tso->dirty = 0; - evacuate((StgClosure **)&tso->_link); - if (gct->failed_to_evac) { - tso->flags |= TSO_LINK_DIRTY; - } else { - tso->flags &= ~TSO_LINK_DIRTY; - } + tso->flags &= ~TSO_LINK_DIRTY; } gct->eager_promotion = saved_eager; @@ -1407,6 +1405,14 @@ scavenge_mutable_list(bdescr *bd, generation *gen) // ASSERT(tso->flags & TSO_LINK_DIRTY); evacuate((StgClosure **)&tso->_link); + if ( tso->why_blocked == BlockedOnMVar + || tso->why_blocked == BlockedOnBlackHole + || tso->why_blocked == BlockedOnMsgWakeup + || tso->why_blocked == BlockedOnMsgThrowTo + || tso->why_blocked == NotBlocked + ) { + evacuate((StgClosure **)&tso->block_info.prev); + } if (gct->failed_to_evac) { recordMutableGen_GC((StgClosure *)p,gen->no); gct->failed_to_evac = rtsFalse; diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c index 3b9775e..c2a1911 100644 --- a/rts/sm/Storage.c +++ b/rts/sm/Storage.c @@ -721,6 +721,16 @@ setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target) } void +setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target) +{ + if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) { + tso->flags |= TSO_LINK_DIRTY; + recordClosureMutated(cap,(StgClosure*)tso); + } + tso->block_info.prev = target; +} + +void dirty_TSO (Capability *cap, StgTSO *tso) { if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) { -- 1.7.10.4