#include "STM.h"
#include "sm/Sanity.h"
#include "Profiling.h"
+#include "Messages.h"
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
static void removeFromQueues(Capability *cap, StgTSO *tso);
-static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target);
+static void removeFromMVarBlockedQueue (StgTSO *tso);
-static void performBlockedException (Capability *cap,
- StgTSO *source, StgTSO *target);
+static void blockedThrowTo (Capability *cap,
+ StgTSO *target, MessageThrowTo *msg);
+
+static void throwToSendMsg (Capability *cap USED_IF_THREADS,
+ Capability *target_cap USED_IF_THREADS,
+ MessageThrowTo *msg USED_IF_THREADS);
/* -----------------------------------------------------------------------------
throwToSingleThreaded
throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
rtsBool stop_at_atomically)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
void
suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
may be blocked and could be woken up at any point by another CPU.
We have some delicate synchronisation to do.
- There is a completely safe fallback scheme: it is always possible
- to just block the source TSO on the target TSO's blocked_exceptions
- queue. This queue is locked using lockTSO()/unlockTSO(). It is
- checked at regular intervals: before and after running a thread
- (schedule() and threadPaused() respectively), and just before GC
- (scheduleDoGC()). Activating a thread on this queue should be done
- using maybePerformBlockedException(): this is done in the context
- of the target thread, so the exception can be raised eagerly.
-
- This fallback scheme works even if the target thread is complete or
- killed: scheduleDoGC() will discover the blocked thread before the
- target is GC'd.
-
- Blocking the source thread on the target thread's blocked_exception
- queue is also employed when the target thread is currently blocking
- exceptions (ie. inside Control.Exception.block).
-
- We could use the safe fallback scheme exclusively, but that
- wouldn't be ideal: most calls to throwTo would block immediately,
- possibly until the next GC, which might require the deadlock
- detection mechanism to kick in. So we try to provide promptness
- wherever possible.
-
- We can promptly deliver the exception if the target thread is:
-
- - runnable, on the same Capability as the source thread (because
- we own the run queue and therefore the target thread).
-
- - blocked, and we can obtain exclusive access to it. Obtaining
- exclusive access to the thread depends on how it is blocked.
-
- We must also be careful to not trip over threadStackOverflow(),
- which might be moving the TSO to enlarge its stack.
- lockTSO()/unlockTSO() are used here too.
-
+ The underlying scheme when multiple Capabilities are in use is
+ message passing: when the target of a throwTo is on another
+ Capability, we send a message (a MessageThrowTo closure) to that
+ Capability.
+
+ If the throwTo needs to block because the target TSO is masking
+ exceptions (the TSO_BLOCKEX flag), then the message is placed on
+ the blocked_exceptions queue attached to the target TSO. When the
+ target TSO enters the unmasked state again, it must check the
+ queue. The blocked_exceptions queue is not locked; only the
+ Capability owning the TSO may modify it.
+
+ To make things simpler for throwTo, we always create the message
+ first before deciding what to do. The message may get sent, or it
+ may get attached to a TSO's blocked_exceptions queue, or the
+ exception may get thrown immediately and the message dropped,
+ depending on the current state of the target.
+
+ Currently we send a message if the target belongs to another
+ Capability, and it is
+
+ - NotBlocked, BlockedOnMsgThrowTo,
+ BlockedOnCCall
+
+ - or it is masking exceptions (TSO_BLOCKEX)
+
+ Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
+ BlockedOnBlackHole then we acquire ownership of the TSO by locking
+ its parent container (e.g. the MVar) and then raise the exception.
+ We might change these cases to be more message-passing-like in the
+ future.
+
Returns:
- THROWTO_SUCCESS exception was raised, ok to continue
+ NULL exception was raised, ok to continue
- THROWTO_BLOCKED exception was not raised; block the source
- thread then call throwToReleaseTarget() when
- the source thread is properly tidied away.
+ MessageThrowTo * exception was not raised; the source TSO
+ should now put itself in the state
+ BlockedOnMsgThrowTo, and when it is ready
+ it should unlock the mssage using
+ unlockClosure(msg, &stg_MSG_THROWTO_info);
+ If it decides not to raise the exception after
+ all, it can revoke it safely with
+ unlockClosure(msg, &stg_MSG_NULL_info);
-------------------------------------------------------------------------- */
-nat
+MessageThrowTo *
throwTo (Capability *cap, // the Capability we hold
StgTSO *source, // the TSO sending the exception (or NULL)
StgTSO *target, // the TSO receiving the exception
- StgClosure *exception, // the exception closure
- /*[out]*/ void **out USED_IF_THREADS)
+ StgClosure *exception) // the exception closure
{
- StgWord status;
+ MessageThrowTo *msg;
- ASSERT(target != END_TSO_QUEUE);
+ msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
+ // message starts locked; the caller has to unlock it when it is
+ // ready.
+ SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
+ msg->source = source;
+ msg->target = target;
+ msg->exception = exception;
- // follow ThreadRelocated links in the target first
- while (target->what_next == ThreadRelocated) {
- target = target->_link;
- // No, it might be a WHITEHOLE:
- // ASSERT(get_itbl(target)->type == TSO);
- }
-
- if (source != NULL) {
- debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu",
- (unsigned long)source->id, (unsigned long)target->id);
- } else {
- debugTrace(DEBUG_sched, "throwTo: from RTS to thread %lu",
- (unsigned long)target->id);
+ switch (throwToMsg(cap, msg))
+ {
+ case THROWTO_SUCCESS:
+ return NULL;
+ case THROWTO_BLOCKED:
+ default:
+ return msg;
}
+}
+
-#ifdef DEBUG
- traceThreadStatus(DEBUG_sched, target);
-#endif
+nat
+throwToMsg (Capability *cap, MessageThrowTo *msg)
+{
+ StgWord status;
+ StgTSO *target = msg->target;
+ Capability *target_cap;
goto check_target;
+
retry:
+ write_barrier();
debugTrace(DEBUG_sched, "throwTo: retrying...");
check_target:
ASSERT(target != END_TSO_QUEUE);
+ // follow ThreadRelocated links in the target first
+ target = deRefTSO(target);
+
// Thread already dead?
if (target->what_next == ThreadComplete
|| target->what_next == ThreadKilled) {
return THROWTO_SUCCESS;
}
+ debugTraceCap(DEBUG_sched, cap,
+ "throwTo: from thread %lu to thread %lu",
+ (unsigned long)msg->source->id,
+ (unsigned long)msg->target->id);
+
+#ifdef DEBUG
+ traceThreadStatus(DEBUG_sched, target);
+#endif
+
+ target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
+ }
+
status = target->why_blocked;
switch (status) {
case NotBlocked:
- /* if status==NotBlocked, and target->cap == cap, then
- we own this TSO and can raise the exception.
-
- How do we establish this condition? Very carefully.
-
- Let
- P = (status == NotBlocked)
- Q = (tso->cap == cap)
-
- Now, if P & Q are true, then the TSO is locked and owned by
- this capability. No other OS thread can steal it.
-
- If P==0 and Q==1: the TSO is blocked, but attached to this
- capabilty, and it can be stolen by another capability.
-
- If P==1 and Q==0: the TSO is runnable on another
- capability. At any time, the TSO may change from runnable
- to blocked and vice versa, while it remains owned by
- another capability.
-
- Suppose we test like this:
-
- p = P
- q = Q
- if (p && q) ...
-
- this is defeated by another capability stealing a blocked
- TSO from us to wake it up (Schedule.c:unblockOne()). The
- other thread is doing
-
- Q = 0
- P = 1
-
- assuming arbitrary reordering, we could see this
- interleaving:
-
- start: P==0 && Q==1
- P = 1
- p = P
- q = Q
- Q = 0
- if (p && q) ...
-
- so we need a memory barrier:
-
- p = P
- mb()
- q = Q
- if (p && q) ...
-
- this avoids the problematic case. There are other cases
- to consider, but this is the tricky one.
-
- Note that we must be sure that unblockOne() does the
- writes in the correct order: Q before P. The memory
- barrier ensures that if we have seen the write to P, we
- have also seen the write to Q.
- */
{
- Capability *target_cap;
+ if ((target->flags & TSO_BLOCKEX) == 0) {
+ // It's on our run queue and not blocking exceptions
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
+ } else {
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
+ }
+ }
- write_barrier();
- target_cap = target->cap;
- if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) {
- // It's on our run queue and not blocking exceptions
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- return THROWTO_SUCCESS;
- } else {
- // Otherwise, just block on the blocked_exceptions queue
- // of the target thread. The queue will get looked at
- // soon enough: it is checked before and after running a
- // thread, and during GC.
- lockTSO(target);
-
- // Avoid race with threadStackOverflow, which may have
- // just moved this TSO.
- if (target->what_next == ThreadRelocated) {
- unlockTSO(target);
- target = target->_link;
- goto retry;
- }
- // check again for ThreadComplete and ThreadKilled. This
- // cooperates with scheduleHandleThreadFinished to ensure
- // that we never miss any threads that are throwing an
- // exception to a thread in the process of terminating.
- if (target->what_next == ThreadComplete
- || target->what_next == ThreadKilled) {
- unlockTSO(target);
- return THROWTO_SUCCESS;
+ case BlockedOnMsgThrowTo:
+ {
+ const StgInfoTable *i;
+ MessageThrowTo *m;
+
+ m = target->block_info.throwto;
+
+ // target is local to this cap, but has sent a throwto
+ // message to another cap.
+ //
+ // The source message is locked. We need to revoke the
+ // target's message so that we can raise the exception, so
+ // we attempt to lock it.
+
+ // There's a possibility of a deadlock if two threads are both
+ // trying to throwTo each other (or more generally, a cycle of
+ // threads). To break the symmetry we compare the addresses
+ // of the MessageThrowTo objects, and the one for which m <
+ // msg gets to spin, while the other can only try to lock
+ // once, but must then back off and unlock both before trying
+ // again.
+ if (m < msg) {
+ i = lockClosure((StgClosure *)m);
+ } else {
+ i = tryLockClosure((StgClosure *)m);
+ if (i == NULL) {
+// debugBelch("collision\n");
+ throwToSendMsg(cap, target->cap, msg);
+ return THROWTO_BLOCKED;
}
- blockedThrowTo(cap,source,target);
- *out = target;
- return THROWTO_BLOCKED;
- }
+ }
+
+ if (i == &stg_MSG_NULL_info) {
+ // we know there's a MSG_TRY_WAKEUP on the way, so we
+ // might as well just do it now. The message will
+ // be a no-op when it arrives.
+ unlockClosure((StgClosure*)m, i);
+ tryWakeupThread_(cap, target);
+ goto retry;
+ }
+
+ if (i != &stg_MSG_THROWTO_info) {
+ // if it's a MSG_NULL, this TSO has been woken up by another Cap
+ unlockClosure((StgClosure*)m, i);
+ goto retry;
+ }
+
+ if ((target->flags & TSO_BLOCKEX) &&
+ ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+ unlockClosure((StgClosure*)m, i);
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
+ }
+
+ // nobody else can wake up this TSO after we claim the message
+ unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
+
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
}
case BlockedOnMVar:
goto retry;
}
+ if (target->_link == END_TSO_QUEUE) {
+ // the MVar operation has already completed. There is a
+ // MSG_TRY_WAKEUP on the way, but we can just wake up the
+ // thread now anyway and ignore the message when it
+ // arrives.
+ unlockClosure((StgClosure *)mvar, info);
+ tryWakeupThread_(cap, target);
+ goto retry;
+ }
+
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- lockClosure((StgClosure *)target);
- blockedThrowTo(cap,source,target);
+ blockedThrowTo(cap,target,msg);
unlockClosure((StgClosure *)mvar, info);
- *out = target;
- return THROWTO_BLOCKED; // caller releases TSO
+ return THROWTO_BLOCKED;
} else {
- removeThreadFromMVarQueue(cap, mvar, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- unblockOne(cap, target);
+ // revoke the MVar operation
+ removeFromMVarBlockedQueue(target);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_SUCCESS;
}
case BlockedOnBlackHole:
{
- ACQUIRE_LOCK(&sched_mutex);
- // double checking the status after the memory barrier:
- if (target->why_blocked != BlockedOnBlackHole) {
- RELEASE_LOCK(&sched_mutex);
- goto retry;
- }
-
if (target->flags & TSO_BLOCKEX) {
- lockTSO(target);
- blockedThrowTo(cap,source,target);
- RELEASE_LOCK(&sched_mutex);
- *out = target;
- return THROWTO_BLOCKED; // caller releases TSO
- } else {
- removeThreadFromQueue(cap, &blackhole_queue, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- unblockOne(cap, target);
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_SUCCESS;
- }
- }
-
- case BlockedOnException:
- {
- StgTSO *target2;
- StgInfoTable *info;
-
- /*
- To obtain exclusive access to a BlockedOnException thread,
- we must call lockClosure() on the TSO on which it is blocked.
- Since the TSO might change underneath our feet, after we
- call lockClosure() we must check that
-
- (a) the closure we locked is actually a TSO
- (b) the original thread is still BlockedOnException,
- (c) the original thread is still blocked on the TSO we locked
- and (d) the target thread has not been relocated.
-
- We synchronise with threadStackOverflow() (which relocates
- threads) using lockClosure()/unlockClosure().
- */
- target2 = target->block_info.tso;
-
- info = lockClosure((StgClosure *)target2);
- if (info != &stg_TSO_info) {
- unlockClosure((StgClosure *)target2, info);
- goto retry;
- }
- if (target->what_next == ThreadRelocated) {
- target = target->_link;
- unlockTSO(target2);
- goto retry;
- }
- if (target2->what_next == ThreadRelocated) {
- target->block_info.tso = target2->_link;
- unlockTSO(target2);
- goto retry;
- }
- if (target->why_blocked != BlockedOnException
- || target->block_info.tso != target2) {
- unlockTSO(target2);
- goto retry;
- }
-
- /*
- Now we have exclusive rights to the target TSO...
-
- If it is blocking exceptions, add the source TSO to its
- blocked_exceptions queue. Otherwise, raise the exception.
- */
- if ((target->flags & TSO_BLOCKEX) &&
- ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- lockTSO(target);
- blockedThrowTo(cap,source,target);
- unlockTSO(target2);
- *out = target;
+ // BlockedOnBlackHole is not interruptible.
+ blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
} else {
- removeThreadFromQueue(cap, &target2->blocked_exceptions, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- unblockOne(cap, target);
- unlockTSO(target2);
- return THROWTO_SUCCESS;
- }
- }
+ // Revoke the message by replacing it with IND. We're not
+ // locking anything here, so we might still get a TRY_WAKEUP
+ // message from the owner of the blackhole some time in the
+ // future, but that doesn't matter.
+ ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+ OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
+ }
+ }
case BlockedOnSTM:
lockTSO(target);
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- blockedThrowTo(cap,source,target);
- *out = target;
+ blockedThrowTo(cap,target,msg);
+ unlockTSO(target);
return THROWTO_BLOCKED;
} else {
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- unblockOne(cap, target);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
unlockTSO(target);
return THROWTO_SUCCESS;
}
case BlockedOnCCall:
case BlockedOnCCall_NoUnblockExc:
- // I don't think it's possible to acquire ownership of a
- // BlockedOnCCall thread. We just assume that the target
- // thread is blocking exceptions, and block on its
- // blocked_exception queue.
- lockTSO(target);
- if (target->why_blocked != BlockedOnCCall &&
- target->why_blocked != BlockedOnCCall_NoUnblockExc) {
- unlockTSO(target);
- goto retry;
- }
- blockedThrowTo(cap,source,target);
- *out = target;
+ blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
#ifndef THREADEDED_RTS
#endif
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- blockedThrowTo(cap,source,target);
+ blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
} else {
removeFromQueues(cap,target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
return THROWTO_SUCCESS;
}
#endif
barf("throwTo");
}
-// Block a TSO on another TSO's blocked_exceptions queue.
-// Precondition: we hold an exclusive lock on the target TSO (this is
-// complex to achieve as there's no single lock on a TSO; see
-// throwTo()).
static void
-blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target)
+throwToSendMsg (Capability *cap STG_UNUSED,
+ Capability *target_cap USED_IF_THREADS,
+ MessageThrowTo *msg USED_IF_THREADS)
+
{
- if (source != NULL) {
- debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id);
- setTSOLink(cap, source, target->blocked_exceptions);
- target->blocked_exceptions = source;
- dirty_TSO(cap,target); // we modified the blocked_exceptions queue
-
- source->block_info.tso = target;
- write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is.
- source->why_blocked = BlockedOnException;
- }
-}
+#ifdef THREADED_RTS
+ debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
+ sendMessage(cap, target_cap, (Message*)msg);
+#endif
+}
-#ifdef THREADED_RTS
-void
-throwToReleaseTarget (void *tso)
+// Block a throwTo message on the target TSO's blocked_exceptions
+// queue. The current Capability must own the target TSO in order to
+// modify the blocked_exceptions queue.
+static void
+blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
{
- unlockTSO((StgTSO *)tso);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
+ (unsigned long)target->id);
+
+ ASSERT(target->cap == cap);
+
+ msg->link = target->blocked_exceptions;
+ target->blocked_exceptions = msg;
+ dirty_TSO(cap,target); // we modified the blocked_exceptions queue
}
-#endif
/* -----------------------------------------------------------------------------
Waking up threads blocked in throwTo
int
maybePerformBlockedException (Capability *cap, StgTSO *tso)
{
- StgTSO *source;
+ MessageThrowTo *msg;
+ const StgInfoTable *i;
if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
- if (tso->blocked_exceptions != END_TSO_QUEUE) {
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
awakenBlockedExceptionQueue(cap,tso);
return 1;
} else {
}
}
- if (tso->blocked_exceptions != END_TSO_QUEUE &&
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
(tso->flags & TSO_BLOCKEX) != 0) {
- debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
}
- if (tso->blocked_exceptions != END_TSO_QUEUE
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
&& ((tso->flags & TSO_BLOCKEX) == 0
|| ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
- // Lock the TSO, this gives us exclusive access to the queue
- lockTSO(tso);
-
- // Check the queue again; it might have changed before we
- // locked it.
- if (tso->blocked_exceptions == END_TSO_QUEUE) {
- unlockTSO(tso);
- return 0;
- }
-
// We unblock just the first thread on the queue, and perform
// its throw immediately.
- source = tso->blocked_exceptions;
- performBlockedException(cap, source, tso);
- tso->blocked_exceptions = unblockOne_(cap, source,
- rtsFalse/*no migrate*/);
- unlockTSO(tso);
+ loop:
+ msg = tso->blocked_exceptions;
+ if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
+ i = lockClosure((StgClosure*)msg);
+ tso->blocked_exceptions = (MessageThrowTo*)msg->link;
+ if (i == &stg_MSG_NULL_info) {
+ unlockClosure((StgClosure*)msg,i);
+ goto loop;
+ }
+
+ throwToSingleThreaded(cap, msg->target, msg->exception);
+ unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info);
+ tryWakeupThread(cap, msg->source);
return 1;
}
return 0;
}
// awakenBlockedExceptionQueue(): Just wake up the whole queue of
-// blocked exceptions and let them try again.
+// blocked exceptions.
void
awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
{
- lockTSO(tso);
- awakenBlockedQueue(cap, tso->blocked_exceptions);
- tso->blocked_exceptions = END_TSO_QUEUE;
- unlockTSO(tso);
+ MessageThrowTo *msg;
+ const StgInfoTable *i;
+
+ for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
+ msg = (MessageThrowTo*)msg->link) {
+ i = lockClosure((StgClosure *)msg);
+ if (i != &stg_MSG_NULL_info) {
+ unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info);
+ tryWakeupThread(cap, msg->source);
+ } else {
+ unlockClosure((StgClosure *)msg,i);
+ }
+ }
+ tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
}
-static void
-performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
-{
- StgClosure *exception;
-
- ASSERT(source->why_blocked == BlockedOnException);
- ASSERT(source->block_info.tso->id == target->id);
- ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
- ASSERT(((StgTSO *)source->sp[1])->id == target->id);
- // check ids not pointers, because the thread might be relocated
-
- exception = (StgClosure *)source->sp[2];
- throwToSingleThreaded(cap, target, exception);
- source->sp += 3;
-}
-
/* -----------------------------------------------------------------------------
Remove a thread from blocking queues.
-------------------------------------------------------------------------- */
static void
+removeFromMVarBlockedQueue (StgTSO *tso)
+{
+ StgMVar *mvar = (StgMVar*)tso->block_info.closure;
+ StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
+
+ if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
+ // already removed from this MVar
+ return;
+ }
+
+ // Assume the MVar is locked. (not assertable; sometimes it isn't
+ // actually WHITEHOLE'd).
+
+ // We want to remove the MVAR_TSO_QUEUE object from the queue. It
+ // isn't doubly-linked so we can't actually remove it; instead we
+ // just overwrite it with an IND if possible and let the GC short
+ // it out. However, we have to be careful to maintain the deque
+ // structure:
+
+ if (mvar->head == q) {
+ mvar->head = q->link;
+ q->header.info = &stg_IND_info;
+ if (mvar->tail == q) {
+ mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
+ }
+ }
+ else if (mvar->tail == q) {
+ // we can't replace it with an IND in this case, because then
+ // we lose the tail pointer when the GC shorts out the IND.
+ // So we use MSG_NULL as a kind of non-dupable indirection;
+ // these are ignored by takeMVar/putMVar.
+ q->header.info = &stg_MSG_NULL_info;
+ }
+ else {
+ q->header.info = &stg_IND_info;
+ }
+
+ // revoke the MVar operation
+ tso->_link = END_TSO_QUEUE;
+}
+
+static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
switch (tso->why_blocked) {
case NotBlocked:
+ case ThreadMigrating:
return;
case BlockedOnSTM:
goto done;
case BlockedOnMVar:
- removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
+ removeFromMVarBlockedQueue(tso);
goto done;
case BlockedOnBlackHole:
- removeThreadFromQueue(cap, &blackhole_queue, tso);
+ // nothing to do
goto done;
- case BlockedOnException:
- {
- StgTSO *target = tso->block_info.tso;
-
- // NO: when called by threadPaused(), we probably have this
- // TSO already locked (WHITEHOLEd) because we just placed
- // ourselves on its queue.
- // ASSERT(get_itbl(target)->type == TSO);
-
- while (target->what_next == ThreadRelocated) {
- target = target->_link;
- }
-
- removeThreadFromQueue(cap, &target->blocked_exceptions, tso);
- goto done;
- }
+ case BlockedOnMsgThrowTo:
+ {
+ MessageThrowTo *m = tso->block_info.throwto;
+ // The message is locked by us, unless we got here via
+ // deleteAllThreads(), in which case we own all the
+ // capabilities.
+ // ASSERT(m->header.info == &stg_WHITEHOLE_info);
+
+ // unlock and revoke it at the same time
+ unlockClosure((StgClosure*)m,&stg_MSG_NULL_info);
+ break;
+ }
#if !defined(THREADED_RTS)
case BlockedOnRead:
}
done:
- unblockOne(cap, tso);
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap, tso);
}
/* -----------------------------------------------------------------------------
* asynchronous exception in an existing thread.
*
* We first remove the thread from any queue on which it might be
- * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
+ * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
+ * TSO blocked_exception queues.
*
* We strip the stack down to the innermost CATCH_FRAME, building
* thunks in the heap for all the active computations, so they can
StgClosure *updatee;
nat i;
- debugTrace(DEBUG_sched,
- "raising exception in thread %ld.", (long)tso->id);
+ debugTraceCap(DEBUG_sched, cap,
+ "raising exception in thread %ld.", (long)tso->id);
#if defined(PROFILING)
/*
fprintCCS_stderr(tso->prof.CCCS);
}
#endif
+ // ASSUMES: the thread is not already complete or dead, or
+ // ThreadRelocated. Upper layers should deal with that.
+ ASSERT(tso->what_next != ThreadComplete &&
+ tso->what_next != ThreadKilled &&
+ tso->what_next != ThreadRelocated);
+
+ // only if we own this TSO (except that deleteThread() calls this
+ ASSERT(tso->cap == cap);
+
+ // wake it up
+ if (tso->why_blocked != NotBlocked) {
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+ }
// mark it dirty; we're about to change its stack.
dirty_TSO(cap, tso);
sp = tso->sp;
- // ASSUMES: the thread is not already complete or dead. Upper
- // layers should deal with that.
- ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled);
-
if (stop_here != NULL) {
updatee = stop_here->updatee;
} else {
// Perform the update
// TODO: this may waste some work, if the thunk has
// already been updated by another thread.
- UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+ updateThunk(cap, tso,
+ ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
}
sp += sizeofW(StgUpdateFrame) - 1;
// top of the CATCH_FRAME ready to enter.
//
{
-#ifdef PROFILING
StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
StgThunk *raise;
if (exception == NULL) break;
* a surprise exception before we get around to executing the
* handler.
*/
- tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE;
+ tso->flags |= TSO_BLOCKEX;
+ if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
+ tso->flags &= ~TSO_INTERRUPTIBLE;
+ } else {
+ tso->flags |= TSO_INTERRUPTIBLE;
+ }
/* Put the newly-built THUNK on top of the stack, ready to execute
* when the thread restarts.
{
StgTRecHeader *trec = tso -> trec;
StgTRecHeader *outer = trec -> enclosing_trec;
- debugTrace(DEBUG_stm,
- "found atomically block delivering async exception");
+ debugTraceCap(DEBUG_stm, cap,
+ "found atomically block delivering async exception");
stmAbortTransaction(cap, trec);
stmFreeAbortedTRec(cap, trec);
tso -> trec = outer;