From 848797ebb9b60cf9c8a004c97afd008f5325c75f Mon Sep 17 00:00:00 2001 From: Simon Marlow Date: Mon, 29 Mar 2010 14:46:13 +0000 Subject: [PATCH] change throwTo to use tryWakeupThread rather than unblockOne --- includes/stg/MiscClosures.h | 2 ++ rts/Messages.c | 8 +++---- rts/RaiseAsync.c | 54 ++++++++++++++++++------------------------- rts/StgMiscClosures.cmm | 4 ++++ rts/Threads.c | 19 +++++++++++++++ 5 files changed, 51 insertions(+), 36 deletions(-) diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h index 9834c4b..4e5667b 100644 --- a/includes/stg/MiscClosures.h +++ b/includes/stg/MiscClosures.h @@ -120,6 +120,7 @@ RTS_INFO(stg_MSG_WAKEUP_info); RTS_INFO(stg_MSG_TRY_WAKEUP_info); RTS_INFO(stg_MSG_THROWTO_info); RTS_INFO(stg_MSG_BLACKHOLE_info); +RTS_INFO(stg_MSG_NULL_info); RTS_INFO(stg_MUT_CONS_info); RTS_INFO(stg_catch_info); RTS_INFO(stg_PAP_info); @@ -171,6 +172,7 @@ RTS_ENTRY(stg_MSG_WAKEUP_entry); RTS_ENTRY(stg_MSG_TRY_WAKEUP_entry); RTS_ENTRY(stg_MSG_THROWTO_entry); RTS_ENTRY(stg_MSG_BLACKHOLE_entry); +RTS_ENTRY(stg_MSG_NULL_entry); RTS_ENTRY(stg_MUT_CONS_entry); RTS_ENTRY(stg_catch_entry); RTS_ENTRY(stg_PAP_entry); diff --git a/rts/Messages.c b/rts/Messages.c index 2b40f76..6a7c64d 100644 --- a/rts/Messages.c +++ b/rts/Messages.c @@ -114,11 +114,9 @@ loop: switch (r) { case THROWTO_SUCCESS: - ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info); - t->source->sp += 3; - unblockOne(cap, t->source); // this message is done - unlockClosure((StgClosure*)m, &stg_IND_info); + unlockClosure((StgClosure*)m, &stg_MSG_NULL_info); + tryWakeupThread(cap, t->source); break; case THROWTO_BLOCKED: // unlock the message @@ -137,7 +135,7 @@ loop: } return; } - else if (i == &stg_IND_info) + else if (i == &stg_IND_info || i == &stg_MSG_NULL_info) { // message was revoked return; diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index d5a4918..f974f8c 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -38,8 +38,6 @@ static void throwToSendMsg (Capability *cap USED_IF_THREADS, Capability *target_cap USED_IF_THREADS, MessageThrowTo *msg USED_IF_THREADS); -static void performBlockedException (Capability *cap, MessageThrowTo *msg); - /* ----------------------------------------------------------------------------- throwToSingleThreaded @@ -148,7 +146,7 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) unlockClosure(msg, &stg_MSG_THROWTO_info); If it decides not to raise the exception after all, it can revoke it safely with - unlockClosure(msg, &stg_IND_info); + unlockClosure(msg, &stg_MSG_NULL_info); -------------------------------------------------------------------------- */ @@ -326,8 +324,17 @@ check_target: } } + if (i == &stg_MSG_NULL_info) { + // we know there's a MSG_TRY_WAKEUP on the way, so we + // might as well just do it now. The message will + // be a no-op when it arrives. + unlockClosure((StgClosure*)m, i); + tryWakeupThread(cap, target); + goto retry; + } + if (i != &stg_MSG_THROWTO_info) { - // if it's an IND, this TSO has been woken up by another Cap + // if it's a MSG_NULL, this TSO has been woken up by another Cap unlockClosure((StgClosure*)m, i); goto retry; } @@ -340,7 +347,7 @@ check_target: } // nobody else can wake up this TSO after we claim the message - unlockClosure((StgClosure*)m, &stg_IND_info); + unlockClosure((StgClosure*)m, &stg_MSG_NULL_info); raiseAsync(cap, target, msg->exception, rtsFalse, NULL); return THROWTO_SUCCESS; @@ -535,21 +542,21 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0; i = lockClosure((StgClosure*)msg); tso->blocked_exceptions = (MessageThrowTo*)msg->link; - if (i == &stg_IND_info) { + if (i == &stg_MSG_NULL_info) { unlockClosure((StgClosure*)msg,i); goto loop; } - performBlockedException(cap, msg); - unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); - unlockClosure((StgClosure*)msg,&stg_IND_info); + throwToSingleThreaded(cap, msg->target, msg->exception); + unlockClosure((StgClosure*)msg,&stg_MSG_NULL_info); + tryWakeupThread(cap, msg->source); return 1; } return 0; } // awakenBlockedExceptionQueue(): Just wake up the whole queue of -// blocked exceptions and let them try again. +// blocked exceptions. void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) @@ -560,31 +567,16 @@ awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE; msg = (MessageThrowTo*)msg->link) { i = lockClosure((StgClosure *)msg); - if (i != &stg_IND_info) { - unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); + if (i != &stg_MSG_NULL_info) { + unlockClosure((StgClosure *)msg,&stg_MSG_NULL_info); + tryWakeupThread(cap, msg->source); + } else { + unlockClosure((StgClosure *)msg,i); } - unlockClosure((StgClosure *)msg,i); } tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; } -static void -performBlockedException (Capability *cap, MessageThrowTo *msg) -{ - StgTSO *source; - - source = msg->source; - - ASSERT(source->why_blocked == BlockedOnMsgThrowTo); - ASSERT(source->block_info.closure == (StgClosure *)msg); - ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info); - ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id); - // check ids not pointers, because the thread might be relocated - - throwToSingleThreaded(cap, msg->target, msg->exception); - source->sp += 3; -} - /* ----------------------------------------------------------------------------- Remove a thread from blocking queues. @@ -638,7 +630,7 @@ removeFromQueues(Capability *cap, StgTSO *tso) // ASSERT(m->header.info == &stg_WHITEHOLE_info); // unlock and revoke it at the same time - unlockClosure((StgClosure*)m,&stg_IND_info); + unlockClosure((StgClosure*)m,&stg_MSG_NULL_info); break; } diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm index 830bde5..7e1e6a7 100644 --- a/rts/StgMiscClosures.cmm +++ b/rts/StgMiscClosures.cmm @@ -503,6 +503,10 @@ INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO") INFO_TABLE_CONSTR(stg_MSG_BLACKHOLE,3,0,0,PRIM,"MSG_BLACKHOLE","MSG_BLACKHOLE") { foreign "C" barf("MSG_BLACKHOLE object entered!") never returns; } +// used to overwrite a MSG_THROWTO when the message has been used/revoked +INFO_TABLE_CONSTR(stg_MSG_NULL,1,0,0,PRIM,"MSG_NULL","MSG_NULL") +{ foreign "C" barf("MSG_NULL object entered!") never returns; } + /* ---------------------------------------------------------------------------- END_TSO_QUEUE diff --git a/rts/Threads.c b/rts/Threads.c index 0c3e591..05a13c7 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -290,12 +290,31 @@ tryWakeupThread (Capability *cap, StgTSO *tso) SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM); msg->tso = tso; sendMessage(cap, tso->cap, (Message*)msg); + debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d", + (lnat)tso->id, tso->cap->no); return; } #endif switch (tso->why_blocked) { + case BlockedOnMsgThrowTo: + { + const StgInfoTable *i; + + i = lockClosure(tso->block_info.closure); + unlockClosure(tso->block_info.closure, i); + if (i != &stg_MSG_NULL_info) { + debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)", + (lnat)tso->id, tso->block_info.throwto->header.info); + break; // still blocked + } + + // remove the block frame from the stack + ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info); + tso->sp += 3; + // fall through... + } case BlockedOnBlackHole: case BlockedOnSTM: { -- 1.7.10.4