X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FRaiseAsync.c;h=c8a38565afcd9aa9af2c506cbc695803562e63f7;hb=21146fa414558ee31b08b14792feed71778bccdf;hp=10d91a7d80a9fbefdc29f45b0a95adec589ec266;hpb=aac6f4e51b57094702ca21ea55c48c91019deed8;p=ghc-hetmet.git diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index 10d91a7..c8a3856 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -8,15 +8,15 @@ #include "PosixSource.h" #include "Rts.h" + +#include "sm/Storage.h" #include "Threads.h" #include "Trace.h" #include "RaiseAsync.h" -#include "SMP.h" #include "Schedule.h" -#include "LdvProfile.h" #include "Updates.h" #include "STM.h" -#include "Sanity.h" +#include "sm/Sanity.h" #include "Profiling.h" #if defined(mingw32_HOST_OS) #include "win32/IOManager.h" @@ -26,7 +26,7 @@ static void raiseAsync (Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically, - StgPtr stop_here); + StgUpdateFrame *stop_here); static void removeFromQueues(Capability *cap, StgTSO *tso); @@ -55,12 +55,12 @@ static void performBlockedException (Capability *cap, void throwToSingleThreaded(Capability *cap, StgTSO *tso, StgClosure *exception) { - throwToSingleThreaded_(cap, tso, exception, rtsFalse, NULL); + throwToSingleThreaded_(cap, tso, exception, rtsFalse); } void throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically, StgPtr stop_here) + rtsBool stop_at_atomically) { // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { @@ -70,11 +70,11 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, // Remove it from any blocking queues removeFromQueues(cap,tso); - raiseAsync(cap, tso, exception, stop_at_atomically, stop_here); + raiseAsync(cap, tso, exception, stop_at_atomically, NULL); } void -suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here) +suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) { // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { @@ -150,6 +150,8 @@ throwTo (Capability *cap, // the Capability we hold { StgWord status; + ASSERT(target != END_TSO_QUEUE); + // follow ThreadRelocated links in the target first while (target->what_next == ThreadRelocated) { target = target->_link; @@ -161,11 +163,7 @@ throwTo (Capability *cap, // the Capability we hold (unsigned long)source->id, (unsigned long)target->id); #ifdef DEBUG - if (traceClass(DEBUG_sched)) { - debugTraceBegin("throwTo: target"); - printThreadStatus(target); - debugTraceEnd(); - } + traceThreadStatus(DEBUG_sched, target); #endif goto check_target; @@ -173,6 +171,8 @@ retry: debugTrace(DEBUG_sched, "throwTo: retrying..."); check_target: + ASSERT(target != END_TSO_QUEUE); + // Thread already dead? if (target->what_next == ThreadComplete || target->what_next == ThreadKilled) { @@ -264,6 +264,15 @@ check_target: target = target->_link; goto retry; } + // check again for ThreadComplete and ThreadKilled. This + // cooperates with scheduleHandleThreadFinished to ensure + // that we never miss any threads that are throwing an + // exception to a thread in the process of terminating. + if (target->what_next == ThreadComplete + || target->what_next == ThreadKilled) { + unlockTSO(target); + return THROWTO_SUCCESS; + } blockedThrowTo(cap,source,target); *out = target; return THROWTO_BLOCKED; @@ -415,6 +424,7 @@ check_target: // Unblocking BlockedOnSTM threads requires the TSO to be // locked; see STM.c:unpark_tso(). if (target->why_blocked != BlockedOnSTM) { + unlockTSO(target); goto retry; } if ((target->flags & TSO_BLOCKEX) && @@ -436,6 +446,11 @@ check_target: // thread is blocking exceptions, and block on its // blocked_exception queue. lockTSO(target); + if (target->why_blocked != BlockedOnCCall && + target->why_blocked != BlockedOnCCall_NoUnblockExc) { + unlockTSO(target); + goto retry; + } blockedThrowTo(cap,source,target); *out = target; return THROWTO_BLOCKED; @@ -512,6 +527,15 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) { StgTSO *source; + if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) { + if (tso->blocked_exceptions != END_TSO_QUEUE) { + awakenBlockedExceptionQueue(cap,tso); + return 1; + } else { + return 0; + } + } + if (tso->blocked_exceptions != END_TSO_QUEUE && (tso->flags & TSO_BLOCKEX) != 0) { debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); @@ -543,15 +567,16 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) return 0; } +// awakenBlockedExceptionQueue(): Just wake up the whole queue of +// blocked exceptions and let them try again. + void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) { - if (tso->blocked_exceptions != END_TSO_QUEUE) { - lockTSO(tso); - awakenBlockedQueue(cap, tso->blocked_exceptions); - tso->blocked_exceptions = END_TSO_QUEUE; - unlockTSO(tso); - } + lockTSO(tso); + awakenBlockedQueue(cap, tso->blocked_exceptions); + tso->blocked_exceptions = END_TSO_QUEUE; + unlockTSO(tso); } static void @@ -576,166 +601,10 @@ performBlockedException (Capability *cap, StgTSO *source, StgTSO *target) This is for use when we raise an exception in another thread, which may be blocked. - Precondition: we have exclusive access to the TSO, which entails - holding a lock on the object that owns the queue, if the TSO is - blocked. e.g. if the thread is blocked on an MVar, we must hold a - lock on the MVar before calling removeFromQueues(). - - This has nothing to do with the UnblockThread event in GranSim. -- HWL + Precondition: we have exclusive access to the TSO, via the same set + of conditions as throwToSingleThreaded() (c.f.). -------------------------------------------------------------------------- */ -#if defined(GRAN) || defined(PARALLEL_HASKELL) -/* - NB: only the type of the blocking queue is different in GranSim and GUM - the operations on the queue-elements are the same - long live polymorphism! - - Locks: sched_mutex is held upon entry and exit. - -*/ -static void -removeFromQueues(Capability *cap, StgTSO *tso) -{ - StgBlockingQueueElement *t, **last; - - switch (tso->why_blocked) { - - case NotBlocked: - return; /* not blocked */ - - case BlockedOnSTM: - // Be careful: nothing to do here! We tell the scheduler that the thread - // is runnable and we leave it to the stack-walking code to abort the - // transaction while unwinding the stack. We should perhaps have a debugging - // test to make sure that this really happens and that the 'zombie' transaction - // does not get committed. - goto done; - - case BlockedOnMVar: - ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); - { - StgBlockingQueueElement *last_tso = END_BQ_QUEUE; - StgMVar *mvar = (StgMVar *)(tso->block_info.closure); - - last = (StgBlockingQueueElement **)&mvar->head; - for (t = (StgBlockingQueueElement *)mvar->head; - t != END_BQ_QUEUE; - last = &t->link, last_tso = t, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - *last = (StgBlockingQueueElement *)tso->link; - if (mvar->tail == tso) { - mvar->tail = (StgTSO *)last_tso; - } - goto done; - } - } - barf("removeFromQueues (MVAR): TSO not found"); - } - - case BlockedOnBlackHole: - ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ); - { - StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure); - - last = &bq->blocking_queue; - for (t = bq->blocking_queue; - t != END_BQ_QUEUE; - last = &t->link, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - *last = (StgBlockingQueueElement *)tso->link; - goto done; - } - } - barf("removeFromQueues (BLACKHOLE): TSO not found"); - } - - case BlockedOnException: - { - StgTSO *target = tso->block_info.tso; - - ASSERT(get_itbl(target)->type == TSO); - - while (target->what_next == ThreadRelocated) { - target = target2->link; - ASSERT(get_itbl(target)->type == TSO); - } - - last = (StgBlockingQueueElement **)&target->blocked_exceptions; - for (t = (StgBlockingQueueElement *)target->blocked_exceptions; - t != END_BQ_QUEUE; - last = &t->link, t = t->link) { - ASSERT(get_itbl(t)->type == TSO); - if (t == (StgBlockingQueueElement *)tso) { - *last = (StgBlockingQueueElement *)tso->link; - goto done; - } - } - barf("removeFromQueues (Exception): TSO not found"); - } - - case BlockedOnRead: - case BlockedOnWrite: -#if defined(mingw32_HOST_OS) - case BlockedOnDoProc: -#endif - { - /* take TSO off blocked_queue */ - StgBlockingQueueElement *prev = NULL; - for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; - prev = t, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - if (prev == NULL) { - blocked_queue_hd = (StgTSO *)t->link; - if ((StgBlockingQueueElement *)blocked_queue_tl == t) { - blocked_queue_tl = END_TSO_QUEUE; - } - } else { - prev->link = t->link; - if ((StgBlockingQueueElement *)blocked_queue_tl == t) { - blocked_queue_tl = (StgTSO *)prev; - } - } -#if defined(mingw32_HOST_OS) - /* (Cooperatively) signal that the worker thread should abort - * the request. - */ - abandonWorkRequest(tso->block_info.async_result->reqID); -#endif - goto done; - } - } - barf("removeFromQueues (I/O): TSO not found"); - } - - case BlockedOnDelay: - { - /* take TSO off sleeping_queue */ - StgBlockingQueueElement *prev = NULL; - for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; - prev = t, t = t->link) { - if (t == (StgBlockingQueueElement *)tso) { - if (prev == NULL) { - sleeping_queue = (StgTSO *)t->link; - } else { - prev->link = t->link; - } - goto done; - } - } - barf("removeFromQueues (delay): TSO not found"); - } - - default: - barf("removeFromQueues: %d", tso->why_blocked); - } - - done: - tso->link = END_TSO_QUEUE; - tso->why_blocked = NotBlocked; - tso->block_info.closure = NULL; - pushOnRunQueue(cap,tso); -} -#else static void removeFromQueues(Capability *cap, StgTSO *tso) { @@ -758,9 +627,6 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; case BlockedOnBlackHole: - // we have exclusive access to this TSO, which implies that we - // must hold sched_mutex: - ASSERT_LOCK_HELD(&sched_mutex); removeThreadFromQueue(cap, &blackhole_queue, tso); goto done; @@ -806,18 +672,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) } done: - tso->_link = END_TSO_QUEUE; // no write barrier reqd - tso->why_blocked = NotBlocked; - tso->block_info.closure = NULL; - appendToRunQueue(cap,tso); - - // We might have just migrated this TSO to our Capability: - if (tso->bound) { - tso->bound->cap = cap; - } - tso->cap = cap; + unblockOne(cap, tso); } -#endif /* ----------------------------------------------------------------------------- * raiseAsync() @@ -858,10 +714,11 @@ removeFromQueues(Capability *cap, StgTSO *tso) static void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically, StgPtr stop_here) + rtsBool stop_at_atomically, StgUpdateFrame *stop_here) { StgRetInfoTable *info; StgPtr sp, frame; + StgClosure *updatee; nat i; debugTrace(DEBUG_sched, @@ -870,7 +727,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, #if defined(PROFILING) /* * Debugging tool: on raising an exception, show where we are. - * See also Exception.cmm:raisezh_fast. + * See also Exception.cmm:stg_raisezh. * This wasn't done for asynchronous exceptions originally; see #1450 */ if (RtsFlags.ProfFlags.showCCSOnException) @@ -888,6 +745,12 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // layers should deal with that. ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled); + if (stop_here != NULL) { + updatee = stop_here->updatee; + } else { + updatee = NULL; + } + // The stack freezing code assumes there's a closure pointer on // the top of the stack, so we have to arrange that this is the case... // @@ -899,7 +762,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, } frame = sp + 1; - while (stop_here == NULL || frame < stop_here) { + while (stop_here == NULL || frame < (StgPtr)stop_here) { // 1. Let the top of the stack be the "current closure" // @@ -933,7 +796,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // fun field. // words = frame - sp - 1; - ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words)); + ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words)); ap->size = words; ap->fun = (StgClosure *)sp[0]; @@ -953,11 +816,19 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // printObj((StgClosure *)ap); // ); - // Perform the update - // TODO: this may waste some work, if the thunk has - // already been updated by another thread. - UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee, - (StgClosure *)ap); + if (((StgUpdateFrame *)frame)->updatee == updatee) { + // If this update frame points to the same closure as + // the update frame further down the stack + // (stop_here), then don't perform the update. We + // want to keep the blackhole in this case, so we can + // detect and report the loop (#2783). + ap = (StgAP_STACK*)updatee; + } else { + // Perform the update + // TODO: this may waste some work, if the thunk has + // already been updated by another thread. + UPD_IND(((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); + } sp += sizeofW(StgUpdateFrame) - 1; sp[0] = (W_)ap; // push onto stack @@ -989,7 +860,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // we've got an exception to raise, so let's pass it to the // handler in this frame. // - raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1); + raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1); TICK_ALLOC_SE_THK(1,0); SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); raise->payload[0] = exception; @@ -1017,9 +888,19 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, case ATOMICALLY_FRAME: if (stop_at_atomically) { - ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); + ASSERT(tso->trec->enclosing_trec == NO_TREC); stmCondemnTransaction(cap, tso -> trec); - tso->sp = frame; + tso->sp = frame - 2; + // The ATOMICALLY_FRAME expects to be returned a + // result from the transaction, which it stores in the + // stack frame. Hence we arrange to return a dummy + // result, so that the GC doesn't get upset (#3578). + // Perhaps a better way would be to have a different + // ATOMICALLY_FRAME instance for condemned + // transactions, but I don't fully understand the + // interaction with STM invariants. + tso->sp[1] = (W_)&stg_NO_TREC_closure; + tso->sp[0] = (W_)&stg_gc_unpt_r1_info; tso->what_next = ThreadRunGHC; return; } @@ -1037,7 +918,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, { StgTRecHeader *trec = tso -> trec; - StgTRecHeader *outer = stmGetEnclosingTRec(trec); + StgTRecHeader *outer = trec -> enclosing_trec; debugTrace(DEBUG_stm, "found atomically block delivering async exception"); stmAbortTransaction(cap, trec);