X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FRaiseAsync.c;h=775505f88780ab6a8ebb376516ae60a736f9b76f;hp=d54f823d6e506fa356dcce134612e05128000249;hb=e5c3b478b3cd1707cf122833822f44b2ac09b8e9;hpb=7408b39235bccdcde48df2a73337ff976fbc09b7 diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index d54f823..775505f 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -18,18 +18,21 @@ #include "STM.h" #include "sm/Sanity.h" #include "Profiling.h" +#include "Messages.h" #if defined(mingw32_HOST_OS) #include "win32/IOManager.h" #endif -static void raiseAsync (Capability *cap, - StgTSO *tso, - StgClosure *exception, - rtsBool stop_at_atomically, - StgUpdateFrame *stop_here); +static StgTSO* raiseAsync (Capability *cap, + StgTSO *tso, + StgClosure *exception, + rtsBool stop_at_atomically, + StgUpdateFrame *stop_here); static void removeFromQueues(Capability *cap, StgTSO *tso); +static void removeFromMVarBlockedQueue (StgTSO *tso); + static void blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg); @@ -37,8 +40,6 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS, Capability *target_cap USED_IF_THREADS, MessageThrowTo *msg USED_IF_THREADS); -static void performBlockedException (Capability *cap, MessageThrowTo *msg); - /* ----------------------------------------------------------------------------- throwToSingleThreaded @@ -56,39 +57,38 @@ static void performBlockedException (Capability *cap, MessageThrowTo *msg); has been raised. -------------------------------------------------------------------------- */ -void -throwToSingleThreaded(Capability *cap, StgTSO *tso, StgClosure *exception) -{ - throwToSingleThreaded_(cap, tso, exception, rtsFalse); -} - -void -throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically) +static void +throwToSingleThreaded__ (Capability *cap, StgTSO *tso, StgClosure *exception, + rtsBool stop_at_atomically, StgUpdateFrame *stop_here) { // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { - return; + return; } // Remove it from any blocking queues removeFromQueues(cap,tso); - raiseAsync(cap, tso, exception, stop_at_atomically, NULL); + raiseAsync(cap, tso, exception, stop_at_atomically, stop_here); } void -suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) +throwToSingleThreaded (Capability *cap, StgTSO *tso, StgClosure *exception) { - // Thread already dead? - if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { - return; - } + throwToSingleThreaded__(cap, tso, exception, rtsFalse, NULL); +} - // Remove it from any blocking queues - removeFromQueues(cap,tso); +void +throwToSingleThreaded_ (Capability *cap, StgTSO *tso, StgClosure *exception, + rtsBool stop_at_atomically) +{ + throwToSingleThreaded__ (cap, tso, exception, stop_at_atomically, NULL); +} - raiseAsync(cap, tso, NULL, rtsFalse, stop_here); +void // cannot return a different TSO +suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) +{ + throwToSingleThreaded__ (cap, tso, NULL, rtsFalse, stop_here); } /* ----------------------------------------------------------------------------- @@ -121,8 +121,8 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) Currently we send a message if the target belongs to another Capability, and it is - - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo, - BlockedOnCCall + - NotBlocked, BlockedOnMsgThrowTo, + BlockedOnCCall_Interruptible - or it is masking exceptions (TSO_BLOCKEX) @@ -143,7 +143,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) unlockClosure(msg, &stg_MSG_THROWTO_info); If it decides not to raise the exception after all, it can revoke it safely with - unlockClosure(msg, &stg_IND_info); + unlockClosure(msg, &stg_MSG_NULL_info); -------------------------------------------------------------------------- */ @@ -158,7 +158,7 @@ throwTo (Capability *cap, // the Capability we hold msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo)); // message starts locked; the caller has to unlock it when it is // ready. - msg->header.info = &stg_WHITEHOLE_info; + SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM); msg->source = source; msg->target = target; msg->exception = exception; @@ -179,14 +179,21 @@ throwToMsg (Capability *cap, MessageThrowTo *msg) { StgWord status; StgTSO *target = msg->target; + Capability *target_cap; + + goto check_target; +retry: + write_barrier(); + debugTrace(DEBUG_sched, "throwTo: retrying..."); + +check_target: ASSERT(target != END_TSO_QUEUE); - // follow ThreadRelocated links in the target first - while (target->what_next == ThreadRelocated) { - target = target->_link; - // No, it might be a WHITEHOLE: - // ASSERT(get_itbl(target)->type == TSO); + // Thread already dead? + if (target->what_next == ThreadComplete + || target->what_next == ThreadKilled) { + return THROWTO_SUCCESS; } debugTraceCap(DEBUG_sched, cap, @@ -198,106 +205,29 @@ throwToMsg (Capability *cap, MessageThrowTo *msg) traceThreadStatus(DEBUG_sched, target); #endif - goto check_target; -retry: - write_barrier(); - debugTrace(DEBUG_sched, "throwTo: retrying..."); - -check_target: - ASSERT(target != END_TSO_QUEUE); - - // Thread already dead? - if (target->what_next == ThreadComplete - || target->what_next == ThreadKilled) { - return THROWTO_SUCCESS; + target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; } status = target->why_blocked; switch (status) { case NotBlocked: - case BlockedOnMsgWakeup: - /* if status==NotBlocked, and target->cap == cap, then - we own this TSO and can raise the exception. - - How do we establish this condition? Very carefully. - - Let - P = (status == NotBlocked) - Q = (tso->cap == cap) - - Now, if P & Q are true, then the TSO is locked and owned by - this capability. No other OS thread can steal it. - - If P==0 and Q==1: the TSO is blocked, but attached to this - capabilty, and it can be stolen by another capability. - - If P==1 and Q==0: the TSO is runnable on another - capability. At any time, the TSO may change from runnable - to blocked and vice versa, while it remains owned by - another capability. - - Suppose we test like this: - - p = P - q = Q - if (p && q) ... - - this is defeated by another capability stealing a blocked - TSO from us to wake it up (Schedule.c:unblockOne()). The - other thread is doing - - Q = 0 - P = 1 - - assuming arbitrary reordering, we could see this - interleaving: - - start: P==0 && Q==1 - P = 1 - p = P - q = Q - Q = 0 - if (p && q) ... - - so we need a memory barrier: - - p = P - mb() - q = Q - if (p && q) ... - - this avoids the problematic case. There are other cases - to consider, but this is the tricky one. - - Note that we must be sure that unblockOne() does the - writes in the correct order: Q before P. The memory - barrier ensures that if we have seen the write to P, we - have also seen the write to Q. - */ { - Capability *target_cap; - - write_barrier(); - target_cap = target->cap; - if (target_cap != cap) { - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; + if ((target->flags & TSO_BLOCKEX) == 0) { + // It's on our run queue and not blocking exceptions + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; } else { - if ((target->flags & TSO_BLOCKEX) == 0) { - // It's on our run queue and not blocking exceptions - raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - return THROWTO_SUCCESS; - } else { - blockedThrowTo(cap,target,msg); - return THROWTO_BLOCKED; - } + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; } } case BlockedOnMsgThrowTo: { - Capability *target_cap; const StgInfoTable *i; MessageThrowTo *m; @@ -328,17 +258,19 @@ check_target: } } - if (i != &stg_MSG_THROWTO_info) { - // if it's an IND, this TSO has been woken up by another Cap + if (i == &stg_MSG_NULL_info) { + // we know there's a MSG_TRY_WAKEUP on the way, so we + // might as well just do it now. The message will + // be a no-op when it arrives. unlockClosure((StgClosure*)m, i); + tryWakeupThread(cap, target); goto retry; } - target_cap = target->cap; - if (target_cap != cap) { + if (i != &stg_MSG_THROWTO_info) { + // if it's a MSG_NULL, this TSO has been woken up by another Cap unlockClosure((StgClosure*)m, i); - throwToSendMsg(cap, target_cap, msg); - return THROWTO_BLOCKED; + goto retry; } if ((target->flags & TSO_BLOCKEX) && @@ -349,10 +281,9 @@ check_target: } // nobody else can wake up this TSO after we claim the message - unlockClosure((StgClosure*)m, &stg_IND_info); + doneWithMsgThrowTo(m); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); return THROWTO_SUCCESS; } @@ -379,12 +310,7 @@ check_target: info = lockClosure((StgClosure *)mvar); - if (target->what_next == ThreadRelocated) { - target = target->_link; - unlockClosure((StgClosure *)mvar,info); - goto retry; - } - // we have the MVar, let's check whether the thread + // we have the MVar, let's check whether the thread // is still blocked on the same MVar. if (target->why_blocked != BlockedOnMVar || (StgMVar *)target->block_info.closure != mvar) { @@ -392,20 +318,25 @@ check_target: goto retry; } + if (target->_link == END_TSO_QUEUE) { + // the MVar operation has already completed. There is a + // MSG_TRY_WAKEUP on the way, but we can just wake up the + // thread now anyway and ignore the message when it + // arrives. + unlockClosure((StgClosure *)mvar, info); + tryWakeupThread(cap, target); + goto retry; + } + if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } + blockedThrowTo(cap,target,msg); unlockClosure((StgClosure *)mvar, info); return THROWTO_BLOCKED; } else { - removeThreadFromMVarQueue(cap, mvar, target); + // revoke the MVar operation + removeFromMVarBlockedQueue(target); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); unlockClosure((StgClosure *)mvar, info); return THROWTO_SUCCESS; } @@ -413,29 +344,20 @@ check_target: case BlockedOnBlackHole: { - ACQUIRE_LOCK(&sched_mutex); - // double checking the status after the memory barrier: - if (target->why_blocked != BlockedOnBlackHole) { - RELEASE_LOCK(&sched_mutex); - goto retry; - } - if (target->flags & TSO_BLOCKEX) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } - RELEASE_LOCK(&sched_mutex); - return THROWTO_BLOCKED; // caller releases lock + // BlockedOnBlackHole is not interruptible. + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; } else { - removeThreadFromQueue(cap, &blackhole_queue, target); - raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); - RELEASE_LOCK(&sched_mutex); - return THROWTO_SUCCESS; - } + // Revoke the message by replacing it with IND. We're not + // locking anything here, so we might still get a TRY_WAKEUP + // message from the owner of the blackhole some time in the + // future, but that doesn't matter. + ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info); + OVERWRITE_INFO(target->block_info.bh, &stg_IND_info); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + return THROWTO_SUCCESS; + } } case BlockedOnSTM: @@ -448,35 +370,43 @@ check_target: } if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - Capability *target_cap = target->cap; - if (target->cap != cap) { - throwToSendMsg(cap,target_cap,msg); - } else { - blockedThrowTo(cap,target,msg); - } + blockedThrowTo(cap,target,msg); unlockTSO(target); return THROWTO_BLOCKED; } else { raiseAsync(cap, target, msg->exception, rtsFalse, NULL); - unblockOne(cap, target); unlockTSO(target); return THROWTO_SUCCESS; } - case BlockedOnCCall: - case BlockedOnCCall_NoUnblockExc: + case BlockedOnCCall_Interruptible: +#ifdef THREADED_RTS { - Capability *target_cap; - - target_cap = target->cap; - if (target_cap != cap) { - throwToSendMsg(cap, target_cap, msg); + Task *task = NULL; + // walk suspended_ccalls to find the correct worker thread + InCall *incall; + for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) { + if (incall->suspended_tso == target) { + task = incall->task; + break; + } + } + if (task != NULL) { + blockedThrowTo(cap, target, msg); + if (!((target->flags & TSO_BLOCKEX) && + ((target->flags & TSO_INTERRUPTIBLE) == 0))) { + interruptWorkerTask(task); + } return THROWTO_BLOCKED; + } else { + debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill"); } - + // fall to next + } +#endif + case BlockedOnCCall: blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; - } #ifndef THREADEDED_RTS case BlockedOnRead: @@ -496,8 +426,20 @@ check_target: } #endif + case ThreadMigrating: + // if is is ThreadMigrating and tso->cap is ours, then it + // *must* be migrating *to* this capability. If it were + // migrating away from the capability, then tso->cap would + // point to the destination. + // + // There is a MSG_WAKEUP in the message queue for this thread, + // but we can just do it preemptively: + tryWakeupThread(cap, target); + // and now retry, the thread should be runnable. + goto retry; + default: - barf("throwTo: unrecognised why_blocked value"); + barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked); } barf("throwTo"); } @@ -509,9 +451,9 @@ throwToSendMsg (Capability *cap STG_UNUSED, { #ifdef THREADED_RTS - debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); + debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); - sendMessage(target_cap, (Message*)msg); + sendMessage(cap, target_cap, (Message*)msg); #endif } @@ -526,7 +468,7 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg) ASSERT(target->cap == cap); - msg->link = (Message*)target->blocked_exceptions; + msg->link = target->blocked_exceptions; target->blocked_exceptions = msg; dirty_TSO(cap,target); // we modified the blocked_exceptions queue } @@ -553,7 +495,8 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) { MessageThrowTo *msg; const StgInfoTable *i; - + StgTSO *source; + if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) { if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { awakenBlockedExceptionQueue(cap,tso); @@ -565,7 +508,7 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && (tso->flags & TSO_BLOCKEX) != 0) { - debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); + debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); } if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE @@ -579,56 +522,44 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0; i = lockClosure((StgClosure*)msg); tso->blocked_exceptions = (MessageThrowTo*)msg->link; - if (i == &stg_IND_info) { + if (i == &stg_MSG_NULL_info) { unlockClosure((StgClosure*)msg,i); goto loop; } - performBlockedException(cap, msg); - unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); - unlockClosure((StgClosure*)msg,&stg_IND_info); + throwToSingleThreaded(cap, msg->target, msg->exception); + source = msg->source; + doneWithMsgThrowTo(msg); + tryWakeupThread(cap, source); return 1; } return 0; } // awakenBlockedExceptionQueue(): Just wake up the whole queue of -// blocked exceptions and let them try again. +// blocked exceptions. void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) { MessageThrowTo *msg; const StgInfoTable *i; + StgTSO *source; for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE; msg = (MessageThrowTo*)msg->link) { i = lockClosure((StgClosure *)msg); - if (i != &stg_IND_info) { - unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); + if (i != &stg_MSG_NULL_info) { + source = msg->source; + doneWithMsgThrowTo(msg); + tryWakeupThread(cap, source); + } else { + unlockClosure((StgClosure *)msg,i); } - unlockClosure((StgClosure *)msg,i); } tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; } -static void -performBlockedException (Capability *cap, MessageThrowTo *msg) -{ - StgTSO *source; - - source = msg->source; - - ASSERT(source->why_blocked == BlockedOnMsgThrowTo); - ASSERT(source->block_info.closure == (StgClosure *)msg); - ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info); - ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id); - // check ids not pointers, because the thread might be relocated - - throwToSingleThreaded(cap, msg->target, msg->exception); - source->sp += 3; -} - /* ----------------------------------------------------------------------------- Remove a thread from blocking queues. @@ -640,11 +571,54 @@ performBlockedException (Capability *cap, MessageThrowTo *msg) -------------------------------------------------------------------------- */ static void +removeFromMVarBlockedQueue (StgTSO *tso) +{ + StgMVar *mvar = (StgMVar*)tso->block_info.closure; + StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link; + + if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) { + // already removed from this MVar + return; + } + + // Assume the MVar is locked. (not assertable; sometimes it isn't + // actually WHITEHOLE'd). + + // We want to remove the MVAR_TSO_QUEUE object from the queue. It + // isn't doubly-linked so we can't actually remove it; instead we + // just overwrite it with an IND if possible and let the GC short + // it out. However, we have to be careful to maintain the deque + // structure: + + if (mvar->head == q) { + mvar->head = q->link; + OVERWRITE_INFO(q, &stg_IND_info); + if (mvar->tail == q) { + mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE; + } + } + else if (mvar->tail == q) { + // we can't replace it with an IND in this case, because then + // we lose the tail pointer when the GC shorts out the IND. + // So we use MSG_NULL as a kind of non-dupable indirection; + // these are ignored by takeMVar/putMVar. + OVERWRITE_INFO(q, &stg_MSG_NULL_info); + } + else { + OVERWRITE_INFO(q, &stg_IND_info); + } + + // revoke the MVar operation + tso->_link = END_TSO_QUEUE; +} + +static void removeFromQueues(Capability *cap, StgTSO *tso) { switch (tso->why_blocked) { case NotBlocked: + case ThreadMigrating: return; case BlockedOnSTM: @@ -657,20 +631,13 @@ removeFromQueues(Capability *cap, StgTSO *tso) goto done; case BlockedOnMVar: - removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso); + removeFromMVarBlockedQueue(tso); goto done; case BlockedOnBlackHole: - removeThreadFromQueue(cap, &blackhole_queue, tso); + // nothing to do goto done; - case BlockedOnMsgWakeup: - { - // kill the message, atomically: - tso->block_info.wakeup->header.info = &stg_IND_info; - break; - } - case BlockedOnMsgThrowTo: { MessageThrowTo *m = tso->block_info.throwto; @@ -680,7 +647,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) // ASSERT(m->header.info == &stg_WHITEHOLE_info); // unlock and revoke it at the same time - unlockClosure((StgClosure*)m,&stg_IND_info); + doneWithMsgThrowTo(m); break; } @@ -709,7 +676,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) } done: - unblockOne(cap, tso); + tso->why_blocked = NotBlocked; + appendToRunQueue(cap, tso); } /* ----------------------------------------------------------------------------- @@ -719,7 +687,8 @@ removeFromQueues(Capability *cap, StgTSO *tso) * asynchronous exception in an existing thread. * * We first remove the thread from any queue on which it might be - * blocked. The possible blockages are MVARs and BLACKHOLE_BQs. + * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and + * TSO blocked_exception queues. * * We strip the stack down to the innermost CATCH_FRAME, building * thunks in the heap for all the active computations, so they can @@ -749,7 +718,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) * * -------------------------------------------------------------------------- */ -static void +static StgTSO * raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically, StgUpdateFrame *stop_here) { @@ -757,9 +726,10 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, StgPtr sp, frame; StgClosure *updatee; nat i; + StgStack *stack; - debugTrace(DEBUG_sched, - "raising exception in thread %ld.", (long)tso->id); + debugTraceCap(DEBUG_sched, cap, + "raising exception in thread %ld.", (long)tso->id); #if defined(PROFILING) /* @@ -772,20 +742,22 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, fprintCCS_stderr(tso->prof.CCCS); } #endif + // ASSUMES: the thread is not already complete or dead + // Upper layers should deal with that. + ASSERT(tso->what_next != ThreadComplete && + tso->what_next != ThreadKilled); - while (tso->what_next == ThreadRelocated) { - tso = tso->_link; - } + // only if we own this TSO (except that deleteThread() calls this + ASSERT(tso->cap == cap); + + stack = tso->stackobj; // mark it dirty; we're about to change its stack. dirty_TSO(cap, tso); + dirty_STACK(cap, stack); - sp = tso->sp; + sp = stack->sp; - // ASSUMES: the thread is not already complete or dead. Upper - // layers should deal with that. - ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled); - if (stop_here != NULL) { updatee = stop_here->updatee; } else { @@ -820,10 +792,13 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // // 5. If it's a STOP_FRAME, then kill the thread. // - // NB: if we pass an ATOMICALLY_FRAME then abort the associated + // 6. If it's an UNDERFLOW_FRAME, then continue with the next + // stack chunk. + // + // NB: if we pass an ATOMICALLY_FRAME then abort the associated // transaction - info = get_ret_itbl((StgClosure *)frame); + info = get_ret_itbl((StgClosure *)frame); switch (info->i.type) { @@ -868,7 +843,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // Perform the update // TODO: this may waste some work, if the thunk has // already been updated by another thread. - UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); + updateThunk(cap, tso, + ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap); } sp += sizeofW(StgUpdateFrame) - 1; @@ -877,12 +853,46 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, continue; //no need to bump frame } - case STOP_FRAME: + case UNDERFLOW_FRAME: + { + StgAP_STACK * ap; + nat words; + + // First build an AP_STACK consisting of the stack chunk above the + // current update frame, with the top word on the stack as the + // fun field. + // + words = frame - sp - 1; + ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words)); + + ap->size = words; + ap->fun = (StgClosure *)sp[0]; + sp++; + for(i=0; i < (nat)words; ++i) { + ap->payload[i] = (StgClosure *)*sp++; + } + + SET_HDR(ap,&stg_AP_STACK_NOUPD_info, + ((StgClosure *)frame)->header.prof.ccs /* ToDo */); + TICK_ALLOC_SE_THK(words+1,0); + + stack->sp = sp; + threadStackUnderflow(cap,tso); + stack = tso->stackobj; + sp = stack->sp; + + sp--; + sp[0] = (W_)ap; + frame = sp + 1; + continue; + } + + case STOP_FRAME: { // We've stripped the entire stack, the thread is now dead. tso->what_next = ThreadKilled; - tso->sp = frame + sizeofW(StgStopFrame); - return; + stack->sp = frame + sizeofW(StgStopFrame); + goto done; } case CATCH_FRAME: @@ -891,9 +901,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // top of the CATCH_FRAME ready to enter. // { -#ifdef PROFILING StgCatchFrame *cf = (StgCatchFrame *)frame; -#endif StgThunk *raise; if (exception == NULL) break; @@ -914,24 +922,28 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, * a surprise exception before we get around to executing the * handler. */ - tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE; + tso->flags |= TSO_BLOCKEX; + if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) { + tso->flags &= ~TSO_INTERRUPTIBLE; + } else { + tso->flags |= TSO_INTERRUPTIBLE; + } /* Put the newly-built THUNK on top of the stack, ready to execute * when the thread restarts. */ sp[0] = (W_)raise; sp[-1] = (W_)&stg_enter_info; - tso->sp = sp-1; + stack->sp = sp-1; tso->what_next = ThreadRunGHC; - IF_DEBUG(sanity, checkTSO(tso)); - return; + goto done; } case ATOMICALLY_FRAME: if (stop_at_atomically) { ASSERT(tso->trec->enclosing_trec == NO_TREC); stmCondemnTransaction(cap, tso -> trec); - tso->sp = frame - 2; + stack->sp = frame - 2; // The ATOMICALLY_FRAME expects to be returned a // result from the transaction, which it stores in the // stack frame. Hence we arrange to return a dummy @@ -940,10 +952,10 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, // ATOMICALLY_FRAME instance for condemned // transactions, but I don't fully understand the // interaction with STM invariants. - tso->sp[1] = (W_)&stg_NO_TREC_closure; - tso->sp[0] = (W_)&stg_gc_unpt_r1_info; - tso->what_next = ThreadRunGHC; - return; + stack->sp[1] = (W_)&stg_NO_TREC_closure; + stack->sp[0] = (W_)&stg_gc_unpt_r1_info; + tso->what_next = ThreadRunGHC; + goto done; } // Not stop_at_atomically... fall through and abort the // transaction. @@ -960,12 +972,12 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, { StgTRecHeader *trec = tso -> trec; StgTRecHeader *outer = trec -> enclosing_trec; - debugTrace(DEBUG_stm, - "found atomically block delivering async exception"); + debugTraceCap(DEBUG_stm, cap, + "found atomically block delivering async exception"); stmAbortTransaction(cap, trec); stmFreeAbortedTRec(cap, trec); tso -> trec = outer; - break; + break; }; default: @@ -976,8 +988,16 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, frame += stack_frame_sizeW((StgClosure *)frame); } - // if we got here, then we stopped at stop_here - ASSERT(stop_here != NULL); +done: + IF_DEBUG(sanity, checkTSO(tso)); + + // wake it up + if (tso->why_blocked != NotBlocked) { + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); + } + + return tso; }