#include "win32/IOManager.h"
#endif
-static void raiseAsync (Capability *cap,
- StgTSO *tso,
- StgClosure *exception,
- rtsBool stop_at_atomically,
- StgUpdateFrame *stop_here);
+static StgTSO* raiseAsync (Capability *cap,
+ StgTSO *tso,
+ StgClosure *exception,
+ rtsBool stop_at_atomically,
+ StgUpdateFrame *stop_here);
static void removeFromQueues(Capability *cap, StgTSO *tso);
+static void removeFromMVarBlockedQueue (StgTSO *tso);
+
static void blockedThrowTo (Capability *cap,
StgTSO *target, MessageThrowTo *msg);
has been raised.
-------------------------------------------------------------------------- */
-void
-throwToSingleThreaded(Capability *cap, StgTSO *tso, StgClosure *exception)
-{
- throwToSingleThreaded_(cap, tso, exception, rtsFalse);
-}
-
-void
-throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically)
+static void
+throwToSingleThreaded__ (Capability *cap, StgTSO *tso, StgClosure *exception,
+ rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
{
- tso = deRefTSO(tso);
-
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
- return;
+ return;
}
// Remove it from any blocking queues
removeFromQueues(cap,tso);
- raiseAsync(cap, tso, exception, stop_at_atomically, NULL);
+ raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
}
void
-suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
+throwToSingleThreaded (Capability *cap, StgTSO *tso, StgClosure *exception)
{
- tso = deRefTSO(tso);
-
- // Thread already dead?
- if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
- return;
- }
+ throwToSingleThreaded__(cap, tso, exception, rtsFalse, NULL);
+}
- // Remove it from any blocking queues
- removeFromQueues(cap,tso);
+void
+throwToSingleThreaded_ (Capability *cap, StgTSO *tso, StgClosure *exception,
+ rtsBool stop_at_atomically)
+{
+ throwToSingleThreaded__ (cap, tso, exception, stop_at_atomically, NULL);
+}
- raiseAsync(cap, tso, NULL, rtsFalse, stop_here);
+void // cannot return a different TSO
+suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
+{
+ throwToSingleThreaded__ (cap, tso, NULL, rtsFalse, stop_here);
}
/* -----------------------------------------------------------------------------
Currently we send a message if the target belongs to another
Capability, and it is
- - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo,
- BlockedOnCCall
+ - NotBlocked, BlockedOnMsgThrowTo,
+ BlockedOnCCall_Interruptible
- or it is masking exceptions (TSO_BLOCKEX)
check_target:
ASSERT(target != END_TSO_QUEUE);
- // follow ThreadRelocated links in the target first
- target = deRefTSO(target);
-
// Thread already dead?
if (target->what_next == ThreadComplete
|| target->what_next == ThreadKilled) {
switch (status) {
case NotBlocked:
- case BlockedOnMsgWakeup:
- /* if status==NotBlocked, and target->cap == cap, then
- we own this TSO and can raise the exception.
-
- How do we establish this condition? Very carefully.
-
- Let
- P = (status == NotBlocked)
- Q = (tso->cap == cap)
-
- Now, if P & Q are true, then the TSO is locked and owned by
- this capability. No other OS thread can steal it.
-
- If P==0 and Q==1: the TSO is blocked, but attached to this
- capabilty, and it can be stolen by another capability.
-
- If P==1 and Q==0: the TSO is runnable on another
- capability. At any time, the TSO may change from runnable
- to blocked and vice versa, while it remains owned by
- another capability.
-
- Suppose we test like this:
-
- p = P
- q = Q
- if (p && q) ...
-
- this is defeated by another capability stealing a blocked
- TSO from us to wake it up (Schedule.c:unblockOne()). The
- other thread is doing
-
- Q = 0
- P = 1
-
- assuming arbitrary reordering, we could see this
- interleaving:
-
- start: P==0 && Q==1
- P = 1
- p = P
- q = Q
- Q = 0
- if (p && q) ...
-
- so we need a memory barrier:
-
- p = P
- mb()
- q = Q
- if (p && q) ...
-
- this avoids the problematic case. There are other cases
- to consider, but this is the tricky one.
-
- Note that we must be sure that unblockOne() does the
- writes in the correct order: Q before P. The memory
- barrier ensures that if we have seen the write to P, we
- have also seen the write to Q.
- */
{
- write_barrier();
if ((target->flags & TSO_BLOCKEX) == 0) {
// It's on our run queue and not blocking exceptions
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
}
// nobody else can wake up this TSO after we claim the message
- unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
+ doneWithMsgThrowTo(m);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
return THROWTO_SUCCESS;
info = lockClosure((StgClosure *)mvar);
- if (target->what_next == ThreadRelocated) {
- target = target->_link;
- unlockClosure((StgClosure *)mvar,info);
- goto retry;
- }
- // we have the MVar, let's check whether the thread
+ // we have the MVar, let's check whether the thread
// is still blocked on the same MVar.
if (target->why_blocked != BlockedOnMVar
|| (StgMVar *)target->block_info.closure != mvar) {
goto retry;
}
+ if (target->_link == END_TSO_QUEUE) {
+ // the MVar operation has already completed. There is a
+ // MSG_TRY_WAKEUP on the way, but we can just wake up the
+ // thread now anyway and ignore the message when it
+ // arrives.
+ unlockClosure((StgClosure *)mvar, info);
+ tryWakeupThread(cap, target);
+ goto retry;
+ }
+
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
blockedThrowTo(cap,target,msg);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_BLOCKED;
} else {
- removeThreadFromMVarQueue(cap, mvar, target);
+ // revoke the MVar operation
+ removeFromMVarBlockedQueue(target);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- if (info == &stg_MVAR_CLEAN_info) {
- dirty_MVAR(&cap->r,(StgClosure*)mvar);
- }
- unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
+ unlockClosure((StgClosure *)mvar, info);
return THROWTO_SUCCESS;
}
}
case BlockedOnBlackHole:
{
- // Revoke the message by replacing it with IND. We're not
- // locking anything here, so we might still get a TRY_WAKEUP
- // message from the owner of the blackhole some time in the
- // future, but that doesn't matter.
- ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
- OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
- raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- return THROWTO_SUCCESS;
+ if (target->flags & TSO_BLOCKEX) {
+ // BlockedOnBlackHole is not interruptible.
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
+ } else {
+ // Revoke the message by replacing it with IND. We're not
+ // locking anything here, so we might still get a TRY_WAKEUP
+ // message from the owner of the blackhole some time in the
+ // future, but that doesn't matter.
+ ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+ OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
+ }
}
case BlockedOnSTM:
return THROWTO_SUCCESS;
}
+ case BlockedOnCCall_Interruptible:
+#ifdef THREADED_RTS
+ {
+ Task *task = NULL;
+ // walk suspended_ccalls to find the correct worker thread
+ InCall *incall;
+ for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
+ if (incall->suspended_tso == target) {
+ task = incall->task;
+ break;
+ }
+ }
+ if (task != NULL) {
+ blockedThrowTo(cap, target, msg);
+ if (!((target->flags & TSO_BLOCKEX) &&
+ ((target->flags & TSO_INTERRUPTIBLE) == 0))) {
+ interruptWorkerTask(task);
+ }
+ return THROWTO_BLOCKED;
+ } else {
+ debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill");
+ }
+ // fall to next
+ }
+#endif
case BlockedOnCCall:
- case BlockedOnCCall_NoUnblockExc:
blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
}
#endif
+ case ThreadMigrating:
+ // if is is ThreadMigrating and tso->cap is ours, then it
+ // *must* be migrating *to* this capability. If it were
+ // migrating away from the capability, then tso->cap would
+ // point to the destination.
+ //
+ // There is a MSG_WAKEUP in the message queue for this thread,
+ // but we can just do it preemptively:
+ tryWakeupThread(cap, target);
+ // and now retry, the thread should be runnable.
+ goto retry;
+
default:
- barf("throwTo: unrecognised why_blocked value");
+ barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked);
}
barf("throwTo");
}
{
MessageThrowTo *msg;
const StgInfoTable *i;
-
+ StgTSO *source;
+
if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
awakenBlockedExceptionQueue(cap,tso);
}
throwToSingleThreaded(cap, msg->target, msg->exception);
- unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info);
- tryWakeupThread(cap, msg->source);
+ source = msg->source;
+ doneWithMsgThrowTo(msg);
+ tryWakeupThread(cap, source);
return 1;
}
return 0;
{
MessageThrowTo *msg;
const StgInfoTable *i;
+ StgTSO *source;
for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
msg = (MessageThrowTo*)msg->link) {
i = lockClosure((StgClosure *)msg);
if (i != &stg_MSG_NULL_info) {
- unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info);
- tryWakeupThread(cap, msg->source);
+ source = msg->source;
+ doneWithMsgThrowTo(msg);
+ tryWakeupThread(cap, source);
} else {
unlockClosure((StgClosure *)msg,i);
}
-------------------------------------------------------------------------- */
static void
+removeFromMVarBlockedQueue (StgTSO *tso)
+{
+ StgMVar *mvar = (StgMVar*)tso->block_info.closure;
+ StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
+
+ if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
+ // already removed from this MVar
+ return;
+ }
+
+ // Assume the MVar is locked. (not assertable; sometimes it isn't
+ // actually WHITEHOLE'd).
+
+ // We want to remove the MVAR_TSO_QUEUE object from the queue. It
+ // isn't doubly-linked so we can't actually remove it; instead we
+ // just overwrite it with an IND if possible and let the GC short
+ // it out. However, we have to be careful to maintain the deque
+ // structure:
+
+ if (mvar->head == q) {
+ mvar->head = q->link;
+ OVERWRITE_INFO(q, &stg_IND_info);
+ if (mvar->tail == q) {
+ mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
+ }
+ }
+ else if (mvar->tail == q) {
+ // we can't replace it with an IND in this case, because then
+ // we lose the tail pointer when the GC shorts out the IND.
+ // So we use MSG_NULL as a kind of non-dupable indirection;
+ // these are ignored by takeMVar/putMVar.
+ OVERWRITE_INFO(q, &stg_MSG_NULL_info);
+ }
+ else {
+ OVERWRITE_INFO(q, &stg_IND_info);
+ }
+
+ // revoke the MVar operation
+ tso->_link = END_TSO_QUEUE;
+}
+
+static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
switch (tso->why_blocked) {
case NotBlocked:
+ case ThreadMigrating:
return;
case BlockedOnSTM:
goto done;
case BlockedOnMVar:
- removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
- // we aren't doing a write barrier here: the MVar is supposed to
- // be already locked, so replacing the info pointer would unlock it.
+ removeFromMVarBlockedQueue(tso);
goto done;
case BlockedOnBlackHole:
// nothing to do
goto done;
- case BlockedOnMsgWakeup:
- {
- // kill the message, atomically:
- OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
- break;
- }
-
case BlockedOnMsgThrowTo:
{
MessageThrowTo *m = tso->block_info.throwto;
// ASSERT(m->header.info == &stg_WHITEHOLE_info);
// unlock and revoke it at the same time
- unlockClosure((StgClosure*)m,&stg_MSG_NULL_info);
+ doneWithMsgThrowTo(m);
break;
}
}
done:
- unblockOne(cap, tso);
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap, tso);
}
/* -----------------------------------------------------------------------------
*
* -------------------------------------------------------------------------- */
-static void
+static StgTSO *
raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
{
StgPtr sp, frame;
StgClosure *updatee;
nat i;
+ StgStack *stack;
debugTraceCap(DEBUG_sched, cap,
"raising exception in thread %ld.", (long)tso->id);
fprintCCS_stderr(tso->prof.CCCS);
}
#endif
- // ASSUMES: the thread is not already complete or dead, or
- // ThreadRelocated. Upper layers should deal with that.
+ // ASSUMES: the thread is not already complete or dead
+ // Upper layers should deal with that.
ASSERT(tso->what_next != ThreadComplete &&
- tso->what_next != ThreadKilled &&
- tso->what_next != ThreadRelocated);
+ tso->what_next != ThreadKilled);
// only if we own this TSO (except that deleteThread() calls this
ASSERT(tso->cap == cap);
- // wake it up
- if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
- }
+ stack = tso->stackobj;
// mark it dirty; we're about to change its stack.
dirty_TSO(cap, tso);
+ dirty_STACK(cap, stack);
- sp = tso->sp;
+ sp = stack->sp;
if (stop_here != NULL) {
updatee = stop_here->updatee;
//
// 5. If it's a STOP_FRAME, then kill the thread.
//
- // NB: if we pass an ATOMICALLY_FRAME then abort the associated
+ // 6. If it's an UNDERFLOW_FRAME, then continue with the next
+ // stack chunk.
+ //
+ // NB: if we pass an ATOMICALLY_FRAME then abort the associated
// transaction
- info = get_ret_itbl((StgClosure *)frame);
+ info = get_ret_itbl((StgClosure *)frame);
switch (info->i.type) {
continue; //no need to bump frame
}
- case STOP_FRAME:
+ case UNDERFLOW_FRAME:
+ {
+ StgAP_STACK * ap;
+ nat words;
+
+ // First build an AP_STACK consisting of the stack chunk above the
+ // current update frame, with the top word on the stack as the
+ // fun field.
+ //
+ words = frame - sp - 1;
+ ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
+
+ ap->size = words;
+ ap->fun = (StgClosure *)sp[0];
+ sp++;
+ for(i=0; i < (nat)words; ++i) {
+ ap->payload[i] = (StgClosure *)*sp++;
+ }
+
+ SET_HDR(ap,&stg_AP_STACK_NOUPD_info,
+ ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
+ TICK_ALLOC_SE_THK(words+1,0);
+
+ stack->sp = sp;
+ threadStackUnderflow(cap,tso);
+ stack = tso->stackobj;
+ sp = stack->sp;
+
+ sp--;
+ sp[0] = (W_)ap;
+ frame = sp + 1;
+ continue;
+ }
+
+ case STOP_FRAME:
{
// We've stripped the entire stack, the thread is now dead.
tso->what_next = ThreadKilled;
- tso->sp = frame + sizeofW(StgStopFrame);
- return;
+ stack->sp = frame + sizeofW(StgStopFrame);
+ goto done;
}
case CATCH_FRAME:
// top of the CATCH_FRAME ready to enter.
//
{
-#ifdef PROFILING
StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
StgThunk *raise;
if (exception == NULL) break;
* a surprise exception before we get around to executing the
* handler.
*/
- tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE;
+ tso->flags |= TSO_BLOCKEX;
+ if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
+ tso->flags &= ~TSO_INTERRUPTIBLE;
+ } else {
+ tso->flags |= TSO_INTERRUPTIBLE;
+ }
/* Put the newly-built THUNK on top of the stack, ready to execute
* when the thread restarts.
*/
sp[0] = (W_)raise;
sp[-1] = (W_)&stg_enter_info;
- tso->sp = sp-1;
+ stack->sp = sp-1;
tso->what_next = ThreadRunGHC;
- IF_DEBUG(sanity, checkTSO(tso));
- return;
+ goto done;
}
case ATOMICALLY_FRAME:
if (stop_at_atomically) {
ASSERT(tso->trec->enclosing_trec == NO_TREC);
stmCondemnTransaction(cap, tso -> trec);
- tso->sp = frame - 2;
+ stack->sp = frame - 2;
// The ATOMICALLY_FRAME expects to be returned a
// result from the transaction, which it stores in the
// stack frame. Hence we arrange to return a dummy
// ATOMICALLY_FRAME instance for condemned
// transactions, but I don't fully understand the
// interaction with STM invariants.
- tso->sp[1] = (W_)&stg_NO_TREC_closure;
- tso->sp[0] = (W_)&stg_gc_unpt_r1_info;
- tso->what_next = ThreadRunGHC;
- return;
+ stack->sp[1] = (W_)&stg_NO_TREC_closure;
+ stack->sp[0] = (W_)&stg_gc_unpt_r1_info;
+ tso->what_next = ThreadRunGHC;
+ goto done;
}
// Not stop_at_atomically... fall through and abort the
// transaction.
stmAbortTransaction(cap, trec);
stmFreeAbortedTRec(cap, trec);
tso -> trec = outer;
- break;
+ break;
};
default:
frame += stack_frame_sizeW((StgClosure *)frame);
}
- // if we got here, then we stopped at stop_here
- ASSERT(stop_here != NULL);
+done:
+ IF_DEBUG(sanity, checkTSO(tso));
+
+ // wake it up
+ if (tso->why_blocked != NotBlocked) {
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+ }
+
+ return tso;
}