X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FRaiseAsync.c;h=39c973bd1de8b706eeb22af55de333defbb90fa3;hp=4a405e7b4909f23511e822873c50e3184670fe26;hb=a2a67cd520b9841114d69a87a423dabcb3b4368e;hpb=ab0e778ccfde61aed4c22679b24d175fc6cc9bf3 diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index 4a405e7..39c973b 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -8,16 +8,17 @@ #include "PosixSource.h" #include "Rts.h" + +#include "sm/Storage.h" #include "Threads.h" #include "Trace.h" #include "RaiseAsync.h" -#include "SMP.h" -#include "Storage.h" #include "Schedule.h" -#include "LdvProfile.h" #include "Updates.h" #include "STM.h" #include "Sanity.h" +#include "Profiling.h" +#include "eventlog/EventLog.h" #if defined(mingw32_HOST_OS) #include "win32/IOManager.h" #endif @@ -26,11 +27,11 @@ static void raiseAsync (Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically, - StgPtr stop_here); + StgUpdateFrame *stop_here); static void removeFromQueues(Capability *cap, StgTSO *tso); -static void blockedThrowTo (StgTSO *source, StgTSO *target); +static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target); static void performBlockedException (Capability *cap, StgTSO *source, StgTSO *target); @@ -55,12 +56,12 @@ static void performBlockedException (Capability *cap, void throwToSingleThreaded(Capability *cap, StgTSO *tso, StgClosure *exception) { - throwToSingleThreaded_(cap, tso, exception, rtsFalse, NULL); + throwToSingleThreaded_(cap, tso, exception, rtsFalse); } void throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically, StgPtr stop_here) + rtsBool stop_at_atomically) { // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { @@ -70,11 +71,11 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, // Remove it from any blocking queues removeFromQueues(cap,tso); - raiseAsync(cap, tso, exception, stop_at_atomically, stop_here); + raiseAsync(cap, tso, exception, stop_at_atomically, NULL); } void -suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here) +suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) { // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { @@ -152,7 +153,7 @@ throwTo (Capability *cap, // the Capability we hold // follow ThreadRelocated links in the target first while (target->what_next == ThreadRelocated) { - target = target->link; + target = target->_link; // No, it might be a WHITEHOLE: // ASSERT(get_itbl(target)->type == TSO); } @@ -261,10 +262,19 @@ check_target: // just moved this TSO. if (target->what_next == ThreadRelocated) { unlockTSO(target); - target = target->link; + target = target->_link; goto retry; } - blockedThrowTo(source,target); + // check again for ThreadComplete and ThreadKilled. This + // cooperates with scheduleHandleThreadFinished to ensure + // that we never miss any threads that are throwing an + // exception to a thread in the process of terminating. + if (target->what_next == ThreadComplete + || target->what_next == ThreadKilled) { + unlockTSO(target); + return THROWTO_SUCCESS; + } + blockedThrowTo(cap,source,target); *out = target; return THROWTO_BLOCKED; } @@ -283,12 +293,18 @@ check_target: // ASSUMPTION: tso->block_info must always point to a // closure. In the threaded RTS it does. - if (get_itbl(mvar)->type != MVAR) goto retry; + switch (get_itbl(mvar)->type) { + case MVAR_CLEAN: + case MVAR_DIRTY: + break; + default: + goto retry; + } info = lockClosure((StgClosure *)mvar); if (target->what_next == ThreadRelocated) { - target = target->link; + target = target->_link; unlockClosure((StgClosure *)mvar,info); goto retry; } @@ -303,12 +319,12 @@ check_target: if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { lockClosure((StgClosure *)target); - blockedThrowTo(source,target); + blockedThrowTo(cap,source,target); unlockClosure((StgClosure *)mvar, info); *out = target; return THROWTO_BLOCKED; // caller releases TSO } else { - removeThreadFromMVarQueue(mvar, target); + removeThreadFromMVarQueue(cap, mvar, target); raiseAsync(cap, target, exception, rtsFalse, NULL); unblockOne(cap, target); unlockClosure((StgClosure *)mvar, info); @@ -327,12 +343,12 @@ check_target: if (target->flags & TSO_BLOCKEX) { lockTSO(target); - blockedThrowTo(source,target); + blockedThrowTo(cap,source,target); RELEASE_LOCK(&sched_mutex); *out = target; return THROWTO_BLOCKED; // caller releases TSO } else { - removeThreadFromQueue(&blackhole_queue, target); + removeThreadFromQueue(cap, &blackhole_queue, target); raiseAsync(cap, target, exception, rtsFalse, NULL); unblockOne(cap, target); RELEASE_LOCK(&sched_mutex); @@ -367,12 +383,12 @@ check_target: goto retry; } if (target->what_next == ThreadRelocated) { - target = target->link; + target = target->_link; unlockTSO(target2); goto retry; } if (target2->what_next == ThreadRelocated) { - target->block_info.tso = target2->link; + target->block_info.tso = target2->_link; unlockTSO(target2); goto retry; } @@ -391,12 +407,12 @@ check_target: if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { lockTSO(target); - blockedThrowTo(source,target); + blockedThrowTo(cap,source,target); unlockTSO(target2); *out = target; return THROWTO_BLOCKED; } else { - removeThreadFromQueue(&target2->blocked_exceptions, target); + removeThreadFromQueue(cap, &target2->blocked_exceptions, target); raiseAsync(cap, target, exception, rtsFalse, NULL); unblockOne(cap, target); unlockTSO(target2); @@ -409,11 +425,12 @@ check_target: // Unblocking BlockedOnSTM threads requires the TSO to be // locked; see STM.c:unpark_tso(). if (target->why_blocked != BlockedOnSTM) { + unlockTSO(target); goto retry; } if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - blockedThrowTo(source,target); + blockedThrowTo(cap,source,target); *out = target; return THROWTO_BLOCKED; } else { @@ -430,7 +447,12 @@ check_target: // thread is blocking exceptions, and block on its // blocked_exception queue. lockTSO(target); - blockedThrowTo(source,target); + if (target->why_blocked != BlockedOnCCall && + target->why_blocked != BlockedOnCCall_NoUnblockExc) { + unlockTSO(target); + goto retry; + } + blockedThrowTo(cap,source,target); *out = target; return THROWTO_BLOCKED; @@ -443,7 +465,7 @@ check_target: #endif if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - blockedThrowTo(source,target); + blockedThrowTo(cap,source,target); return THROWTO_BLOCKED; } else { removeFromQueues(cap,target); @@ -463,12 +485,12 @@ check_target: // complex to achieve as there's no single lock on a TSO; see // throwTo()). static void -blockedThrowTo (StgTSO *source, StgTSO *target) +blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target) { debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id); - source->link = target->blocked_exceptions; + setTSOLink(cap, source, target->blocked_exceptions); target->blocked_exceptions = source; - dirtyTSO(target); // we modified the blocked_exceptions queue + dirty_TSO(cap,target); // we modified the blocked_exceptions queue source->block_info.tso = target; write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is. @@ -497,13 +519,29 @@ throwToReleaseTarget (void *tso) queue, but not perform any throwTo() immediately. This might be more appropriate when the target thread is the one actually running (see Exception.cmm). + + Returns: non-zero if an exception was raised, zero otherwise. -------------------------------------------------------------------------- */ -void +int maybePerformBlockedException (Capability *cap, StgTSO *tso) { StgTSO *source; + if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) { + if (tso->blocked_exceptions != END_TSO_QUEUE) { + awakenBlockedExceptionQueue(cap,tso); + return 1; + } else { + return 0; + } + } + + if (tso->blocked_exceptions != END_TSO_QUEUE && + (tso->flags & TSO_BLOCKEX) != 0) { + debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); + } + if (tso->blocked_exceptions != END_TSO_QUEUE && ((tso->flags & TSO_BLOCKEX) == 0 || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) { @@ -515,7 +553,7 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) // locked it. if (tso->blocked_exceptions == END_TSO_QUEUE) { unlockTSO(tso); - return; + return 0; } // We unblock just the first thread on the queue, and perform @@ -525,18 +563,21 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) tso->blocked_exceptions = unblockOne_(cap, source, rtsFalse/*no migrate*/); unlockTSO(tso); + return 1; } + return 0; } +// awakenBlockedExceptionQueue(): Just wake up the whole queue of +// blocked exceptions and let them try again. + void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) { - if (tso->blocked_exceptions != END_TSO_QUEUE) { - lockTSO(tso); - awakenBlockedQueue(cap, tso->blocked_exceptions); - tso->blocked_exceptions = END_TSO_QUEUE; - unlockTSO(tso); - } + lockTSO(tso); + awakenBlockedQueue(cap, tso->blocked_exceptions); + tso->blocked_exceptions = END_TSO_QUEUE; + unlockTSO(tso); } static void @@ -560,161 +601,11 @@ performBlockedException (Capability *cap, StgTSO *source, StgTSO *target) This is for use when we raise an exception in another thread, which may be blocked. - This has nothing to do with the UnblockThread event in GranSim. -- HWL - -------------------------------------------------------------------------- */ - -#if defined(GRAN) || defined(PARALLEL_HASKELL) -/* - NB: only the type of the blocking queue is different in GranSim and GUM - the operations on the queue-elements are the same - long live polymorphism! - - Locks: sched_mutex is held upon entry and exit. - -*/ -static void -removeFromQueues(Capability *cap, StgTSO *tso) -{ - StgBlockingQueueElement *t, **last; - - switch (tso->why_blocked) { - - case NotBlocked: - return; /* not blocked */ - - case BlockedOnSTM: - // Be careful: nothing to do here! We tell the scheduler that the thread - // is runnable and we leave it to the stack-walking code to abort the - // transaction while unwinding the stack. We should perhaps have a debugging - // test to make sure that this really happens and that the 'zombie' transaction - // does not get committed. - goto done; - - case BlockedOnMVar: - ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); - { - StgBlockingQueueElement *last_tso = END_BQ_QUEUE; - StgMVar *mvar = (StgMVar *)(tso->block_info.closure); - - last = (StgBlockingQueueElement **)&mvar->head; - for (t = (StgBlockingQueueElement *)mvar->head; - t != END_BQ_QUEUE; - last = &t->link, last_tso = t, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - *last = (StgBlockingQueueElement *)tso->link; - if (mvar->tail == tso) { - mvar->tail = (StgTSO *)last_tso; - } - goto done; - } - } - barf("removeFromQueues (MVAR): TSO not found"); - } - - case BlockedOnBlackHole: - ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ); - { - StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure); - - last = &bq->blocking_queue; - for (t = bq->blocking_queue; - t != END_BQ_QUEUE; - last = &t->link, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - *last = (StgBlockingQueueElement *)tso->link; - goto done; - } - } - barf("removeFromQueues (BLACKHOLE): TSO not found"); - } - - case BlockedOnException: - { - StgTSO *target = tso->block_info.tso; - - ASSERT(get_itbl(target)->type == TSO); - - while (target->what_next == ThreadRelocated) { - target = target2->link; - ASSERT(get_itbl(target)->type == TSO); - } - last = (StgBlockingQueueElement **)&target->blocked_exceptions; - for (t = (StgBlockingQueueElement *)target->blocked_exceptions; - t != END_BQ_QUEUE; - last = &t->link, t = t->link) { - ASSERT(get_itbl(t)->type == TSO); - if (t == (StgBlockingQueueElement *)tso) { - *last = (StgBlockingQueueElement *)tso->link; - goto done; - } - } - barf("removeFromQueues (Exception): TSO not found"); - } - - case BlockedOnRead: - case BlockedOnWrite: -#if defined(mingw32_HOST_OS) - case BlockedOnDoProc: -#endif - { - /* take TSO off blocked_queue */ - StgBlockingQueueElement *prev = NULL; - for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; - prev = t, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - if (prev == NULL) { - blocked_queue_hd = (StgTSO *)t->link; - if ((StgBlockingQueueElement *)blocked_queue_tl == t) { - blocked_queue_tl = END_TSO_QUEUE; - } - } else { - prev->link = t->link; - if ((StgBlockingQueueElement *)blocked_queue_tl == t) { - blocked_queue_tl = (StgTSO *)prev; - } - } -#if defined(mingw32_HOST_OS) - /* (Cooperatively) signal that the worker thread should abort - * the request. - */ - abandonWorkRequest(tso->block_info.async_result->reqID); -#endif - goto done; - } - } - barf("removeFromQueues (I/O): TSO not found"); - } - - case BlockedOnDelay: - { - /* take TSO off sleeping_queue */ - StgBlockingQueueElement *prev = NULL; - for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; - prev = t, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - if (prev == NULL) { - sleeping_queue = (StgTSO *)t->link; - } else { - prev->link = t->link; - } - goto done; - } - } - barf("removeFromQueues (delay): TSO not found"); - } - - default: - barf("removeFromQueues"); - } + Precondition: we have exclusive access to the TSO, via the same set + of conditions as throwToSingleThreaded() (c.f.). + -------------------------------------------------------------------------- */ - done: - tso->link = END_TSO_QUEUE; - tso->why_blocked = NotBlocked; - tso->block_info.closure = NULL; - pushOnRunQueue(cap,tso); -} -#else static void removeFromQueues(Capability *cap, StgTSO *tso) { @@ -733,11 +624,11 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; case BlockedOnMVar: - removeThreadFromMVarQueue((StgMVar *)tso->block_info.closure, tso); + removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso); goto done; case BlockedOnBlackHole: - removeThreadFromQueue(&blackhole_queue, tso); + removeThreadFromQueue(cap, &blackhole_queue, tso); goto done; case BlockedOnException: @@ -750,10 +641,10 @@ removeFromQueues(Capability *cap, StgTSO *tso) // ASSERT(get_itbl(target)->type == TSO); while (target->what_next == ThreadRelocated) { - target = target->link; + target = target->_link; } - removeThreadFromQueue(&target->blocked_exceptions, tso); + removeThreadFromQueue(cap, &target->blocked_exceptions, tso); goto done; } @@ -763,7 +654,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) #if defined(mingw32_HOST_OS) case BlockedOnDoProc: #endif - removeThreadFromDeQueue(&blocked_queue_hd, &blocked_queue_tl, tso); + removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso); #if defined(mingw32_HOST_OS) /* (Cooperatively) signal that the worker thread should abort * the request. @@ -773,27 +664,17 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; case BlockedOnDelay: - removeThreadFromQueue(&sleeping_queue, tso); + removeThreadFromQueue(cap, &sleeping_queue, tso); goto done; #endif default: - barf("removeFromQueues"); + barf("removeFromQueues: %d", tso->why_blocked); } done: - tso->link = END_TSO_QUEUE; - tso->why_blocked = NotBlocked; - tso->block_info.closure = NULL; - appendToRunQueue(cap,tso); - - // We might have just migrated this TSO to our Capability: - if (tso->bound) { - tso->bound->cap = cap; - } - tso->cap = cap; + unblockOne(cap, tso); } -#endif /* ----------------------------------------------------------------------------- * raiseAsync() @@ -834,17 +715,30 @@ removeFromQueues(Capability *cap, StgTSO *tso) static void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically, StgPtr stop_here) + rtsBool stop_at_atomically, StgUpdateFrame *stop_here) { StgRetInfoTable *info; StgPtr sp, frame; + StgClosure *updatee; nat i; debugTrace(DEBUG_sched, "raising exception in thread %ld.", (long)tso->id); +#if defined(PROFILING) + /* + * Debugging tool: on raising an exception, show where we are. + * See also Exception.cmm:raisezh_fast. + * This wasn't done for asynchronous exceptions originally; see #1450 + */ + if (RtsFlags.ProfFlags.showCCSOnException) + { + fprintCCS_stderr(tso->prof.CCCS); + } +#endif + // mark it dirty; we're about to change its stack. - dirtyTSO(tso); + dirty_TSO(cap, tso); sp = tso->sp; @@ -852,6 +746,12 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // layers should deal with that. ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled); + if (stop_here != NULL) { + updatee = stop_here->updatee; + } else { + updatee = NULL; + } + // The stack freezing code assumes there's a closure pointer on // the top of the stack, so we have to arrange that this is the case... // @@ -863,7 +763,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, } frame = sp + 1; - while (stop_here == NULL || frame < stop_here) { + while (stop_here == NULL || frame < (StgPtr)stop_here) { // 1. Let the top of the stack be the "current closure" // @@ -917,21 +817,20 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // printObj((StgClosure *)ap); // ); - // Replace the updatee with an indirection - // - // Warning: if we're in a loop, more than one update frame on - // the stack may point to the same object. Be careful not to - // overwrite an IND_OLDGEN in this case, because we'll screw - // up the mutable lists. To be on the safe side, don't - // overwrite any kind of indirection at all. See also - // threadSqueezeStack in GC.c, where we have to make a similar - // check. - // - if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) { - // revert the black hole - UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee, - (StgClosure *)ap); - } + if (((StgUpdateFrame *)frame)->updatee == updatee) { + // If this update frame points to the same closure as + // the update frame further down the stack + // (stop_here), then don't perform the update. We + // want to keep the blackhole in this case, so we can + // detect and report the loop (#2783). + ap = (StgAP_STACK*)updatee; + } else { + // Perform the update + // TODO: this may waste some work, if the thunk has + // already been updated by another thread. + UPD_IND(((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); + } + sp += sizeofW(StgUpdateFrame) - 1; sp[0] = (W_)ap; // push onto stack frame = sp + 1; @@ -939,10 +838,12 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, } case STOP_FRAME: + { // We've stripped the entire stack, the thread is now dead. tso->what_next = ThreadKilled; tso->sp = frame + sizeofW(StgStopFrame); return; + } case CATCH_FRAME: // If we find a CATCH_FRAME, and we've got an exception to raise, @@ -990,23 +891,14 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, if (stop_at_atomically) { ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); stmCondemnTransaction(cap, tso -> trec); -#ifdef REG_R1 tso->sp = frame; -#else - // R1 is not a register: the return convention for IO in - // this case puts the return value on the stack, so we - // need to set up the stack to return to the atomically - // frame properly... - tso->sp = frame - 2; - tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not? - tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info; -#endif tso->what_next = ThreadRunGHC; return; } // Not stop_at_atomically... fall through and abort the // transaction. + case CATCH_STM_FRAME: case CATCH_RETRY_FRAME: // IF we find an ATOMICALLY_FRAME then we abort the // current transaction and propagate the exception. In @@ -1014,15 +906,17 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // whether the transaction is valid or not because its // possible validity cannot have caused the exception // and will not be visible after the abort. - debugTrace(DEBUG_stm, - "found atomically block delivering async exception"); + { StgTRecHeader *trec = tso -> trec; StgTRecHeader *outer = stmGetEnclosingTRec(trec); + debugTrace(DEBUG_stm, + "found atomically block delivering async exception"); stmAbortTransaction(cap, trec); stmFreeAbortedTRec(cap, trec); tso -> trec = outer; break; + }; default: break;