X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FRaiseAsync.c;h=f974f8c0d76c363170005ec1def96e27fa0057f9;hb=848797ebb9b60cf9c8a004c97afd008f5325c75f;hp=d54f823d6e506fa356dcce134612e05128000249;hpb=7408b39235bccdcde48df2a73337ff976fbc09b7;p=ghc-hetmet.git diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index d54f823..f974f8c 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -18,6 +18,7 @@ #include "STM.h" #include "sm/Sanity.h" #include "Profiling.h" +#include "Messages.h" #if defined(mingw32_HOST_OS) #include "win32/IOManager.h" #endif @@ -37,8 +38,6 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS, Capability *target_cap USED_IF_THREADS, MessageThrowTo *msg USED_IF_THREADS); -static void performBlockedException (Capability *cap, MessageThrowTo *msg); - /* ----------------------------------------------------------------------------- throwToSingleThreaded @@ -66,6 +65,8 @@ void throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) { + tso = deRefTSO(tso); + // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { return; @@ -80,6 +81,8 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, void suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) { + tso = deRefTSO(tso); + // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { return; @@ -143,7 +146,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) unlockClosure(msg, &stg_MSG_THROWTO_info); If it decides not to raise the exception after all, it can revoke it safely with - unlockClosure(msg, &stg_IND_info); + unlockClosure(msg, &stg_MSG_NULL_info); -------------------------------------------------------------------------- */ @@ -158,7 +161,7 @@ throwTo (Capability *cap, // the Capability we hold msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo)); // message starts locked; the caller has to unlock it when it is // ready. - msg->header.info = &stg_WHITEHOLE_info; + SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM); msg->source = source; msg->target = target; msg->exception = exception; @@ -179,14 +182,24 @@ throwToMsg (Capability *cap, MessageThrowTo *msg) { StgWord status; StgTSO *target = msg->target; + Capability *target_cap; + + goto check_target; +retry: + write_barrier(); + debugTrace(DEBUG_sched, "throwTo: retrying..."); + +check_target: ASSERT(target != END_TSO_QUEUE); // follow ThreadRelocated links in the target first - while (target->what_next == ThreadRelocated) { - target = target->_link; - // No, it might be a WHITEHOLE: - // ASSERT(get_itbl(target)->type == TSO); + target = deRefTSO(target); + + // Thread already dead? + if (target->what_next == ThreadComplete + || target->what_next == ThreadKilled) { + return THROWTO_SUCCESS; } debugTraceCap(DEBUG_sched, cap, @@ -198,18 +211,10 @@ throwToMsg (Capability *cap, MessageThrowTo *msg) traceThreadStatus(DEBUG_sched, target); #endif - goto check_target; -retry: - write_barrier(); - debugTrace(DEBUG_sched, "throwTo: retrying..."); - -check_target: - ASSERT(target != END_TSO_QUEUE); - - // Thread already dead? - if (target->what_next == ThreadComplete - || target->what_next == ThreadKilled) { - return THROWTO_SUCCESS; + target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; } status = target->why_blocked; @@ -276,28 +281,19 @@ check_target: have also seen the write to Q. */ { - Capability *target_cap; - write_barrier(); - target_cap = target->cap; - if (target_cap != cap) { - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; + if ((target->flags & TSO_BLOCKEX) == 0) { + // It's on our run queue and not blocking exceptions + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; } else { - if ((target->flags & TSO_BLOCKEX) == 0) { - // It's on our run queue and not blocking exceptions - raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - return THROWTO_SUCCESS; - } else { - blockedThrowTo(cap,target,msg); - return THROWTO_BLOCKED; - } + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; } } case BlockedOnMsgThrowTo: { - Capability *target_cap; const StgInfoTable *i; MessageThrowTo *m; @@ -328,17 +324,19 @@ check_target: } } - if (i != &stg_MSG_THROWTO_info) { - // if it's an IND, this TSO has been woken up by another Cap + if (i == &stg_MSG_NULL_info) { + // we know there's a MSG_TRY_WAKEUP on the way, so we + // might as well just do it now. The message will + // be a no-op when it arrives. unlockClosure((StgClosure*)m, i); + tryWakeupThread(cap, target); goto retry; } - target_cap = target->cap; - if (target_cap != cap) { + if (i != &stg_MSG_THROWTO_info) { + // if it's a MSG_NULL, this TSO has been woken up by another Cap unlockClosure((StgClosure*)m, i); - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; + goto retry; } if ((target->flags & TSO_BLOCKEX) && @@ -349,10 +347,9 @@ check_target: } // nobody else can wake up this TSO after we claim the message - unlockClosure((StgClosure*)m, &stg_IND_info); + unlockClosure((StgClosure*)m, &stg_MSG_NULL_info); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); return THROWTO_SUCCESS; } @@ -394,48 +391,30 @@ check_target: if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } + blockedThrowTo(cap,target,msg); unlockClosure((StgClosure *)mvar, info); return THROWTO_BLOCKED; } else { removeThreadFromMVarQueue(cap, mvar, target); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); - unlockClosure((StgClosure *)mvar, info); + if (info == &stg_MVAR_CLEAN_info) { + dirty_MVAR(&cap->r,(StgClosure*)mvar); + } + unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info); return THROWTO_SUCCESS; } } case BlockedOnBlackHole: { - ACQUIRE_LOCK(&sched_mutex); - // double checking the status after the memory barrier: - if (target->why_blocked != BlockedOnBlackHole) { - RELEASE_LOCK(&sched_mutex); - goto retry; - } - - if (target->flags & TSO_BLOCKEX) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } - RELEASE_LOCK(&sched_mutex); - return THROWTO_BLOCKED; // caller releases lock - } else { - removeThreadFromQueue(cap, &blackhole_queue, target); - raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); - RELEASE_LOCK(&sched_mutex); - return THROWTO_SUCCESS; - } + // Revoke the message by replacing it with IND. We're not + // locking anything here, so we might still get a TRY_WAKEUP + // message from the owner of the blackhole some time in the + // future, but that doesn't matter. + ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info); + OVERWRITE_INFO(target->block_info.bh, &stg_IND_info); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; } case BlockedOnSTM: @@ -448,35 +427,19 @@ check_target: } if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } + blockedThrowTo(cap,target,msg); unlockTSO(target); return THROWTO_BLOCKED; } else { raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); unlockTSO(target); return THROWTO_SUCCESS; } case BlockedOnCCall: case BlockedOnCCall_NoUnblockExc: - { - Capability *target_cap; - - target_cap = target->cap; - if (target_cap != cap) { - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; - } - blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; - } #ifndef THREADEDED_RTS case BlockedOnRead: @@ -509,9 +472,9 @@ throwToSendMsg (Capability *cap STG_UNUSED, { #ifdef THREADED_RTS - debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); + debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); - sendMessage(target_cap, (Message*)msg); + sendMessage(cap, target_cap, (Message*)msg); #endif } @@ -526,7 +489,7 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg) ASSERT(target->cap == cap); - msg->link = (Message*)target->blocked_exceptions; + msg->link = target->blocked_exceptions; target->blocked_exceptions = msg; dirty_TSO(cap,target); // we modified the blocked_exceptions queue } @@ -565,7 +528,7 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && (tso->flags & TSO_BLOCKEX) != 0) { - debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); + debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); } if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE @@ -579,21 +542,21 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0; i = lockClosure((StgClosure*)msg); tso->blocked_exceptions = (MessageThrowTo*)msg->link; - if (i == &stg_IND_info) { + if (i == &stg_MSG_NULL_info) { unlockClosure((StgClosure*)msg,i); goto loop; } - performBlockedException(cap, msg); - unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); - unlockClosure((StgClosure*)msg,&stg_IND_info); + throwToSingleThreaded(cap, msg->target, msg->exception); + unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info); + tryWakeupThread(cap, msg->source); return 1; } return 0; } // awakenBlockedExceptionQueue(): Just wake up the whole queue of -// blocked exceptions and let them try again. +// blocked exceptions. void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) @@ -604,31 +567,16 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE; msg = (MessageThrowTo*)msg->link) { i = lockClosure((StgClosure *)msg); - if (i != &stg_IND_info) { - unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); + if (i != &stg_MSG_NULL_info) { + unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info); + tryWakeupThread(cap, msg->source); + } else { + unlockClosure((StgClosure *)msg,i); } - unlockClosure((StgClosure *)msg,i); } tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; } -static void -performBlockedException (Capability *cap, MessageThrowTo *msg) -{ - StgTSO *source; - - source = msg->source; - - ASSERT(source->why_blocked == BlockedOnMsgThrowTo); - ASSERT(source->block_info.closure == (StgClosure *)msg); - ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info); - ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id); - // check ids not pointers, because the thread might be relocated - - throwToSingleThreaded(cap, msg->target, msg->exception); - source->sp += 3; -} - /* ----------------------------------------------------------------------------- Remove a thread from blocking queues. @@ -658,16 +606,18 @@ removeFromQueues(Capability *cap, StgTSO *tso) case BlockedOnMVar: removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso); + // we aren't doing a write barrier here: the MVar is supposed to + // be already locked, so replacing the info pointer would unlock it. goto done; case BlockedOnBlackHole: - removeThreadFromQueue(cap, &blackhole_queue, tso); + // nothing to do goto done; case BlockedOnMsgWakeup: { // kill the message, atomically: - tso->block_info.wakeup->header.info = &stg_IND_info; + OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info); break; } @@ -680,7 +630,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) // ASSERT(m->header.info == &stg_WHITEHOLE_info); // unlock and revoke it at the same time - unlockClosure((StgClosure*)m,&stg_IND_info); + unlockClosure((StgClosure*)m,&stg_MSG_NULL_info); break; } @@ -719,7 +669,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) * asynchronous exception in an existing thread. * * We first remove the thread from any queue on which it might be - * blocked. The possible blockages are MVARs and BLACKHOLE_BQs. + * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and + * TSO blocked_exception queues. * * We strip the stack down to the innermost CATCH_FRAME, building * thunks in the heap for all the active computations, so they can @@ -758,8 +709,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, StgClosure *updatee; nat i; - debugTrace(DEBUG_sched, - "raising exception in thread %ld.", (long)tso->id); + debugTraceCap(DEBUG_sched, cap, + "raising exception in thread %ld.", (long)tso->id); #if defined(PROFILING) /* @@ -772,20 +723,26 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, fprintCCS_stderr(tso->prof.CCCS); } #endif + // ASSUMES: the thread is not already complete or dead, or + // ThreadRelocated. Upper layers should deal with that. + ASSERT(tso->what_next != ThreadComplete && + tso->what_next != ThreadKilled && + tso->what_next != ThreadRelocated); - while (tso->what_next == ThreadRelocated) { - tso = tso->_link; - } + // only if we own this TSO (except that deleteThread() calls this + ASSERT(tso->cap == cap); + + // wake it up + if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) { + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); + } // mark it dirty; we're about to change its stack. dirty_TSO(cap, tso); sp = tso->sp; - // ASSUMES: the thread is not already complete or dead. Upper - // layers should deal with that. - ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled); - if (stop_here != NULL) { updatee = stop_here->updatee; } else { @@ -868,7 +825,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // Perform the update // TODO: this may waste some work, if the thunk has // already been updated by another thread. - UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); + updateThunk(cap, tso, + ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); } sp += sizeofW(StgUpdateFrame) - 1; @@ -960,8 +918,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, { StgTRecHeader *trec = tso -> trec; StgTRecHeader *outer = trec -> enclosing_trec; - debugTrace(DEBUG_stm, - "found atomically block delivering async exception"); + debugTraceCap(DEBUG_stm, cap, + "found atomically block delivering async exception"); stmAbortTransaction(cap, trec); stmFreeAbortedTRec(cap, trec); tso -> trec = outer;