#ifdef DEBUG
{
const StgInfoTable *i = msg->header.info;
- if (i != &stg_MSG_WAKEUP_info &&
- i != &stg_MSG_THROWTO_info &&
+ if (i != &stg_MSG_THROWTO_info &&
i != &stg_MSG_BLACKHOLE_info &&
i != &stg_MSG_TRY_WAKEUP_info &&
i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
loop:
write_barrier(); // allow m->header to be modified by another thread
i = m->header.info;
- if (i == &stg_MSG_WAKEUP_info)
- {
- // the plan is to eventually get rid of these and use
- // TRY_WAKEUP instead.
- MessageWakeup *w = (MessageWakeup *)m;
- StgTSO *tso = w->tso;
- debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
- (lnat)tso->id);
- ASSERT(tso->cap == cap);
- ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
- ASSERT(tso->block_info.closure == (StgClosure *)m);
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap, tso);
- }
- else if (i == &stg_MSG_TRY_WAKEUP_info)
+ if (i == &stg_MSG_TRY_WAKEUP_info)
{
StgTSO *tso = ((MessageWakeup *)m)->tso;
debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
switch (r) {
case THROWTO_SUCCESS:
- ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
- t->source->sp += 3;
- unblockOne(cap, t->source);
// this message is done
- unlockClosure((StgClosure*)m, &stg_IND_info);
+ unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
+ tryWakeupThread(cap, t->source);
break;
case THROWTO_BLOCKED:
// unlock the message
}
return;
}
- else if (i == &stg_IND_info)
+ else if (i == &stg_IND_info || i == &stg_MSG_NULL_info)
{
// message was revoked
return;
const StgInfoTable *info;
StgClosure *p;
StgBlockingQueue *bq;
- StgClosure *bh = msg->bh;
+ StgClosure *bh = UNTAG_CLOSURE(msg->bh);
StgTSO *owner;
debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p",
return 0;
}
- // we know at this point that the closure
+ // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
+ // or a value.
loop:
- p = ((StgInd*)bh)->indirectee;
+ // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
+ // and turns this into an infinite loop.
+ p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
info = p->header.info;
if (info == &stg_IND_info)
bq->link = owner->bq;
owner->bq = bq;
dirty_TSO(cap, owner); // we modified owner->bq
-
+
+ // If the owner of the blackhole is currently runnable, then
+ // bump it to the front of the run queue. This gives the
+ // blocked-on thread a little boost which should help unblock
+ // this thread, and may avoid a pile-up of other threads
+ // becoming blocked on the same BLACKHOLE (#3838).
+ //
+ // NB. we check to make sure that the owner is not the same as
+ // the current thread, since in that case it will not be on
+ // the run queue.
+ if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
+ removeFromRunQueue(cap, owner);
+ pushOnRunQueue(cap,owner);
+ }
+
// point to the BLOCKING_QUEUE from the BLACKHOLE
write_barrier(); // make the BQ visible
((StgInd*)bh)->indirectee = (StgClosure *)bq;
if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
- recordClosureMutated(cap,bq);
+ recordClosureMutated(cap,(StgClosure*)bq);
}
debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
(lnat)msg->tso->id, (lnat)owner->id);
+ // See above, #3838
+ if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
+ removeFromRunQueue(cap, owner);
+ pushOnRunQueue(cap,owner);
+ }
+
return 1; // blocked
}