#include "PosixSource.h"
#include "Rts.h"
+
+#include "sm/Storage.h"
#include "Threads.h"
#include "Trace.h"
#include "RaiseAsync.h"
-#include "SMP.h"
#include "Schedule.h"
-#include "Storage.h"
-#include "LdvProfile.h"
#include "Updates.h"
#include "STM.h"
-#include "Sanity.h"
+#include "sm/Sanity.h"
+#include "Profiling.h"
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
StgTSO *tso,
StgClosure *exception,
rtsBool stop_at_atomically,
- StgPtr stop_here);
+ StgUpdateFrame *stop_here);
static void removeFromQueues(Capability *cap, StgTSO *tso);
-static void blockedThrowTo (StgTSO *source, StgTSO *target);
+static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target);
static void performBlockedException (Capability *cap,
StgTSO *source, StgTSO *target);
void
throwToSingleThreaded(Capability *cap, StgTSO *tso, StgClosure *exception)
{
- throwToSingleThreaded_(cap, tso, exception, rtsFalse, NULL);
+ throwToSingleThreaded_(cap, tso, exception, rtsFalse);
}
void
throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically, StgPtr stop_here)
+ rtsBool stop_at_atomically)
{
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
// Remove it from any blocking queues
removeFromQueues(cap,tso);
- raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
+ raiseAsync(cap, tso, exception, stop_at_atomically, NULL);
}
void
-suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
+suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
{
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
{
StgWord status;
+ ASSERT(target != END_TSO_QUEUE);
+
// follow ThreadRelocated links in the target first
while (target->what_next == ThreadRelocated) {
- target = target->link;
+ target = target->_link;
// No, it might be a WHITEHOLE:
// ASSERT(get_itbl(target)->type == TSO);
}
(unsigned long)source->id, (unsigned long)target->id);
#ifdef DEBUG
- if (traceClass(DEBUG_sched)) {
- debugTraceBegin("throwTo: target");
- printThreadStatus(target);
- debugTraceEnd();
- }
+ traceThreadStatus(DEBUG_sched, target);
#endif
goto check_target;
debugTrace(DEBUG_sched, "throwTo: retrying...");
check_target:
+ ASSERT(target != END_TSO_QUEUE);
+
// Thread already dead?
if (target->what_next == ThreadComplete
|| target->what_next == ThreadKilled) {
// just moved this TSO.
if (target->what_next == ThreadRelocated) {
unlockTSO(target);
- target = target->link;
+ target = target->_link;
goto retry;
}
- blockedThrowTo(source,target);
+ // check again for ThreadComplete and ThreadKilled. This
+ // cooperates with scheduleHandleThreadFinished to ensure
+ // that we never miss any threads that are throwing an
+ // exception to a thread in the process of terminating.
+ if (target->what_next == ThreadComplete
+ || target->what_next == ThreadKilled) {
+ unlockTSO(target);
+ return THROWTO_SUCCESS;
+ }
+ blockedThrowTo(cap,source,target);
*out = target;
return THROWTO_BLOCKED;
}
// ASSUMPTION: tso->block_info must always point to a
// closure. In the threaded RTS it does.
- if (get_itbl(mvar)->type != MVAR) goto retry;
+ switch (get_itbl(mvar)->type) {
+ case MVAR_CLEAN:
+ case MVAR_DIRTY:
+ break;
+ default:
+ goto retry;
+ }
info = lockClosure((StgClosure *)mvar);
if (target->what_next == ThreadRelocated) {
- target = target->link;
+ target = target->_link;
unlockClosure((StgClosure *)mvar,info);
goto retry;
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
lockClosure((StgClosure *)target);
- blockedThrowTo(source,target);
+ blockedThrowTo(cap,source,target);
unlockClosure((StgClosure *)mvar, info);
*out = target;
return THROWTO_BLOCKED; // caller releases TSO
} else {
- removeThreadFromMVarQueue(mvar, target);
+ removeThreadFromMVarQueue(cap, mvar, target);
raiseAsync(cap, target, exception, rtsFalse, NULL);
unblockOne(cap, target);
unlockClosure((StgClosure *)mvar, info);
if (target->flags & TSO_BLOCKEX) {
lockTSO(target);
- blockedThrowTo(source,target);
+ blockedThrowTo(cap,source,target);
RELEASE_LOCK(&sched_mutex);
*out = target;
return THROWTO_BLOCKED; // caller releases TSO
} else {
- removeThreadFromQueue(&blackhole_queue, target);
+ removeThreadFromQueue(cap, &blackhole_queue, target);
raiseAsync(cap, target, exception, rtsFalse, NULL);
unblockOne(cap, target);
RELEASE_LOCK(&sched_mutex);
goto retry;
}
if (target->what_next == ThreadRelocated) {
- target = target->link;
+ target = target->_link;
unlockTSO(target2);
goto retry;
}
if (target2->what_next == ThreadRelocated) {
- target->block_info.tso = target2->link;
+ target->block_info.tso = target2->_link;
unlockTSO(target2);
goto retry;
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
lockTSO(target);
- blockedThrowTo(source,target);
+ blockedThrowTo(cap,source,target);
unlockTSO(target2);
*out = target;
return THROWTO_BLOCKED;
} else {
- removeThreadFromQueue(&target2->blocked_exceptions, target);
+ removeThreadFromQueue(cap, &target2->blocked_exceptions, target);
raiseAsync(cap, target, exception, rtsFalse, NULL);
unblockOne(cap, target);
unlockTSO(target2);
// Unblocking BlockedOnSTM threads requires the TSO to be
// locked; see STM.c:unpark_tso().
if (target->why_blocked != BlockedOnSTM) {
+ unlockTSO(target);
goto retry;
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- blockedThrowTo(source,target);
+ blockedThrowTo(cap,source,target);
*out = target;
return THROWTO_BLOCKED;
} else {
// thread is blocking exceptions, and block on its
// blocked_exception queue.
lockTSO(target);
- blockedThrowTo(source,target);
+ if (target->why_blocked != BlockedOnCCall &&
+ target->why_blocked != BlockedOnCCall_NoUnblockExc) {
+ unlockTSO(target);
+ goto retry;
+ }
+ blockedThrowTo(cap,source,target);
*out = target;
return THROWTO_BLOCKED;
#endif
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- blockedThrowTo(source,target);
+ blockedThrowTo(cap,source,target);
return THROWTO_BLOCKED;
} else {
removeFromQueues(cap,target);
// complex to achieve as there's no single lock on a TSO; see
// throwTo()).
static void
-blockedThrowTo (StgTSO *source, StgTSO *target)
+blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target)
{
debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id);
- source->link = target->blocked_exceptions;
+ setTSOLink(cap, source, target->blocked_exceptions);
target->blocked_exceptions = source;
- dirtyTSO(target); // we modified the blocked_exceptions queue
+ dirty_TSO(cap,target); // we modified the blocked_exceptions queue
source->block_info.tso = target;
write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is.
queue, but not perform any throwTo() immediately. This might be
more appropriate when the target thread is the one actually running
(see Exception.cmm).
+
+ Returns: non-zero if an exception was raised, zero otherwise.
-------------------------------------------------------------------------- */
-void
+int
maybePerformBlockedException (Capability *cap, StgTSO *tso)
{
StgTSO *source;
+ if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
+ if (tso->blocked_exceptions != END_TSO_QUEUE) {
+ awakenBlockedExceptionQueue(cap,tso);
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ if (tso->blocked_exceptions != END_TSO_QUEUE &&
+ (tso->flags & TSO_BLOCKEX) != 0) {
+ debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+ }
+
if (tso->blocked_exceptions != END_TSO_QUEUE
&& ((tso->flags & TSO_BLOCKEX) == 0
|| ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
// locked it.
if (tso->blocked_exceptions == END_TSO_QUEUE) {
unlockTSO(tso);
- return;
+ return 0;
}
// We unblock just the first thread on the queue, and perform
tso->blocked_exceptions = unblockOne_(cap, source,
rtsFalse/*no migrate*/);
unlockTSO(tso);
+ return 1;
}
+ return 0;
}
+// awakenBlockedExceptionQueue(): Just wake up the whole queue of
+// blocked exceptions and let them try again.
+
void
awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
{
- if (tso->blocked_exceptions != END_TSO_QUEUE) {
- lockTSO(tso);
- awakenBlockedQueue(cap, tso->blocked_exceptions);
- tso->blocked_exceptions = END_TSO_QUEUE;
- unlockTSO(tso);
- }
+ lockTSO(tso);
+ awakenBlockedQueue(cap, tso->blocked_exceptions);
+ tso->blocked_exceptions = END_TSO_QUEUE;
+ unlockTSO(tso);
}
static void
This is for use when we raise an exception in another thread, which
may be blocked.
- This has nothing to do with the UnblockThread event in GranSim. -- HWL
- -------------------------------------------------------------------------- */
-
-#if defined(GRAN) || defined(PARALLEL_HASKELL)
-/*
- NB: only the type of the blocking queue is different in GranSim and GUM
- the operations on the queue-elements are the same
- long live polymorphism!
-
- Locks: sched_mutex is held upon entry and exit.
-
-*/
-static void
-removeFromQueues(Capability *cap, StgTSO *tso)
-{
- StgBlockingQueueElement *t, **last;
-
- switch (tso->why_blocked) {
-
- case NotBlocked:
- return; /* not blocked */
-
- case BlockedOnSTM:
- // Be careful: nothing to do here! We tell the scheduler that the thread
- // is runnable and we leave it to the stack-walking code to abort the
- // transaction while unwinding the stack. We should perhaps have a debugging
- // test to make sure that this really happens and that the 'zombie' transaction
- // does not get committed.
- goto done;
-
- case BlockedOnMVar:
- ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
- {
- StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
- StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
-
- last = (StgBlockingQueueElement **)&mvar->head;
- for (t = (StgBlockingQueueElement *)mvar->head;
- t != END_BQ_QUEUE;
- last = &t->link, last_tso = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- if (mvar->tail == tso) {
- mvar->tail = (StgTSO *)last_tso;
- }
- goto done;
- }
- }
- barf("removeFromQueues (MVAR): TSO not found");
- }
-
- case BlockedOnBlackHole:
- ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
- {
- StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
-
- last = &bq->blocking_queue;
- for (t = bq->blocking_queue;
- t != END_BQ_QUEUE;
- last = &t->link, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- goto done;
- }
- }
- barf("removeFromQueues (BLACKHOLE): TSO not found");
- }
-
- case BlockedOnException:
- {
- StgTSO *target = tso->block_info.tso;
-
- ASSERT(get_itbl(target)->type == TSO);
-
- while (target->what_next == ThreadRelocated) {
- target = target2->link;
- ASSERT(get_itbl(target)->type == TSO);
- }
-
- last = (StgBlockingQueueElement **)&target->blocked_exceptions;
- for (t = (StgBlockingQueueElement *)target->blocked_exceptions;
- t != END_BQ_QUEUE;
- last = &t->link, t = t->link) {
- ASSERT(get_itbl(t)->type == TSO);
- if (t == (StgBlockingQueueElement *)tso) {
- *last = (StgBlockingQueueElement *)tso->link;
- goto done;
- }
- }
- barf("removeFromQueues (Exception): TSO not found");
- }
-
- case BlockedOnRead:
- case BlockedOnWrite:
-#if defined(mingw32_HOST_OS)
- case BlockedOnDoProc:
-#endif
- {
- /* take TSO off blocked_queue */
- StgBlockingQueueElement *prev = NULL;
- for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE;
- prev = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- if (prev == NULL) {
- blocked_queue_hd = (StgTSO *)t->link;
- if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
- blocked_queue_tl = END_TSO_QUEUE;
- }
- } else {
- prev->link = t->link;
- if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
- blocked_queue_tl = (StgTSO *)prev;
- }
- }
-#if defined(mingw32_HOST_OS)
- /* (Cooperatively) signal that the worker thread should abort
- * the request.
- */
- abandonWorkRequest(tso->block_info.async_result->reqID);
-#endif
- goto done;
- }
- }
- barf("removeFromQueues (I/O): TSO not found");
- }
-
- case BlockedOnDelay:
- {
- /* take TSO off sleeping_queue */
- StgBlockingQueueElement *prev = NULL;
- for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE;
- prev = t, t = t->link) {
- if (t == (StgBlockingQueueElement *)tso) {
- if (prev == NULL) {
- sleeping_queue = (StgTSO *)t->link;
- } else {
- prev->link = t->link;
- }
- goto done;
- }
- }
- barf("removeFromQueues (delay): TSO not found");
- }
- default:
- barf("removeFromQueues");
- }
+ Precondition: we have exclusive access to the TSO, via the same set
+ of conditions as throwToSingleThreaded() (c.f.).
+ -------------------------------------------------------------------------- */
- done:
- tso->link = END_TSO_QUEUE;
- tso->why_blocked = NotBlocked;
- tso->block_info.closure = NULL;
- pushOnRunQueue(cap,tso);
-}
-#else
static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
goto done;
case BlockedOnMVar:
- removeThreadFromMVarQueue((StgMVar *)tso->block_info.closure, tso);
+ removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
goto done;
case BlockedOnBlackHole:
- removeThreadFromQueue(&blackhole_queue, tso);
+ removeThreadFromQueue(cap, &blackhole_queue, tso);
goto done;
case BlockedOnException:
// ASSERT(get_itbl(target)->type == TSO);
while (target->what_next == ThreadRelocated) {
- target = target->link;
+ target = target->_link;
}
- removeThreadFromQueue(&target->blocked_exceptions, tso);
+ removeThreadFromQueue(cap, &target->blocked_exceptions, tso);
goto done;
}
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
#endif
- removeThreadFromDeQueue(&blocked_queue_hd, &blocked_queue_tl, tso);
+ removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
#if defined(mingw32_HOST_OS)
/* (Cooperatively) signal that the worker thread should abort
* the request.
goto done;
case BlockedOnDelay:
- removeThreadFromQueue(&sleeping_queue, tso);
+ removeThreadFromQueue(cap, &sleeping_queue, tso);
goto done;
#endif
default:
- barf("removeFromQueues");
+ barf("removeFromQueues: %d", tso->why_blocked);
}
done:
- tso->link = END_TSO_QUEUE;
- tso->why_blocked = NotBlocked;
- tso->block_info.closure = NULL;
- appendToRunQueue(cap,tso);
-
- // We might have just migrated this TSO to our Capability:
- if (tso->bound) {
- tso->bound->cap = cap;
- }
- tso->cap = cap;
+ unblockOne(cap, tso);
}
-#endif
/* -----------------------------------------------------------------------------
* raiseAsync()
static void
raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically, StgPtr stop_here)
+ rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
{
StgRetInfoTable *info;
StgPtr sp, frame;
+ StgClosure *updatee;
nat i;
debugTrace(DEBUG_sched,
"raising exception in thread %ld.", (long)tso->id);
+#if defined(PROFILING)
+ /*
+ * Debugging tool: on raising an exception, show where we are.
+ * See also Exception.cmm:stg_raisezh.
+ * This wasn't done for asynchronous exceptions originally; see #1450
+ */
+ if (RtsFlags.ProfFlags.showCCSOnException)
+ {
+ fprintCCS_stderr(tso->prof.CCCS);
+ }
+#endif
+
// mark it dirty; we're about to change its stack.
- dirtyTSO(tso);
+ dirty_TSO(cap, tso);
sp = tso->sp;
// layers should deal with that.
ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled);
+ if (stop_here != NULL) {
+ updatee = stop_here->updatee;
+ } else {
+ updatee = NULL;
+ }
+
// The stack freezing code assumes there's a closure pointer on
// the top of the stack, so we have to arrange that this is the case...
//
}
frame = sp + 1;
- while (stop_here == NULL || frame < stop_here) {
+ while (stop_here == NULL || frame < (StgPtr)stop_here) {
// 1. Let the top of the stack be the "current closure"
//
// fun field.
//
words = frame - sp - 1;
- ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
+ ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
ap->size = words;
ap->fun = (StgClosure *)sp[0];
// printObj((StgClosure *)ap);
// );
- // Replace the updatee with an indirection
- //
- // Warning: if we're in a loop, more than one update frame on
- // the stack may point to the same object. Be careful not to
- // overwrite an IND_OLDGEN in this case, because we'll screw
- // up the mutable lists. To be on the safe side, don't
- // overwrite any kind of indirection at all. See also
- // threadSqueezeStack in GC.c, where we have to make a similar
- // check.
- //
- if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
- // revert the black hole
- UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
- (StgClosure *)ap);
- }
+ if (((StgUpdateFrame *)frame)->updatee == updatee) {
+ // If this update frame points to the same closure as
+ // the update frame further down the stack
+ // (stop_here), then don't perform the update. We
+ // want to keep the blackhole in this case, so we can
+ // detect and report the loop (#2783).
+ ap = (StgAP_STACK*)updatee;
+ } else {
+ // Perform the update
+ // TODO: this may waste some work, if the thunk has
+ // already been updated by another thread.
+ UPD_IND(((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+ }
+
sp += sizeofW(StgUpdateFrame) - 1;
sp[0] = (W_)ap; // push onto stack
frame = sp + 1;
}
case STOP_FRAME:
+ {
// We've stripped the entire stack, the thread is now dead.
tso->what_next = ThreadKilled;
tso->sp = frame + sizeofW(StgStopFrame);
return;
+ }
case CATCH_FRAME:
// If we find a CATCH_FRAME, and we've got an exception to raise,
// we've got an exception to raise, so let's pass it to the
// handler in this frame.
//
- raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
+ raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
TICK_ALLOC_SE_THK(1,0);
SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
raise->payload[0] = exception;
case ATOMICALLY_FRAME:
if (stop_at_atomically) {
- ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+ ASSERT(tso->trec->enclosing_trec == NO_TREC);
stmCondemnTransaction(cap, tso -> trec);
-#ifdef REG_R1
- tso->sp = frame;
-#else
- // R1 is not a register: the return convention for IO in
- // this case puts the return value on the stack, so we
- // need to set up the stack to return to the atomically
- // frame properly...
tso->sp = frame - 2;
- tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
- tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
-#endif
+ // The ATOMICALLY_FRAME expects to be returned a
+ // result from the transaction, which it stores in the
+ // stack frame. Hence we arrange to return a dummy
+ // result, so that the GC doesn't get upset (#3578).
+ // Perhaps a better way would be to have a different
+ // ATOMICALLY_FRAME instance for condemned
+ // transactions, but I don't fully understand the
+ // interaction with STM invariants.
+ tso->sp[1] = (W_)&stg_NO_TREC_closure;
+ tso->sp[0] = (W_)&stg_gc_unpt_r1_info;
tso->what_next = ThreadRunGHC;
return;
}
// Not stop_at_atomically... fall through and abort the
// transaction.
+ case CATCH_STM_FRAME:
case CATCH_RETRY_FRAME:
// IF we find an ATOMICALLY_FRAME then we abort the
// current transaction and propagate the exception. In
// whether the transaction is valid or not because its
// possible validity cannot have caused the exception
// and will not be visible after the abort.
- debugTrace(DEBUG_stm,
- "found atomically block delivering async exception");
+ {
StgTRecHeader *trec = tso -> trec;
- StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+ StgTRecHeader *outer = trec -> enclosing_trec;
+ debugTrace(DEBUG_stm,
+ "found atomically block delivering async exception");
stmAbortTransaction(cap, trec);
stmFreeAbortedTRec(cap, trec);
tso -> trec = outer;
break;
+ };
default:
break;