X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FMessages.c;h=5dec6c69271dc03bfb70562818b601127ba6e17b;hb=26f4bfc82f2b2359259578e9c54d476fc2de650f;hp=ae5d5d1abc879c5cf5f76859bf3f2cd442ade6ea;hpb=2726a2f10256710cc6ed80b1098cb32e121e1be7;p=ghc-hetmet.git diff --git a/rts/Messages.c b/rts/Messages.c index ae5d5d1..5dec6c6 100644 --- a/rts/Messages.c +++ b/rts/Messages.c @@ -28,8 +28,7 @@ void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg) #ifdef DEBUG { const StgInfoTable *i = msg->header.info; - if (i != &stg_MSG_WAKEUP_info && - i != &stg_MSG_THROWTO_info && + if (i != &stg_MSG_THROWTO_info && i != &stg_MSG_BLACKHOLE_info && i != &stg_MSG_TRY_WAKEUP_info && i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked @@ -71,21 +70,7 @@ executeMessage (Capability *cap, Message *m) loop: write_barrier(); // allow m->header to be modified by another thread i = m->header.info; - if (i == &stg_MSG_WAKEUP_info) - { - // the plan is to eventually get rid of these and use - // TRY_WAKEUP instead. - MessageWakeup *w = (MessageWakeup *)m; - StgTSO *tso = w->tso; - debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", - (lnat)tso->id); - ASSERT(tso->cap == cap); - ASSERT(tso->why_blocked == BlockedOnMsgWakeup); - ASSERT(tso->block_info.closure == (StgClosure *)m); - tso->why_blocked = NotBlocked; - appendToRunQueue(cap, tso); - } - else if (i == &stg_MSG_TRY_WAKEUP_info) + if (i == &stg_MSG_TRY_WAKEUP_info) { StgTSO *tso = ((MessageWakeup *)m)->tso; debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld", @@ -113,11 +98,13 @@ loop: r = throwToMsg(cap, t); switch (r) { - case THROWTO_SUCCESS: + case THROWTO_SUCCESS: { // this message is done - unlockClosure((StgClosure*)m, &stg_MSG_NULL_info); - tryWakeupThread(cap, t->source); + StgTSO *source = t->source; + doneWithMsgThrowTo(t); + tryWakeupThread(cap, source); break; + } case THROWTO_BLOCKED: // unlock the message unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info); @@ -176,7 +163,7 @@ nat messageBlackHole(Capability *cap, MessageBlackHole *msg) const StgInfoTable *info; StgClosure *p; StgBlockingQueue *bq; - StgClosure *bh = msg->bh; + StgClosure *bh = UNTAG_CLOSURE(msg->bh); StgTSO *owner; debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p", @@ -190,6 +177,7 @@ nat messageBlackHole(Capability *cap, MessageBlackHole *msg) // all. if (info != &stg_BLACKHOLE_info && info != &stg_CAF_BLACKHOLE_info && + info != &__stg_EAGER_BLACKHOLE_info && info != &stg_WHITEHOLE_info) { // if it is a WHITEHOLE, then a thread is in the process of // trying to BLACKHOLE it. But we know that it was once a @@ -198,9 +186,12 @@ nat messageBlackHole(Capability *cap, MessageBlackHole *msg) return 0; } - // we know at this point that the closure + // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND, + // or a value. loop: - p = ((StgInd*)bh)->indirectee; + // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load + // and turns this into an infinite loop. + p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee)); info = p->header.info; if (info == &stg_IND_info) @@ -214,7 +205,7 @@ loop: else if (info == &stg_TSO_info) { - owner = deRefTSO((StgTSO *)p); + owner = (StgTSO*)p; #ifdef THREADED_RTS if (owner->cap != cap) { @@ -276,7 +267,7 @@ loop: ASSERT(bq->bh == bh); - owner = deRefTSO(bq->owner); + owner = bq->owner; ASSERT(owner != END_TSO_QUEUE); @@ -312,3 +303,46 @@ loop: return 0; // not blocked } +// A shorter version of messageBlackHole(), that just returns the +// owner (or NULL if the owner cannot be found, because the blackhole +// has been updated in the meantime). + +StgTSO * blackHoleOwner (StgClosure *bh) +{ + const StgInfoTable *info; + StgClosure *p; + + info = bh->header.info; + + if (info != &stg_BLACKHOLE_info && + info != &stg_CAF_BLACKHOLE_info && + info != &__stg_EAGER_BLACKHOLE_info && + info != &stg_WHITEHOLE_info) { + return NULL; + } + + // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND, + // or a value. +loop: + // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load + // and turns this into an infinite loop. + p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee)); + info = p->header.info; + + if (info == &stg_IND_info) goto loop; + + else if (info == &stg_TSO_info) + { + return (StgTSO*)p; + } + else if (info == &stg_BLOCKING_QUEUE_CLEAN_info || + info == &stg_BLOCKING_QUEUE_DIRTY_info) + { + StgBlockingQueue *bq = (StgBlockingQueue *)p; + return bq->owner; + } + + return NULL; // not blocked +} + +