#include "STM.h"
#include "sm/Sanity.h"
#include "Profiling.h"
+#include "Messages.h"
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
static void removeFromQueues(Capability *cap, StgTSO *tso);
+static void removeFromMVarBlockedQueue (StgTSO *tso);
+
static void blockedThrowTo (Capability *cap,
StgTSO *target, MessageThrowTo *msg);
Capability *target_cap USED_IF_THREADS,
MessageThrowTo *msg USED_IF_THREADS);
-static void performBlockedException (Capability *cap, MessageThrowTo *msg);
-
/* -----------------------------------------------------------------------------
throwToSingleThreaded
throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
rtsBool stop_at_atomically)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
void
suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
Currently we send a message if the target belongs to another
Capability, and it is
- - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo,
+ - NotBlocked, BlockedOnMsgThrowTo,
BlockedOnCCall
- or it is masking exceptions (TSO_BLOCKEX)
unlockClosure(msg, &stg_MSG_THROWTO_info);
If it decides not to raise the exception after
all, it can revoke it safely with
- unlockClosure(msg, &stg_IND_info);
+ unlockClosure(msg, &stg_MSG_NULL_info);
-------------------------------------------------------------------------- */
msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
// message starts locked; the caller has to unlock it when it is
// ready.
- msg->header.info = &stg_WHITEHOLE_info;
+ SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
msg->source = source;
msg->target = target;
msg->exception = exception;
{
StgWord status;
StgTSO *target = msg->target;
+ Capability *target_cap;
+ goto check_target;
+
+retry:
+ write_barrier();
+ debugTrace(DEBUG_sched, "throwTo: retrying...");
+
+check_target:
ASSERT(target != END_TSO_QUEUE);
// follow ThreadRelocated links in the target first
- while (target->what_next == ThreadRelocated) {
- target = target->_link;
- // No, it might be a WHITEHOLE:
- // ASSERT(get_itbl(target)->type == TSO);
+ target = deRefTSO(target);
+
+ // Thread already dead?
+ if (target->what_next == ThreadComplete
+ || target->what_next == ThreadKilled) {
+ return THROWTO_SUCCESS;
}
debugTraceCap(DEBUG_sched, cap,
traceThreadStatus(DEBUG_sched, target);
#endif
- goto check_target;
-retry:
- write_barrier();
- debugTrace(DEBUG_sched, "throwTo: retrying...");
-
-check_target:
- ASSERT(target != END_TSO_QUEUE);
-
- // Thread already dead?
- if (target->what_next == ThreadComplete
- || target->what_next == ThreadKilled) {
- return THROWTO_SUCCESS;
+ target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
}
status = target->why_blocked;
switch (status) {
case NotBlocked:
- case BlockedOnMsgWakeup:
- /* if status==NotBlocked, and target->cap == cap, then
- we own this TSO and can raise the exception.
-
- How do we establish this condition? Very carefully.
-
- Let
- P = (status == NotBlocked)
- Q = (tso->cap == cap)
-
- Now, if P & Q are true, then the TSO is locked and owned by
- this capability. No other OS thread can steal it.
-
- If P==0 and Q==1: the TSO is blocked, but attached to this
- capabilty, and it can be stolen by another capability.
-
- If P==1 and Q==0: the TSO is runnable on another
- capability. At any time, the TSO may change from runnable
- to blocked and vice versa, while it remains owned by
- another capability.
-
- Suppose we test like this:
-
- p = P
- q = Q
- if (p && q) ...
-
- this is defeated by another capability stealing a blocked
- TSO from us to wake it up (Schedule.c:unblockOne()). The
- other thread is doing
-
- Q = 0
- P = 1
-
- assuming arbitrary reordering, we could see this
- interleaving:
-
- start: P==0 && Q==1
- P = 1
- p = P
- q = Q
- Q = 0
- if (p && q) ...
-
- so we need a memory barrier:
-
- p = P
- mb()
- q = Q
- if (p && q) ...
-
- this avoids the problematic case. There are other cases
- to consider, but this is the tricky one.
-
- Note that we must be sure that unblockOne() does the
- writes in the correct order: Q before P. The memory
- barrier ensures that if we have seen the write to P, we
- have also seen the write to Q.
- */
{
- Capability *target_cap;
-
- write_barrier();
- target_cap = target->cap;
- if (target_cap != cap) {
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
+ if ((target->flags & TSO_BLOCKEX) == 0) {
+ // It's on our run queue and not blocking exceptions
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
} else {
- if ((target->flags & TSO_BLOCKEX) == 0) {
- // It's on our run queue and not blocking exceptions
- raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- return THROWTO_SUCCESS;
- } else {
- blockedThrowTo(cap,target,msg);
- return THROWTO_BLOCKED;
- }
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
}
}
case BlockedOnMsgThrowTo:
{
- Capability *target_cap;
const StgInfoTable *i;
MessageThrowTo *m;
}
}
- if (i != &stg_MSG_THROWTO_info) {
- // if it's an IND, this TSO has been woken up by another Cap
+ if (i == &stg_MSG_NULL_info) {
+ // we know there's a MSG_TRY_WAKEUP on the way, so we
+ // might as well just do it now. The message will
+ // be a no-op when it arrives.
unlockClosure((StgClosure*)m, i);
+ tryWakeupThread_(cap, target);
goto retry;
}
- target_cap = target->cap;
- if (target_cap != cap) {
+ if (i != &stg_MSG_THROWTO_info) {
+ // if it's a MSG_NULL, this TSO has been woken up by another Cap
unlockClosure((StgClosure*)m, i);
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
+ goto retry;
}
if ((target->flags & TSO_BLOCKEX) &&
}
// nobody else can wake up this TSO after we claim the message
- unlockClosure((StgClosure*)m, &stg_IND_info);
+ unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
return THROWTO_SUCCESS;
}
goto retry;
}
+ if (target->_link == END_TSO_QUEUE) {
+ // the MVar operation has already completed. There is a
+ // MSG_TRY_WAKEUP on the way, but we can just wake up the
+ // thread now anyway and ignore the message when it
+ // arrives.
+ unlockClosure((StgClosure *)mvar, info);
+ tryWakeupThread_(cap, target);
+ goto retry;
+ }
+
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
+ blockedThrowTo(cap,target,msg);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_BLOCKED;
} else {
- removeThreadFromMVarQueue(cap, mvar, target);
+ // revoke the MVar operation
+ removeFromMVarBlockedQueue(target);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_SUCCESS;
}
case BlockedOnBlackHole:
{
- ACQUIRE_LOCK(&sched_mutex);
- // double checking the status after the memory barrier:
- if (target->why_blocked != BlockedOnBlackHole) {
- RELEASE_LOCK(&sched_mutex);
- goto retry;
- }
-
if (target->flags & TSO_BLOCKEX) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_BLOCKED; // caller releases lock
+ // BlockedOnBlackHole is not interruptible.
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
} else {
- removeThreadFromQueue(cap, &blackhole_queue, target);
- raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_SUCCESS;
- }
+ // Revoke the message by replacing it with IND. We're not
+ // locking anything here, so we might still get a TRY_WAKEUP
+ // message from the owner of the blackhole some time in the
+ // future, but that doesn't matter.
+ ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+ OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
+ }
}
case BlockedOnSTM:
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
+ blockedThrowTo(cap,target,msg);
unlockTSO(target);
return THROWTO_BLOCKED;
} else {
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
unlockTSO(target);
return THROWTO_SUCCESS;
}
case BlockedOnCCall:
case BlockedOnCCall_NoUnblockExc:
- {
- Capability *target_cap;
-
- target_cap = target->cap;
- if (target_cap != cap) {
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
- }
-
blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
- }
#ifndef THREADEDED_RTS
case BlockedOnRead:
{
#ifdef THREADED_RTS
- debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
- sendMessage(target_cap, (Message*)msg);
+ sendMessage(cap, target_cap, (Message*)msg);
#endif
}
ASSERT(target->cap == cap);
- msg->link = (Message*)target->blocked_exceptions;
+ msg->link = target->blocked_exceptions;
target->blocked_exceptions = msg;
dirty_TSO(cap,target); // we modified the blocked_exceptions queue
}
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
(tso->flags & TSO_BLOCKEX) != 0) {
- debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
}
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
i = lockClosure((StgClosure*)msg);
tso->blocked_exceptions = (MessageThrowTo*)msg->link;
- if (i == &stg_IND_info) {
+ if (i == &stg_MSG_NULL_info) {
unlockClosure((StgClosure*)msg,i);
goto loop;
}
- performBlockedException(cap, msg);
- unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
- unlockClosure((StgClosure*)msg,&stg_IND_info);
+ throwToSingleThreaded(cap, msg->target, msg->exception);
+ unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info);
+ tryWakeupThread(cap, msg->source);
return 1;
}
return 0;
}
// awakenBlockedExceptionQueue(): Just wake up the whole queue of
-// blocked exceptions and let them try again.
+// blocked exceptions.
void
awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
msg = (MessageThrowTo*)msg->link) {
i = lockClosure((StgClosure *)msg);
- if (i != &stg_IND_info) {
- unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+ if (i != &stg_MSG_NULL_info) {
+ unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info);
+ tryWakeupThread(cap, msg->source);
+ } else {
+ unlockClosure((StgClosure *)msg,i);
}
- unlockClosure((StgClosure *)msg,i);
}
tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
}
-static void
-performBlockedException (Capability *cap, MessageThrowTo *msg)
-{
- StgTSO *source;
-
- source = msg->source;
-
- ASSERT(source->why_blocked == BlockedOnMsgThrowTo);
- ASSERT(source->block_info.closure == (StgClosure *)msg);
- ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
- ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id);
- // check ids not pointers, because the thread might be relocated
-
- throwToSingleThreaded(cap, msg->target, msg->exception);
- source->sp += 3;
-}
-
/* -----------------------------------------------------------------------------
Remove a thread from blocking queues.
-------------------------------------------------------------------------- */
static void
+removeFromMVarBlockedQueue (StgTSO *tso)
+{
+ StgMVar *mvar = (StgMVar*)tso->block_info.closure;
+ StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
+
+ if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
+ // already removed from this MVar
+ return;
+ }
+
+ // Assume the MVar is locked. (not assertable; sometimes it isn't
+ // actually WHITEHOLE'd).
+
+ // We want to remove the MVAR_TSO_QUEUE object from the queue. It
+ // isn't doubly-linked so we can't actually remove it; instead we
+ // just overwrite it with an IND if possible and let the GC short
+ // it out. However, we have to be careful to maintain the deque
+ // structure:
+
+ if (mvar->head == q) {
+ mvar->head = q->link;
+ q->header.info = &stg_IND_info;
+ if (mvar->tail == q) {
+ mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
+ }
+ }
+ else if (mvar->tail == q) {
+ // we can't replace it with an IND in this case, because then
+ // we lose the tail pointer when the GC shorts out the IND.
+ // So we use MSG_NULL as a kind of non-dupable indirection;
+ // these are ignored by takeMVar/putMVar.
+ q->header.info = &stg_MSG_NULL_info;
+ }
+ else {
+ q->header.info = &stg_IND_info;
+ }
+
+ // revoke the MVar operation
+ tso->_link = END_TSO_QUEUE;
+}
+
+static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
switch (tso->why_blocked) {
case NotBlocked:
+ case ThreadMigrating:
return;
case BlockedOnSTM:
goto done;
case BlockedOnMVar:
- removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
+ removeFromMVarBlockedQueue(tso);
goto done;
case BlockedOnBlackHole:
- removeThreadFromQueue(cap, &blackhole_queue, tso);
+ // nothing to do
goto done;
- case BlockedOnMsgWakeup:
- {
- // kill the message, atomically:
- tso->block_info.wakeup->header.info = &stg_IND_info;
- break;
- }
-
case BlockedOnMsgThrowTo:
{
MessageThrowTo *m = tso->block_info.throwto;
// ASSERT(m->header.info == &stg_WHITEHOLE_info);
// unlock and revoke it at the same time
- unlockClosure((StgClosure*)m,&stg_IND_info);
+ unlockClosure((StgClosure*)m,&stg_MSG_NULL_info);
break;
}
}
done:
- unblockOne(cap, tso);
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap, tso);
}
/* -----------------------------------------------------------------------------
* asynchronous exception in an existing thread.
*
* We first remove the thread from any queue on which it might be
- * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
+ * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
+ * TSO blocked_exception queues.
*
* We strip the stack down to the innermost CATCH_FRAME, building
* thunks in the heap for all the active computations, so they can
StgClosure *updatee;
nat i;
- debugTrace(DEBUG_sched,
- "raising exception in thread %ld.", (long)tso->id);
+ debugTraceCap(DEBUG_sched, cap,
+ "raising exception in thread %ld.", (long)tso->id);
#if defined(PROFILING)
/*
fprintCCS_stderr(tso->prof.CCCS);
}
#endif
+ // ASSUMES: the thread is not already complete or dead, or
+ // ThreadRelocated. Upper layers should deal with that.
+ ASSERT(tso->what_next != ThreadComplete &&
+ tso->what_next != ThreadKilled &&
+ tso->what_next != ThreadRelocated);
- while (tso->what_next == ThreadRelocated) {
- tso = tso->_link;
- }
+ // only if we own this TSO (except that deleteThread() calls this
+ ASSERT(tso->cap == cap);
+
+ // wake it up
+ if (tso->why_blocked != NotBlocked) {
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+ }
// mark it dirty; we're about to change its stack.
dirty_TSO(cap, tso);
sp = tso->sp;
- // ASSUMES: the thread is not already complete or dead. Upper
- // layers should deal with that.
- ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled);
-
if (stop_here != NULL) {
updatee = stop_here->updatee;
} else {
// Perform the update
// TODO: this may waste some work, if the thunk has
// already been updated by another thread.
- UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+ updateThunk(cap, tso,
+ ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
}
sp += sizeofW(StgUpdateFrame) - 1;
// top of the CATCH_FRAME ready to enter.
//
{
-#ifdef PROFILING
StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
StgThunk *raise;
if (exception == NULL) break;
* a surprise exception before we get around to executing the
* handler.
*/
- tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE;
+ tso->flags |= TSO_BLOCKEX;
+ if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
+ tso->flags &= ~TSO_INTERRUPTIBLE;
+ } else {
+ tso->flags |= TSO_INTERRUPTIBLE;
+ }
/* Put the newly-built THUNK on top of the stack, ready to execute
* when the thread restarts.
{
StgTRecHeader *trec = tso -> trec;
StgTRecHeader *outer = trec -> enclosing_trec;
- debugTrace(DEBUG_stm,
- "found atomically block delivering async exception");
+ debugTraceCap(DEBUG_stm, cap,
+ "found atomically block delivering async exception");
stmAbortTransaction(cap, trec);
stmFreeAbortedTRec(cap, trec);
tso -> trec = outer;