X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FRaiseAsync.c;h=775505f88780ab6a8ebb376516ae60a736f9b76f;hp=cbbdc95beb5c92d3d75b373d837531009974626a;hb=c5b178be60a5a44abd2f4ddf8c399857678326e2;hpb=539d3adec64f51a3fb13bb65b7a494d7eded01a0 diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index cbbdc95..775505f 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -23,11 +23,11 @@ #include "win32/IOManager.h" #endif -static void raiseAsync (Capability *cap, - StgTSO *tso, - StgClosure *exception, - rtsBool stop_at_atomically, - StgUpdateFrame *stop_here); +static StgTSO* raiseAsync (Capability *cap, + StgTSO *tso, + StgClosure *exception, + rtsBool stop_at_atomically, + StgUpdateFrame *stop_here); static void removeFromQueues(Capability *cap, StgTSO *tso); @@ -57,43 +57,38 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS, has been raised. -------------------------------------------------------------------------- */ -void -throwToSingleThreaded(Capability *cap, StgTSO *tso, StgClosure *exception) -{ - throwToSingleThreaded_(cap, tso, exception, rtsFalse); -} - -void -throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically) +static void +throwToSingleThreaded__ (Capability *cap, StgTSO *tso, StgClosure *exception, + rtsBool stop_at_atomically, StgUpdateFrame *stop_here) { - tso = deRefTSO(tso); - // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { - return; + return; } // Remove it from any blocking queues removeFromQueues(cap,tso); - raiseAsync(cap, tso, exception, stop_at_atomically, NULL); + raiseAsync(cap, tso, exception, stop_at_atomically, stop_here); } void -suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) +throwToSingleThreaded (Capability *cap, StgTSO *tso, StgClosure *exception) { - tso = deRefTSO(tso); - - // Thread already dead? - if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { - return; - } + throwToSingleThreaded__(cap, tso, exception, rtsFalse, NULL); +} - // Remove it from any blocking queues - removeFromQueues(cap,tso); +void +throwToSingleThreaded_ (Capability *cap, StgTSO *tso, StgClosure *exception, + rtsBool stop_at_atomically) +{ + throwToSingleThreaded__ (cap, tso, exception, stop_at_atomically, NULL); +} - raiseAsync(cap, tso, NULL, rtsFalse, stop_here); +void // cannot return a different TSO +suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) +{ + throwToSingleThreaded__ (cap, tso, NULL, rtsFalse, stop_here); } /* ----------------------------------------------------------------------------- @@ -195,9 +190,6 @@ retry: check_target: ASSERT(target != END_TSO_QUEUE); - // follow ThreadRelocated links in the target first - target = deRefTSO(target); - // Thread already dead? if (target->what_next == ThreadComplete || target->what_next == ThreadKilled) { @@ -271,7 +263,7 @@ check_target: // might as well just do it now. The message will // be a no-op when it arrives. unlockClosure((StgClosure*)m, i); - tryWakeupThread_(cap, target); + tryWakeupThread(cap, target); goto retry; } @@ -289,7 +281,7 @@ check_target: } // nobody else can wake up this TSO after we claim the message - unlockClosure((StgClosure*)m, &stg_MSG_NULL_info); + doneWithMsgThrowTo(m); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); return THROWTO_SUCCESS; @@ -318,12 +310,7 @@ check_target: info = lockClosure((StgClosure *)mvar); - if (target->what_next == ThreadRelocated) { - target = target->_link; - unlockClosure((StgClosure *)mvar,info); - goto retry; - } - // we have the MVar, let's check whether the thread + // we have the MVar, let's check whether the thread // is still blocked on the same MVar. if (target->why_blocked != BlockedOnMVar || (StgMVar *)target->block_info.closure != mvar) { @@ -337,7 +324,7 @@ check_target: // thread now anyway and ignore the message when it // arrives. unlockClosure((StgClosure *)mvar, info); - tryWakeupThread_(cap, target); + tryWakeupThread(cap, target); goto retry; } @@ -406,7 +393,8 @@ check_target: } if (task != NULL) { blockedThrowTo(cap, target, msg); - if (!((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0))) { + if (!((target->flags & TSO_BLOCKEX) && + ((target->flags & TSO_INTERRUPTIBLE) == 0))) { interruptWorkerTask(task); } return THROWTO_BLOCKED; @@ -438,8 +426,20 @@ check_target: } #endif + case ThreadMigrating: + // if is is ThreadMigrating and tso->cap is ours, then it + // *must* be migrating *to* this capability. If it were + // migrating away from the capability, then tso->cap would + // point to the destination. + // + // There is a MSG_WAKEUP in the message queue for this thread, + // but we can just do it preemptively: + tryWakeupThread(cap, target); + // and now retry, the thread should be runnable. + goto retry; + default: - barf("throwTo: unrecognised why_blocked value"); + barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked); } barf("throwTo"); } @@ -495,7 +495,8 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) { MessageThrowTo *msg; const StgInfoTable *i; - + StgTSO *source; + if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) { if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { awakenBlockedExceptionQueue(cap,tso); @@ -527,8 +528,9 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) } throwToSingleThreaded(cap, msg->target, msg->exception); - unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info); - tryWakeupThread(cap, msg->source); + source = msg->source; + doneWithMsgThrowTo(msg); + tryWakeupThread(cap, source); return 1; } return 0; @@ -542,13 +544,15 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) { MessageThrowTo *msg; const StgInfoTable *i; + StgTSO *source; for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE; msg = (MessageThrowTo*)msg->link) { i = lockClosure((StgClosure *)msg); if (i != &stg_MSG_NULL_info) { - unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info); - tryWakeupThread(cap, msg->source); + source = msg->source; + doneWithMsgThrowTo(msg); + tryWakeupThread(cap, source); } else { unlockClosure((StgClosure *)msg,i); } @@ -588,7 +592,7 @@ removeFromMVarBlockedQueue (StgTSO *tso) if (mvar->head == q) { mvar->head = q->link; - q->header.info = &stg_IND_info; + OVERWRITE_INFO(q, &stg_IND_info); if (mvar->tail == q) { mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE; } @@ -598,10 +602,10 @@ removeFromMVarBlockedQueue (StgTSO *tso) // we lose the tail pointer when the GC shorts out the IND. // So we use MSG_NULL as a kind of non-dupable indirection; // these are ignored by takeMVar/putMVar. - q->header.info = &stg_MSG_NULL_info; + OVERWRITE_INFO(q, &stg_MSG_NULL_info); } else { - q->header.info = &stg_IND_info; + OVERWRITE_INFO(q, &stg_IND_info); } // revoke the MVar operation @@ -643,7 +647,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) // ASSERT(m->header.info == &stg_WHITEHOLE_info); // unlock and revoke it at the same time - unlockClosure((StgClosure*)m,&stg_MSG_NULL_info); + doneWithMsgThrowTo(m); break; } @@ -667,14 +671,6 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; #endif - case BlockedOnCCall_Interruptible: - case BlockedOnCCall: - // ccall shouldn't be put on the run queue, because whenever - // we raise an exception for such a blocked thread, it's only - // when we're /exiting/ the call. - tso->why_blocked = NotBlocked; - return; - default: barf("removeFromQueues: %d", tso->why_blocked); } @@ -722,7 +718,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) * * -------------------------------------------------------------------------- */ -static void +static StgTSO * raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically, StgUpdateFrame *stop_here) { @@ -730,6 +726,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, StgPtr sp, frame; StgClosure *updatee; nat i; + StgStack *stack; debugTraceCap(DEBUG_sched, cap, "raising exception in thread %ld.", (long)tso->id); @@ -745,25 +742,21 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, fprintCCS_stderr(tso->prof.CCCS); } #endif - // ASSUMES: the thread is not already complete or dead, or - // ThreadRelocated. Upper layers should deal with that. + // ASSUMES: the thread is not already complete or dead + // Upper layers should deal with that. ASSERT(tso->what_next != ThreadComplete && - tso->what_next != ThreadKilled && - tso->what_next != ThreadRelocated); + tso->what_next != ThreadKilled); // only if we own this TSO (except that deleteThread() calls this ASSERT(tso->cap == cap); - // wake it up - if (tso->why_blocked != NotBlocked) { - tso->why_blocked = NotBlocked; - appendToRunQueue(cap,tso); - } + stack = tso->stackobj; // mark it dirty; we're about to change its stack. dirty_TSO(cap, tso); + dirty_STACK(cap, stack); - sp = tso->sp; + sp = stack->sp; if (stop_here != NULL) { updatee = stop_here->updatee; @@ -799,10 +792,13 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // // 5. If it's a STOP_FRAME, then kill the thread. // - // NB: if we pass an ATOMICALLY_FRAME then abort the associated + // 6. If it's an UNDERFLOW_FRAME, then continue with the next + // stack chunk. + // + // NB: if we pass an ATOMICALLY_FRAME then abort the associated // transaction - info = get_ret_itbl((StgClosure *)frame); + info = get_ret_itbl((StgClosure *)frame); switch (info->i.type) { @@ -857,12 +853,46 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, continue; //no need to bump frame } - case STOP_FRAME: + case UNDERFLOW_FRAME: + { + StgAP_STACK * ap; + nat words; + + // First build an AP_STACK consisting of the stack chunk above the + // current update frame, with the top word on the stack as the + // fun field. + // + words = frame - sp - 1; + ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words)); + + ap->size = words; + ap->fun = (StgClosure *)sp[0]; + sp++; + for(i=0; i < (nat)words; ++i) { + ap->payload[i] = (StgClosure *)*sp++; + } + + SET_HDR(ap,&stg_AP_STACK_NOUPD_info, + ((StgClosure *)frame)->header.prof.ccs /* ToDo */); + TICK_ALLOC_SE_THK(words+1,0); + + stack->sp = sp; + threadStackUnderflow(cap,tso); + stack = tso->stackobj; + sp = stack->sp; + + sp--; + sp[0] = (W_)ap; + frame = sp + 1; + continue; + } + + case STOP_FRAME: { // We've stripped the entire stack, the thread is now dead. tso->what_next = ThreadKilled; - tso->sp = frame + sizeofW(StgStopFrame); - return; + stack->sp = frame + sizeofW(StgStopFrame); + goto done; } case CATCH_FRAME: @@ -904,17 +934,16 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, */ sp[0] = (W_)raise; sp[-1] = (W_)&stg_enter_info; - tso->sp = sp-1; + stack->sp = sp-1; tso->what_next = ThreadRunGHC; - IF_DEBUG(sanity, checkTSO(tso)); - return; + goto done; } case ATOMICALLY_FRAME: if (stop_at_atomically) { ASSERT(tso->trec->enclosing_trec == NO_TREC); stmCondemnTransaction(cap, tso -> trec); - tso->sp = frame - 2; + stack->sp = frame - 2; // The ATOMICALLY_FRAME expects to be returned a // result from the transaction, which it stores in the // stack frame. Hence we arrange to return a dummy @@ -923,10 +952,10 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // ATOMICALLY_FRAME instance for condemned // transactions, but I don't fully understand the // interaction with STM invariants. - tso->sp[1] = (W_)&stg_NO_TREC_closure; - tso->sp[0] = (W_)&stg_gc_unpt_r1_info; - tso->what_next = ThreadRunGHC; - return; + stack->sp[1] = (W_)&stg_NO_TREC_closure; + stack->sp[0] = (W_)&stg_gc_unpt_r1_info; + tso->what_next = ThreadRunGHC; + goto done; } // Not stop_at_atomically... fall through and abort the // transaction. @@ -948,7 +977,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, stmAbortTransaction(cap, trec); stmFreeAbortedTRec(cap, trec); tso -> trec = outer; - break; + break; }; default: @@ -959,8 +988,16 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, frame += stack_frame_sizeW((StgClosure *)frame); } - // if we got here, then we stopped at stop_here - ASSERT(stop_here != NULL); +done: + IF_DEBUG(sanity, checkTSO(tso)); + + // wake it up + if (tso->why_blocked != NotBlocked) { + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); + } + + return tso; }