/* Reason for thread being blocked. See comment above struct StgTso_. */
typedef union {
StgClosure *closure;
+ StgTSO *prev; // a back-link when the TSO is on the run queue (NotBlocked)
struct MessageBlackHole_ *bh;
struct MessageThrowTo_ *throwto;
struct MessageWakeup_ *wakeup;
void dirty_TSO (Capability *cap, StgTSO *tso);
void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
+void setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target);
// Apply to a TSO before looking at it if you are not sure whether it
// might be ThreadRelocated or not (basically, that's most of the time
bq->link = owner->bq;
owner->bq = bq;
dirty_TSO(cap, owner); // we modified owner->bq
-
+
+ // If the owner of the blackhole is currently runnable, then
+ // bump it to the front of the run queue. This gives the
+ // blocked-on thread a little boost which should help unblock
+ // this thread, and may avoid a pile-up of other threads
+ // becoming blocked on the same BLACKHOLE (#3838).
+ //
+ // NB. we check to make sure that the owner is not the same as
+ // the current thread, since in that case it will not be on
+ // the run queue.
+ if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
+ removeFromRunQueue(cap, owner);
+ pushOnRunQueue(cap,owner);
+ }
+
// point to the BLOCKING_QUEUE from the BLACKHOLE
write_barrier(); // make the BQ visible
((StgInd*)bh)->indirectee = (StgClosure *)bq;
if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
- recordClosureMutated(cap,bq);
+ recordClosureMutated(cap,(StgClosure*)bq);
}
debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
(lnat)msg->tso->id, (lnat)owner->id);
+ // See above, #3838
+ if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
+ removeFromRunQueue(cap, owner);
+ pushOnRunQueue(cap,owner);
+ }
+
return 1; // blocked
}
static void deleteThread_(Capability *cap, StgTSO *tso);
#endif
-/* -----------------------------------------------------------------------------
- * Putting a thread on the run queue: different scheduling policies
- * -------------------------------------------------------------------------- */
-
-STATIC_INLINE void
-addToRunQueue( Capability *cap, StgTSO *t )
-{
- // this does round-robin scheduling; good for concurrency
- appendToRunQueue(cap,t);
-}
-
/* ---------------------------------------------------------------------------
Main scheduling loop.
} /* end of while() */
}
+/* -----------------------------------------------------------------------------
+ * Run queue operations
+ * -------------------------------------------------------------------------- */
+
+void
+removeFromRunQueue (Capability *cap, StgTSO *tso)
+{
+ if (tso->block_info.prev == END_TSO_QUEUE) {
+ ASSERT(cap->run_queue_hd == tso);
+ cap->run_queue_hd = tso->_link;
+ } else {
+ setTSOLink(cap, tso->block_info.prev, tso->_link);
+ }
+ if (tso->_link == END_TSO_QUEUE) {
+ ASSERT(cap->run_queue_tl == tso);
+ cap->run_queue_tl = tso->block_info.prev;
+ } else {
+ setTSOPrev(cap, tso->_link, tso->block_info.prev);
+ }
+ tso->_link = tso->block_info.prev = END_TSO_QUEUE;
+
+ IF_DEBUG(sanity, checkRunQueue(cap));
+}
+
/* ----------------------------------------------------------------------------
* Setting up the scheduler loop
* ------------------------------------------------------------------------- */
|| t->bound == task->incall // don't move my bound thread
|| tsoLocked(t)) { // don't move a locked thread
setTSOLink(cap, prev, t);
+ setTSOPrev(cap, t, prev);
prev = t;
} else if (i == n_free_caps) {
pushed_to_all = rtsTrue;
i = 0;
// keep one for us
setTSOLink(cap, prev, t);
+ setTSOPrev(cap, t, prev);
prev = t;
} else {
appendToRunQueue(free_caps[i],t);
}
}
cap->run_queue_tl = prev;
+
+ IF_DEBUG(sanity, checkRunQueue(cap));
}
#ifdef SPARK_PUSHING
// context switch flag, and we end up waiting for a GC.
// See #1984, and concurrent/should_run/1984
cap->context_switch = 0;
- addToRunQueue(cap,t);
+ appendToRunQueue(cap,t);
} else {
pushOnRunQueue(cap,t);
}
//debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
checkTSO(t));
- addToRunQueue(cap,t);
+ appendToRunQueue(cap,t);
return rtsFalse;
}
ASSERT(tso->_link == END_TSO_QUEUE);
if (cap->run_queue_hd == END_TSO_QUEUE) {
cap->run_queue_hd = tso;
+ tso->block_info.prev = END_TSO_QUEUE;
} else {
setTSOLink(cap, cap->run_queue_tl, tso);
+ setTSOPrev(cap, tso, cap->run_queue_tl);
}
cap->run_queue_tl = tso;
traceEventThreadRunnable (cap, tso);
pushOnRunQueue (Capability *cap, StgTSO *tso)
{
setTSOLink(cap, tso, cap->run_queue_hd);
+ tso->block_info.prev = END_TSO_QUEUE;
+ if (cap->run_queue_hd != END_TSO_QUEUE) {
+ setTSOPrev(cap, cap->run_queue_hd, tso);
+ }
cap->run_queue_hd = tso;
if (cap->run_queue_tl == END_TSO_QUEUE) {
cap->run_queue_tl = tso;
StgTSO *t = cap->run_queue_hd;
ASSERT(t != END_TSO_QUEUE);
cap->run_queue_hd = t->_link;
+ cap->run_queue_hd->block_info.prev = END_TSO_QUEUE;
t->_link = END_TSO_QUEUE; // no write barrier req'd
if (cap->run_queue_hd == END_TSO_QUEUE) {
cap->run_queue_tl = END_TSO_QUEUE;
return t;
}
+extern void removeFromRunQueue (Capability *cap, StgTSO *tso);
+
/* Add a thread to the end of the blocked queue.
*/
#if !defined(THREADED_RTS)
{
StgIndStatic *c;
- for (c = (StgIndStatic *)revertible_caf_list; c != NULL;
+ for (c = (StgIndStatic *)revertible_caf_list;
+ c != (StgIndStatic *)END_OF_STATIC_LIST;
c = (StgIndStatic *)c->static_link)
{
SET_INFO(c, c->saved_info);
ASSERT(LOOKS_LIKE_CLOSURE_PTR(bq->bh));
ASSERT(get_itbl(bq->owner)->type == TSO);
- ASSERT(bq->queue == END_TSO_QUEUE || get_itbl(bq->queue)->type == TSO);
+ ASSERT(bq->queue == (MessageBlackHole*)END_TSO_QUEUE
+ || get_itbl(bq->queue)->type == TSO);
ASSERT(bq->link == (StgBlockingQueue*)END_TSO_QUEUE ||
get_itbl(bq->link)->type == IND ||
get_itbl(bq->link)->type == BLOCKING_QUEUE);
reportUnmarkedBlocks();
}
+void
+checkRunQueue(Capability *cap)
+{
+ StgTSO *prev, *tso;
+ prev = END_TSO_QUEUE;
+ for (tso = cap->run_queue_hd; tso != END_TSO_QUEUE;
+ prev = tso, tso = tso->_link) {
+ ASSERT(prev == END_TSO_QUEUE || prev->_link == tso);
+ ASSERT(tso->block_info.prev == prev);
+ }
+ ASSERT(cap->run_queue_tl == prev);
+}
/* -----------------------------------------------------------------------------
Memory leak detection
void checkMutableList ( bdescr *bd, nat gen );
void checkMutableLists ( rtsBool checkTSOs );
+void checkRunQueue (Capability *cap);
+
void memInventory (rtsBool show);
void checkBQ (StgTSO *bqe, StgClosure *closure);
saved_eager = gct->eager_promotion;
gct->eager_promotion = rtsFalse;
+
+ evacuate((StgClosure **)&tso->blocked_exceptions);
+ evacuate((StgClosure **)&tso->bq);
+
+ // scavange current transaction record
+ evacuate((StgClosure **)&tso->trec);
+
+ // scavenge this thread's stack
+ scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
+
+ tso->dirty = gct->failed_to_evac;
+
+ evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgWakeup
|| tso->why_blocked == BlockedOnMsgThrowTo
+ || tso->why_blocked == NotBlocked
) {
evacuate(&tso->block_info.closure);
}
}
#endif
- evacuate((StgClosure **)&tso->blocked_exceptions);
- evacuate((StgClosure **)&tso->bq);
-
- // scavange current transaction record
- evacuate((StgClosure **)&tso->trec);
-
- // scavenge this thread's stack
- scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
-
- if (gct->failed_to_evac) {
- tso->dirty = 1;
- evacuate((StgClosure **)&tso->_link);
+ if (tso->dirty == 0 && gct->failed_to_evac) {
+ tso->flags |= TSO_LINK_DIRTY;
} else {
- tso->dirty = 0;
- evacuate((StgClosure **)&tso->_link);
- if (gct->failed_to_evac) {
- tso->flags |= TSO_LINK_DIRTY;
- } else {
- tso->flags &= ~TSO_LINK_DIRTY;
- }
+ tso->flags &= ~TSO_LINK_DIRTY;
}
gct->eager_promotion = saved_eager;
// ASSERT(tso->flags & TSO_LINK_DIRTY);
evacuate((StgClosure **)&tso->_link);
+ if ( tso->why_blocked == BlockedOnMVar
+ || tso->why_blocked == BlockedOnBlackHole
+ || tso->why_blocked == BlockedOnMsgWakeup
+ || tso->why_blocked == BlockedOnMsgThrowTo
+ || tso->why_blocked == NotBlocked
+ ) {
+ evacuate((StgClosure **)&tso->block_info.prev);
+ }
if (gct->failed_to_evac) {
recordMutableGen_GC((StgClosure *)p,gen->no);
gct->failed_to_evac = rtsFalse;
}
void
+setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target)
+{
+ if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {
+ tso->flags |= TSO_LINK_DIRTY;
+ recordClosureMutated(cap,(StgClosure*)tso);
+ }
+ tso->block_info.prev = target;
+}
+
+void
dirty_TSO (Capability *cap, StgTSO *tso)
{
if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {