X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FRaiseAsync.c;h=ad830cf32206a96ad5ded2b9ce134fe2ef2b9c28;hb=2a4cd5365060c75d474af1532cd3ebb8ddc94996;hp=d5a4918f34f6dcb56d112a7a2916893f177fd535;hpb=5d52d9b64c21dcf77849866584744722f8121389;p=ghc-hetmet.git diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index d5a4918..ad830cf 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -31,6 +31,8 @@ static void raiseAsync (Capability *cap, static void removeFromQueues(Capability *cap, StgTSO *tso); +static void removeFromMVarBlockedQueue (StgTSO *tso); + static void blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg); @@ -38,8 +40,6 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS, Capability *target_cap USED_IF_THREADS, MessageThrowTo *msg USED_IF_THREADS); -static void performBlockedException (Capability *cap, MessageThrowTo *msg); - /* ----------------------------------------------------------------------------- throwToSingleThreaded @@ -126,7 +126,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) Currently we send a message if the target belongs to another Capability, and it is - - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo, + - NotBlocked, BlockedOnMsgThrowTo, BlockedOnCCall - or it is masking exceptions (TSO_BLOCKEX) @@ -148,7 +148,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) unlockClosure(msg, &stg_MSG_THROWTO_info); If it decides not to raise the exception after all, it can revoke it safely with - unlockClosure(msg, &stg_IND_info); + unlockClosure(msg, &stg_MSG_NULL_info); -------------------------------------------------------------------------- */ @@ -223,67 +223,7 @@ check_target: switch (status) { case NotBlocked: - case BlockedOnMsgWakeup: - /* if status==NotBlocked, and target->cap == cap, then - we own this TSO and can raise the exception. - - How do we establish this condition? Very carefully. - - Let - P = (status == NotBlocked) - Q = (tso->cap == cap) - - Now, if P & Q are true, then the TSO is locked and owned by - this capability. No other OS thread can steal it. - - If P==0 and Q==1: the TSO is blocked, but attached to this - capabilty, and it can be stolen by another capability. - - If P==1 and Q==0: the TSO is runnable on another - capability. At any time, the TSO may change from runnable - to blocked and vice versa, while it remains owned by - another capability. - - Suppose we test like this: - - p = P - q = Q - if (p && q) ... - - this is defeated by another capability stealing a blocked - TSO from us to wake it up (Schedule.c:unblockOne()). The - other thread is doing - - Q = 0 - P = 1 - - assuming arbitrary reordering, we could see this - interleaving: - - start: P==0 && Q==1 - P = 1 - p = P - q = Q - Q = 0 - if (p && q) ... - - so we need a memory barrier: - - p = P - mb() - q = Q - if (p && q) ... - - this avoids the problematic case. There are other cases - to consider, but this is the tricky one. - - Note that we must be sure that unblockOne() does the - writes in the correct order: Q before P. The memory - barrier ensures that if we have seen the write to P, we - have also seen the write to Q. - */ { - write_barrier(); if ((target->flags & TSO_BLOCKEX) == 0) { // It's on our run queue and not blocking exceptions raiseAsync(cap, target, msg->exception, rtsFalse, NULL); @@ -326,8 +266,17 @@ check_target: } } + if (i == &stg_MSG_NULL_info) { + // we know there's a MSG_TRY_WAKEUP on the way, so we + // might as well just do it now. The message will + // be a no-op when it arrives. + unlockClosure((StgClosure*)m, i); + tryWakeupThread_(cap, target); + goto retry; + } + if (i != &stg_MSG_THROWTO_info) { - // if it's an IND, this TSO has been woken up by another Cap + // if it's a MSG_NULL, this TSO has been woken up by another Cap unlockClosure((StgClosure*)m, i); goto retry; } @@ -340,7 +289,7 @@ check_target: } // nobody else can wake up this TSO after we claim the message - unlockClosure((StgClosure*)m, &stg_IND_info); + unlockClosure((StgClosure*)m, &stg_MSG_NULL_info); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); return THROWTO_SUCCESS; @@ -382,32 +331,46 @@ check_target: goto retry; } + if (target->_link == END_TSO_QUEUE) { + // the MVar operation has already completed. There is a + // MSG_TRY_WAKEUP on the way, but we can just wake up the + // thread now anyway and ignore the message when it + // arrives. + unlockClosure((StgClosure *)mvar, info); + tryWakeupThread_(cap, target); + goto retry; + } + if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { blockedThrowTo(cap,target,msg); unlockClosure((StgClosure *)mvar, info); return THROWTO_BLOCKED; } else { - removeThreadFromMVarQueue(cap, mvar, target); + // revoke the MVar operation + removeFromMVarBlockedQueue(target); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - if (info == &stg_MVAR_CLEAN_info) { - dirty_MVAR(&cap->r,(StgClosure*)mvar); - } - unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info); + unlockClosure((StgClosure *)mvar, info); return THROWTO_SUCCESS; } } case BlockedOnBlackHole: { - // Revoke the message by replacing it with IND. We're not - // locking anything here, so we might still get a TRY_WAKEUP - // message from the owner of the blackhole some time in the - // future, but that doesn't matter. - ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info); - OVERWRITE_INFO(target->block_info.bh, &stg_IND_info); - raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - return THROWTO_SUCCESS; + if (target->flags & TSO_BLOCKEX) { + // BlockedOnBlackHole is not interruptible. + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; + } else { + // Revoke the message by replacing it with IND. We're not + // locking anything here, so we might still get a TRY_WAKEUP + // message from the owner of the blackhole some time in the + // future, but that doesn't matter. + ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info); + OVERWRITE_INFO(target->block_info.bh, &stg_IND_info); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; + } } case BlockedOnSTM: @@ -535,21 +498,21 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0; i = lockClosure((StgClosure*)msg); tso->blocked_exceptions = (MessageThrowTo*)msg->link; - if (i == &stg_IND_info) { + if (i == &stg_MSG_NULL_info) { unlockClosure((StgClosure*)msg,i); goto loop; } - performBlockedException(cap, msg); - unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); - unlockClosure((StgClosure*)msg,&stg_IND_info); + throwToSingleThreaded(cap, msg->target, msg->exception); + unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info); + tryWakeupThread(cap, msg->source); return 1; } return 0; } // awakenBlockedExceptionQueue(): Just wake up the whole queue of -// blocked exceptions and let them try again. +// blocked exceptions. void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) @@ -560,31 +523,16 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE; msg = (MessageThrowTo*)msg->link) { i = lockClosure((StgClosure *)msg); - if (i != &stg_IND_info) { - unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); + if (i != &stg_MSG_NULL_info) { + unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info); + tryWakeupThread(cap, msg->source); + } else { + unlockClosure((StgClosure *)msg,i); } - unlockClosure((StgClosure *)msg,i); } tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; } -static void -performBlockedException (Capability *cap, MessageThrowTo *msg) -{ - StgTSO *source; - - source = msg->source; - - ASSERT(source->why_blocked == BlockedOnMsgThrowTo); - ASSERT(source->block_info.closure == (StgClosure *)msg); - ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info); - ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id); - // check ids not pointers, because the thread might be relocated - - throwToSingleThreaded(cap, msg->target, msg->exception); - source->sp += 3; -} - /* ----------------------------------------------------------------------------- Remove a thread from blocking queues. @@ -596,11 +544,54 @@ performBlockedException (Capability *cap, MessageThrowTo *msg) -------------------------------------------------------------------------- */ static void +removeFromMVarBlockedQueue (StgTSO *tso) +{ + StgMVar *mvar = (StgMVar*)tso->block_info.closure; + StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link; + + if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) { + // already removed from this MVar + return; + } + + // Assume the MVar is locked. (not assertable; sometimes it isn't + // actually WHITEHOLE'd). + + // We want to remove the MVAR_TSO_QUEUE object from the queue. It + // isn't doubly-linked so we can't actually remove it; instead we + // just overwrite it with an IND if possible and let the GC short + // it out. However, we have to be careful to maintain the deque + // structure: + + if (mvar->head == q) { + mvar->head = q->link; + q->header.info = &stg_IND_info; + if (mvar->tail == q) { + mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE; + } + } + else if (mvar->tail == q) { + // we can't replace it with an IND in this case, because then + // we lose the tail pointer when the GC shorts out the IND. + // So we use MSG_NULL as a kind of non-dupable indirection; + // these are ignored by takeMVar/putMVar. + q->header.info = &stg_MSG_NULL_info; + } + else { + q->header.info = &stg_IND_info; + } + + // revoke the MVar operation + tso->_link = END_TSO_QUEUE; +} + +static void removeFromQueues(Capability *cap, StgTSO *tso) { switch (tso->why_blocked) { case NotBlocked: + case ThreadMigrating: return; case BlockedOnSTM: @@ -613,22 +604,13 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; case BlockedOnMVar: - removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso); - // we aren't doing a write barrier here: the MVar is supposed to - // be already locked, so replacing the info pointer would unlock it. + removeFromMVarBlockedQueue(tso); goto done; case BlockedOnBlackHole: // nothing to do goto done; - case BlockedOnMsgWakeup: - { - // kill the message, atomically: - OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info); - break; - } - case BlockedOnMsgThrowTo: { MessageThrowTo *m = tso->block_info.throwto; @@ -638,7 +620,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) // ASSERT(m->header.info == &stg_WHITEHOLE_info); // unlock and revoke it at the same time - unlockClosure((StgClosure*)m,&stg_IND_info); + unlockClosure((StgClosure*)m,&stg_MSG_NULL_info); break; } @@ -667,7 +649,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) } done: - unblockOne(cap, tso); + tso->why_blocked = NotBlocked; + appendToRunQueue(cap, tso); } /* ----------------------------------------------------------------------------- @@ -741,7 +724,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, ASSERT(tso->cap == cap); // wake it up - if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) { + if (tso->why_blocked != NotBlocked) { tso->why_blocked = NotBlocked; appendToRunQueue(cap,tso); } @@ -857,9 +840,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // top of the CATCH_FRAME ready to enter. // { -#ifdef PROFILING StgCatchFrame *cf = (StgCatchFrame *)frame; -#endif StgThunk *raise; if (exception == NULL) break; @@ -880,7 +861,12 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, * a surprise exception before we get around to executing the * handler. */ - tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE; + tso->flags |= TSO_BLOCKEX; + if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) { + tso->flags &= ~TSO_INTERRUPTIBLE; + } else { + tso->flags |= TSO_INTERRUPTIBLE; + } /* Put the newly-built THUNK on top of the stack, ready to execute * when the thread restarts.