X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FRaiseAsync.c;h=b94ccea283ce12fc5b1cfe8cd2a6164452b55a96;hp=501da2f55f3dc9ef836fdb681ca1f183d9d3de5e;hb=83d563cb9ede0ba792836e529b1e2929db926355;hpb=b55e7b53eb4af373764969ab9cfd5a4ef4bc9b8d diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index 501da2f..b94ccea 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -8,16 +8,17 @@ #include "PosixSource.h" #include "Rts.h" + +#include "sm/Storage.h" #include "Threads.h" #include "Trace.h" #include "RaiseAsync.h" -#include "SMP.h" #include "Schedule.h" -#include "LdvProfile.h" #include "Updates.h" #include "STM.h" -#include "Sanity.h" +#include "sm/Sanity.h" #include "Profiling.h" +#include "Messages.h" #if defined(mingw32_HOST_OS) #include "win32/IOManager.h" #endif @@ -30,10 +31,14 @@ static void raiseAsync (Capability *cap, static void removeFromQueues(Capability *cap, StgTSO *tso); -static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target); +static void removeFromMVarBlockedQueue (StgTSO *tso); + +static void blockedThrowTo (Capability *cap, + StgTSO *target, MessageThrowTo *msg); -static void performBlockedException (Capability *cap, - StgTSO *source, StgTSO *target); +static void throwToSendMsg (Capability *cap USED_IF_THREADS, + Capability *target_cap USED_IF_THREADS, + MessageThrowTo *msg USED_IF_THREADS); /* ----------------------------------------------------------------------------- throwToSingleThreaded @@ -62,6 +67,8 @@ void throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) { + tso = deRefTSO(tso); + // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { return; @@ -76,6 +83,8 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, void suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) { + tso = deRefTSO(tso); + // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { return; @@ -96,178 +105,194 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) may be blocked and could be woken up at any point by another CPU. We have some delicate synchronisation to do. - There is a completely safe fallback scheme: it is always possible - to just block the source TSO on the target TSO's blocked_exceptions - queue. This queue is locked using lockTSO()/unlockTSO(). It is - checked at regular intervals: before and after running a thread - (schedule() and threadPaused() respectively), and just before GC - (scheduleDoGC()). Activating a thread on this queue should be done - using maybePerformBlockedException(): this is done in the context - of the target thread, so the exception can be raised eagerly. - - This fallback scheme works even if the target thread is complete or - killed: scheduleDoGC() will discover the blocked thread before the - target is GC'd. - - Blocking the source thread on the target thread's blocked_exception - queue is also employed when the target thread is currently blocking - exceptions (ie. inside Control.Exception.block). - - We could use the safe fallback scheme exclusively, but that - wouldn't be ideal: most calls to throwTo would block immediately, - possibly until the next GC, which might require the deadlock - detection mechanism to kick in. So we try to provide promptness - wherever possible. - - We can promptly deliver the exception if the target thread is: - - - runnable, on the same Capability as the source thread (because - we own the run queue and therefore the target thread). - - - blocked, and we can obtain exclusive access to it. Obtaining - exclusive access to the thread depends on how it is blocked. - - We must also be careful to not trip over threadStackOverflow(), - which might be moving the TSO to enlarge its stack. - lockTSO()/unlockTSO() are used here too. - + The underlying scheme when multiple Capabilities are in use is + message passing: when the target of a throwTo is on another + Capability, we send a message (a MessageThrowTo closure) to that + Capability. + + If the throwTo needs to block because the target TSO is masking + exceptions (the TSO_BLOCKEX flag), then the message is placed on + the blocked_exceptions queue attached to the target TSO. When the + target TSO enters the unmasked state again, it must check the + queue. The blocked_exceptions queue is not locked; only the + Capability owning the TSO may modify it. + + To make things simpler for throwTo, we always create the message + first before deciding what to do. The message may get sent, or it + may get attached to a TSO's blocked_exceptions queue, or the + exception may get thrown immediately and the message dropped, + depending on the current state of the target. + + Currently we send a message if the target belongs to another + Capability, and it is + + - NotBlocked, BlockedOnMsgThrowTo, + BlockedOnCCall_Interruptible + + - or it is masking exceptions (TSO_BLOCKEX) + + Currently, if the target is BlockedOnMVar, BlockedOnSTM, or + BlockedOnBlackHole then we acquire ownership of the TSO by locking + its parent container (e.g. the MVar) and then raise the exception. + We might change these cases to be more message-passing-like in the + future. + Returns: - THROWTO_SUCCESS exception was raised, ok to continue + NULL exception was raised, ok to continue - THROWTO_BLOCKED exception was not raised; block the source - thread then call throwToReleaseTarget() when - the source thread is properly tidied away. + MessageThrowTo * exception was not raised; the source TSO + should now put itself in the state + BlockedOnMsgThrowTo, and when it is ready + it should unlock the mssage using + unlockClosure(msg, &stg_MSG_THROWTO_info); + If it decides not to raise the exception after + all, it can revoke it safely with + unlockClosure(msg, &stg_MSG_NULL_info); -------------------------------------------------------------------------- */ -nat +MessageThrowTo * throwTo (Capability *cap, // the Capability we hold - StgTSO *source, // the TSO sending the exception + StgTSO *source, // the TSO sending the exception (or NULL) StgTSO *target, // the TSO receiving the exception - StgClosure *exception, // the exception closure - /*[out]*/ void **out USED_IF_THREADS) + StgClosure *exception) // the exception closure { - StgWord status; - - // follow ThreadRelocated links in the target first - while (target->what_next == ThreadRelocated) { - target = target->_link; - // No, it might be a WHITEHOLE: - // ASSERT(get_itbl(target)->type == TSO); - } + MessageThrowTo *msg; - debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu", - (unsigned long)source->id, (unsigned long)target->id); + msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo)); + // message starts locked; the caller has to unlock it when it is + // ready. + SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM); + msg->source = source; + msg->target = target; + msg->exception = exception; -#ifdef DEBUG - if (traceClass(DEBUG_sched)) { - debugTraceBegin("throwTo: target"); - printThreadStatus(target); - debugTraceEnd(); + switch (throwToMsg(cap, msg)) + { + case THROWTO_SUCCESS: + return NULL; + case THROWTO_BLOCKED: + default: + return msg; } -#endif +} + + +nat +throwToMsg (Capability *cap, MessageThrowTo *msg) +{ + StgWord status; + StgTSO *target = msg->target; + Capability *target_cap; goto check_target; + retry: + write_barrier(); debugTrace(DEBUG_sched, "throwTo: retrying..."); check_target: + ASSERT(target != END_TSO_QUEUE); + + // follow ThreadRelocated links in the target first + target = deRefTSO(target); + // Thread already dead? if (target->what_next == ThreadComplete || target->what_next == ThreadKilled) { return THROWTO_SUCCESS; } + debugTraceCap(DEBUG_sched, cap, + "throwTo: from thread %lu to thread %lu", + (unsigned long)msg->source->id, + (unsigned long)msg->target->id); + +#ifdef DEBUG + traceThreadStatus(DEBUG_sched, target); +#endif + + target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; + } + status = target->why_blocked; switch (status) { case NotBlocked: - /* if status==NotBlocked, and target->cap == cap, then - we own this TSO and can raise the exception. - - How do we establish this condition? Very carefully. - - Let - P = (status == NotBlocked) - Q = (tso->cap == cap) - - Now, if P & Q are true, then the TSO is locked and owned by - this capability. No other OS thread can steal it. - - If P==0 and Q==1: the TSO is blocked, but attached to this - capabilty, and it can be stolen by another capability. - - If P==1 and Q==0: the TSO is runnable on another - capability. At any time, the TSO may change from runnable - to blocked and vice versa, while it remains owned by - another capability. - - Suppose we test like this: - - p = P - q = Q - if (p && q) ... - - this is defeated by another capability stealing a blocked - TSO from us to wake it up (Schedule.c:unblockOne()). The - other thread is doing - - Q = 0 - P = 1 - - assuming arbitrary reordering, we could see this - interleaving: - - start: P==0 && Q==1 - P = 1 - p = P - q = Q - Q = 0 - if (p && q) ... - - so we need a memory barrier: - - p = P - mb() - q = Q - if (p && q) ... - - this avoids the problematic case. There are other cases - to consider, but this is the tricky one. - - Note that we must be sure that unblockOne() does the - writes in the correct order: Q before P. The memory - barrier ensures that if we have seen the write to P, we - have also seen the write to Q. - */ { - Capability *target_cap; + if ((target->flags & TSO_BLOCKEX) == 0) { + // It's on our run queue and not blocking exceptions + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; + } else { + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; + } + } - write_barrier(); - target_cap = target->cap; - if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) { - // It's on our run queue and not blocking exceptions - raiseAsync(cap, target, exception, rtsFalse, NULL); - return THROWTO_SUCCESS; - } else { - // Otherwise, just block on the blocked_exceptions queue - // of the target thread. The queue will get looked at - // soon enough: it is checked before and after running a - // thread, and during GC. - lockTSO(target); - - // Avoid race with threadStackOverflow, which may have - // just moved this TSO. - if (target->what_next == ThreadRelocated) { - unlockTSO(target); - target = target->_link; - goto retry; - } - blockedThrowTo(cap,source,target); - *out = target; - return THROWTO_BLOCKED; - } + case BlockedOnMsgThrowTo: + { + const StgInfoTable *i; + MessageThrowTo *m; + + m = target->block_info.throwto; + + // target is local to this cap, but has sent a throwto + // message to another cap. + // + // The source message is locked. We need to revoke the + // target's message so that we can raise the exception, so + // we attempt to lock it. + + // There's a possibility of a deadlock if two threads are both + // trying to throwTo each other (or more generally, a cycle of + // threads). To break the symmetry we compare the addresses + // of the MessageThrowTo objects, and the one for which m < + // msg gets to spin, while the other can only try to lock + // once, but must then back off and unlock both before trying + // again. + if (m < msg) { + i = lockClosure((StgClosure *)m); + } else { + i = tryLockClosure((StgClosure *)m); + if (i == NULL) { +// debugBelch("collision\n"); + throwToSendMsg(cap, target->cap, msg); + return THROWTO_BLOCKED; + } + } + + if (i == &stg_MSG_NULL_info) { + // we know there's a MSG_TRY_WAKEUP on the way, so we + // might as well just do it now. The message will + // be a no-op when it arrives. + unlockClosure((StgClosure*)m, i); + tryWakeupThread_(cap, target); + goto retry; + } + + if (i != &stg_MSG_THROWTO_info) { + // if it's a MSG_NULL, this TSO has been woken up by another Cap + unlockClosure((StgClosure*)m, i); + goto retry; + } + + if ((target->flags & TSO_BLOCKEX) && + ((target->flags & TSO_INTERRUPTIBLE) == 0)) { + unlockClosure((StgClosure*)m, i); + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; + } + + // nobody else can wake up this TSO after we claim the message + unlockClosure((StgClosure*)m, &stg_MSG_NULL_info); + + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; } case BlockedOnMVar: @@ -306,17 +331,25 @@ check_target: goto retry; } + if (target->_link == END_TSO_QUEUE) { + // the MVar operation has already completed. There is a + // MSG_TRY_WAKEUP on the way, but we can just wake up the + // thread now anyway and ignore the message when it + // arrives. + unlockClosure((StgClosure *)mvar, info); + tryWakeupThread_(cap, target); + goto retry; + } + if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - lockClosure((StgClosure *)target); - blockedThrowTo(cap,source,target); + blockedThrowTo(cap,target,msg); unlockClosure((StgClosure *)mvar, info); - *out = target; - return THROWTO_BLOCKED; // caller releases TSO + return THROWTO_BLOCKED; } else { - removeThreadFromMVarQueue(cap, mvar, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); - unblockOne(cap, target); + // revoke the MVar operation + removeFromMVarBlockedQueue(target); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); unlockClosure((StgClosure *)mvar, info); return THROWTO_SUCCESS; } @@ -324,120 +357,65 @@ check_target: case BlockedOnBlackHole: { - ACQUIRE_LOCK(&sched_mutex); - // double checking the status after the memory barrier: - if (target->why_blocked != BlockedOnBlackHole) { - RELEASE_LOCK(&sched_mutex); - goto retry; - } - if (target->flags & TSO_BLOCKEX) { - lockTSO(target); - blockedThrowTo(cap,source,target); - RELEASE_LOCK(&sched_mutex); - *out = target; - return THROWTO_BLOCKED; // caller releases TSO - } else { - removeThreadFromQueue(cap, &blackhole_queue, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); - unblockOne(cap, target); - RELEASE_LOCK(&sched_mutex); - return THROWTO_SUCCESS; - } - } - - case BlockedOnException: - { - StgTSO *target2; - StgInfoTable *info; - - /* - To obtain exclusive access to a BlockedOnException thread, - we must call lockClosure() on the TSO on which it is blocked. - Since the TSO might change underneath our feet, after we - call lockClosure() we must check that - - (a) the closure we locked is actually a TSO - (b) the original thread is still BlockedOnException, - (c) the original thread is still blocked on the TSO we locked - and (d) the target thread has not been relocated. - - We synchronise with threadStackOverflow() (which relocates - threads) using lockClosure()/unlockClosure(). - */ - target2 = target->block_info.tso; - - info = lockClosure((StgClosure *)target2); - if (info != &stg_TSO_info) { - unlockClosure((StgClosure *)target2, info); - goto retry; - } - if (target->what_next == ThreadRelocated) { - target = target->_link; - unlockTSO(target2); - goto retry; - } - if (target2->what_next == ThreadRelocated) { - target->block_info.tso = target2->_link; - unlockTSO(target2); - goto retry; - } - if (target->why_blocked != BlockedOnException - || target->block_info.tso != target2) { - unlockTSO(target2); - goto retry; - } - - /* - Now we have exclusive rights to the target TSO... - - If it is blocking exceptions, add the source TSO to its - blocked_exceptions queue. Otherwise, raise the exception. - */ - if ((target->flags & TSO_BLOCKEX) && - ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - lockTSO(target); - blockedThrowTo(cap,source,target); - unlockTSO(target2); - *out = target; + // BlockedOnBlackHole is not interruptible. + blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; } else { - removeThreadFromQueue(cap, &target2->blocked_exceptions, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); - unblockOne(cap, target); - unlockTSO(target2); - return THROWTO_SUCCESS; - } - } + // Revoke the message by replacing it with IND. We're not + // locking anything here, so we might still get a TRY_WAKEUP + // message from the owner of the blackhole some time in the + // future, but that doesn't matter. + ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info); + OVERWRITE_INFO(target->block_info.bh, &stg_IND_info); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; + } + } case BlockedOnSTM: lockTSO(target); // Unblocking BlockedOnSTM threads requires the TSO to be // locked; see STM.c:unpark_tso(). if (target->why_blocked != BlockedOnSTM) { + unlockTSO(target); goto retry; } if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - blockedThrowTo(cap,source,target); - *out = target; + blockedThrowTo(cap,target,msg); + unlockTSO(target); return THROWTO_BLOCKED; } else { - raiseAsync(cap, target, exception, rtsFalse, NULL); - unblockOne(cap, target); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); unlockTSO(target); return THROWTO_SUCCESS; } + case BlockedOnCCall_Interruptible: +#ifdef THREADED_RTS + { + Task *task = NULL; + // walk suspended_ccalls to find the correct worker thread + InCall *incall; + for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) { + if (incall->suspended_tso == target) { + task = incall->task; + break; + } + } + if (task != NULL) { + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + interruptWorkerTask(task); + return THROWTO_SUCCESS; + } else { + debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill"); + } + // fall to next + } +#endif case BlockedOnCCall: - case BlockedOnCCall_NoUnblockExc: - // I don't think it's possible to acquire ownership of a - // BlockedOnCCall thread. We just assume that the target - // thread is blocking exceptions, and block on its - // blocked_exception queue. - lockTSO(target); - blockedThrowTo(cap,source,target); - *out = target; + blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; #ifndef THREADEDED_RTS @@ -449,11 +427,11 @@ check_target: #endif if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - blockedThrowTo(cap,source,target); + blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; } else { removeFromQueues(cap,target); - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); return THROWTO_SUCCESS; } #endif @@ -464,31 +442,34 @@ check_target: barf("throwTo"); } -// Block a TSO on another TSO's blocked_exceptions queue. -// Precondition: we hold an exclusive lock on the target TSO (this is -// complex to achieve as there's no single lock on a TSO; see -// throwTo()). static void -blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target) +throwToSendMsg (Capability *cap STG_UNUSED, + Capability *target_cap USED_IF_THREADS, + MessageThrowTo *msg USED_IF_THREADS) + { - debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id); - setTSOLink(cap, source, target->blocked_exceptions); - target->blocked_exceptions = source; - dirty_TSO(cap,target); // we modified the blocked_exceptions queue - - source->block_info.tso = target; - write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is. - source->why_blocked = BlockedOnException; -} +#ifdef THREADED_RTS + debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); + sendMessage(cap, target_cap, (Message*)msg); +#endif +} -#ifdef THREADED_RTS -void -throwToReleaseTarget (void *tso) +// Block a throwTo message on the target TSO's blocked_exceptions +// queue. The current Capability must own the target TSO in order to +// modify the blocked_exceptions queue. +static void +blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg) { - unlockTSO((StgTSO *)tso); + debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu", + (unsigned long)target->id); + + ASSERT(target->cap == cap); + + msg->link = target->blocked_exceptions; + target->blocked_exceptions = msg; + dirty_TSO(cap,target); // we modified the blocked_exceptions queue } -#endif /* ----------------------------------------------------------------------------- Waking up threads blocked in throwTo @@ -510,66 +491,69 @@ throwToReleaseTarget (void *tso) int maybePerformBlockedException (Capability *cap, StgTSO *tso) { - StgTSO *source; + MessageThrowTo *msg; + const StgInfoTable *i; - if (tso->blocked_exceptions != END_TSO_QUEUE && + if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) { + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { + awakenBlockedExceptionQueue(cap,tso); + return 1; + } else { + return 0; + } + } + + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && (tso->flags & TSO_BLOCKEX) != 0) { - debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); + debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); } - if (tso->blocked_exceptions != END_TSO_QUEUE + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && ((tso->flags & TSO_BLOCKEX) == 0 || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) { - // Lock the TSO, this gives us exclusive access to the queue - lockTSO(tso); - - // Check the queue again; it might have changed before we - // locked it. - if (tso->blocked_exceptions == END_TSO_QUEUE) { - unlockTSO(tso); - return 0; - } - // We unblock just the first thread on the queue, and perform // its throw immediately. - source = tso->blocked_exceptions; - performBlockedException(cap, source, tso); - tso->blocked_exceptions = unblockOne_(cap, source, - rtsFalse/*no migrate*/); - unlockTSO(tso); + loop: + msg = tso->blocked_exceptions; + if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0; + i = lockClosure((StgClosure*)msg); + tso->blocked_exceptions = (MessageThrowTo*)msg->link; + if (i == &stg_MSG_NULL_info) { + unlockClosure((StgClosure*)msg,i); + goto loop; + } + + throwToSingleThreaded(cap, msg->target, msg->exception); + unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info); + tryWakeupThread(cap, msg->source); return 1; } return 0; } +// awakenBlockedExceptionQueue(): Just wake up the whole queue of +// blocked exceptions. + void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) { - if (tso->blocked_exceptions != END_TSO_QUEUE) { - lockTSO(tso); - awakenBlockedQueue(cap, tso->blocked_exceptions); - tso->blocked_exceptions = END_TSO_QUEUE; - unlockTSO(tso); + MessageThrowTo *msg; + const StgInfoTable *i; + + for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE; + msg = (MessageThrowTo*)msg->link) { + i = lockClosure((StgClosure *)msg); + if (i != &stg_MSG_NULL_info) { + unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info); + tryWakeupThread(cap, msg->source); + } else { + unlockClosure((StgClosure *)msg,i); + } } + tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; } -static void -performBlockedException (Capability *cap, StgTSO *source, StgTSO *target) -{ - StgClosure *exception; - - ASSERT(source->why_blocked == BlockedOnException); - ASSERT(source->block_info.tso->id == target->id); - ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info); - ASSERT(((StgTSO *)source->sp[1])->id == target->id); - // check ids not pointers, because the thread might be relocated - - exception = (StgClosure *)source->sp[2]; - throwToSingleThreaded(cap, target, exception); - source->sp += 3; -} - /* ----------------------------------------------------------------------------- Remove a thread from blocking queues. @@ -581,11 +565,54 @@ performBlockedException (Capability *cap, StgTSO *source, StgTSO *target) -------------------------------------------------------------------------- */ static void +removeFromMVarBlockedQueue (StgTSO *tso) +{ + StgMVar *mvar = (StgMVar*)tso->block_info.closure; + StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link; + + if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) { + // already removed from this MVar + return; + } + + // Assume the MVar is locked. (not assertable; sometimes it isn't + // actually WHITEHOLE'd). + + // We want to remove the MVAR_TSO_QUEUE object from the queue. It + // isn't doubly-linked so we can't actually remove it; instead we + // just overwrite it with an IND if possible and let the GC short + // it out. However, we have to be careful to maintain the deque + // structure: + + if (mvar->head == q) { + mvar->head = q->link; + q->header.info = &stg_IND_info; + if (mvar->tail == q) { + mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE; + } + } + else if (mvar->tail == q) { + // we can't replace it with an IND in this case, because then + // we lose the tail pointer when the GC shorts out the IND. + // So we use MSG_NULL as a kind of non-dupable indirection; + // these are ignored by takeMVar/putMVar. + q->header.info = &stg_MSG_NULL_info; + } + else { + q->header.info = &stg_IND_info; + } + + // revoke the MVar operation + tso->_link = END_TSO_QUEUE; +} + +static void removeFromQueues(Capability *cap, StgTSO *tso) { switch (tso->why_blocked) { case NotBlocked: + case ThreadMigrating: return; case BlockedOnSTM: @@ -598,29 +625,25 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; case BlockedOnMVar: - removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso); + removeFromMVarBlockedQueue(tso); goto done; case BlockedOnBlackHole: - removeThreadFromQueue(cap, &blackhole_queue, tso); + // nothing to do goto done; - case BlockedOnException: - { - StgTSO *target = tso->block_info.tso; - - // NO: when called by threadPaused(), we probably have this - // TSO already locked (WHITEHOLEd) because we just placed - // ourselves on its queue. - // ASSERT(get_itbl(target)->type == TSO); - - while (target->what_next == ThreadRelocated) { - target = target->_link; - } - - removeThreadFromQueue(cap, &target->blocked_exceptions, tso); - goto done; - } + case BlockedOnMsgThrowTo: + { + MessageThrowTo *m = tso->block_info.throwto; + // The message is locked by us, unless we got here via + // deleteAllThreads(), in which case we own all the + // capabilities. + // ASSERT(m->header.info == &stg_WHITEHOLE_info); + + // unlock and revoke it at the same time + unlockClosure((StgClosure*)m,&stg_MSG_NULL_info); + break; + } #if !defined(THREADED_RTS) case BlockedOnRead: @@ -647,16 +670,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) } done: - tso->_link = END_TSO_QUEUE; // no write barrier reqd tso->why_blocked = NotBlocked; - tso->block_info.closure = NULL; - appendToRunQueue(cap,tso); - - // We might have just migrated this TSO to our Capability: - if (tso->bound) { - tso->bound->cap = cap; - } - tso->cap = cap; + appendToRunQueue(cap, tso); } /* ----------------------------------------------------------------------------- @@ -666,7 +681,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) * asynchronous exception in an existing thread. * * We first remove the thread from any queue on which it might be - * blocked. The possible blockages are MVARs and BLACKHOLE_BQs. + * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and + * TSO blocked_exception queues. * * We strip the stack down to the innermost CATCH_FRAME, building * thunks in the heap for all the active computations, so they can @@ -705,13 +721,13 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, StgClosure *updatee; nat i; - debugTrace(DEBUG_sched, - "raising exception in thread %ld.", (long)tso->id); + debugTraceCap(DEBUG_sched, cap, + "raising exception in thread %ld.", (long)tso->id); #if defined(PROFILING) /* * Debugging tool: on raising an exception, show where we are. - * See also Exception.cmm:raisezh_fast. + * See also Exception.cmm:stg_raisezh. * This wasn't done for asynchronous exceptions originally; see #1450 */ if (RtsFlags.ProfFlags.showCCSOnException) @@ -719,16 +735,26 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, fprintCCS_stderr(tso->prof.CCCS); } #endif + // ASSUMES: the thread is not already complete or dead, or + // ThreadRelocated. Upper layers should deal with that. + ASSERT(tso->what_next != ThreadComplete && + tso->what_next != ThreadKilled && + tso->what_next != ThreadRelocated); + + // only if we own this TSO (except that deleteThread() calls this + ASSERT(tso->cap == cap); + + // wake it up + if (tso->why_blocked != NotBlocked) { + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); + } // mark it dirty; we're about to change its stack. dirty_TSO(cap, tso); sp = tso->sp; - // ASSUMES: the thread is not already complete or dead. Upper - // layers should deal with that. - ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled); - if (stop_here != NULL) { updatee = stop_here->updatee; } else { @@ -780,7 +806,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // fun field. // words = frame - sp - 1; - ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words)); + ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words)); ap->size = words; ap->fun = (StgClosure *)sp[0]; @@ -811,7 +837,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // Perform the update // TODO: this may waste some work, if the thunk has // already been updated by another thread. - UPD_IND(((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); + updateThunk(cap, tso, + ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); } sp += sizeofW(StgUpdateFrame) - 1; @@ -834,9 +861,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // top of the CATCH_FRAME ready to enter. // { -#ifdef PROFILING StgCatchFrame *cf = (StgCatchFrame *)frame; -#endif StgThunk *raise; if (exception == NULL) break; @@ -844,7 +869,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // we've got an exception to raise, so let's pass it to the // handler in this frame. // - raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1); + raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1); TICK_ALLOC_SE_THK(1,0); SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); raise->payload[0] = exception; @@ -857,7 +882,12 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, * a surprise exception before we get around to executing the * handler. */ - tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE; + tso->flags |= TSO_BLOCKEX; + if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) { + tso->flags &= ~TSO_INTERRUPTIBLE; + } else { + tso->flags |= TSO_INTERRUPTIBLE; + } /* Put the newly-built THUNK on top of the stack, ready to execute * when the thread restarts. @@ -872,9 +902,19 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, case ATOMICALLY_FRAME: if (stop_at_atomically) { - ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); + ASSERT(tso->trec->enclosing_trec == NO_TREC); stmCondemnTransaction(cap, tso -> trec); - tso->sp = frame; + tso->sp = frame - 2; + // The ATOMICALLY_FRAME expects to be returned a + // result from the transaction, which it stores in the + // stack frame. Hence we arrange to return a dummy + // result, so that the GC doesn't get upset (#3578). + // Perhaps a better way would be to have a different + // ATOMICALLY_FRAME instance for condemned + // transactions, but I don't fully understand the + // interaction with STM invariants. + tso->sp[1] = (W_)&stg_NO_TREC_closure; + tso->sp[0] = (W_)&stg_gc_unpt_r1_info; tso->what_next = ThreadRunGHC; return; } @@ -892,9 +932,9 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, { StgTRecHeader *trec = tso -> trec; - StgTRecHeader *outer = stmGetEnclosingTRec(trec); - debugTrace(DEBUG_stm, - "found atomically block delivering async exception"); + StgTRecHeader *outer = trec -> enclosing_trec; + debugTraceCap(DEBUG_stm, cap, + "found atomically block delivering async exception"); stmAbortTransaction(cap, trec); stmFreeAbortedTRec(cap, trec); tso -> trec = outer;