#include "PosixSource.h"
#include "Rts.h"
+
+#include "sm/Storage.h"
#include "Threads.h"
#include "Trace.h"
#include "RaiseAsync.h"
-#include "SMP.h"
#include "Schedule.h"
-#include "LdvProfile.h"
#include "Updates.h"
#include "STM.h"
-#include "Sanity.h"
+#include "sm/Sanity.h"
+#include "Profiling.h"
+#include "Messages.h"
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
StgTSO *tso,
StgClosure *exception,
rtsBool stop_at_atomically,
- StgPtr stop_here);
+ StgUpdateFrame *stop_here);
static void removeFromQueues(Capability *cap, StgTSO *tso);
-static void blockedThrowTo (StgTSO *source, StgTSO *target);
+static void removeFromMVarBlockedQueue (StgTSO *tso);
+
+static void blockedThrowTo (Capability *cap,
+ StgTSO *target, MessageThrowTo *msg);
-static void performBlockedException (Capability *cap,
- StgTSO *source, StgTSO *target);
+static void throwToSendMsg (Capability *cap USED_IF_THREADS,
+ Capability *target_cap USED_IF_THREADS,
+ MessageThrowTo *msg USED_IF_THREADS);
/* -----------------------------------------------------------------------------
throwToSingleThreaded
void
throwToSingleThreaded(Capability *cap, StgTSO *tso, StgClosure *exception)
{
- throwToSingleThreaded_(cap, tso, exception, rtsFalse, NULL);
+ throwToSingleThreaded_(cap, tso, exception, rtsFalse);
}
void
throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically, StgPtr stop_here)
+ rtsBool stop_at_atomically)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
// Remove it from any blocking queues
removeFromQueues(cap,tso);
- raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
+ raiseAsync(cap, tso, exception, stop_at_atomically, NULL);
}
void
-suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
+suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
may be blocked and could be woken up at any point by another CPU.
We have some delicate synchronisation to do.
- There is a completely safe fallback scheme: it is always possible
- to just block the source TSO on the target TSO's blocked_exceptions
- queue. This queue is locked using lockTSO()/unlockTSO(). It is
- checked at regular intervals: before and after running a thread
- (schedule() and threadPaused() respectively), and just before GC
- (scheduleDoGC()). Activating a thread on this queue should be done
- using maybePerformBlockedException(): this is done in the context
- of the target thread, so the exception can be raised eagerly.
-
- This fallback scheme works even if the target thread is complete or
- killed: scheduleDoGC() will discover the blocked thread before the
- target is GC'd.
-
- Blocking the source thread on the target thread's blocked_exception
- queue is also employed when the target thread is currently blocking
- exceptions (ie. inside Control.Exception.block).
-
- We could use the safe fallback scheme exclusively, but that
- wouldn't be ideal: most calls to throwTo would block immediately,
- possibly until the next GC, which might require the deadlock
- detection mechanism to kick in. So we try to provide promptness
- wherever possible.
-
- We can promptly deliver the exception if the target thread is:
-
- - runnable, on the same Capability as the source thread (because
- we own the run queue and therefore the target thread).
-
- - blocked, and we can obtain exclusive access to it. Obtaining
- exclusive access to the thread depends on how it is blocked.
-
- We must also be careful to not trip over threadStackOverflow(),
- which might be moving the TSO to enlarge its stack.
- lockTSO()/unlockTSO() are used here too.
-
+ The underlying scheme when multiple Capabilities are in use is
+ message passing: when the target of a throwTo is on another
+ Capability, we send a message (a MessageThrowTo closure) to that
+ Capability.
+
+ If the throwTo needs to block because the target TSO is masking
+ exceptions (the TSO_BLOCKEX flag), then the message is placed on
+ the blocked_exceptions queue attached to the target TSO. When the
+ target TSO enters the unmasked state again, it must check the
+ queue. The blocked_exceptions queue is not locked; only the
+ Capability owning the TSO may modify it.
+
+ To make things simpler for throwTo, we always create the message
+ first before deciding what to do. The message may get sent, or it
+ may get attached to a TSO's blocked_exceptions queue, or the
+ exception may get thrown immediately and the message dropped,
+ depending on the current state of the target.
+
+ Currently we send a message if the target belongs to another
+ Capability, and it is
+
+ - NotBlocked, BlockedOnMsgThrowTo,
+ BlockedOnCCall
+
+ - or it is masking exceptions (TSO_BLOCKEX)
+
+ Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
+ BlockedOnBlackHole then we acquire ownership of the TSO by locking
+ its parent container (e.g. the MVar) and then raise the exception.
+ We might change these cases to be more message-passing-like in the
+ future.
+
Returns:
- THROWTO_SUCCESS exception was raised, ok to continue
+ NULL exception was raised, ok to continue
- THROWTO_BLOCKED exception was not raised; block the source
- thread then call throwToReleaseTarget() when
- the source thread is properly tidied away.
+ MessageThrowTo * exception was not raised; the source TSO
+ should now put itself in the state
+ BlockedOnMsgThrowTo, and when it is ready
+ it should unlock the mssage using
+ unlockClosure(msg, &stg_MSG_THROWTO_info);
+ If it decides not to raise the exception after
+ all, it can revoke it safely with
+ unlockClosure(msg, &stg_MSG_NULL_info);
-------------------------------------------------------------------------- */
-nat
+MessageThrowTo *
throwTo (Capability *cap, // the Capability we hold
- StgTSO *source, // the TSO sending the exception
+ StgTSO *source, // the TSO sending the exception (or NULL)
StgTSO *target, // the TSO receiving the exception
- StgClosure *exception, // the exception closure
- /*[out]*/ void **out USED_IF_THREADS)
+ StgClosure *exception) // the exception closure
{
- StgWord status;
-
- // follow ThreadRelocated links in the target first
- while (target->what_next == ThreadRelocated) {
- target = target->link;
- // No, it might be a WHITEHOLE:
- // ASSERT(get_itbl(target)->type == TSO);
- }
+ MessageThrowTo *msg;
- debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu",
- (unsigned long)source->id, (unsigned long)target->id);
+ msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
+ // message starts locked; the caller has to unlock it when it is
+ // ready.
+ SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
+ msg->source = source;
+ msg->target = target;
+ msg->exception = exception;
-#ifdef DEBUG
- if (traceClass(DEBUG_sched)) {
- debugTraceBegin("throwTo: target");
- printThreadStatus(target);
- debugTraceEnd();
+ switch (throwToMsg(cap, msg))
+ {
+ case THROWTO_SUCCESS:
+ return NULL;
+ case THROWTO_BLOCKED:
+ default:
+ return msg;
}
-#endif
+}
+
+
+nat
+throwToMsg (Capability *cap, MessageThrowTo *msg)
+{
+ StgWord status;
+ StgTSO *target = msg->target;
+ Capability *target_cap;
goto check_target;
+
retry:
+ write_barrier();
debugTrace(DEBUG_sched, "throwTo: retrying...");
check_target:
+ ASSERT(target != END_TSO_QUEUE);
+
+ // follow ThreadRelocated links in the target first
+ target = deRefTSO(target);
+
// Thread already dead?
if (target->what_next == ThreadComplete
|| target->what_next == ThreadKilled) {
return THROWTO_SUCCESS;
}
+ debugTraceCap(DEBUG_sched, cap,
+ "throwTo: from thread %lu to thread %lu",
+ (unsigned long)msg->source->id,
+ (unsigned long)msg->target->id);
+
+#ifdef DEBUG
+ traceThreadStatus(DEBUG_sched, target);
+#endif
+
+ target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
+ }
+
status = target->why_blocked;
switch (status) {
case NotBlocked:
- /* if status==NotBlocked, and target->cap == cap, then
- we own this TSO and can raise the exception.
-
- How do we establish this condition? Very carefully.
-
- Let
- P = (status == NotBlocked)
- Q = (tso->cap == cap)
-
- Now, if P & Q are true, then the TSO is locked and owned by
- this capability. No other OS thread can steal it.
-
- If P==0 and Q==1: the TSO is blocked, but attached to this
- capabilty, and it can be stolen by another capability.
-
- If P==1 and Q==0: the TSO is runnable on another
- capability. At any time, the TSO may change from runnable
- to blocked and vice versa, while it remains owned by
- another capability.
-
- Suppose we test like this:
-
- p = P
- q = Q
- if (p && q) ...
-
- this is defeated by another capability stealing a blocked
- TSO from us to wake it up (Schedule.c:unblockOne()). The
- other thread is doing
-
- Q = 0
- P = 1
-
- assuming arbitrary reordering, we could see this
- interleaving:
-
- start: P==0 && Q==1
- P = 1
- p = P
- q = Q
- Q = 0
- if (p && q) ...
-
- so we need a memory barrier:
-
- p = P
- mb()
- q = Q
- if (p && q) ...
-
- this avoids the problematic case. There are other cases
- to consider, but this is the tricky one.
-
- Note that we must be sure that unblockOne() does the
- writes in the correct order: Q before P. The memory
- barrier ensures that if we have seen the write to P, we
- have also seen the write to Q.
- */
{
- Capability *target_cap;
+ if ((target->flags & TSO_BLOCKEX) == 0) {
+ // It's on our run queue and not blocking exceptions
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
+ } else {
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
+ }
+ }
- write_barrier();
- target_cap = target->cap;
- if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) {
- // It's on our run queue and not blocking exceptions
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- return THROWTO_SUCCESS;
- } else {
- // Otherwise, just block on the blocked_exceptions queue
- // of the target thread. The queue will get looked at
- // soon enough: it is checked before and after running a
- // thread, and during GC.
- lockTSO(target);
-
- // Avoid race with threadStackOverflow, which may have
- // just moved this TSO.
- if (target->what_next == ThreadRelocated) {
- unlockTSO(target);
- target = target->link;
- goto retry;
- }
- blockedThrowTo(source,target);
- *out = target;
- return THROWTO_BLOCKED;
- }
+ case BlockedOnMsgThrowTo:
+ {
+ const StgInfoTable *i;
+ MessageThrowTo *m;
+
+ m = target->block_info.throwto;
+
+ // target is local to this cap, but has sent a throwto
+ // message to another cap.
+ //
+ // The source message is locked. We need to revoke the
+ // target's message so that we can raise the exception, so
+ // we attempt to lock it.
+
+ // There's a possibility of a deadlock if two threads are both
+ // trying to throwTo each other (or more generally, a cycle of
+ // threads). To break the symmetry we compare the addresses
+ // of the MessageThrowTo objects, and the one for which m <
+ // msg gets to spin, while the other can only try to lock
+ // once, but must then back off and unlock both before trying
+ // again.
+ if (m < msg) {
+ i = lockClosure((StgClosure *)m);
+ } else {
+ i = tryLockClosure((StgClosure *)m);
+ if (i == NULL) {
+// debugBelch("collision\n");
+ throwToSendMsg(cap, target->cap, msg);
+ return THROWTO_BLOCKED;
+ }
+ }
+
+ if (i == &stg_MSG_NULL_info) {
+ // we know there's a MSG_TRY_WAKEUP on the way, so we
+ // might as well just do it now. The message will
+ // be a no-op when it arrives.
+ unlockClosure((StgClosure*)m, i);
+ tryWakeupThread_(cap, target);
+ goto retry;
+ }
+
+ if (i != &stg_MSG_THROWTO_info) {
+ // if it's a MSG_NULL, this TSO has been woken up by another Cap
+ unlockClosure((StgClosure*)m, i);
+ goto retry;
+ }
+
+ if ((target->flags & TSO_BLOCKEX) &&
+ ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+ unlockClosure((StgClosure*)m, i);
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
+ }
+
+ // nobody else can wake up this TSO after we claim the message
+ unlockClosure((StgClosure*)m, &stg_MSG_NULL_info);
+
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
}
case BlockedOnMVar:
// ASSUMPTION: tso->block_info must always point to a
// closure. In the threaded RTS it does.
- if (get_itbl(mvar)->type != MVAR) goto retry;
+ switch (get_itbl(mvar)->type) {
+ case MVAR_CLEAN:
+ case MVAR_DIRTY:
+ break;
+ default:
+ goto retry;
+ }
info = lockClosure((StgClosure *)mvar);
if (target->what_next == ThreadRelocated) {
- target = target->link;
+ target = target->_link;
unlockClosure((StgClosure *)mvar,info);
goto retry;
}
goto retry;
}
+ if (target->_link == END_TSO_QUEUE) {
+ // the MVar operation has already completed. There is a
+ // MSG_TRY_WAKEUP on the way, but we can just wake up the
+ // thread now anyway and ignore the message when it
+ // arrives.
+ unlockClosure((StgClosure *)mvar, info);
+ tryWakeupThread_(cap, target);
+ goto retry;
+ }
+
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- lockClosure((StgClosure *)target);
- blockedThrowTo(source,target);
+ blockedThrowTo(cap,target,msg);
unlockClosure((StgClosure *)mvar, info);
- *out = target;
- return THROWTO_BLOCKED; // caller releases TSO
+ return THROWTO_BLOCKED;
} else {
- removeThreadFromMVarQueue(mvar, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- unblockOne(cap, target);
+ // revoke the MVar operation
+ removeFromMVarBlockedQueue(target);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_SUCCESS;
}
case BlockedOnBlackHole:
{
- ACQUIRE_LOCK(&sched_mutex);
- // double checking the status after the memory barrier:
- if (target->why_blocked != BlockedOnBlackHole) {
- RELEASE_LOCK(&sched_mutex);
- goto retry;
- }
-
if (target->flags & TSO_BLOCKEX) {
- lockTSO(target);
- blockedThrowTo(source,target);
- RELEASE_LOCK(&sched_mutex);
- *out = target;
- return THROWTO_BLOCKED; // caller releases TSO
- } else {
- removeThreadFromQueue(&blackhole_queue, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- unblockOne(cap, target);
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_SUCCESS;
- }
- }
-
- case BlockedOnException:
- {
- StgTSO *target2;
- StgInfoTable *info;
-
- /*
- To obtain exclusive access to a BlockedOnException thread,
- we must call lockClosure() on the TSO on which it is blocked.
- Since the TSO might change underneath our feet, after we
- call lockClosure() we must check that
-
- (a) the closure we locked is actually a TSO
- (b) the original thread is still BlockedOnException,
- (c) the original thread is still blocked on the TSO we locked
- and (d) the target thread has not been relocated.
-
- We synchronise with threadStackOverflow() (which relocates
- threads) using lockClosure()/unlockClosure().
- */
- target2 = target->block_info.tso;
-
- info = lockClosure((StgClosure *)target2);
- if (info != &stg_TSO_info) {
- unlockClosure((StgClosure *)target2, info);
- goto retry;
- }
- if (target->what_next == ThreadRelocated) {
- target = target->link;
- unlockTSO(target2);
- goto retry;
- }
- if (target2->what_next == ThreadRelocated) {
- target->block_info.tso = target2->link;
- unlockTSO(target2);
- goto retry;
- }
- if (target->why_blocked != BlockedOnException
- || target->block_info.tso != target2) {
- unlockTSO(target2);
- goto retry;
- }
-
- /*
- Now we have exclusive rights to the target TSO...
-
- If it is blocking exceptions, add the source TSO to its
- blocked_exceptions queue. Otherwise, raise the exception.
- */
- if ((target->flags & TSO_BLOCKEX) &&
- ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- lockTSO(target);
- blockedThrowTo(source,target);
- unlockTSO(target2);
- *out = target;
+ // BlockedOnBlackHole is not interruptible.
+ blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
} else {
- removeThreadFromQueue(&target2->blocked_exceptions, target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- unblockOne(cap, target);
- unlockTSO(target2);
- return THROWTO_SUCCESS;
- }
- }
+ // Revoke the message by replacing it with IND. We're not
+ // locking anything here, so we might still get a TRY_WAKEUP
+ // message from the owner of the blackhole some time in the
+ // future, but that doesn't matter.
+ ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+ OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
+ }
+ }
case BlockedOnSTM:
lockTSO(target);
// Unblocking BlockedOnSTM threads requires the TSO to be
// locked; see STM.c:unpark_tso().
if (target->why_blocked != BlockedOnSTM) {
+ unlockTSO(target);
goto retry;
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- blockedThrowTo(source,target);
- *out = target;
+ blockedThrowTo(cap,target,msg);
+ unlockTSO(target);
return THROWTO_BLOCKED;
} else {
- raiseAsync(cap, target, exception, rtsFalse, NULL);
- unblockOne(cap, target);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
unlockTSO(target);
return THROWTO_SUCCESS;
}
case BlockedOnCCall:
case BlockedOnCCall_NoUnblockExc:
- // I don't think it's possible to acquire ownership of a
- // BlockedOnCCall thread. We just assume that the target
- // thread is blocking exceptions, and block on its
- // blocked_exception queue.
- lockTSO(target);
- blockedThrowTo(source,target);
- *out = target;
+ blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
#ifndef THREADEDED_RTS
#endif
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- blockedThrowTo(source,target);
+ blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
} else {
removeFromQueues(cap,target);
- raiseAsync(cap, target, exception, rtsFalse, NULL);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
return THROWTO_SUCCESS;
}
#endif
barf("throwTo");
}
-// Block a TSO on another TSO's blocked_exceptions queue.
-// Precondition: we hold an exclusive lock on the target TSO (this is
-// complex to achieve as there's no single lock on a TSO; see
-// throwTo()).
static void
-blockedThrowTo (StgTSO *source, StgTSO *target)
+throwToSendMsg (Capability *cap STG_UNUSED,
+ Capability *target_cap USED_IF_THREADS,
+ MessageThrowTo *msg USED_IF_THREADS)
+
{
- debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id);
- source->link = target->blocked_exceptions;
- target->blocked_exceptions = source;
- dirtyTSO(target); // we modified the blocked_exceptions queue
-
- source->block_info.tso = target;
- write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is.
- source->why_blocked = BlockedOnException;
-}
+#ifdef THREADED_RTS
+ debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
+ sendMessage(cap, target_cap, (Message*)msg);
+#endif
+}
-#ifdef THREADED_RTS
-void
-throwToReleaseTarget (void *tso)
+// Block a throwTo message on the target TSO's blocked_exceptions
+// queue. The current Capability must own the target TSO in order to
+// modify the blocked_exceptions queue.
+static void
+blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
{
- unlockTSO((StgTSO *)tso);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
+ (unsigned long)target->id);
+
+ ASSERT(target->cap == cap);
+
+ msg->link = target->blocked_exceptions;
+ target->blocked_exceptions = msg;
+ dirty_TSO(cap,target); // we modified the blocked_exceptions queue
}
-#endif
/* -----------------------------------------------------------------------------
Waking up threads blocked in throwTo
int
maybePerformBlockedException (Capability *cap, StgTSO *tso)
{
- StgTSO *source;
+ MessageThrowTo *msg;
+ const StgInfoTable *i;
- if (tso->blocked_exceptions != END_TSO_QUEUE
- && ((tso->flags & TSO_BLOCKEX) == 0
- || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
+ if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
+ awakenBlockedExceptionQueue(cap,tso);
+ return 1;
+ } else {
+ return 0;
+ }
+ }
- // Lock the TSO, this gives us exclusive access to the queue
- lockTSO(tso);
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
+ (tso->flags & TSO_BLOCKEX) != 0) {
+ debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+ }
- // Check the queue again; it might have changed before we
- // locked it.
- if (tso->blocked_exceptions == END_TSO_QUEUE) {
- unlockTSO(tso);
- return 0;
- }
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
+ && ((tso->flags & TSO_BLOCKEX) == 0
+ || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
// We unblock just the first thread on the queue, and perform
// its throw immediately.
- source = tso->blocked_exceptions;
- performBlockedException(cap, source, tso);
- tso->blocked_exceptions = unblockOne_(cap, source,
- rtsFalse/*no migrate*/);
- unlockTSO(tso);
+ loop:
+ msg = tso->blocked_exceptions;
+ if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
+ i = lockClosure((StgClosure*)msg);
+ tso->blocked_exceptions = (MessageThrowTo*)msg->link;
+ if (i == &stg_MSG_NULL_info) {
+ unlockClosure((StgClosure*)msg,i);
+ goto loop;
+ }
+
+ throwToSingleThreaded(cap, msg->target, msg->exception);
+ unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info);
+ tryWakeupThread(cap, msg->source);
return 1;
}
return 0;
}
+// awakenBlockedExceptionQueue(): Just wake up the whole queue of
+// blocked exceptions.
+
void
awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
{
- if (tso->blocked_exceptions != END_TSO_QUEUE) {
- lockTSO(tso);
- awakenBlockedQueue(cap, tso->blocked_exceptions);
- tso->blocked_exceptions = END_TSO_QUEUE;
- unlockTSO(tso);
+ MessageThrowTo *msg;
+ const StgInfoTable *i;
+
+ for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
+ msg = (MessageThrowTo*)msg->link) {
+ i = lockClosure((StgClosure *)msg);
+ if (i != &stg_MSG_NULL_info) {
+ unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info);
+ tryWakeupThread(cap, msg->source);
+ } else {
+ unlockClosure((StgClosure *)msg,i);
+ }
}
+ tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
}
-static void
-performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
-{
- StgClosure *exception;
-
- ASSERT(source->why_blocked == BlockedOnException);
- ASSERT(source->block_info.tso->id == target->id);
- ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
- ASSERT(((StgTSO *)source->sp[1])->id == target->id);
- // check ids not pointers, because the thread might be relocated
-
- exception = (StgClosure *)source->sp[2];
- throwToSingleThreaded(cap, target, exception);
- source->sp += 3;
-}
-
/* -----------------------------------------------------------------------------
Remove a thread from blocking queues.
This is for use when we raise an exception in another thread, which
may be blocked.
- This has nothing to do with the UnblockThread event in GranSim. -- HWL
- -------------------------------------------------------------------------- */
-
-#if defined(GRAN) || defined(PARALLEL_HASKELL)
-/*
- NB: only the type of the blocking queue is different in GranSim and GUM
- the operations on the queue-elements are the same
- long live polymorphism!
- Locks: sched_mutex is held upon entry and exit.
+ Precondition: we have exclusive access to the TSO, via the same set
+ of conditions as throwToSingleThreaded() (c.f.).
+ -------------------------------------------------------------------------- */
-*/
static void
-removeFromQueues(Capability *cap, StgTSO *tso)
+removeFromMVarBlockedQueue (StgTSO *tso)
{
- StgBlockingQueueElement *t, **last;
-
- switch (tso->why_blocked) {
-
- case NotBlocked:
- return; /* not blocked */
+ StgMVar *mvar = (StgMVar*)tso->block_info.closure;
+ StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
- case BlockedOnSTM:
- // Be careful: nothing to do here! We tell the scheduler that the thread
- // is runnable and we leave it to the stack-walking code to abort the
- // transaction while unwinding the stack. We should perhaps have a debugging
- // test to make sure that this really happens and that the 'zombie' transaction
- // does not get committed.
- goto done;
-
- case BlockedOnMVar:
- ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
- {
- StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
- StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
-
- last = (StgBlockingQueueElement **)&mvar->head;
- for (t = (StgBlockingQueueElement *)mvar->head;
- t != END_BQ_QUEUE;
- last = &t->link, last_tso = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- if (mvar->tail == tso) {
- mvar->tail = (StgTSO *)last_tso;
- }
- goto done;
- }
- }
- barf("removeFromQueues (MVAR): TSO not found");
- }
-
- case BlockedOnBlackHole:
- ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
- {
- StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
-
- last = &bq->blocking_queue;
- for (t = bq->blocking_queue;
- t != END_BQ_QUEUE;
- last = &t->link, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- goto done;
- }
- }
- barf("removeFromQueues (BLACKHOLE): TSO not found");
+ if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
+ // already removed from this MVar
+ return;
}
- case BlockedOnException:
- {
- StgTSO *target = tso->block_info.tso;
-
- ASSERT(get_itbl(target)->type == TSO);
-
- while (target->what_next == ThreadRelocated) {
- target = target2->link;
- ASSERT(get_itbl(target)->type == TSO);
- }
-
- last = (StgBlockingQueueElement **)&target->blocked_exceptions;
- for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
- t != END_BQ_QUEUE;
- last = &t->link, t = t->link) {
- ASSERT(get_itbl(t)->type == TSO);
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- goto done;
- }
- }
- barf("removeFromQueues (Exception): TSO not found");
+ // Assume the MVar is locked. (not assertable; sometimes it isn't
+ // actually WHITEHOLE'd).
+
+ // We want to remove the MVAR_TSO_QUEUE object from the queue. It
+ // isn't doubly-linked so we can't actually remove it; instead we
+ // just overwrite it with an IND if possible and let the GC short
+ // it out. However, we have to be careful to maintain the deque
+ // structure:
+
+ if (mvar->head == q) {
+ mvar->head = q->link;
+ q->header.info = &stg_IND_info;
+ if (mvar->tail == q) {
+ mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
+ }
}
-
- case BlockedOnRead:
- case BlockedOnWrite:
-#if defined(mingw32_HOST_OS)
- case BlockedOnDoProc:
-#endif
- {
- /* take TSO off blocked_queue */
- StgBlockingQueueElement *prev = NULL;
- for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
- prev = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- if (prev == NULL) {
- blocked_queue_hd = (StgTSO *)t->link;
- if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
- blocked_queue_tl = END_TSO_QUEUE;
- }
- } else {
- prev->link = t->link;
- if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
- blocked_queue_tl = (StgTSO *)prev;
- }
- }
-#if defined(mingw32_HOST_OS)
- /* (Cooperatively) signal that the worker thread should abort
- * the request.
- */
- abandonWorkRequest(tso->block_info.async_result->reqID);
-#endif
- goto done;
- }
- }
- barf("removeFromQueues (I/O): TSO not found");
+ else if (mvar->tail == q) {
+ // we can't replace it with an IND in this case, because then
+ // we lose the tail pointer when the GC shorts out the IND.
+ // So we use MSG_NULL as a kind of non-dupable indirection;
+ // these are ignored by takeMVar/putMVar.
+ q->header.info = &stg_MSG_NULL_info;
}
-
- case BlockedOnDelay:
- {
- /* take TSO off sleeping_queue */
- StgBlockingQueueElement *prev = NULL;
- for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
- prev = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- if (prev == NULL) {
- sleeping_queue = (StgTSO *)t->link;
- } else {
- prev->link = t->link;
- }
- goto done;
- }
- }
- barf("removeFromQueues (delay): TSO not found");
+ else {
+ q->header.info = &stg_IND_info;
}
- default:
- barf("removeFromQueues");
- }
-
- done:
- tso->link = END_TSO_QUEUE;
- tso->why_blocked = NotBlocked;
- tso->block_info.closure = NULL;
- pushOnRunQueue(cap,tso);
+ // revoke the MVar operation
+ tso->_link = END_TSO_QUEUE;
}
-#else
+
static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
switch (tso->why_blocked) {
case NotBlocked:
+ case ThreadMigrating:
return;
case BlockedOnSTM:
goto done;
case BlockedOnMVar:
- removeThreadFromMVarQueue((StgMVar *)tso->block_info.closure, tso);
+ removeFromMVarBlockedQueue(tso);
goto done;
case BlockedOnBlackHole:
- removeThreadFromQueue(&blackhole_queue, tso);
+ // nothing to do
goto done;
- case BlockedOnException:
- {
- StgTSO *target = tso->block_info.tso;
-
- // NO: when called by threadPaused(), we probably have this
- // TSO already locked (WHITEHOLEd) because we just placed
- // ourselves on its queue.
- // ASSERT(get_itbl(target)->type == TSO);
-
- while (target->what_next == ThreadRelocated) {
- target = target->link;
- }
-
- removeThreadFromQueue(&target->blocked_exceptions, tso);
- goto done;
- }
+ case BlockedOnMsgThrowTo:
+ {
+ MessageThrowTo *m = tso->block_info.throwto;
+ // The message is locked by us, unless we got here via
+ // deleteAllThreads(), in which case we own all the
+ // capabilities.
+ // ASSERT(m->header.info == &stg_WHITEHOLE_info);
+
+ // unlock and revoke it at the same time
+ unlockClosure((StgClosure*)m,&stg_MSG_NULL_info);
+ break;
+ }
#if !defined(THREADED_RTS)
case BlockedOnRead:
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
#endif
- removeThreadFromDeQueue(&blocked_queue_hd, &blocked_queue_tl, tso);
+ removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
#if defined(mingw32_HOST_OS)
/* (Cooperatively) signal that the worker thread should abort
* the request.
goto done;
case BlockedOnDelay:
- removeThreadFromQueue(&sleeping_queue, tso);
+ removeThreadFromQueue(cap, &sleeping_queue, tso);
goto done;
#endif
default:
- barf("removeFromQueues");
+ barf("removeFromQueues: %d", tso->why_blocked);
}
done:
- tso->link = END_TSO_QUEUE;
tso->why_blocked = NotBlocked;
- tso->block_info.closure = NULL;
- appendToRunQueue(cap,tso);
-
- // We might have just migrated this TSO to our Capability:
- if (tso->bound) {
- tso->bound->cap = cap;
- }
- tso->cap = cap;
+ appendToRunQueue(cap, tso);
}
-#endif
/* -----------------------------------------------------------------------------
* raiseAsync()
* asynchronous exception in an existing thread.
*
* We first remove the thread from any queue on which it might be
- * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
+ * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
+ * TSO blocked_exception queues.
*
* We strip the stack down to the innermost CATCH_FRAME, building
* thunks in the heap for all the active computations, so they can
static void
raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically, StgPtr stop_here)
+ rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
{
StgRetInfoTable *info;
StgPtr sp, frame;
+ StgClosure *updatee;
nat i;
- debugTrace(DEBUG_sched,
- "raising exception in thread %ld.", (long)tso->id);
+ debugTraceCap(DEBUG_sched, cap,
+ "raising exception in thread %ld.", (long)tso->id);
+#if defined(PROFILING)
+ /*
+ * Debugging tool: on raising an exception, show where we are.
+ * See also Exception.cmm:stg_raisezh.
+ * This wasn't done for asynchronous exceptions originally; see #1450
+ */
+ if (RtsFlags.ProfFlags.showCCSOnException)
+ {
+ fprintCCS_stderr(tso->prof.CCCS);
+ }
+#endif
+ // ASSUMES: the thread is not already complete or dead, or
+ // ThreadRelocated. Upper layers should deal with that.
+ ASSERT(tso->what_next != ThreadComplete &&
+ tso->what_next != ThreadKilled &&
+ tso->what_next != ThreadRelocated);
+
+ // only if we own this TSO (except that deleteThread() calls this
+ ASSERT(tso->cap == cap);
+
+ // wake it up
+ if (tso->why_blocked != NotBlocked) {
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+ }
+
// mark it dirty; we're about to change its stack.
- dirtyTSO(tso);
+ dirty_TSO(cap, tso);
sp = tso->sp;
- // ASSUMES: the thread is not already complete or dead. Upper
- // layers should deal with that.
- ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled);
+ if (stop_here != NULL) {
+ updatee = stop_here->updatee;
+ } else {
+ updatee = NULL;
+ }
// The stack freezing code assumes there's a closure pointer on
// the top of the stack, so we have to arrange that this is the case...
}
frame = sp + 1;
- while (stop_here == NULL || frame < stop_here) {
+ while (stop_here == NULL || frame < (StgPtr)stop_here) {
// 1. Let the top of the stack be the "current closure"
//
// fun field.
//
words = frame - sp - 1;
- ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
+ ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
ap->size = words;
ap->fun = (StgClosure *)sp[0];
// printObj((StgClosure *)ap);
// );
- // Replace the updatee with an indirection
- //
- // Warning: if we're in a loop, more than one update frame on
- // the stack may point to the same object. Be careful not to
- // overwrite an IND_OLDGEN in this case, because we'll screw
- // up the mutable lists. To be on the safe side, don't
- // overwrite any kind of indirection at all. See also
- // threadSqueezeStack in GC.c, where we have to make a similar
- // check.
- //
- if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
- // revert the black hole
- UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
- (StgClosure *)ap);
- }
+ if (((StgUpdateFrame *)frame)->updatee == updatee) {
+ // If this update frame points to the same closure as
+ // the update frame further down the stack
+ // (stop_here), then don't perform the update. We
+ // want to keep the blackhole in this case, so we can
+ // detect and report the loop (#2783).
+ ap = (StgAP_STACK*)updatee;
+ } else {
+ // Perform the update
+ // TODO: this may waste some work, if the thunk has
+ // already been updated by another thread.
+ updateThunk(cap, tso,
+ ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+ }
+
sp += sizeofW(StgUpdateFrame) - 1;
sp[0] = (W_)ap; // push onto stack
frame = sp + 1;
}
case STOP_FRAME:
+ {
// We've stripped the entire stack, the thread is now dead.
tso->what_next = ThreadKilled;
tso->sp = frame + sizeofW(StgStopFrame);
return;
+ }
case CATCH_FRAME:
// If we find a CATCH_FRAME, and we've got an exception to raise,
// top of the CATCH_FRAME ready to enter.
//
{
-#ifdef PROFILING
StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
StgThunk *raise;
if (exception == NULL) break;
// we've got an exception to raise, so let's pass it to the
// handler in this frame.
//
- raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
+ raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
TICK_ALLOC_SE_THK(1,0);
SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
raise->payload[0] = exception;
* a surprise exception before we get around to executing the
* handler.
*/
- tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE;
+ tso->flags |= TSO_BLOCKEX;
+ if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
+ tso->flags &= ~TSO_INTERRUPTIBLE;
+ } else {
+ tso->flags |= TSO_INTERRUPTIBLE;
+ }
/* Put the newly-built THUNK on top of the stack, ready to execute
* when the thread restarts.
case ATOMICALLY_FRAME:
if (stop_at_atomically) {
- ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+ ASSERT(tso->trec->enclosing_trec == NO_TREC);
stmCondemnTransaction(cap, tso -> trec);
-#ifdef REG_R1
- tso->sp = frame;
-#else
- // R1 is not a register: the return convention for IO in
- // this case puts the return value on the stack, so we
- // need to set up the stack to return to the atomically
- // frame properly...
tso->sp = frame - 2;
- tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
- tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
-#endif
+ // The ATOMICALLY_FRAME expects to be returned a
+ // result from the transaction, which it stores in the
+ // stack frame. Hence we arrange to return a dummy
+ // result, so that the GC doesn't get upset (#3578).
+ // Perhaps a better way would be to have a different
+ // ATOMICALLY_FRAME instance for condemned
+ // transactions, but I don't fully understand the
+ // interaction with STM invariants.
+ tso->sp[1] = (W_)&stg_NO_TREC_closure;
+ tso->sp[0] = (W_)&stg_gc_unpt_r1_info;
tso->what_next = ThreadRunGHC;
return;
}
// Not stop_at_atomically... fall through and abort the
// transaction.
+ case CATCH_STM_FRAME:
case CATCH_RETRY_FRAME:
// IF we find an ATOMICALLY_FRAME then we abort the
// current transaction and propagate the exception. In
// whether the transaction is valid or not because its
// possible validity cannot have caused the exception
// and will not be visible after the abort.
- debugTrace(DEBUG_stm,
- "found atomically block delivering async exception");
+ {
StgTRecHeader *trec = tso -> trec;
- StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+ StgTRecHeader *outer = trec -> enclosing_trec;
+ debugTraceCap(DEBUG_stm, cap,
+ "found atomically block delivering async exception");
stmAbortTransaction(cap, trec);
stmFreeAbortedTRec(cap, trec);
tso -> trec = outer;
break;
+ };
default:
break;