X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FRaiseAsync.c;h=b94ccea283ce12fc5b1cfe8cd2a6164452b55a96;hp=d54f823d6e506fa356dcce134612e05128000249;hb=83d563cb9ede0ba792836e529b1e2929db926355;hpb=7408b39235bccdcde48df2a73337ff976fbc09b7 diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index d54f823..b94ccea 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -18,6 +18,7 @@ #include "STM.h" #include "sm/Sanity.h" #include "Profiling.h" +#include "Messages.h" #if defined(mingw32_HOST_OS) #include "win32/IOManager.h" #endif @@ -30,6 +31,8 @@ static void raiseAsync (Capability *cap, static void removeFromQueues(Capability *cap, StgTSO *tso); +static void removeFromMVarBlockedQueue (StgTSO *tso); + static void blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg); @@ -37,8 +40,6 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS, Capability *target_cap USED_IF_THREADS, MessageThrowTo *msg USED_IF_THREADS); -static void performBlockedException (Capability *cap, MessageThrowTo *msg); - /* ----------------------------------------------------------------------------- throwToSingleThreaded @@ -66,6 +67,8 @@ void throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) { + tso = deRefTSO(tso); + // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { return; @@ -80,6 +83,8 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, void suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) { + tso = deRefTSO(tso); + // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { return; @@ -121,8 +126,8 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) Currently we send a message if the target belongs to another Capability, and it is - - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo, - BlockedOnCCall + - NotBlocked, BlockedOnMsgThrowTo, + BlockedOnCCall_Interruptible - or it is masking exceptions (TSO_BLOCKEX) @@ -143,7 +148,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) unlockClosure(msg, &stg_MSG_THROWTO_info); If it decides not to raise the exception after all, it can revoke it safely with - unlockClosure(msg, &stg_IND_info); + unlockClosure(msg, &stg_MSG_NULL_info); -------------------------------------------------------------------------- */ @@ -158,7 +163,7 @@ throwTo (Capability *cap, // the Capability we hold msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo)); // message starts locked; the caller has to unlock it when it is // ready. - msg->header.info = &stg_WHITEHOLE_info; + SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM); msg->source = source; msg->target = target; msg->exception = exception; @@ -179,14 +184,24 @@ throwToMsg (Capability *cap, MessageThrowTo *msg) { StgWord status; StgTSO *target = msg->target; + Capability *target_cap; + + goto check_target; + +retry: + write_barrier(); + debugTrace(DEBUG_sched, "throwTo: retrying..."); +check_target: ASSERT(target != END_TSO_QUEUE); // follow ThreadRelocated links in the target first - while (target->what_next == ThreadRelocated) { - target = target->_link; - // No, it might be a WHITEHOLE: - // ASSERT(get_itbl(target)->type == TSO); + target = deRefTSO(target); + + // Thread already dead? + if (target->what_next == ThreadComplete + || target->what_next == ThreadKilled) { + return THROWTO_SUCCESS; } debugTraceCap(DEBUG_sched, cap, @@ -198,106 +213,29 @@ throwToMsg (Capability *cap, MessageThrowTo *msg) traceThreadStatus(DEBUG_sched, target); #endif - goto check_target; -retry: - write_barrier(); - debugTrace(DEBUG_sched, "throwTo: retrying..."); - -check_target: - ASSERT(target != END_TSO_QUEUE); - - // Thread already dead? - if (target->what_next == ThreadComplete - || target->what_next == ThreadKilled) { - return THROWTO_SUCCESS; + target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; } status = target->why_blocked; switch (status) { case NotBlocked: - case BlockedOnMsgWakeup: - /* if status==NotBlocked, and target->cap == cap, then - we own this TSO and can raise the exception. - - How do we establish this condition? Very carefully. - - Let - P = (status == NotBlocked) - Q = (tso->cap == cap) - - Now, if P & Q are true, then the TSO is locked and owned by - this capability. No other OS thread can steal it. - - If P==0 and Q==1: the TSO is blocked, but attached to this - capabilty, and it can be stolen by another capability. - - If P==1 and Q==0: the TSO is runnable on another - capability. At any time, the TSO may change from runnable - to blocked and vice versa, while it remains owned by - another capability. - - Suppose we test like this: - - p = P - q = Q - if (p && q) ... - - this is defeated by another capability stealing a blocked - TSO from us to wake it up (Schedule.c:unblockOne()). The - other thread is doing - - Q = 0 - P = 1 - - assuming arbitrary reordering, we could see this - interleaving: - - start: P==0 && Q==1 - P = 1 - p = P - q = Q - Q = 0 - if (p && q) ... - - so we need a memory barrier: - - p = P - mb() - q = Q - if (p && q) ... - - this avoids the problematic case. There are other cases - to consider, but this is the tricky one. - - Note that we must be sure that unblockOne() does the - writes in the correct order: Q before P. The memory - barrier ensures that if we have seen the write to P, we - have also seen the write to Q. - */ { - Capability *target_cap; - - write_barrier(); - target_cap = target->cap; - if (target_cap != cap) { - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; + if ((target->flags & TSO_BLOCKEX) == 0) { + // It's on our run queue and not blocking exceptions + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; } else { - if ((target->flags & TSO_BLOCKEX) == 0) { - // It's on our run queue and not blocking exceptions - raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - return THROWTO_SUCCESS; - } else { - blockedThrowTo(cap,target,msg); - return THROWTO_BLOCKED; - } + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; } } case BlockedOnMsgThrowTo: { - Capability *target_cap; const StgInfoTable *i; MessageThrowTo *m; @@ -328,17 +266,19 @@ check_target: } } - if (i != &stg_MSG_THROWTO_info) { - // if it's an IND, this TSO has been woken up by another Cap + if (i == &stg_MSG_NULL_info) { + // we know there's a MSG_TRY_WAKEUP on the way, so we + // might as well just do it now. The message will + // be a no-op when it arrives. unlockClosure((StgClosure*)m, i); + tryWakeupThread_(cap, target); goto retry; } - target_cap = target->cap; - if (target_cap != cap) { + if (i != &stg_MSG_THROWTO_info) { + // if it's a MSG_NULL, this TSO has been woken up by another Cap unlockClosure((StgClosure*)m, i); - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; + goto retry; } if ((target->flags & TSO_BLOCKEX) && @@ -349,10 +289,9 @@ check_target: } // nobody else can wake up this TSO after we claim the message - unlockClosure((StgClosure*)m, &stg_IND_info); + unlockClosure((StgClosure*)m, &stg_MSG_NULL_info); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); return THROWTO_SUCCESS; } @@ -392,20 +331,25 @@ check_target: goto retry; } + if (target->_link == END_TSO_QUEUE) { + // the MVar operation has already completed. There is a + // MSG_TRY_WAKEUP on the way, but we can just wake up the + // thread now anyway and ignore the message when it + // arrives. + unlockClosure((StgClosure *)mvar, info); + tryWakeupThread_(cap, target); + goto retry; + } + if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } + blockedThrowTo(cap,target,msg); unlockClosure((StgClosure *)mvar, info); return THROWTO_BLOCKED; } else { - removeThreadFromMVarQueue(cap, mvar, target); + // revoke the MVar operation + removeFromMVarBlockedQueue(target); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); unlockClosure((StgClosure *)mvar, info); return THROWTO_SUCCESS; } @@ -413,29 +357,20 @@ check_target: case BlockedOnBlackHole: { - ACQUIRE_LOCK(&sched_mutex); - // double checking the status after the memory barrier: - if (target->why_blocked != BlockedOnBlackHole) { - RELEASE_LOCK(&sched_mutex); - goto retry; - } - if (target->flags & TSO_BLOCKEX) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } - RELEASE_LOCK(&sched_mutex); - return THROWTO_BLOCKED; // caller releases lock + // BlockedOnBlackHole is not interruptible. + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; } else { - removeThreadFromQueue(cap, &blackhole_queue, target); - raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); - RELEASE_LOCK(&sched_mutex); - return THROWTO_SUCCESS; - } + // Revoke the message by replacing it with IND. We're not + // locking anything here, so we might still get a TRY_WAKEUP + // message from the owner of the blackhole some time in the + // future, but that doesn't matter. + ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info); + OVERWRITE_INFO(target->block_info.bh, &stg_IND_info); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; + } } case BlockedOnSTM: @@ -448,35 +383,40 @@ check_target: } if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } + blockedThrowTo(cap,target,msg); unlockTSO(target); return THROWTO_BLOCKED; } else { raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); unlockTSO(target); return THROWTO_SUCCESS; } - case BlockedOnCCall: - case BlockedOnCCall_NoUnblockExc: + case BlockedOnCCall_Interruptible: +#ifdef THREADED_RTS { - Capability *target_cap; - - target_cap = target->cap; - if (target_cap != cap) { - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; + Task *task = NULL; + // walk suspended_ccalls to find the correct worker thread + InCall *incall; + for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) { + if (incall->suspended_tso == target) { + task = incall->task; + break; + } } - + if (task != NULL) { + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + interruptWorkerTask(task); + return THROWTO_SUCCESS; + } else { + debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill"); + } + // fall to next + } +#endif + case BlockedOnCCall: blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; - } #ifndef THREADEDED_RTS case BlockedOnRead: @@ -509,9 +449,9 @@ throwToSendMsg (Capability *cap STG_UNUSED, { #ifdef THREADED_RTS - debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); + debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); - sendMessage(target_cap, (Message*)msg); + sendMessage(cap, target_cap, (Message*)msg); #endif } @@ -526,7 +466,7 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg) ASSERT(target->cap == cap); - msg->link = (Message*)target->blocked_exceptions; + msg->link = target->blocked_exceptions; target->blocked_exceptions = msg; dirty_TSO(cap,target); // we modified the blocked_exceptions queue } @@ -565,7 +505,7 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && (tso->flags & TSO_BLOCKEX) != 0) { - debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); + debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); } if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE @@ -579,21 +519,21 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0; i = lockClosure((StgClosure*)msg); tso->blocked_exceptions = (MessageThrowTo*)msg->link; - if (i == &stg_IND_info) { + if (i == &stg_MSG_NULL_info) { unlockClosure((StgClosure*)msg,i); goto loop; } - performBlockedException(cap, msg); - unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); - unlockClosure((StgClosure*)msg,&stg_IND_info); + throwToSingleThreaded(cap, msg->target, msg->exception); + unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info); + tryWakeupThread(cap, msg->source); return 1; } return 0; } // awakenBlockedExceptionQueue(): Just wake up the whole queue of -// blocked exceptions and let them try again. +// blocked exceptions. void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) @@ -604,31 +544,16 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE; msg = (MessageThrowTo*)msg->link) { i = lockClosure((StgClosure *)msg); - if (i != &stg_IND_info) { - unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); + if (i != &stg_MSG_NULL_info) { + unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info); + tryWakeupThread(cap, msg->source); + } else { + unlockClosure((StgClosure *)msg,i); } - unlockClosure((StgClosure *)msg,i); } tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; } -static void -performBlockedException (Capability *cap, MessageThrowTo *msg) -{ - StgTSO *source; - - source = msg->source; - - ASSERT(source->why_blocked == BlockedOnMsgThrowTo); - ASSERT(source->block_info.closure == (StgClosure *)msg); - ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info); - ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id); - // check ids not pointers, because the thread might be relocated - - throwToSingleThreaded(cap, msg->target, msg->exception); - source->sp += 3; -} - /* ----------------------------------------------------------------------------- Remove a thread from blocking queues. @@ -640,11 +565,54 @@ performBlockedException (Capability *cap, MessageThrowTo *msg) -------------------------------------------------------------------------- */ static void +removeFromMVarBlockedQueue (StgTSO *tso) +{ + StgMVar *mvar = (StgMVar*)tso->block_info.closure; + StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link; + + if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) { + // already removed from this MVar + return; + } + + // Assume the MVar is locked. (not assertable; sometimes it isn't + // actually WHITEHOLE'd). + + // We want to remove the MVAR_TSO_QUEUE object from the queue. It + // isn't doubly-linked so we can't actually remove it; instead we + // just overwrite it with an IND if possible and let the GC short + // it out. However, we have to be careful to maintain the deque + // structure: + + if (mvar->head == q) { + mvar->head = q->link; + q->header.info = &stg_IND_info; + if (mvar->tail == q) { + mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE; + } + } + else if (mvar->tail == q) { + // we can't replace it with an IND in this case, because then + // we lose the tail pointer when the GC shorts out the IND. + // So we use MSG_NULL as a kind of non-dupable indirection; + // these are ignored by takeMVar/putMVar. + q->header.info = &stg_MSG_NULL_info; + } + else { + q->header.info = &stg_IND_info; + } + + // revoke the MVar operation + tso->_link = END_TSO_QUEUE; +} + +static void removeFromQueues(Capability *cap, StgTSO *tso) { switch (tso->why_blocked) { case NotBlocked: + case ThreadMigrating: return; case BlockedOnSTM: @@ -657,20 +625,13 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; case BlockedOnMVar: - removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso); + removeFromMVarBlockedQueue(tso); goto done; case BlockedOnBlackHole: - removeThreadFromQueue(cap, &blackhole_queue, tso); + // nothing to do goto done; - case BlockedOnMsgWakeup: - { - // kill the message, atomically: - tso->block_info.wakeup->header.info = &stg_IND_info; - break; - } - case BlockedOnMsgThrowTo: { MessageThrowTo *m = tso->block_info.throwto; @@ -680,7 +641,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) // ASSERT(m->header.info == &stg_WHITEHOLE_info); // unlock and revoke it at the same time - unlockClosure((StgClosure*)m,&stg_IND_info); + unlockClosure((StgClosure*)m,&stg_MSG_NULL_info); break; } @@ -709,7 +670,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) } done: - unblockOne(cap, tso); + tso->why_blocked = NotBlocked; + appendToRunQueue(cap, tso); } /* ----------------------------------------------------------------------------- @@ -719,7 +681,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) * asynchronous exception in an existing thread. * * We first remove the thread from any queue on which it might be - * blocked. The possible blockages are MVARs and BLACKHOLE_BQs. + * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and + * TSO blocked_exception queues. * * We strip the stack down to the innermost CATCH_FRAME, building * thunks in the heap for all the active computations, so they can @@ -758,8 +721,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, StgClosure *updatee; nat i; - debugTrace(DEBUG_sched, - "raising exception in thread %ld.", (long)tso->id); + debugTraceCap(DEBUG_sched, cap, + "raising exception in thread %ld.", (long)tso->id); #if defined(PROFILING) /* @@ -772,20 +735,26 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, fprintCCS_stderr(tso->prof.CCCS); } #endif + // ASSUMES: the thread is not already complete or dead, or + // ThreadRelocated. Upper layers should deal with that. + ASSERT(tso->what_next != ThreadComplete && + tso->what_next != ThreadKilled && + tso->what_next != ThreadRelocated); - while (tso->what_next == ThreadRelocated) { - tso = tso->_link; - } + // only if we own this TSO (except that deleteThread() calls this + ASSERT(tso->cap == cap); + + // wake it up + if (tso->why_blocked != NotBlocked) { + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); + } // mark it dirty; we're about to change its stack. dirty_TSO(cap, tso); sp = tso->sp; - // ASSUMES: the thread is not already complete or dead. Upper - // layers should deal with that. - ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled); - if (stop_here != NULL) { updatee = stop_here->updatee; } else { @@ -868,7 +837,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // Perform the update // TODO: this may waste some work, if the thunk has // already been updated by another thread. - UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); + updateThunk(cap, tso, + ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); } sp += sizeofW(StgUpdateFrame) - 1; @@ -891,9 +861,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // top of the CATCH_FRAME ready to enter. // { -#ifdef PROFILING StgCatchFrame *cf = (StgCatchFrame *)frame; -#endif StgThunk *raise; if (exception == NULL) break; @@ -914,7 +882,12 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, * a surprise exception before we get around to executing the * handler. */ - tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE; + tso->flags |= TSO_BLOCKEX; + if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) { + tso->flags &= ~TSO_INTERRUPTIBLE; + } else { + tso->flags |= TSO_INTERRUPTIBLE; + } /* Put the newly-built THUNK on top of the stack, ready to execute * when the thread restarts. @@ -960,8 +933,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, { StgTRecHeader *trec = tso -> trec; StgTRecHeader *outer = trec -> enclosing_trec; - debugTrace(DEBUG_stm, - "found atomically block delivering async exception"); + debugTraceCap(DEBUG_stm, cap, + "found atomically block delivering async exception"); stmAbortTransaction(cap, trec); stmFreeAbortedTRec(cap, trec); tso -> trec = outer;