From: Simon Marlow Date: Thu, 11 Mar 2010 09:57:44 +0000 (+0000) Subject: Use message-passing to implement throwTo in the RTS X-Git-Tag: 2010-03-16~9 X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=commitdiff_plain;h=7408b39235bccdcde48df2a73337ff976fbc09b7 Use message-passing to implement throwTo in the RTS This replaces some complicated locking schemes with message-passing in the implementation of throwTo. The benefits are - previously it was impossible to guarantee that a throwTo from a thread running on one CPU to a thread running on another CPU would be noticed, and we had to rely on the GC to pick up these forgotten exceptions. This no longer happens. - the locking regime is simpler (though the code is about the same size) - threads can be unblocked from a blocked_exceptions queue without having to traverse the whole queue now. It's a rare case, but replaces an O(n) operation with an O(1). - generally we move in the direction of sharing less between Capabilities (aka HECs), which will become important with other changes we have planned. Also in this patch I replaced several STM-specific closure types with a generic MUT_PRIM closure type, which allowed a lot of code in the GC and other places to go away, hence the line-count reduction. The message-passing changes resulted in about a net zero line-count difference. --- diff --git a/includes/rts/Constants.h b/includes/rts/Constants.h index 54a1ca7..bfc77fa 100644 --- a/includes/rts/Constants.h +++ b/includes/rts/Constants.h @@ -208,25 +208,27 @@ #define NotBlocked 0 #define BlockedOnMVar 1 #define BlockedOnBlackHole 2 -#define BlockedOnException 3 -#define BlockedOnRead 4 -#define BlockedOnWrite 5 -#define BlockedOnDelay 6 -#define BlockedOnSTM 7 +#define BlockedOnRead 3 +#define BlockedOnWrite 4 +#define BlockedOnDelay 5 +#define BlockedOnSTM 6 /* Win32 only: */ -#define BlockedOnDoProc 8 +#define BlockedOnDoProc 7 /* Only relevant for PAR: */ /* blocked on a remote closure represented by a Global Address: */ -#define BlockedOnGA 9 +#define BlockedOnGA 8 /* same as above but without sending a Fetch message */ -#define BlockedOnGA_NoSend 10 +#define BlockedOnGA_NoSend 9 /* Only relevant for THREADED_RTS: */ -#define BlockedOnCCall 11 -#define BlockedOnCCall_NoUnblockExc 12 +#define BlockedOnCCall 10 +#define BlockedOnCCall_NoUnblockExc 11 /* same as above but don't unblock async exceptions in resumeThread() */ +/* Involved in a message sent to tso->msg_cap */ +#define BlockedOnMsgWakeup 12 +#define BlockedOnMsgThrowTo 13 /* * These constants are returned to the scheduler by a thread that has * stopped for one reason or another. See typedef StgThreadReturnCode diff --git a/includes/rts/storage/ClosureMacros.h b/includes/rts/storage/ClosureMacros.h index f73d2c5..a115f6f 100644 --- a/includes/rts/storage/ClosureMacros.h +++ b/includes/rts/storage/ClosureMacros.h @@ -335,18 +335,8 @@ closure_sizeW_ (StgClosure *p, StgInfoTable *info) return tso_sizeW((StgTSO *)p); case BCO: return bco_sizeW((StgBCO *)p); - case TVAR_WATCH_QUEUE: - return sizeofW(StgTVarWatchQueue); - case TVAR: - return sizeofW(StgTVar); case TREC_CHUNK: return sizeofW(StgTRecChunk); - case TREC_HEADER: - return sizeofW(StgTRecHeader); - case ATOMIC_INVARIANT: - return sizeofW(StgAtomicInvariant); - case INVARIANT_CHECK_QUEUE: - return sizeofW(StgInvariantCheckQueue); default: return sizeW_fromITBL(info); } diff --git a/includes/rts/storage/ClosureTypes.h b/includes/rts/storage/ClosureTypes.h index 508dce2..6a76772 100644 --- a/includes/rts/storage/ClosureTypes.h +++ b/includes/rts/storage/ClosureTypes.h @@ -74,18 +74,14 @@ #define MUT_VAR_CLEAN 50 #define MUT_VAR_DIRTY 51 #define WEAK 52 -#define STABLE_NAME 53 -#define TSO 54 -#define TVAR_WATCH_QUEUE 55 -#define INVARIANT_CHECK_QUEUE 56 -#define ATOMIC_INVARIANT 57 -#define TVAR 58 -#define TREC_CHUNK 59 -#define TREC_HEADER 60 -#define ATOMICALLY_FRAME 61 -#define CATCH_RETRY_FRAME 62 -#define CATCH_STM_FRAME 63 -#define WHITEHOLE 64 -#define N_CLOSURE_TYPES 65 +#define PRIM 53 +#define MUT_PRIM 54 +#define TSO 55 +#define TREC_CHUNK 56 +#define ATOMICALLY_FRAME 57 +#define CATCH_RETRY_FRAME 58 +#define CATCH_STM_FRAME 59 +#define WHITEHOLE 60 +#define N_CLOSURE_TYPES 61 #endif /* RTS_STORAGE_CLOSURETYPES_H */ diff --git a/includes/rts/storage/Closures.h b/includes/rts/storage/Closures.h index 8928268..d7498e2 100644 --- a/includes/rts/storage/Closures.h +++ b/includes/rts/storage/Closures.h @@ -390,10 +390,10 @@ typedef struct StgInvariantCheckQueue_ { struct StgTRecHeader_ { StgHeader header; - TRecState state; struct StgTRecHeader_ *enclosing_trec; StgTRecChunk *current_chunk; StgInvariantCheckQueue *invariants_to_check; + TRecState state; }; typedef struct { @@ -416,4 +416,27 @@ typedef struct { StgClosure *alt_code; } StgCatchRetryFrame; +/* ---------------------------------------------------------------------------- + Messages + ------------------------------------------------------------------------- */ + +typedef struct Message_ { + StgHeader header; + struct Message_ *link; +} Message; + +typedef struct MessageWakeup_ { + StgHeader header; + Message *link; + StgTSO *tso; +} MessageWakeup; + +typedef struct MessageThrowTo_ { + StgHeader header; + Message *link; + StgTSO *source; + StgTSO *target; + StgClosure *exception; +} MessageThrowTo; + #endif /* RTS_STORAGE_CLOSURES_H */ diff --git a/includes/rts/storage/SMPClosureOps.h b/includes/rts/storage/SMPClosureOps.h index 582ec0e..8dee7cb 100644 --- a/includes/rts/storage/SMPClosureOps.h +++ b/includes/rts/storage/SMPClosureOps.h @@ -18,6 +18,7 @@ #else EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p); +EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p); EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info); #if defined(THREADED_RTS) @@ -43,11 +44,15 @@ EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p) } while (1); } -EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info) +EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p) { - // This is a strictly ordered write, so we need a write_barrier(): - write_barrier(); - p->header.info = info; + StgWord info; + info = xchg((P_)(void *)&p->header.info, (W_)&stg_WHITEHOLE_info); + if (info != (W_)&stg_WHITEHOLE_info) { + return (StgInfoTable *)info; + } else { + return NULL; + } } #else /* !THREADED_RTS */ @@ -56,12 +61,19 @@ EXTERN_INLINE StgInfoTable * lockClosure(StgClosure *p) { return (StgInfoTable *)p->header.info; } -EXTERN_INLINE void -unlockClosure(StgClosure *p STG_UNUSED, const StgInfoTable *info STG_UNUSED) -{ /* nothing */ } +EXTERN_INLINE StgInfoTable * +tryLockClosure(StgClosure *p) +{ return (StgInfoTable *)p->header.info; } #endif /* THREADED_RTS */ +EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info) +{ + // This is a strictly ordered write, so we need a write_barrier(): + write_barrier(); + p->header.info = info; +} + // Handy specialised versions of lockClosure()/unlockClosure() EXTERN_INLINE void lockTSO(StgTSO *tso); EXTERN_INLINE void lockTSO(StgTSO *tso) diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h index e8d97c5..e2015f2 100644 --- a/includes/rts/storage/TSO.h +++ b/includes/rts/storage/TSO.h @@ -46,7 +46,8 @@ typedef struct { /* Reason for thread being blocked. See comment above struct StgTso_. */ typedef union { StgClosure *closure; - struct StgTSO_ *tso; + struct MessageThrowTo_ *throwto; + struct MessageWakeup_ *wakeup; StgInt fd; /* StgInt instead of int, so that it's the same size as the ptrs */ #if defined(mingw32_HOST_OS) StgAsyncIOResult *async_result; @@ -87,7 +88,8 @@ typedef struct StgTSO_ { will already be dirty. */ - struct StgTSO_* global_link; /* Links all threads together */ + struct StgTSO_* global_link; // Links threads on the + // generation->threads lists StgWord dirty; /* non-zero => dirty */ /* @@ -108,9 +110,9 @@ typedef struct StgTSO_ { * setTSOLink(). */ - StgWord16 what_next; /* Values defined in Constants.h */ - StgWord16 why_blocked; /* Values defined in Constants.h */ - StgWord32 flags; + StgWord16 what_next; // Values defined in Constants.h + StgWord16 why_blocked; // Values defined in Constants.h + StgWord32 flags; // Values defined in Constants.h StgTSOBlockInfo block_info; StgThreadID id; int saved_errno; @@ -123,7 +125,7 @@ typedef struct StgTSO_ { exceptions. In order to access this field, the TSO must be locked using lockClosure/unlockClosure (see SMP.h). */ - struct StgTSO_ * blocked_exceptions; + struct MessageThrowTo_ * blocked_exceptions; #ifdef TICKY_TICKY /* TICKY-specific stuff would go here. */ @@ -167,7 +169,7 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target); tso->why_blocked tso->block_info location ---------------------------------------------------------------------- - NotBlocked NULL runnable_queue, or running + NotBlocked END_TSO_QUEUE runnable_queue, or running BlockedOnBlackHole the BLACKHOLE blackhole_queue @@ -175,7 +177,7 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target); BlockedOnSTM END_TSO_QUEUE STM wait queue(s) - BlockedOnException the TSO TSO->blocked_exception + BlockedOnMsgThrowTo MessageThrowTo * TSO->blocked_exception BlockedOnRead NULL blocked_queue BlockedOnWrite NULL blocked_queue @@ -189,7 +191,6 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target); tso->what_next == ThreadComplete or ThreadKilled tso->link == (could be on some queue somewhere) - tso->su == tso->stack + tso->stack_size tso->sp == tso->stack + tso->stack_size - 1 (i.e. top stack word) tso->sp[0] == return value of thread, if what_next == ThreadComplete, exception , if what_next == ThreadKilled diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h index e68282e..42e878f 100644 --- a/includes/stg/MiscClosures.h +++ b/includes/stg/MiscClosures.h @@ -114,6 +114,8 @@ RTS_INFO(stg_MUT_ARR_PTRS_FROZEN0_info); RTS_INFO(stg_MUT_VAR_CLEAN_info); RTS_INFO(stg_MUT_VAR_DIRTY_info); RTS_INFO(stg_END_TSO_QUEUE_info); +RTS_INFO(stg_MSG_WAKEUP_info); +RTS_INFO(stg_MSG_THROWTO_info); RTS_INFO(stg_MUT_CONS_info); RTS_INFO(stg_catch_info); RTS_INFO(stg_PAP_info); @@ -163,6 +165,8 @@ RTS_ENTRY(stg_MUT_ARR_PTRS_FROZEN0_entry); RTS_ENTRY(stg_MUT_VAR_CLEAN_entry); RTS_ENTRY(stg_MUT_VAR_DIRTY_entry); RTS_ENTRY(stg_END_TSO_QUEUE_entry); +RTS_ENTRY(stg_MSG_WAKEUP_entry); +RTS_ENTRY(stg_MSG_THROWTO_entry); RTS_ENTRY(stg_MUT_CONS_entry); RTS_ENTRY(stg_catch_entry); RTS_ENTRY(stg_PAP_entry); @@ -205,8 +209,6 @@ RTS_CLOSURE(stg_END_STM_CHUNK_LIST_closure); RTS_CLOSURE(stg_NO_TREC_closure); RTS_ENTRY(stg_NO_FINALIZER_entry); -RTS_ENTRY(stg_END_EXCEPTION_LIST_entry); -RTS_ENTRY(stg_EXCEPTION_CONS_entry); #if IN_STG_CODE extern DLL_IMPORT_RTS StgWordArray stg_CHARLIKE_closure; diff --git a/rts/Capability.c b/rts/Capability.c index ce6eceb..5f54eca 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -223,8 +223,7 @@ initCapability( Capability *cap, nat i ) cap->suspended_ccalls = NULL; cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; - cap->wakeup_queue_hd = END_TSO_QUEUE; - cap->wakeup_queue_tl = END_TSO_QUEUE; + cap->inbox = (Message*)END_TSO_QUEUE; cap->sparks_created = 0; cap->sparks_converted = 0; cap->sparks_pruned = 0; @@ -419,7 +418,7 @@ releaseCapability_ (Capability* cap, // If we have an unbound thread on the run queue, or if there's // anything else to do, give the Capability to a worker thread. if (always_wakeup || - !emptyRunQueue(cap) || !emptyWakeupQueue(cap) || + !emptyRunQueue(cap) || !emptyInbox(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap,cap->spare_workers); @@ -645,11 +644,11 @@ yieldCapability (Capability** pCap, Task *task) * ------------------------------------------------------------------------- */ void -wakeupThreadOnCapability (Capability *my_cap, +wakeupThreadOnCapability (Capability *cap, Capability *other_cap, StgTSO *tso) { - ACQUIRE_LOCK(&other_cap->lock); + MessageWakeup *msg; // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) if (tso->bound) { @@ -658,27 +657,20 @@ wakeupThreadOnCapability (Capability *my_cap, } tso->cap = other_cap; - ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1); + ASSERT(tso->why_blocked != BlockedOnMsgWakeup || + tso->block_info.closure->header.info == &stg_IND_info); - if (other_cap->running_task == NULL) { - // nobody is running this Capability, we can add our thread - // directly onto the run queue and start up a Task to run it. + ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info); - other_cap->running_task = myTask(); - // precond for releaseCapability_() and appendToRunQueue() + msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup)); + msg->header.info = &stg_MSG_WAKEUP_info; + msg->tso = tso; + tso->block_info.closure = (StgClosure *)msg; + dirty_TSO(cap, tso); + write_barrier(); + tso->why_blocked = BlockedOnMsgWakeup; - appendToRunQueue(other_cap,tso); - - releaseCapability_(other_cap,rtsFalse); - } else { - appendToWakeupQueue(my_cap,other_cap,tso); - other_cap->context_switch = 1; - // someone is running on this Capability, so it cannot be - // freed without first checking the wakeup queue (see - // releaseCapability_). - } - - RELEASE_LOCK(&other_cap->lock); + sendMessage(other_cap, (Message*)msg); } /* ---------------------------------------------------------------------------- @@ -881,8 +873,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, evac(user, (StgClosure **)(void *)&cap->run_queue_hd); evac(user, (StgClosure **)(void *)&cap->run_queue_tl); #if defined(THREADED_RTS) - evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd); - evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl); + evac(user, (StgClosure **)(void *)&cap->inbox); #endif for (incall = cap->suspended_ccalls; incall != NULL; incall=incall->next) { @@ -910,3 +901,29 @@ markCapabilities (evac_fn evac, void *user) { markSomeCapabilities(evac, user, 0, 1, rtsFalse); } + +/* ----------------------------------------------------------------------------- + Messages + -------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS + +void sendMessage(Capability *cap, Message *msg) +{ + ACQUIRE_LOCK(&cap->lock); + + msg->link = cap->inbox; + cap->inbox = msg; + + if (cap->running_task == NULL) { + cap->running_task = myTask(); + // precond for releaseCapability_() + releaseCapability_(cap,rtsFalse); + } else { + contextSwitchCapability(cap); + } + + RELEASE_LOCK(&cap->lock); +} + +#endif // THREADED_RTS diff --git a/rts/Capability.h b/rts/Capability.h index 41974dc..4030b5e 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -88,11 +88,8 @@ struct Capability_ { Task *returning_tasks_hd; // Singly-linked, with head/tail Task *returning_tasks_tl; - // A list of threads to append to this Capability's run queue at - // the earliest opportunity. These are threads that have been - // woken up by another Capability. - StgTSO *wakeup_queue_hd; - StgTSO *wakeup_queue_tl; + // Messages, or END_TSO_QUEUE. + Message *inbox; SparkPool *sparks; @@ -285,6 +282,18 @@ void markCapabilities (evac_fn evac, void *user); void traverseSparkQueues (evac_fn evac, void *user); /* ----------------------------------------------------------------------------- + Messages + -------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS + +INLINE_HEADER rtsBool emptyInbox(Capability *cap);; + +void sendMessage (Capability *cap, Message *msg); + +#endif // THREADED_RTS + +/* ----------------------------------------------------------------------------- * INLINE functions... private below here * -------------------------------------------------------------------------- */ @@ -333,6 +342,15 @@ contextSwitchCapability (Capability *cap) cap->context_switch = 1; } +#ifdef THREADED_RTS + +INLINE_HEADER rtsBool emptyInbox(Capability *cap) +{ + return (cap->inbox == (Message*)END_TSO_QUEUE); +} + +#endif + END_RTS_PRIVATE #endif /* CAPABILITY_H */ diff --git a/rts/ClosureFlags.c b/rts/ClosureFlags.c index 477a892..358cb40 100644 --- a/rts/ClosureFlags.c +++ b/rts/ClosureFlags.c @@ -74,20 +74,16 @@ StgWord16 closure_flags[] = { [MUT_VAR_CLEAN] = (_HNF| _NS| _MUT|_UPT ), [MUT_VAR_DIRTY] = (_HNF| _NS| _MUT|_UPT ), [WEAK] = (_HNF| _NS| _UPT ), - [STABLE_NAME] = (_HNF| _NS| _UPT ), + [PRIM] = (_HNF| _NS| _UPT ), + [MUT_PRIM] = (_HNF| _NS| _MUT|_UPT ), [TSO] = (_HNF| _NS| _MUT|_UPT ), - [TVAR_WATCH_QUEUE] = ( _NS| _MUT|_UPT ), - [INVARIANT_CHECK_QUEUE]= ( _NS| _MUT|_UPT ), - [ATOMIC_INVARIANT] = ( _NS| _MUT|_UPT ), - [TVAR] = (_HNF| _NS| _MUT|_UPT ), [TREC_CHUNK] = ( _NS| _MUT|_UPT ), - [TREC_HEADER] = ( _NS| _MUT|_UPT ), [ATOMICALLY_FRAME] = ( _BTM ), [CATCH_RETRY_FRAME] = ( _BTM ), [CATCH_STM_FRAME] = ( _BTM ), [WHITEHOLE] = ( 0 ) }; -#if N_CLOSURE_TYPES != 65 +#if N_CLOSURE_TYPES != 61 #error Closure types changed: update ClosureFlags.c! #endif diff --git a/rts/Exception.cmm b/rts/Exception.cmm index 6c887c2..55c79ce 100644 --- a/rts/Exception.cmm +++ b/rts/Exception.cmm @@ -56,7 +56,7 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL ) CInt r; StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) & - ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32); + %lobits32(~(TSO_BLOCKEX|TSO_INTERRUPTIBLE)); /* Eagerly raise a blocked exception, if there is one */ if (StgTSO_blocked_exceptions(CurrentTSO) != END_TSO_QUEUE) { @@ -99,8 +99,8 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL ) INFO_TABLE_RET( stg_blockAsyncExceptionszh_ret, RET_SMALL ) { - StgTSO_flags(CurrentTSO) = - StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32; + StgTSO_flags(CurrentTSO) = %lobits32( + TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE); Sp_adj(1); jump %ENTRY_CODE(Sp(0)); @@ -113,8 +113,8 @@ stg_blockAsyncExceptionszh if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) == 0) { - StgTSO_flags(CurrentTSO) = - StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32; + StgTSO_flags(CurrentTSO) = %lobits32( + TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE); /* avoid growing the stack unnecessarily */ if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) { @@ -142,8 +142,8 @@ stg_unblockAsyncExceptionszh /* If exceptions are already unblocked, there's nothing to do */ if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) != 0) { - StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) & - ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32); + StgTSO_flags(CurrentTSO) = %lobits32( + TO_W_(StgTSO_flags(CurrentTSO)) & ~(TSO_BLOCKEX|TSO_INTERRUPTIBLE)); /* avoid growing the stack unnecessarily */ if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) { @@ -252,27 +252,22 @@ stg_killThreadzh } } else { W_ out; - W_ retcode; + W_ msg; out = Sp - WDS(1); /* ok to re-use stack space here */ - (retcode) = foreign "C" throwTo(MyCapability() "ptr", - CurrentTSO "ptr", - target "ptr", - exception "ptr", - out "ptr") [R1,R2]; + (msg) = foreign "C" throwTo(MyCapability() "ptr", + CurrentTSO "ptr", + target "ptr", + exception "ptr") [R1,R2]; - switch [THROWTO_SUCCESS .. THROWTO_BLOCKED] (retcode) { - - case THROWTO_SUCCESS: { + if (msg == NULL) { jump %ENTRY_CODE(Sp(0)); - } - - case THROWTO_BLOCKED: { - R3 = W_[out]; - // we must block, and call throwToReleaseTarget() before returning + } else { + StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo; + StgTSO_block_info(CurrentTSO) = msg; + // we must block, and unlock the message before returning jump stg_block_throwto; } - } } } @@ -507,8 +502,8 @@ retry_pop_stack: /* Ensure that async excpetions are blocked when running the handler. */ - StgTSO_flags(CurrentTSO) = - StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32; + StgTSO_flags(CurrentTSO) = %lobits32( + TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE); /* Call the handler, passing the exception value and a realworld * token as arguments. diff --git a/rts/FrontPanel.c b/rts/FrontPanel.c index 163a7c0..ebba405 100644 --- a/rts/FrontPanel.c +++ b/rts/FrontPanel.c @@ -697,7 +697,7 @@ residencyCensus( void ) break; case WEAK: - case STABLE_NAME: + case PRIM: case MVAR: case MUT_VAR: /* case MUT_CONS: FIXME: case does not exist */ diff --git a/rts/HeapStackCheck.cmm b/rts/HeapStackCheck.cmm index b516ef2..a528a3f 100644 --- a/rts/HeapStackCheck.cmm +++ b/rts/HeapStackCheck.cmm @@ -631,9 +631,8 @@ INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused ) stg_block_throwto_finally { -#ifdef THREADED_RTS - foreign "C" throwToReleaseTarget (R3 "ptr"); -#endif + // unlock the throwto message + unlockClosure(StgTSO_block_info(CurrentTSO), stg_MSG_THROWTO_info); jump StgReturn; } diff --git a/rts/LdvProfile.c b/rts/LdvProfile.c index c2e7d7e..ccaf10c 100644 --- a/rts/LdvProfile.c +++ b/rts/LdvProfile.c @@ -109,7 +109,7 @@ processHeapClosureForDead( StgClosure *c ) case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: case BCO: - case STABLE_NAME: + case PRIM: case TVAR_WATCH_QUEUE: case TVAR: case TREC_HEADER: diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm index 5325c85..bf81eee 100644 --- a/rts/PrimOps.cmm +++ b/rts/PrimOps.cmm @@ -542,9 +542,9 @@ stg_forkzh closure "ptr") []; /* start blocked if the current thread is blocked */ - StgTSO_flags(threadid) = - StgTSO_flags(threadid) | (StgTSO_flags(CurrentTSO) & - (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32)); + StgTSO_flags(threadid) = %lobits16( + TO_W_(StgTSO_flags(threadid)) | + TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE)); foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") []; @@ -572,9 +572,9 @@ stg_forkOnzh closure "ptr") []; /* start blocked if the current thread is blocked */ - StgTSO_flags(threadid) = - StgTSO_flags(threadid) | (StgTSO_flags(CurrentTSO) & - (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32)); + StgTSO_flags(threadid) = %lobits16( + TO_W_(StgTSO_flags(threadid)) | + TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE)); foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") []; diff --git a/rts/Printer.c b/rts/Printer.c index 1b8a6dd..e981329 100644 --- a/rts/Printer.c +++ b/rts/Printer.c @@ -160,6 +160,12 @@ printClosure( StgClosure *obj ) printStdObjPayload(obj); break; + case PRIM: + debugBelch("PRIM("); + printPtr((StgPtr)obj->header.info); + printStdObjPayload(obj); + break; + case THUNK: case THUNK_1_0: case THUNK_0_1: case THUNK_1_1: case THUNK_0_2: case THUNK_2_0: @@ -356,10 +362,6 @@ printClosure( StgClosure *obj ) /* ToDo: chase 'link' ? */ break; - case STABLE_NAME: - debugBelch("STABLE_NAME(%lu)\n", (lnat)((StgStableName*)obj)->sn); - break; - case TSO: debugBelch("TSO("); debugBelch("%lu (%p)",(unsigned long)(((StgTSO*)obj)->id), (StgTSO*)obj); @@ -1132,14 +1134,10 @@ char *closure_type_names[] = { [MUT_VAR_CLEAN] = "MUT_VAR_CLEAN", [MUT_VAR_DIRTY] = "MUT_VAR_DIRTY", [WEAK] = "WEAK", - [STABLE_NAME] = "STABLE_NAME", + [PRIM] = "PRIM", + [MUT_PRIM] = "MUT_PRIM", [TSO] = "TSO", - [TVAR_WATCH_QUEUE] = "TVAR_WATCH_QUEUE", - [INVARIANT_CHECK_QUEUE] = "INVARIANT_CHECK_QUEUE", - [ATOMIC_INVARIANT] = "ATOMIC_INVARIANT", - [TVAR] = "TVAR", [TREC_CHUNK] = "TREC_CHUNK", - [TREC_HEADER] = "TREC_HEADER", [ATOMICALLY_FRAME] = "ATOMICALLY_FRAME", [CATCH_RETRY_FRAME] = "CATCH_RETRY_FRAME", [CATCH_STM_FRAME] = "CATCH_STM_FRAME", diff --git a/rts/ProfHeap.c b/rts/ProfHeap.c index 15337d4..e90051c 100644 --- a/rts/ProfHeap.c +++ b/rts/ProfHeap.c @@ -912,7 +912,8 @@ heapCensusChain( Census *census, bdescr *bd ) case MVAR_CLEAN: case MVAR_DIRTY: case WEAK: - case STABLE_NAME: + case PRIM: + case MUT_PRIM: case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: prim = rtsTrue; @@ -960,31 +961,6 @@ heapCensusChain( Census *census, bdescr *bd ) break; #endif - case TREC_HEADER: - prim = rtsTrue; - size = sizeofW(StgTRecHeader); - break; - - case TVAR_WATCH_QUEUE: - prim = rtsTrue; - size = sizeofW(StgTVarWatchQueue); - break; - - case INVARIANT_CHECK_QUEUE: - prim = rtsTrue; - size = sizeofW(StgInvariantCheckQueue); - break; - - case ATOMIC_INVARIANT: - prim = rtsTrue; - size = sizeofW(StgAtomicInvariant); - break; - - case TVAR: - prim = rtsTrue; - size = sizeofW(StgTVar); - break; - case TREC_CHUNK: prim = rtsTrue; size = sizeofW(StgTRecChunk); diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index ca5e5ea..d54f823 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -30,10 +30,14 @@ static void raiseAsync (Capability *cap, static void removeFromQueues(Capability *cap, StgTSO *tso); -static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target); +static void blockedThrowTo (Capability *cap, + StgTSO *target, MessageThrowTo *msg); -static void performBlockedException (Capability *cap, - StgTSO *source, StgTSO *target); +static void throwToSendMsg (Capability *cap USED_IF_THREADS, + Capability *target_cap USED_IF_THREADS, + MessageThrowTo *msg USED_IF_THREADS); + +static void performBlockedException (Capability *cap, MessageThrowTo *msg); /* ----------------------------------------------------------------------------- throwToSingleThreaded @@ -96,59 +100,85 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here) may be blocked and could be woken up at any point by another CPU. We have some delicate synchronisation to do. - There is a completely safe fallback scheme: it is always possible - to just block the source TSO on the target TSO's blocked_exceptions - queue. This queue is locked using lockTSO()/unlockTSO(). It is - checked at regular intervals: before and after running a thread - (schedule() and threadPaused() respectively), and just before GC - (scheduleDoGC()). Activating a thread on this queue should be done - using maybePerformBlockedException(): this is done in the context - of the target thread, so the exception can be raised eagerly. - - This fallback scheme works even if the target thread is complete or - killed: scheduleDoGC() will discover the blocked thread before the - target is GC'd. - - Blocking the source thread on the target thread's blocked_exception - queue is also employed when the target thread is currently blocking - exceptions (ie. inside Control.Exception.block). - - We could use the safe fallback scheme exclusively, but that - wouldn't be ideal: most calls to throwTo would block immediately, - possibly until the next GC, which might require the deadlock - detection mechanism to kick in. So we try to provide promptness - wherever possible. - - We can promptly deliver the exception if the target thread is: - - - runnable, on the same Capability as the source thread (because - we own the run queue and therefore the target thread). - - - blocked, and we can obtain exclusive access to it. Obtaining - exclusive access to the thread depends on how it is blocked. - - We must also be careful to not trip over threadStackOverflow(), - which might be moving the TSO to enlarge its stack. - lockTSO()/unlockTSO() are used here too. - + The underlying scheme when multiple Capabilities are in use is + message passing: when the target of a throwTo is on another + Capability, we send a message (a MessageThrowTo closure) to that + Capability. + + If the throwTo needs to block because the target TSO is masking + exceptions (the TSO_BLOCKEX flag), then the message is placed on + the blocked_exceptions queue attached to the target TSO. When the + target TSO enters the unmasked state again, it must check the + queue. The blocked_exceptions queue is not locked; only the + Capability owning the TSO may modify it. + + To make things simpler for throwTo, we always create the message + first before deciding what to do. The message may get sent, or it + may get attached to a TSO's blocked_exceptions queue, or the + exception may get thrown immediately and the message dropped, + depending on the current state of the target. + + Currently we send a message if the target belongs to another + Capability, and it is + + - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo, + BlockedOnCCall + + - or it is masking exceptions (TSO_BLOCKEX) + + Currently, if the target is BlockedOnMVar, BlockedOnSTM, or + BlockedOnBlackHole then we acquire ownership of the TSO by locking + its parent container (e.g. the MVar) and then raise the exception. + We might change these cases to be more message-passing-like in the + future. + Returns: - THROWTO_SUCCESS exception was raised, ok to continue + NULL exception was raised, ok to continue - THROWTO_BLOCKED exception was not raised; block the source - thread then call throwToReleaseTarget() when - the source thread is properly tidied away. + MessageThrowTo * exception was not raised; the source TSO + should now put itself in the state + BlockedOnMsgThrowTo, and when it is ready + it should unlock the mssage using + unlockClosure(msg, &stg_MSG_THROWTO_info); + If it decides not to raise the exception after + all, it can revoke it safely with + unlockClosure(msg, &stg_IND_info); -------------------------------------------------------------------------- */ -nat +MessageThrowTo * throwTo (Capability *cap, // the Capability we hold StgTSO *source, // the TSO sending the exception (or NULL) StgTSO *target, // the TSO receiving the exception - StgClosure *exception, // the exception closure - /*[out]*/ void **out USED_IF_THREADS) + StgClosure *exception) // the exception closure +{ + MessageThrowTo *msg; + + msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo)); + // message starts locked; the caller has to unlock it when it is + // ready. + msg->header.info = &stg_WHITEHOLE_info; + msg->source = source; + msg->target = target; + msg->exception = exception; + + switch (throwToMsg(cap, msg)) + { + case THROWTO_SUCCESS: + return NULL; + case THROWTO_BLOCKED: + default: + return msg; + } +} + + +nat +throwToMsg (Capability *cap, MessageThrowTo *msg) { StgWord status; + StgTSO *target = msg->target; ASSERT(target != END_TSO_QUEUE); @@ -159,13 +189,10 @@ throwTo (Capability *cap, // the Capability we hold // ASSERT(get_itbl(target)->type == TSO); } - if (source != NULL) { - debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu", - (unsigned long)source->id, (unsigned long)target->id); - } else { - debugTrace(DEBUG_sched, "throwTo: from RTS to thread %lu", - (unsigned long)target->id); - } + debugTraceCap(DEBUG_sched, cap, + "throwTo: from thread %lu to thread %lu", + (unsigned long)msg->source->id, + (unsigned long)msg->target->id); #ifdef DEBUG traceThreadStatus(DEBUG_sched, target); @@ -173,6 +200,7 @@ throwTo (Capability *cap, // the Capability we hold goto check_target; retry: + write_barrier(); debugTrace(DEBUG_sched, "throwTo: retrying..."); check_target: @@ -188,6 +216,7 @@ check_target: switch (status) { case NotBlocked: + case BlockedOnMsgWakeup: /* if status==NotBlocked, and target->cap == cap, then we own this TSO and can raise the exception. @@ -251,37 +280,80 @@ check_target: write_barrier(); target_cap = target->cap; - if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) { - // It's on our run queue and not blocking exceptions - raiseAsync(cap, target, exception, rtsFalse, NULL); - return THROWTO_SUCCESS; - } else { - // Otherwise, just block on the blocked_exceptions queue - // of the target thread. The queue will get looked at - // soon enough: it is checked before and after running a - // thread, and during GC. - lockTSO(target); - - // Avoid race with threadStackOverflow, which may have - // just moved this TSO. - if (target->what_next == ThreadRelocated) { - unlockTSO(target); - target = target->_link; - goto retry; - } - // check again for ThreadComplete and ThreadKilled. This - // cooperates with scheduleHandleThreadFinished to ensure - // that we never miss any threads that are throwing an - // exception to a thread in the process of terminating. - if (target->what_next == ThreadComplete - || target->what_next == ThreadKilled) { - unlockTSO(target); + if (target_cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; + } else { + if ((target->flags & TSO_BLOCKEX) == 0) { + // It's on our run queue and not blocking exceptions + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); return THROWTO_SUCCESS; + } else { + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; } - blockedThrowTo(cap,source,target); - *out = target; - return THROWTO_BLOCKED; - } + } + } + + case BlockedOnMsgThrowTo: + { + Capability *target_cap; + const StgInfoTable *i; + MessageThrowTo *m; + + m = target->block_info.throwto; + + // target is local to this cap, but has sent a throwto + // message to another cap. + // + // The source message is locked. We need to revoke the + // target's message so that we can raise the exception, so + // we attempt to lock it. + + // There's a possibility of a deadlock if two threads are both + // trying to throwTo each other (or more generally, a cycle of + // threads). To break the symmetry we compare the addresses + // of the MessageThrowTo objects, and the one for which m < + // msg gets to spin, while the other can only try to lock + // once, but must then back off and unlock both before trying + // again. + if (m < msg) { + i = lockClosure((StgClosure *)m); + } else { + i = tryLockClosure((StgClosure *)m); + if (i == NULL) { +// debugBelch("collision\n"); + throwToSendMsg(cap, target->cap, msg); + return THROWTO_BLOCKED; + } + } + + if (i != &stg_MSG_THROWTO_info) { + // if it's an IND, this TSO has been woken up by another Cap + unlockClosure((StgClosure*)m, i); + goto retry; + } + + target_cap = target->cap; + if (target_cap != cap) { + unlockClosure((StgClosure*)m, i); + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; + } + + if ((target->flags & TSO_BLOCKEX) && + ((target->flags & TSO_INTERRUPTIBLE) == 0)) { + unlockClosure((StgClosure*)m, i); + blockedThrowTo(cap,target,msg); + return THROWTO_BLOCKED; + } + + // nobody else can wake up this TSO after we claim the message + unlockClosure((StgClosure*)m, &stg_IND_info); + + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); + unblockOne(cap, target); + return THROWTO_SUCCESS; } case BlockedOnMVar: @@ -322,14 +394,17 @@ check_target: if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - lockClosure((StgClosure *)target); - blockedThrowTo(cap,source,target); + Capability *target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap,target_cap,msg); + } else { + blockedThrowTo(cap,target,msg); + } unlockClosure((StgClosure *)mvar, info); - *out = target; - return THROWTO_BLOCKED; // caller releases TSO + return THROWTO_BLOCKED; } else { removeThreadFromMVarQueue(cap, mvar, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); unblockOne(cap, target); unlockClosure((StgClosure *)mvar, info); return THROWTO_SUCCESS; @@ -346,84 +421,23 @@ check_target: } if (target->flags & TSO_BLOCKEX) { - lockTSO(target); - blockedThrowTo(cap,source,target); + Capability *target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap,target_cap,msg); + } else { + blockedThrowTo(cap,target,msg); + } RELEASE_LOCK(&sched_mutex); - *out = target; - return THROWTO_BLOCKED; // caller releases TSO + return THROWTO_BLOCKED; // caller releases lock } else { removeThreadFromQueue(cap, &blackhole_queue, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); unblockOne(cap, target); RELEASE_LOCK(&sched_mutex); return THROWTO_SUCCESS; } } - case BlockedOnException: - { - StgTSO *target2; - StgInfoTable *info; - - /* - To obtain exclusive access to a BlockedOnException thread, - we must call lockClosure() on the TSO on which it is blocked. - Since the TSO might change underneath our feet, after we - call lockClosure() we must check that - - (a) the closure we locked is actually a TSO - (b) the original thread is still BlockedOnException, - (c) the original thread is still blocked on the TSO we locked - and (d) the target thread has not been relocated. - - We synchronise with threadStackOverflow() (which relocates - threads) using lockClosure()/unlockClosure(). - */ - target2 = target->block_info.tso; - - info = lockClosure((StgClosure *)target2); - if (info != &stg_TSO_info) { - unlockClosure((StgClosure *)target2, info); - goto retry; - } - if (target->what_next == ThreadRelocated) { - target = target->_link; - unlockTSO(target2); - goto retry; - } - if (target2->what_next == ThreadRelocated) { - target->block_info.tso = target2->_link; - unlockTSO(target2); - goto retry; - } - if (target->why_blocked != BlockedOnException - || target->block_info.tso != target2) { - unlockTSO(target2); - goto retry; - } - - /* - Now we have exclusive rights to the target TSO... - - If it is blocking exceptions, add the source TSO to its - blocked_exceptions queue. Otherwise, raise the exception. - */ - if ((target->flags & TSO_BLOCKEX) && - ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - lockTSO(target); - blockedThrowTo(cap,source,target); - unlockTSO(target2); - *out = target; - return THROWTO_BLOCKED; - } else { - removeThreadFromQueue(cap, &target2->blocked_exceptions, target); - raiseAsync(cap, target, exception, rtsFalse, NULL); - unblockOne(cap, target); - unlockTSO(target2); - return THROWTO_SUCCESS; - } - } - case BlockedOnSTM: lockTSO(target); // Unblocking BlockedOnSTM threads requires the TSO to be @@ -434,11 +448,16 @@ check_target: } if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - blockedThrowTo(cap,source,target); - *out = target; + Capability *target_cap = target->cap; + if (target->cap != cap) { + throwToSendMsg(cap,target_cap,msg); + } else { + blockedThrowTo(cap,target,msg); + } + unlockTSO(target); return THROWTO_BLOCKED; } else { - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); unblockOne(cap, target); unlockTSO(target); return THROWTO_SUCCESS; @@ -446,19 +465,18 @@ check_target: case BlockedOnCCall: case BlockedOnCCall_NoUnblockExc: - // I don't think it's possible to acquire ownership of a - // BlockedOnCCall thread. We just assume that the target - // thread is blocking exceptions, and block on its - // blocked_exception queue. - lockTSO(target); - if (target->why_blocked != BlockedOnCCall && - target->why_blocked != BlockedOnCCall_NoUnblockExc) { - unlockTSO(target); - goto retry; - } - blockedThrowTo(cap,source,target); - *out = target; + { + Capability *target_cap; + + target_cap = target->cap; + if (target_cap != cap) { + throwToSendMsg(cap, target_cap, msg); + return THROWTO_BLOCKED; + } + + blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; + } #ifndef THREADEDED_RTS case BlockedOnRead: @@ -469,11 +487,11 @@ check_target: #endif if ((target->flags & TSO_BLOCKEX) && ((target->flags & TSO_INTERRUPTIBLE) == 0)) { - blockedThrowTo(cap,source,target); + blockedThrowTo(cap,target,msg); return THROWTO_BLOCKED; } else { removeFromQueues(cap,target); - raiseAsync(cap, target, exception, rtsFalse, NULL); + raiseAsync(cap, target, msg->exception, rtsFalse, NULL); return THROWTO_SUCCESS; } #endif @@ -484,33 +502,34 @@ check_target: barf("throwTo"); } -// Block a TSO on another TSO's blocked_exceptions queue. -// Precondition: we hold an exclusive lock on the target TSO (this is -// complex to achieve as there's no single lock on a TSO; see -// throwTo()). static void -blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target) +throwToSendMsg (Capability *cap STG_UNUSED, + Capability *target_cap USED_IF_THREADS, + MessageThrowTo *msg USED_IF_THREADS) + { - if (source != NULL) { - debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id); - setTSOLink(cap, source, target->blocked_exceptions); - target->blocked_exceptions = source; - dirty_TSO(cap,target); // we modified the blocked_exceptions queue - - source->block_info.tso = target; - write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is. - source->why_blocked = BlockedOnException; - } -} +#ifdef THREADED_RTS + debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no); + sendMessage(target_cap, (Message*)msg); +#endif +} -#ifdef THREADED_RTS -void -throwToReleaseTarget (void *tso) +// Block a throwTo message on the target TSO's blocked_exceptions +// queue. The current Capability must own the target TSO in order to +// modify the blocked_exceptions queue. +static void +blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg) { - unlockTSO((StgTSO *)tso); + debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu", + (unsigned long)target->id); + + ASSERT(target->cap == cap); + + msg->link = (Message*)target->blocked_exceptions; + target->blocked_exceptions = msg; + dirty_TSO(cap,target); // we modified the blocked_exceptions queue } -#endif /* ----------------------------------------------------------------------------- Waking up threads blocked in throwTo @@ -532,10 +551,11 @@ throwToReleaseTarget (void *tso) int maybePerformBlockedException (Capability *cap, StgTSO *tso) { - StgTSO *source; + MessageThrowTo *msg; + const StgInfoTable *i; if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) { - if (tso->blocked_exceptions != END_TSO_QUEUE) { + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { awakenBlockedExceptionQueue(cap,tso); return 1; } else { @@ -543,32 +563,30 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) } } - if (tso->blocked_exceptions != END_TSO_QUEUE && + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && (tso->flags & TSO_BLOCKEX) != 0) { debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id); } - if (tso->blocked_exceptions != END_TSO_QUEUE + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && ((tso->flags & TSO_BLOCKEX) == 0 || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) { - // Lock the TSO, this gives us exclusive access to the queue - lockTSO(tso); - - // Check the queue again; it might have changed before we - // locked it. - if (tso->blocked_exceptions == END_TSO_QUEUE) { - unlockTSO(tso); - return 0; - } - // We unblock just the first thread on the queue, and perform // its throw immediately. - source = tso->blocked_exceptions; - performBlockedException(cap, source, tso); - tso->blocked_exceptions = unblockOne_(cap, source, - rtsFalse/*no migrate*/); - unlockTSO(tso); + loop: + msg = tso->blocked_exceptions; + if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0; + i = lockClosure((StgClosure*)msg); + tso->blocked_exceptions = (MessageThrowTo*)msg->link; + if (i == &stg_IND_info) { + unlockClosure((StgClosure*)msg,i); + goto loop; + } + + performBlockedException(cap, msg); + unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); + unlockClosure((StgClosure*)msg,&stg_IND_info); return 1; } return 0; @@ -580,25 +598,34 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso) void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso) { - lockTSO(tso); - awakenBlockedQueue(cap, tso->blocked_exceptions); - tso->blocked_exceptions = END_TSO_QUEUE; - unlockTSO(tso); + MessageThrowTo *msg; + const StgInfoTable *i; + + for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE; + msg = (MessageThrowTo*)msg->link) { + i = lockClosure((StgClosure *)msg); + if (i != &stg_IND_info) { + unblockOne_(cap, msg->source, rtsFalse/*no migrate*/); + } + unlockClosure((StgClosure *)msg,i); + } + tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; } static void -performBlockedException (Capability *cap, StgTSO *source, StgTSO *target) +performBlockedException (Capability *cap, MessageThrowTo *msg) { - StgClosure *exception; + StgTSO *source; + + source = msg->source; - ASSERT(source->why_blocked == BlockedOnException); - ASSERT(source->block_info.tso->id == target->id); + ASSERT(source->why_blocked == BlockedOnMsgThrowTo); + ASSERT(source->block_info.closure == (StgClosure *)msg); ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info); - ASSERT(((StgTSO *)source->sp[1])->id == target->id); + ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id); // check ids not pointers, because the thread might be relocated - exception = (StgClosure *)source->sp[2]; - throwToSingleThreaded(cap, target, exception); + throwToSingleThreaded(cap, msg->target, msg->exception); source->sp += 3; } @@ -637,22 +664,25 @@ removeFromQueues(Capability *cap, StgTSO *tso) removeThreadFromQueue(cap, &blackhole_queue, tso); goto done; - case BlockedOnException: - { - StgTSO *target = tso->block_info.tso; - - // NO: when called by threadPaused(), we probably have this - // TSO already locked (WHITEHOLEd) because we just placed - // ourselves on its queue. - // ASSERT(get_itbl(target)->type == TSO); - - while (target->what_next == ThreadRelocated) { - target = target->_link; - } - - removeThreadFromQueue(cap, &target->blocked_exceptions, tso); - goto done; - } + case BlockedOnMsgWakeup: + { + // kill the message, atomically: + tso->block_info.wakeup->header.info = &stg_IND_info; + break; + } + + case BlockedOnMsgThrowTo: + { + MessageThrowTo *m = tso->block_info.throwto; + // The message is locked by us, unless we got here via + // deleteAllThreads(), in which case we own all the + // capabilities. + // ASSERT(m->header.info == &stg_WHITEHOLE_info); + + // unlock and revoke it at the same time + unlockClosure((StgClosure*)m,&stg_IND_info); + break; + } #if !defined(THREADED_RTS) case BlockedOnRead: @@ -743,6 +773,10 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, } #endif + while (tso->what_next == ThreadRelocated) { + tso = tso->_link; + } + // mark it dirty; we're about to change its stack. dirty_TSO(cap, tso); diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h index 96eb96e..5137d41 100644 --- a/rts/RaiseAsync.h +++ b/rts/RaiseAsync.h @@ -29,16 +29,13 @@ void suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here); -nat throwTo (Capability *cap, // the Capability we hold - StgTSO *source, // the TSO sending the exception - StgTSO *target, // the TSO receiving the exception - StgClosure *exception, // the exception closure - /*[out]*/ void **out // pass to throwToReleaseTarget() - ); +MessageThrowTo *throwTo (Capability *cap, // the Capability we hold + StgTSO *source, + StgTSO *target, + StgClosure *exception); // the exception closure -#ifdef THREADED_RTS -void throwToReleaseTarget (void *tso); -#endif +nat throwToMsg (Capability *cap, + MessageThrowTo *msg); int maybePerformBlockedException (Capability *cap, StgTSO *tso); void awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso); @@ -52,7 +49,7 @@ interruptible(StgTSO *t) { switch (t->why_blocked) { case BlockedOnMVar: - case BlockedOnException: + case BlockedOnMsgThrowTo: case BlockedOnRead: case BlockedOnWrite: #if defined(mingw32_HOST_OS) diff --git a/rts/RetainerProfile.c b/rts/RetainerProfile.c index 4fca19c..b7bc909 100644 --- a/rts/RetainerProfile.c +++ b/rts/RetainerProfile.c @@ -509,7 +509,7 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child ) // layout.payload.ptrs, no SRT case CONSTR: - case STABLE_NAME: + case PRIM: case BCO: case CONSTR_STATIC: init_ptrs(&se.info, get_itbl(c)->layout.payload.ptrs, @@ -883,7 +883,7 @@ pop( StgClosure **c, StgClosure **cp, retainer *r ) } case CONSTR: - case STABLE_NAME: + case PRIM: case BCO: case CONSTR_STATIC: // StgMutArrPtr.ptrs, no SRT @@ -1108,7 +1108,7 @@ isRetainer( StgClosure *c ) case CONSTR_STATIC: case FUN_STATIC: // misc - case STABLE_NAME: + case PRIM: case BCO: case ARR_WORDS: // STM diff --git a/rts/RtsMessages.c b/rts/RtsMessages.c index e2a30a6..6e75abc 100644 --- a/rts/RtsMessages.c +++ b/rts/RtsMessages.c @@ -9,6 +9,8 @@ #include "PosixSource.h" #include "Rts.h" +#include "eventlog/EventLog.h" + #include #include #include @@ -161,6 +163,10 @@ rtsFatalInternalErrorFn(const char *s, va_list ap) fflush(stderr); } +#ifdef TRACING + if (RtsFlags.TraceFlags.tracing == TRACE_EVENTLOG) endEventLogging(); +#endif + abort(); // stg_exit(EXIT_INTERNAL_ERROR); } diff --git a/rts/STM.c b/rts/STM.c index ed5a722..be61538 100644 --- a/rts/STM.c +++ b/rts/STM.c @@ -352,8 +352,7 @@ static StgBool watcher_is_tso(StgTVarWatchQueue *q) { static StgBool watcher_is_invariant(StgTVarWatchQueue *q) { StgClosure *c = q -> closure; - StgInfoTable *info = get_itbl(c); - return (info -> type) == ATOMIC_INVARIANT; + return (c->header.info == &stg_ATOMIC_INVARIANT_info); } /*......................................................................*/ diff --git a/rts/Schedule.c b/rts/Schedule.c index 4cca469..70e0246 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -139,7 +139,7 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); -static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); +static void scheduleProcessInbox(Capability *cap); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); static void schedulePushWork(Capability *cap, Task *task); @@ -618,7 +618,7 @@ scheduleFindWork (Capability *cap) // list each time around the scheduler. if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - scheduleCheckWakeupThreads(cap); + scheduleProcessInbox(cap); scheduleCheckBlockedThreads(cap); @@ -673,7 +673,7 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield) if (!force_yield && !shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || - !emptyWakeupQueue(cap) || + !emptyInbox(cap) || blackholes_need_checking || sched_state >= SCHED_INTERRUPTING)) return; @@ -725,7 +725,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, for (i=0, n_free_caps=0; i < n_capabilities; i++) { cap0 = &capabilities[i]; if (cap != cap0 && tryGrabCapability(cap0,task)) { - if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) { + if (!emptyRunQueue(cap0) + || cap->returning_tasks_hd != NULL + || cap->inbox != (Message*)END_TSO_QUEUE) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! releaseCapability(cap0); @@ -871,23 +873,89 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) * Check for threads woken up by other Capabilities * ------------------------------------------------------------------------- */ +#if defined(THREADED_RTS) +static void +executeMessage (Capability *cap, Message *m) +{ + const StgInfoTable *i; + +loop: + write_barrier(); // allow m->header to be modified by another thread + i = m->header.info; + if (i == &stg_MSG_WAKEUP_info) + { + MessageWakeup *w = (MessageWakeup *)m; + StgTSO *tso = w->tso; + debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", + (lnat)tso->id); + ASSERT(tso->cap == cap); + ASSERT(tso->why_blocked == BlockedOnMsgWakeup); + ASSERT(tso->block_info.closure == (StgClosure *)m); + tso->why_blocked = NotBlocked; + appendToRunQueue(cap, tso); + } + else if (i == &stg_MSG_THROWTO_info) + { + MessageThrowTo *t = (MessageThrowTo *)m; + nat r; + const StgInfoTable *i; + + i = lockClosure((StgClosure*)m); + if (i != &stg_MSG_THROWTO_info) { + unlockClosure((StgClosure*)m, i); + goto loop; + } + + debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", + (lnat)t->source->id, (lnat)t->target->id); + + ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo); + ASSERT(t->source->block_info.closure == (StgClosure *)m); + + r = throwToMsg(cap, t); + + switch (r) { + case THROWTO_SUCCESS: + ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info); + t->source->sp += 3; + unblockOne(cap, t->source); + // this message is done + unlockClosure((StgClosure*)m, &stg_IND_info); + break; + case THROWTO_BLOCKED: + // unlock the message + unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info); + break; + } + } + else if (i == &stg_IND_info) + { + // message was revoked + return; + } + else if (i == &stg_WHITEHOLE_info) + { + goto loop; + } + else + { + barf("executeMessage: %p", i); + } +} +#endif + static void -scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) +scheduleProcessInbox (Capability *cap USED_IF_THREADS) { #if defined(THREADED_RTS) - // Any threads that were woken up by other Capabilities get - // appended to our run queue. - if (!emptyWakeupQueue(cap)) { - ACQUIRE_LOCK(&cap->lock); - if (emptyRunQueue(cap)) { - cap->run_queue_hd = cap->wakeup_queue_hd; - cap->run_queue_tl = cap->wakeup_queue_tl; - } else { - setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd); - cap->run_queue_tl = cap->wakeup_queue_tl; - } - cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; - RELEASE_LOCK(&cap->lock); + Message *m; + + while (!emptyInbox(cap)) { + ACQUIRE_LOCK(&cap->lock); + m = cap->inbox; + cap->inbox = m->link; + RELEASE_LOCK(&cap->lock); + executeMessage(cap, (Message *)m); } #endif } @@ -983,7 +1051,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) switch (task->incall->tso->why_blocked) { case BlockedOnSTM: case BlockedOnBlackHole: - case BlockedOnException: + case BlockedOnMsgThrowTo: case BlockedOnMVar: throwToSingleThreaded(cap, task->incall->tso, (StgClosure *)nonTermination_closure); @@ -1268,9 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) */ // blocked exceptions can now complete, even if the thread was in - // blocked mode (see #2910). This unconditionally calls - // lockTSO(), which ensures that we don't miss any threads that - // are engaged in throwTo() with this thread as a target. + // blocked mode (see #2910). awakenBlockedExceptionQueue (cap, t); // @@ -1884,7 +1950,7 @@ resumeThread (void *task_) if (tso->why_blocked == BlockedOnCCall) { // avoid locking the TSO if we don't have to - if (tso->blocked_exceptions != END_TSO_QUEUE) { + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { awakenBlockedExceptionQueue(cap,tso); } tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE); @@ -2187,10 +2253,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) IF_DEBUG(sanity,checkTSO(tso)); - // don't allow throwTo() to modify the blocked_exceptions queue - // while we are moving the TSO: - lockClosure((StgClosure *)tso); - if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) { // NB. never raise a StackOverflow exception if the thread is @@ -2201,7 +2263,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) // if (tso->flags & TSO_SQUEEZED) { - unlockTSO(tso); return tso; } // #3677: In a stack overflow situation, stack squeezing may @@ -2223,7 +2284,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) tso->sp+64))); // Send this thread the StackOverflow exception - unlockTSO(tso); throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure); return tso; } @@ -2239,7 +2299,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) // the stack anyway. if ((tso->flags & TSO_SQUEEZED) && ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) { - unlockTSO(tso); return tso; } @@ -2289,9 +2348,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; - unlockTSO(dest); - unlockTSO(tso); - IF_DEBUG(sanity,checkTSO(dest)); #if 0 IF_DEBUG(scheduler,printTSO(dest)); @@ -2324,10 +2380,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) return tso; } - // don't allow throwTo() to modify the blocked_exceptions queue - // while we are moving the TSO: - lockClosure((StgClosure *)tso); - // this is the number of words we'll free free_w = round_to_mblocks(tso_size_w/2); @@ -2358,9 +2410,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) task->incall->tso = new_tso; } - unlockTSO(new_tso); - unlockTSO(tso); - IF_DEBUG(sanity,checkTSO(new_tso)); return new_tso; @@ -2691,61 +2740,9 @@ resurrectThreads (StgTSO *threads) * can wake up threads, remember...). */ continue; - case BlockedOnException: - // throwTo should never block indefinitely: if the target - // thread dies or completes, throwTo returns. - barf("resurrectThreads: thread BlockedOnException"); - break; default: - barf("resurrectThreads: thread blocked in a strange way"); + barf("resurrectThreads: thread blocked in a strange way: %d", + tso->why_blocked); } } } - -/* ----------------------------------------------------------------------------- - performPendingThrowTos is called after garbage collection, and - passed a list of threads that were found to have pending throwTos - (tso->blocked_exceptions was not empty), and were blocked. - Normally this doesn't happen, because we would deliver the - exception directly if the target thread is blocked, but there are - small windows where it might occur on a multiprocessor (see - throwTo()). - - NB. we must be holding all the capabilities at this point, just - like resurrectThreads(). - -------------------------------------------------------------------------- */ - -void -performPendingThrowTos (StgTSO *threads) -{ - StgTSO *tso, *next; - Capability *cap; - Task *task, *saved_task;; - generation *gen; - - task = myTask(); - cap = task->cap; - - for (tso = threads; tso != END_TSO_QUEUE; tso = next) { - next = tso->global_link; - - gen = Bdescr((P_)tso)->gen; - tso->global_link = gen->threads; - gen->threads = tso; - - debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); - - // We must pretend this Capability belongs to the current Task - // for the time being, as invariants will be broken otherwise. - // In fact the current Task has exclusive access to the systme - // at this point, so this is just bookkeeping: - task->cap = tso->cap; - saved_task = tso->cap->running_task; - tso->cap->running_task = task; - maybePerformBlockedException(tso->cap, tso); - tso->cap->running_task = saved_task; - } - - // Restore our original Capability: - task->cap = cap; -} diff --git a/rts/Schedule.h b/rts/Schedule.h index af322d8..76138b6 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -105,7 +105,6 @@ extern Mutex sched_mutex; void interruptStgRts (void); void resurrectThreads (StgTSO *); -void performPendingThrowTos (StgTSO *); /* ----------------------------------------------------------------------------- * Some convenient macros/inline functions... @@ -179,25 +178,6 @@ appendToBlockedQueue(StgTSO *tso) } #endif -#if defined(THREADED_RTS) -// Assumes: my_cap is owned by the current Task. We hold -// other_cap->lock, but we do not necessarily own other_cap; another -// Task may be running on it. -INLINE_HEADER void -appendToWakeupQueue (Capability *my_cap, Capability *other_cap, StgTSO *tso) -{ - ASSERT(tso->_link == END_TSO_QUEUE); - if (other_cap->wakeup_queue_hd == END_TSO_QUEUE) { - other_cap->wakeup_queue_hd = tso; - } else { - // my_cap is passed to setTSOLink() because it may need to - // write to the mutable list. - setTSOLink(my_cap, other_cap->wakeup_queue_tl, tso); - } - other_cap->wakeup_queue_tl = tso; -} -#endif - /* Check whether various thread queues are empty */ INLINE_HEADER rtsBool @@ -212,14 +192,6 @@ emptyRunQueue(Capability *cap) return emptyQueue(cap->run_queue_hd); } -#if defined(THREADED_RTS) -INLINE_HEADER rtsBool -emptyWakeupQueue(Capability *cap) -{ - return emptyQueue(cap->wakeup_queue_hd); -} -#endif - #if !defined(THREADED_RTS) #define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd)) #define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue)) diff --git a/rts/StgMiscClosures.cmm b/rts/StgMiscClosures.cmm index 8fd96c1..f111875 100644 --- a/rts/StgMiscClosures.cmm +++ b/rts/StgMiscClosures.cmm @@ -419,7 +419,7 @@ CLOSURE(stg_NO_FINALIZER_closure,stg_NO_FINALIZER); Stable Names are unlifted too. ------------------------------------------------------------------------- */ -INFO_TABLE(stg_STABLE_NAME,0,1,STABLE_NAME,"STABLE_NAME","STABLE_NAME") +INFO_TABLE(stg_STABLE_NAME,0,1,PRIM,"STABLE_NAME","STABLE_NAME") { foreign "C" barf("STABLE_NAME object entered!") never returns; } /* ---------------------------------------------------------------------------- @@ -439,22 +439,22 @@ INFO_TABLE(stg_MVAR_DIRTY,3,0,MVAR_DIRTY,"MVAR","MVAR") STM -------------------------------------------------------------------------- */ -INFO_TABLE(stg_TVAR, 0, 0, TVAR, "TVAR", "TVAR") +INFO_TABLE(stg_TVAR, 2, 1, MUT_PRIM, "TVAR", "TVAR") { foreign "C" barf("TVAR object entered!") never returns; } -INFO_TABLE(stg_TVAR_WATCH_QUEUE, 0, 0, TVAR_WATCH_QUEUE, "TVAR_WATCH_QUEUE", "TVAR_WATCH_QUEUE") +INFO_TABLE(stg_TVAR_WATCH_QUEUE, 3, 0, MUT_PRIM, "TVAR_WATCH_QUEUE", "TVAR_WATCH_QUEUE") { foreign "C" barf("TVAR_WATCH_QUEUE object entered!") never returns; } -INFO_TABLE(stg_ATOMIC_INVARIANT, 0, 0, ATOMIC_INVARIANT, "ATOMIC_INVARIANT", "ATOMIC_INVARIANT") +INFO_TABLE(stg_ATOMIC_INVARIANT, 2, 1, MUT_PRIM, "ATOMIC_INVARIANT", "ATOMIC_INVARIANT") { foreign "C" barf("ATOMIC_INVARIANT object entered!") never returns; } -INFO_TABLE(stg_INVARIANT_CHECK_QUEUE, 0, 0, INVARIANT_CHECK_QUEUE, "INVARIANT_CHECK_QUEUE", "INVARIANT_CHECK_QUEUE") +INFO_TABLE(stg_INVARIANT_CHECK_QUEUE, 3, 0, MUT_PRIM, "INVARIANT_CHECK_QUEUE", "INVARIANT_CHECK_QUEUE") { foreign "C" barf("INVARIANT_CHECK_QUEUE object entered!") never returns; } INFO_TABLE(stg_TREC_CHUNK, 0, 0, TREC_CHUNK, "TREC_CHUNK", "TREC_CHUNK") { foreign "C" barf("TREC_CHUNK object entered!") never returns; } -INFO_TABLE(stg_TREC_HEADER, 0, 0, TREC_HEADER, "TREC_HEADER", "TREC_HEADER") +INFO_TABLE(stg_TREC_HEADER, 3, 1, MUT_PRIM, "TREC_HEADER", "TREC_HEADER") { foreign "C" barf("TREC_HEADER object entered!") never returns; } INFO_TABLE_CONSTR(stg_END_STM_WATCH_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_WATCH_QUEUE","END_STM_WATCH_QUEUE") @@ -478,6 +478,17 @@ CLOSURE(stg_END_STM_CHUNK_LIST_closure,stg_END_STM_CHUNK_LIST); CLOSURE(stg_NO_TREC_closure,stg_NO_TREC); /* ---------------------------------------------------------------------------- + Messages + ------------------------------------------------------------------------- */ + +// PRIM rather than CONSTR, because PRIM objects cannot be duplicated by the GC. +INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP") +{ foreign "C" barf("MSG_WAKEUP object entered!") never returns; } + +INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO") +{ foreign "C" barf("MSG_THROWTO object entered!") never returns; } + +/* ---------------------------------------------------------------------------- END_TSO_QUEUE This is a static nullary constructor (like []) that we use to mark the @@ -490,18 +501,6 @@ INFO_TABLE_CONSTR(stg_END_TSO_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_TSO_QUEUE","E CLOSURE(stg_END_TSO_QUEUE_closure,stg_END_TSO_QUEUE); /* ---------------------------------------------------------------------------- - Exception lists - ------------------------------------------------------------------------- */ - -INFO_TABLE_CONSTR(stg_END_EXCEPTION_LIST,0,0,0,CONSTR_NOCAF_STATIC,"END_EXCEPTION_LIST","END_EXCEPTION_LIST") -{ foreign "C" barf("END_EXCEPTION_LIST object entered!") never returns; } - -CLOSURE(stg_END_EXCEPTION_LIST_closure,stg_END_EXCEPTION_LIST); - -INFO_TABLE(stg_EXCEPTION_CONS,1,1,CONSTR,"EXCEPTION_CONS","EXCEPTION_CONS") -{ foreign "C" barf("EXCEPTION_CONS object entered!") never returns; } - -/* ---------------------------------------------------------------------------- Arrays These come in two basic flavours: arrays of data (StgArrWords) and arrays of diff --git a/rts/Threads.c b/rts/Threads.c index 08b7aab..f824d02 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -74,7 +74,7 @@ createThread(Capability *cap, nat size) tso->what_next = ThreadRunGHC; tso->why_blocked = NotBlocked; - tso->blocked_exceptions = END_TSO_QUEUE; + tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE; tso->flags = 0; tso->dirty = 1; @@ -218,8 +218,9 @@ unblockOne_ (Capability *cap, StgTSO *tso, // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO); ASSERT(tso->why_blocked != NotBlocked); + ASSERT(tso->why_blocked != BlockedOnMsgWakeup || + tso->block_info.closure->header.info == &stg_IND_info); - tso->why_blocked = NotBlocked; next = tso->_link; tso->_link = END_TSO_QUEUE; @@ -235,6 +236,8 @@ unblockOne_ (Capability *cap, StgTSO *tso, } tso->cap = cap; + write_barrier(); + tso->why_blocked = NotBlocked; appendToRunQueue(cap,tso); // context-switch soonish so we can migrate the new thread if @@ -246,6 +249,7 @@ unblockOne_ (Capability *cap, StgTSO *tso, wakeupThreadOnCapability(cap, tso->cap, tso); } #else + tso->why_blocked = NotBlocked; appendToRunQueue(cap,tso); // context-switch soonish so we can migrate the new thread if @@ -327,13 +331,15 @@ printThreadBlockage(StgTSO *tso) case BlockedOnMVar: debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; - case BlockedOnException: - debugBelch("is blocked on delivering an exception to thread %lu", - (unsigned long)tso->block_info.tso->id); - break; case BlockedOnBlackHole: debugBelch("is blocked on a black hole"); break; + case BlockedOnMsgWakeup: + debugBelch("is blocked on a wakeup message"); + break; + case BlockedOnMsgThrowTo: + debugBelch("is blocked on a throwto message"); + break; case NotBlocked: debugBelch("is not blocked"); break; diff --git a/rts/Threads.h b/rts/Threads.h index 8e0ee26..dfe879e 100644 --- a/rts/Threads.h +++ b/rts/Threads.h @@ -11,6 +11,8 @@ BEGIN_RTS_PRIVATE +#define END_BLOCKED_EXCEPTIONS_QUEUE ((MessageThrowTo*)END_TSO_QUEUE) + StgTSO * unblockOne (Capability *cap, StgTSO *tso); StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate); diff --git a/rts/Trace.h b/rts/Trace.h index f8b6ad4..69ea3d3 100644 --- a/rts/Trace.h +++ b/rts/Trace.h @@ -135,6 +135,15 @@ void traceUserMsg(Capability *cap, char *msg); #define debugTrace(class, str, ...) /* nothing */ #endif +#ifdef DEBUG +#define debugTraceCap(class, cap, msg, ...) \ + if (RTS_UNLIKELY(class)) { \ + traceCap_(cap, msg, ##__VA_ARGS__); \ + } +#else +#define debugTraceCap(class, cap, str, ...) /* nothing */ +#endif + /* * Emit a message/event describing the state of a thread */ @@ -152,6 +161,7 @@ void traceThreadStatus_ (StgTSO *tso); #define traceCap(class, cap, msg, ...) /* nothing */ #define trace(class, msg, ...) /* nothing */ #define debugTrace(class, str, ...) /* nothing */ +#define debugTraceCap(class, cap, str, ...) /* nothing */ #define traceThreadStatus(class, tso) /* nothing */ #endif /* TRACING */ diff --git a/rts/eventlog/EventLog.h b/rts/eventlog/EventLog.h index fd87820..6ebf33d 100644 --- a/rts/eventlog/EventLog.h +++ b/rts/eventlog/EventLog.h @@ -59,7 +59,7 @@ INLINE_HEADER void postMsg (char *msg STG_UNUSED, va_list ap STG_UNUSED) { /* nothing */ } -INLINE_HEADER void postCapMsg (Capability *cap, +INLINE_HEADER void postCapMsg (Capability *cap STG_UNUSED, char *msg STG_UNUSED, va_list ap STG_UNUSED) { /* nothing */ } diff --git a/rts/sm/Compact.c b/rts/sm/Compact.c index e55ae2b..39284f9 100644 --- a/rts/sm/Compact.c +++ b/rts/sm/Compact.c @@ -471,7 +471,8 @@ thread_TSO (StgTSO *tso) if ( tso->why_blocked == BlockedOnMVar || tso->why_blocked == BlockedOnBlackHole - || tso->why_blocked == BlockedOnException + || tso->why_blocked == BlockedOnMsgThrowTo + || tso->why_blocked == BlockedOnMsgWakeup ) { thread_(&tso->block_info.closure); } @@ -622,7 +623,8 @@ thread_obj (StgInfoTable *info, StgPtr p) case FUN: case CONSTR: - case STABLE_NAME: + case PRIM: + case MUT_PRIM: case IND_PERM: case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: @@ -705,32 +707,6 @@ thread_obj (StgInfoTable *info, StgPtr p) case TSO: return thread_TSO((StgTSO *)p); - case TVAR_WATCH_QUEUE: - { - StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p; - thread_(&wq->closure); - thread_(&wq->next_queue_entry); - thread_(&wq->prev_queue_entry); - return p + sizeofW(StgTVarWatchQueue); - } - - case TVAR: - { - StgTVar *tvar = (StgTVar *)p; - thread((void *)&tvar->current_value); - thread((void *)&tvar->first_watch_queue_entry); - return p + sizeofW(StgTVar); - } - - case TREC_HEADER: - { - StgTRecHeader *trec = (StgTRecHeader *)p; - thread_(&trec->enclosing_trec); - thread_(&trec->current_chunk); - thread_(&trec->invariants_to_check); - return p + sizeofW(StgTRecHeader); - } - case TREC_CHUNK: { StgWord i; @@ -745,23 +721,6 @@ thread_obj (StgInfoTable *info, StgPtr p) return p + sizeofW(StgTRecChunk); } - case ATOMIC_INVARIANT: - { - StgAtomicInvariant *invariant = (StgAtomicInvariant *)p; - thread_(&invariant->code); - thread_(&invariant->last_execution); - return p + sizeofW(StgAtomicInvariant); - } - - case INVARIANT_CHECK_QUEUE: - { - StgInvariantCheckQueue *queue = (StgInvariantCheckQueue *)p; - thread_(&queue->invariant); - thread_(&queue->my_execution); - thread_(&queue->next_queue_entry); - return p + sizeofW(StgInvariantCheckQueue); - } - default: barf("update_fwd: unknown/strange object %d", (int)(info->type)); return NULL; diff --git a/rts/sm/Evac.c b/rts/sm/Evac.c index 76026b0..21017a6 100644 --- a/rts/sm/Evac.c +++ b/rts/sm/Evac.c @@ -626,7 +626,8 @@ loop: return; case WEAK: - case STABLE_NAME: + case PRIM: + case MUT_PRIM: copy_tag(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag); return; @@ -721,30 +722,10 @@ loop: } } - case TREC_HEADER: - copy(p,info,q,sizeofW(StgTRecHeader),gen); - return; - - case TVAR_WATCH_QUEUE: - copy(p,info,q,sizeofW(StgTVarWatchQueue),gen); - return; - - case TVAR: - copy(p,info,q,sizeofW(StgTVar),gen); - return; - case TREC_CHUNK: copy(p,info,q,sizeofW(StgTRecChunk),gen); return; - case ATOMIC_INVARIANT: - copy(p,info,q,sizeofW(StgAtomicInvariant),gen); - return; - - case INVARIANT_CHECK_QUEUE: - copy(p,info,q,sizeofW(StgInvariantCheckQueue),gen); - return; - default: barf("evacuate: strange closure type %d", (int)(INFO_PTR_TO_STRUCT(info)->type)); } diff --git a/rts/sm/GC.c b/rts/sm/GC.c index 2eabdab..ae6fc99 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -732,7 +732,6 @@ SET_GCT(gc_threads[0]); // send exceptions to any threads which were about to die RELEASE_SM_LOCK; resurrectThreads(resurrected_threads); - performPendingThrowTos(exception_threads); ACQUIRE_SM_LOCK; // Update the stable pointer hash table. diff --git a/rts/sm/MarkWeak.c b/rts/sm/MarkWeak.c index 7b7187c..9df39b9 100644 --- a/rts/sm/MarkWeak.c +++ b/rts/sm/MarkWeak.c @@ -22,6 +22,7 @@ #include "Schedule.h" #include "Weak.h" #include "Storage.h" +#include "Threads.h" /* ----------------------------------------------------------------------------- Weak Pointers @@ -80,9 +81,6 @@ StgWeak *old_weak_ptr_list; // also pending finaliser list // List of threads found to be unreachable StgTSO *resurrected_threads; -// List of blocked threads found to have pending throwTos -StgTSO *exception_threads; - static void resurrectUnreachableThreads (generation *gen); static rtsBool tidyThreadList (generation *gen); @@ -93,7 +91,6 @@ initWeakForGC(void) weak_ptr_list = NULL; weak_stage = WeakPtrs; resurrected_threads = END_TSO_QUEUE; - exception_threads = END_TSO_QUEUE; } rtsBool @@ -286,35 +283,11 @@ static rtsBool tidyThreadList (generation *gen) next = t->global_link; - // This is a good place to check for blocked - // exceptions. It might be the case that a thread is - // blocked on delivering an exception to a thread that - // is also blocked - we try to ensure that this - // doesn't happen in throwTo(), but it's too hard (or - // impossible) to close all the race holes, so we - // accept that some might get through and deal with - // them here. A GC will always happen at some point, - // even if the system is otherwise deadlocked. - // - // If an unreachable thread has blocked - // exceptions, we really want to perform the - // blocked exceptions rather than throwing - // BlockedIndefinitely exceptions. This is the - // only place we can discover such threads. - // The target thread might even be - // ThreadFinished or ThreadKilled. Bugs here - // will only be seen when running on a - // multiprocessor. - if (t->blocked_exceptions != END_TSO_QUEUE) { - if (tmp == NULL) { - evacuate((StgClosure **)&t); - flag = rtsTrue; - } - t->global_link = exception_threads; - exception_threads = t; - *prev = next; - continue; - } + // if the thread is not masking exceptions but there are + // pending exceptions on its queue, then something has gone + // wrong: + ASSERT(t->blocked_exceptions == END_BLOCKED_EXCEPTIONS_QUEUE + || (t->flags & TSO_BLOCKEX)); if (tmp == NULL) { // not alive (yet): leave this thread on the diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c index 442fee1..11d5424 100644 --- a/rts/sm/Sanity.c +++ b/rts/sm/Sanity.c @@ -307,7 +307,8 @@ checkClosure( StgClosure* p ) case IND_OLDGEN_PERM: case BLACKHOLE: case CAF_BLACKHOLE: - case STABLE_NAME: + case PRIM: + case MUT_PRIM: case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: case CONSTR_STATIC: @@ -416,39 +417,6 @@ checkClosure( StgClosure* p ) checkTSO((StgTSO *)p); return tso_sizeW((StgTSO *)p); - case TVAR_WATCH_QUEUE: - { - StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p; - ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->next_queue_entry)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->prev_queue_entry)); - return sizeofW(StgTVarWatchQueue); - } - - case INVARIANT_CHECK_QUEUE: - { - StgInvariantCheckQueue *q = (StgInvariantCheckQueue *)p; - ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->invariant)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->my_execution)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->next_queue_entry)); - return sizeofW(StgInvariantCheckQueue); - } - - case ATOMIC_INVARIANT: - { - StgAtomicInvariant *invariant = (StgAtomicInvariant *)p; - ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->code)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->last_execution)); - return sizeofW(StgAtomicInvariant); - } - - case TVAR: - { - StgTVar *tv = (StgTVar *)p; - ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->current_value)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->first_watch_queue_entry)); - return sizeofW(StgTVar); - } - case TREC_CHUNK: { nat i; @@ -461,14 +429,6 @@ checkClosure( StgClosure* p ) } return sizeofW(StgTRecChunk); } - - case TREC_HEADER: - { - StgTRecHeader *trec = (StgTRecHeader *)p; - ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> enclosing_trec)); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> current_chunk)); - return sizeofW(StgTRecHeader); - } default: barf("checkClosure (closure type %d)", info->type); diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index 466b9b4..1b671a0 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -82,7 +82,8 @@ scavengeTSO (StgTSO *tso) if ( tso->why_blocked == BlockedOnMVar || tso->why_blocked == BlockedOnBlackHole - || tso->why_blocked == BlockedOnException + || tso->why_blocked == BlockedOnMsgWakeup + || tso->why_blocked == BlockedOnMsgThrowTo ) { evacuate(&tso->block_info.closure); } @@ -394,7 +395,6 @@ scavenge_block (bdescr *bd) { StgPtr p, q; StgInfoTable *info; - generation *saved_evac_gen; rtsBool saved_eager_promotion; gen_workspace *ws; @@ -403,7 +403,6 @@ scavenge_block (bdescr *bd) gct->scan_bd = bd; gct->evac_gen = bd->gen; - saved_evac_gen = gct->evac_gen; saved_eager_promotion = gct->eager_promotion; gct->failed_to_evac = rtsFalse; @@ -532,7 +531,7 @@ scavenge_block (bdescr *bd) gen_obj: case CONSTR: case WEAK: - case STABLE_NAME: + case PRIM: { StgPtr end; @@ -672,42 +671,21 @@ scavenge_block (bdescr *bd) break; } - case TVAR_WATCH_QUEUE: + case MUT_PRIM: { - StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&wq->closure); - evacuate((StgClosure **)&wq->next_queue_entry); - evacuate((StgClosure **)&wq->prev_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - p += sizeofW(StgTVarWatchQueue); - break; - } + StgPtr end; - case TVAR: - { - StgTVar *tvar = ((StgTVar *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&tvar->current_value); - evacuate((StgClosure **)&tvar->first_watch_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - p += sizeofW(StgTVar); - break; - } + gct->eager_promotion = rtsFalse; - case TREC_HEADER: - { - StgTRecHeader *trec = ((StgTRecHeader *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&trec->enclosing_trec); - evacuate((StgClosure **)&trec->current_chunk); - evacuate((StgClosure **)&trec->invariants_to_check); - gct->evac_gen = saved_evac_gen; + end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs; + for (p = (P_)((StgClosure *)p)->payload; p < end; p++) { + evacuate((StgClosure **)p); + } + p += info->layout.payload.nptrs; + + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable - p += sizeofW(StgTRecHeader); - break; + break; } case TREC_CHUNK: @@ -715,44 +693,19 @@ scavenge_block (bdescr *bd) StgWord i; StgTRecChunk *tc = ((StgTRecChunk *) p); TRecEntry *e = &(tc -> entries[0]); - gct->evac_gen = 0; + gct->eager_promotion = rtsFalse; evacuate((StgClosure **)&tc->prev_chunk); for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) { evacuate((StgClosure **)&e->tvar); evacuate((StgClosure **)&e->expected_value); evacuate((StgClosure **)&e->new_value); } - gct->evac_gen = saved_evac_gen; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable p += sizeofW(StgTRecChunk); break; } - case ATOMIC_INVARIANT: - { - StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p); - gct->evac_gen = 0; - evacuate(&invariant->code); - evacuate((StgClosure **)&invariant->last_execution); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - p += sizeofW(StgAtomicInvariant); - break; - } - - case INVARIANT_CHECK_QUEUE: - { - StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&queue->invariant); - evacuate((StgClosure **)&queue->my_execution); - evacuate((StgClosure **)&queue->next_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - p += sizeofW(StgInvariantCheckQueue); - break; - } - default: barf("scavenge: unimplemented/strange closure type %d @ %p", info->type, p); @@ -806,10 +759,10 @@ scavenge_mark_stack(void) { StgPtr p, q; StgInfoTable *info; - generation *saved_evac_gen; + rtsBool saved_eager_promotion; gct->evac_gen = oldest_gen; - saved_evac_gen = gct->evac_gen; + saved_eager_promotion = gct->eager_promotion; while ((p = pop_mark_stack())) { @@ -822,8 +775,6 @@ scavenge_mark_stack(void) case MVAR_CLEAN: case MVAR_DIRTY: { - rtsBool saved_eager_promotion = gct->eager_promotion; - StgMVar *mvar = ((StgMVar *)p); gct->eager_promotion = rtsFalse; evacuate((StgClosure **)&mvar->head); @@ -906,7 +857,7 @@ scavenge_mark_stack(void) gen_obj: case CONSTR: case WEAK: - case STABLE_NAME: + case PRIM: { StgPtr end; @@ -938,8 +889,6 @@ scavenge_mark_stack(void) case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: { - rtsBool saved_eager_promotion = gct->eager_promotion; - gct->eager_promotion = rtsFalse; evacuate(&((StgMutVar *)p)->var); gct->eager_promotion = saved_eager_promotion; @@ -986,13 +935,10 @@ scavenge_mark_stack(void) case MUT_ARR_PTRS_DIRTY: // follow everything { - rtsBool saved_eager; - // We don't eagerly promote objects pointed to by a mutable // array, but if we find the array only points to objects in // the same or an older generation, we mark it "clean" and // avoid traversing it during minor GCs. - saved_eager = gct->eager_promotion; gct->eager_promotion = rtsFalse; scavenge_mut_arr_ptrs((StgMutArrPtrs *)p); @@ -1003,7 +949,7 @@ scavenge_mark_stack(void) ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info; } - gct->eager_promotion = saved_eager; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable anyhow. break; } @@ -1032,81 +978,39 @@ scavenge_mark_stack(void) break; } - case TVAR_WATCH_QUEUE: - { - StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&wq->closure); - evacuate((StgClosure **)&wq->next_queue_entry); - evacuate((StgClosure **)&wq->prev_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - - case TVAR: - { - StgTVar *tvar = ((StgTVar *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&tvar->current_value); - evacuate((StgClosure **)&tvar->first_watch_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - + case MUT_PRIM: + { + StgPtr end; + + gct->eager_promotion = rtsFalse; + + end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs; + for (p = (P_)((StgClosure *)p)->payload; p < end; p++) { + evacuate((StgClosure **)p); + } + + gct->eager_promotion = saved_eager_promotion; + gct->failed_to_evac = rtsTrue; // mutable + break; + } + case TREC_CHUNK: { StgWord i; StgTRecChunk *tc = ((StgTRecChunk *) p); TRecEntry *e = &(tc -> entries[0]); - gct->evac_gen = 0; + gct->eager_promotion = rtsFalse; evacuate((StgClosure **)&tc->prev_chunk); for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) { evacuate((StgClosure **)&e->tvar); evacuate((StgClosure **)&e->expected_value); evacuate((StgClosure **)&e->new_value); } - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - - case TREC_HEADER: - { - StgTRecHeader *trec = ((StgTRecHeader *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&trec->enclosing_trec); - evacuate((StgClosure **)&trec->current_chunk); - evacuate((StgClosure **)&trec->invariants_to_check); - gct->evac_gen = saved_evac_gen; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable break; } - case ATOMIC_INVARIANT: - { - StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p); - gct->evac_gen = 0; - evacuate(&invariant->code); - evacuate((StgClosure **)&invariant->last_execution); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - - case INVARIANT_CHECK_QUEUE: - { - StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&queue->invariant); - evacuate((StgClosure **)&queue->my_execution); - evacuate((StgClosure **)&queue->next_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - default: barf("scavenge_mark_stack: unimplemented/strange closure type %d @ %p", info->type, p); @@ -1133,9 +1037,11 @@ static rtsBool scavenge_one(StgPtr p) { const StgInfoTable *info; - generation *saved_evac_gen = gct->evac_gen; rtsBool no_luck; + rtsBool saved_eager_promotion; + saved_eager_promotion = gct->eager_promotion; + ASSERT(LOOKS_LIKE_CLOSURE_PTR(p)); info = get_itbl((StgClosure *)p); @@ -1144,8 +1050,6 @@ scavenge_one(StgPtr p) case MVAR_CLEAN: case MVAR_DIRTY: { - rtsBool saved_eager_promotion = gct->eager_promotion; - StgMVar *mvar = ((StgMVar *)p); gct->eager_promotion = rtsFalse; evacuate((StgClosure **)&mvar->head); @@ -1190,6 +1094,7 @@ scavenge_one(StgPtr p) case CONSTR_0_2: case CONSTR_2_0: case WEAK: + case PRIM: case IND_PERM: { StgPtr q, end; @@ -1204,7 +1109,6 @@ scavenge_one(StgPtr p) case MUT_VAR_CLEAN: case MUT_VAR_DIRTY: { StgPtr q = p; - rtsBool saved_eager_promotion = gct->eager_promotion; gct->eager_promotion = rtsFalse; evacuate(&((StgMutVar *)p)->var); @@ -1254,13 +1158,10 @@ scavenge_one(StgPtr p) case MUT_ARR_PTRS_CLEAN: case MUT_ARR_PTRS_DIRTY: { - rtsBool saved_eager; - // We don't eagerly promote objects pointed to by a mutable // array, but if we find the array only points to objects in // the same or an older generation, we mark it "clean" and // avoid traversing it during minor GCs. - saved_eager = gct->eager_promotion; gct->eager_promotion = rtsFalse; scavenge_mut_arr_ptrs((StgMutArrPtrs *)p); @@ -1271,7 +1172,7 @@ scavenge_one(StgPtr p) ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info; } - gct->eager_promotion = saved_eager; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; break; } @@ -1298,81 +1199,40 @@ scavenge_one(StgPtr p) break; } - case TVAR_WATCH_QUEUE: - { - StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&wq->closure); - evacuate((StgClosure **)&wq->next_queue_entry); - evacuate((StgClosure **)&wq->prev_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } + case MUT_PRIM: + { + StgPtr end; + + gct->eager_promotion = rtsFalse; + + end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs; + for (p = (P_)((StgClosure *)p)->payload; p < end; p++) { + evacuate((StgClosure **)p); + } - case TVAR: - { - StgTVar *tvar = ((StgTVar *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&tvar->current_value); - evacuate((StgClosure **)&tvar->first_watch_queue_entry); - gct->evac_gen = saved_evac_gen; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable break; - } - case TREC_HEADER: - { - StgTRecHeader *trec = ((StgTRecHeader *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&trec->enclosing_trec); - evacuate((StgClosure **)&trec->current_chunk); - evacuate((StgClosure **)&trec->invariants_to_check); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } + } case TREC_CHUNK: { StgWord i; StgTRecChunk *tc = ((StgTRecChunk *) p); TRecEntry *e = &(tc -> entries[0]); - gct->evac_gen = 0; + gct->eager_promotion = rtsFalse; evacuate((StgClosure **)&tc->prev_chunk); for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) { evacuate((StgClosure **)&e->tvar); evacuate((StgClosure **)&e->expected_value); evacuate((StgClosure **)&e->new_value); } - gct->evac_gen = saved_evac_gen; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsTrue; // mutable break; } - case ATOMIC_INVARIANT: - { - StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p); - gct->evac_gen = 0; - evacuate(&invariant->code); - evacuate((StgClosure **)&invariant->last_execution); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - - case INVARIANT_CHECK_QUEUE: - { - StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p); - gct->evac_gen = 0; - evacuate((StgClosure **)&queue->invariant); - evacuate((StgClosure **)&queue->my_execution); - evacuate((StgClosure **)&queue->next_queue_entry); - gct->evac_gen = saved_evac_gen; - gct->failed_to_evac = rtsTrue; // mutable - break; - } - case IND: // IND can happen, for example, when the interpreter allocates // a gigantic AP closure (more than one block), which ends up @@ -1470,8 +1330,8 @@ scavenge_mutable_list(bdescr *bd, generation *gen) continue; case MUT_ARR_PTRS_DIRTY: { - rtsBool saved_eager; - saved_eager = gct->eager_promotion; + rtsBool saved_eager_promotion; + saved_eager_promotion = gct->eager_promotion; gct->eager_promotion = rtsFalse; scavenge_mut_arr_ptrs_marked((StgMutArrPtrs *)p); @@ -1482,7 +1342,7 @@ scavenge_mutable_list(bdescr *bd, generation *gen) ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info; } - gct->eager_promotion = saved_eager; + gct->eager_promotion = saved_eager_promotion; gct->failed_to_evac = rtsFalse; recordMutableGen_GC((StgClosure *)p,gen->no); continue;