mkSplitMarkerLabel,
mkDirty_MUT_VAR_Label,
mkUpdInfoLabel,
+ mkBHUpdInfoLabel,
mkIndStaticInfoLabel,
mkMainCapabilityLabel,
mkMAP_FROZEN_infoLabel,
mkSplitMarkerLabel = CmmLabel rtsPackageId (fsLit "__stg_split_marker") CmmCode
mkDirty_MUT_VAR_Label = CmmLabel rtsPackageId (fsLit "dirty_MUT_VAR") CmmCode
mkUpdInfoLabel = CmmLabel rtsPackageId (fsLit "stg_upd_frame") CmmInfo
+mkBHUpdInfoLabel = CmmLabel rtsPackageId (fsLit "stg_bh_upd_frame" ) CmmInfo
mkIndStaticInfoLabel = CmmLabel rtsPackageId (fsLit "stg_IND_STATIC") CmmInfo
mkMainCapabilityLabel = CmmLabel rtsPackageId (fsLit "MainCapability") CmmData
mkMAP_FROZEN_infoLabel = CmmLabel rtsPackageId (fsLit "stg_MUT_ARR_PTRS_FROZEN0") CmmInfo
OnStack -> do { sp_rel <- getSpRelOffset virt_sp
; returnFC (CmmLoad sp_rel bWord) }
- UpdateCode -> returnFC (CmmLit (CmmLabel mkUpdInfoLabel))
CaseAlts lbl _ _ -> returnFC (CmmLit (CmmLabel lbl))
}
then do
tickyBlackHole (not is_single_entry)
let bh_info = CmmReg (CmmGlobal EagerBlackholeInfo)
- stmtC (CmmStore (CmmReg nodeReg) bh_info)
+ stmtsC [
+ CmmStore (cmmOffsetW (CmmReg nodeReg) fixedHdrSize)
+ (CmmReg (CmmGlobal CurrentTSO)),
+ CmmCall (CmmPrim MO_WriteBarrier) [] [] CmmUnsafe CmmMayReturn,
+ CmmStore (CmmReg nodeReg) bh_info
+ ]
else
nopC
\end{code}
= code
| not (isStaticClosure closure_info)
- = if closureUpdReqd closure_info
- then do { tickyPushUpdateFrame; pushUpdateFrame (CmmReg nodeReg) code }
- else do { tickyUpdateFrameOmitted; code }
-
+ = do
+ if not (closureUpdReqd closure_info)
+ then do tickyUpdateFrameOmitted; code
+ else do
+ tickyPushUpdateFrame
+ dflags <- getDynFlags
+ if not opt_SccProfilingOn && dopt Opt_EagerBlackHoling dflags
+ then pushBHUpdateFrame (CmmReg nodeReg) code
+ else pushUpdateFrame (CmmReg nodeReg) code
+
| otherwise -- A static closure
= do { tickyUpdateBhCaf closure_info
; if closureUpdReqd closure_info
then do -- Blackhole the (updatable) CAF:
{ upd_closure <- link_caf closure_info True
- ; pushUpdateFrame upd_closure code }
+ ; pushBHUpdateFrame upd_closure code }
else do
{ -- krc: removed some ticky-related code here.
; tickyUpdateFrameOmitted
{ -- Alloc black hole specifying CC_HDR(Node) as the cost centre
; let use_cc = costCentreFrom (CmmReg nodeReg)
blame_cc = use_cc
- ; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc []
+ tso = CmmReg (CmmGlobal CurrentTSO)
+ ; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc [(tso,fixedHdrSize)]
; hp_rel <- getHpRelOffset hp_offset
-- Call the RTS function newCAF to add the CAF to the CafList
\begin{code}
data Sequel
= OnStack -- Continuation is on the stack
- | UpdateCode -- Continuation is update
| CaseAlts
CLabel -- Jump to this; if the continuation is for a vectored
setStackFrame, getStackFrame,
mkVirtStkOffsets, mkStkAmodes,
freeStackSlots,
- pushUpdateFrame, emitPushUpdateFrame,
+ pushUpdateFrame, pushBHUpdateFrame, emitPushUpdateFrame,
) where
#include "HsVersions.h"
\begin{code}
pushUpdateFrame :: CmmExpr -> Code -> Code
pushUpdateFrame updatee code
+ = pushSpecUpdateFrame mkUpdInfoLabel updatee code
+
+pushBHUpdateFrame :: CmmExpr -> Code -> Code
+pushBHUpdateFrame updatee code
+ = pushSpecUpdateFrame mkBHUpdInfoLabel updatee code
+
+pushSpecUpdateFrame :: CLabel -> CmmExpr -> Code -> Code
+pushSpecUpdateFrame lbl updatee code
= do {
when debugIsOn $ do
{ EndOfBlockInfo _ sequel <- getEndOfBlockInfo ;
-- The location of the lowest-address
-- word of the update frame itself
- ; setEndOfBlockInfo (EndOfBlockInfo vsp UpdateCode) $
- do { emitPushUpdateFrame frame_addr updatee
+ -- NB. we used to set the Sequel to 'UpdateCode' so
+ -- that we could jump directly to the update code if
+ -- we know that the next frame on the stack is an
+ -- update frame. However, the RTS can sometimes
+ -- change an update frame into something else (see
+ -- e.g. Note [upd-black-hole] in rts/sm/Scav.c), so we
+ -- no longer make this assumption.
+ ; setEndOfBlockInfo (EndOfBlockInfo vsp OnStack) $
+ do { emitSpecPushUpdateFrame lbl frame_addr updatee
; code }
}
emitPushUpdateFrame :: CmmExpr -> CmmExpr -> Code
-emitPushUpdateFrame frame_addr updatee = do
+emitPushUpdateFrame = emitSpecPushUpdateFrame mkUpdInfoLabel
+
+emitSpecPushUpdateFrame :: CLabel -> CmmExpr -> CmmExpr -> Code
+emitSpecPushUpdateFrame lbl frame_addr updatee = do
stmtsC [ -- Set the info word
- CmmStore frame_addr (mkLblExpr mkUpdInfoLabel)
+ CmmStore frame_addr (mkLblExpr lbl)
, -- And the updatee
CmmStore (cmmOffsetB frame_addr off_updatee) updatee ]
initUpdFrameProf frame_addr
else \
_assertFail(__FILE__, __LINE__)
+#define CHECKM(predicate, msg, ...) \
+ if (predicate) \
+ /*null*/; \
+ else \
+ barf(msg, ##__VA_ARGS__)
+
#ifndef DEBUG
#define ASSERT(predicate) /* nothing */
+#define ASSERTM(predicate,msg,...) /* nothing */
#else
#define ASSERT(predicate) CHECK(predicate)
+#define ASSERTM(predicate,msg,...) CHECKM(predicate,msg,##__VA_ARGS__)
#endif /* DEBUG */
/*
closure_field(StgTSO, trec);
closure_field(StgTSO, flags);
closure_field(StgTSO, dirty);
+ closure_field(StgTSO, bq);
closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS);
tso_field(StgTSO, sp);
tso_field_offset(StgTSO, stack);
closure_size(StgStableName);
closure_field(StgStableName,sn);
+ closure_size(StgBlockingQueue);
+ closure_field(StgBlockingQueue, bh);
+ closure_field(StgBlockingQueue, owner);
+ closure_field(StgBlockingQueue, queue);
+ closure_field(StgBlockingQueue, link);
+
+ closure_size(MessageBlackHole);
+ closure_field(MessageBlackHole, link);
+ closure_field(MessageBlackHole, tso);
+ closure_field(MessageBlackHole, bh);
+
struct_field_("RtsFlags_ProfFlags_showCCSOnException",
RTS_FLAGS, ProfFlags.showCCSOnException);
struct_field_("RtsFlags_DebugFlags_apply",
SET_HDR(c,info,costCentreStack); \
(c)->words = n_words;
+// Use when changing a closure from one kind to another
+#define OVERWRITE_INFO(c, new_info) \
+ LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)(c)); \
+ SET_INFO((c), (new_info)); \
+ LDV_RECORD_CREATE(c);
+
/* -----------------------------------------------------------------------------
How to get hold of the static link field for a static closure.
-------------------------------------------------------------------------- */
{ return sizeofW(StgSelector); }
INLINE_HEADER StgOffset BLACKHOLE_sizeW ( void )
-{ return sizeofW(StgHeader)+MIN_PAYLOAD_SIZE; }
+{ return sizeofW(StgInd); } // a BLACKHOLE is a kind of indirection
/* --------------------------------------------------------------------------
Sizes of closures
#define UPDATE_FRAME 38
#define CATCH_FRAME 39
#define STOP_FRAME 40
-#define CAF_BLACKHOLE 41
+#define BLOCKING_QUEUE 41
#define BLACKHOLE 42
#define MVAR_CLEAN 43
#define MVAR_DIRTY 44
StgInfoTable *saved_info;
} StgIndStatic;
+typedef struct StgBlockingQueue_ {
+ StgHeader header;
+ struct StgBlockingQueue_ *link; // here so it looks like an IND
+ StgClosure *bh; // the BLACKHOLE
+ StgTSO *owner;
+ struct MessageBlackHole_ *queue;
+} StgBlockingQueue;
+
typedef struct {
StgHeader header;
StgWord words;
typedef struct MessageThrowTo_ {
StgHeader header;
- Message *link;
+ struct MessageThrowTo_ *link;
StgTSO *source;
StgTSO *target;
StgClosure *exception;
} MessageThrowTo;
+typedef struct MessageBlackHole_ {
+ StgHeader header;
+ struct MessageBlackHole_ *link;
+ StgTSO *tso;
+ StgClosure *bh;
+} MessageBlackHole;
+
#endif /* RTS_STORAGE_CLOSURES_H */
/* Reason for thread being blocked. See comment above struct StgTso_. */
typedef union {
StgClosure *closure;
+ struct MessageBlackHole_ *bh;
struct MessageThrowTo_ *throwto;
struct MessageWakeup_ *wakeup;
StgInt fd; /* StgInt instead of int, so that it's the same size as the ptrs */
*/
struct StgTSO_* _link;
/*
+ Currently used for linking TSOs on:
+ * cap->run_queue_{hd,tl}
+ * MVAR queue
+ * (non-THREADED_RTS); the blocked_queue
+ * and pointing to the relocated version of a ThreadRelocated
+
NOTE!!! do not modify _link directly, it is subject to
a write barrier for generational GC. Instead use the
setTSOLink() function. Exceptions to this rule are:
* setting the link field to END_TSO_QUEUE
- * putting a TSO on the blackhole_queue
* setting the link field of the currently running TSO, as it
will already be dirty.
*/
*/
struct MessageThrowTo_ * blocked_exceptions;
+ /*
+ A list of StgBlockingQueue objects, representing threads blocked
+ on thunks that are under evaluation by this thread.
+ */
+ struct StgBlockingQueue_ *bq;
+
#ifdef TICKY_TICKY
/* TICKY-specific stuff would go here. */
#endif
void dirty_TSO (Capability *cap, StgTSO *tso);
void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
+// Apply to a TSO before looking at it if you are not sure whether it
+// might be ThreadRelocated or not (basically, that's most of the time
+// unless the TSO is the current TSO).
+//
+INLINE_HEADER StgTSO * deRefTSO(StgTSO *tso)
+{
+ while (tso->what_next == ThreadRelocated) {
+ tso = tso->_link;
+ }
+ return tso;
+}
+
/* -----------------------------------------------------------------------------
Invariants:
/* Stack frames */
RTS_RET_INFO(stg_upd_frame_info);
+RTS_RET_INFO(stg_bh_upd_frame_info);
RTS_RET_INFO(stg_marked_upd_frame_info);
RTS_RET_INFO(stg_noupd_frame_info);
RTS_RET_INFO(stg_catch_frame_info);
RTS_RET_INFO(stg_unblockAsyncExceptionszh_ret_info);
RTS_ENTRY(stg_upd_frame_ret);
+RTS_ENTRY(stg_bh_upd_frame_ret);
RTS_ENTRY(stg_marked_upd_frame_ret);
// RTS_FUN(stg_interp_constr_entry);
RTS_INFO(stg_IND_PERM_info);
RTS_INFO(stg_IND_OLDGEN_info);
RTS_INFO(stg_IND_OLDGEN_PERM_info);
-RTS_INFO(stg_CAF_UNENTERED_info);
-RTS_INFO(stg_CAF_ENTERED_info);
-RTS_INFO(stg_WHITEHOLE_info);
RTS_INFO(stg_BLACKHOLE_info);
-RTS_INFO(__stg_EAGER_BLACKHOLE_info);
RTS_INFO(stg_CAF_BLACKHOLE_info);
+RTS_INFO(__stg_EAGER_BLACKHOLE_info);
+RTS_INFO(stg_WHITEHOLE_info);
+RTS_INFO(stg_BLOCKING_QUEUE_CLEAN_info);
+RTS_INFO(stg_BLOCKING_QUEUE_DIRTY_info);
RTS_FUN_INFO(stg_BCO_info);
RTS_INFO(stg_EVACUATED_info);
RTS_INFO(stg_MUT_VAR_DIRTY_info);
RTS_INFO(stg_END_TSO_QUEUE_info);
RTS_INFO(stg_MSG_WAKEUP_info);
+RTS_INFO(stg_MSG_TRY_WAKEUP_info);
RTS_INFO(stg_MSG_THROWTO_info);
+RTS_INFO(stg_MSG_BLACKHOLE_info);
RTS_INFO(stg_MUT_CONS_info);
RTS_INFO(stg_catch_info);
RTS_INFO(stg_PAP_info);
RTS_ENTRY(stg_IND_PERM_entry);
RTS_ENTRY(stg_IND_OLDGEN_entry);
RTS_ENTRY(stg_IND_OLDGEN_PERM_entry);
-RTS_ENTRY(stg_CAF_UNENTERED_entry);
-RTS_ENTRY(stg_CAF_ENTERED_entry);
RTS_ENTRY(stg_WHITEHOLE_entry);
RTS_ENTRY(stg_BLACKHOLE_entry);
-RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry);
RTS_ENTRY(stg_CAF_BLACKHOLE_entry);
+RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry);
RTS_ENTRY(stg_BCO_entry);
RTS_ENTRY(stg_EVACUATED_entry);
RTS_ENTRY(stg_WEAK_entry);
RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
RTS_ENTRY(stg_END_TSO_QUEUE_entry);
RTS_ENTRY(stg_MSG_WAKEUP_entry);
+RTS_ENTRY(stg_MSG_TRY_WAKEUP_entry);
RTS_ENTRY(stg_MSG_THROWTO_entry);
+RTS_ENTRY(stg_MSG_BLACKHOLE_entry);
RTS_ENTRY(stg_MUT_CONS_entry);
RTS_ENTRY(stg_catch_entry);
RTS_ENTRY(stg_PAP_entry);
RTS_RET_INFO(stg_enter_info);
RTS_ENTRY(stg_enter_ret);
+RTS_RET_INFO(stg_enter_checkbh_info);
+RTS_ENTRY(stg_enter_checkbh_ret);
RTS_RET_INFO(stg_gc_void_info);
RTS_ENTRY(stg_gc_void_ret);
STATIC_INLINE rtsBool
globalWorkToDo (void)
{
- return blackholes_need_checking
- || sched_state >= SCHED_INTERRUPTING
- ;
+ return sched_state >= SCHED_INTERRUPTING
+ || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
}
#endif
}
/* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
- *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
- * ------------------------------------------------------------------------- */
-
-void
-wakeupThreadOnCapability (Capability *cap,
- Capability *other_cap,
- StgTSO *tso)
-{
- MessageWakeup *msg;
-
- // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
- if (tso->bound) {
- ASSERT(tso->bound->task->cap == tso->cap);
- tso->bound->task->cap = other_cap;
- }
- tso->cap = other_cap;
-
- ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
- tso->block_info.closure->header.info == &stg_IND_info);
-
- ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
-
- msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
- msg->header.info = &stg_MSG_WAKEUP_info;
- msg->tso = tso;
- tso->block_info.closure = (StgClosure *)msg;
- dirty_TSO(cap, tso);
- write_barrier();
- tso->why_blocked = BlockedOnMsgWakeup;
-
- sendMessage(other_cap, (Message*)msg);
-}
-
-/* ----------------------------------------------------------------------------
* prodCapability
*
* If a Capability is currently idle, wake up a Task on it. Used to
Messages
-------------------------------------------------------------------------- */
-#ifdef THREADED_RTS
-
-void sendMessage(Capability *cap, Message *msg)
-{
- ACQUIRE_LOCK(&cap->lock);
-
- msg->link = cap->inbox;
- cap->inbox = msg;
-
- if (cap->running_task == NULL) {
- cap->running_task = myTask();
- // precond for releaseCapability_()
- releaseCapability_(cap,rtsFalse);
- } else {
- contextSwitchCapability(cap);
- }
-
- RELEASE_LOCK(&cap->lock);
-}
-
-#endif // THREADED_RTS
INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
+INLINE_HEADER void recordClosureMutated (Capability *cap, StgClosure *p);
+
#if defined(THREADED_RTS)
// Gives up the current capability IFF there is a higher-priority
//
void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
-// Wakes up a thread on a Capability (probably a different Capability
-// from the one held by the current Task).
-//
-void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
- StgTSO *tso);
-
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
//
INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
-void sendMessage (Capability *cap, Message *msg);
-
#endif // THREADED_RTS
/* -----------------------------------------------------------------------------
*bd->free++ = (StgWord)p;
}
+INLINE_HEADER void
+recordClosureMutated (Capability *cap, StgClosure *p)
+{
+ bdescr *bd;
+ bd = Bdescr((StgPtr)p);
+ if (bd->gen_no != 0) recordMutableCap(p,cap,bd->gen_no);
+}
+
+
#if defined(THREADED_RTS)
INLINE_HEADER rtsBool
emptySparkPoolCap (Capability *cap)
[UPDATE_FRAME] = ( _BTM ),
[CATCH_FRAME] = ( _BTM ),
[STOP_FRAME] = ( _BTM ),
- [CAF_BLACKHOLE] = ( _BTM|_NS| _UPT ),
[BLACKHOLE] = ( _NS| _UPT ),
+ [BLOCKING_QUEUE] = ( _NS| _MUT|_UPT ),
[MVAR_CLEAN] = (_HNF| _NS| _MUT|_UPT ),
[MVAR_DIRTY] = (_HNF| _NS| _MUT|_UPT ),
[ARR_WORDS] = (_HNF| _NS| _UPT ),
type = Thunk;
break;
- case CAF_BLACKHOLE:
- case EAGER_BLACKHOLE:
case BLACKHOLE:
/* case BLACKHOLE_BQ: FIXME: case does not exist */
size = sizeW_fromITBL(info);
}
/* -----------------------------------------------------------------------------
+ stg_enter_checkbh is just like stg_enter, except that we also call
+ checkBlockingQueues(). The point of this is that the GC can
+ replace an stg_marked_upd_frame with an stg_enter_checkbh if it
+ finds that the BLACKHOLE has already been updated by another
+ thread. It would be unsafe to use stg_enter, because there might
+ be an orphaned BLOCKING_QUEUE now.
+ -------------------------------------------------------------------------- */
+
+INFO_TABLE_RET( stg_enter_checkbh, RET_SMALL, P_ unused)
+{
+ R1 = Sp(1);
+ Sp_adj(2);
+ foreign "C" checkBlockingQueues(MyCapability() "ptr",
+ CurrentTSO) [R1];
+ ENTER();
+}
+
+/* -----------------------------------------------------------------------------
Heap checks in Primitive case alternatives
A primitive case alternative is entered with a value either in
// code fragment executed just before we return to the scheduler
stg_block_putmvar_finally
{
-#ifdef THREADED_RTS
unlockClosure(R3, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(R3, stg_MVAR_DIRTY_info);
-#endif
jump StgReturn;
}
BLOCK_BUT_FIRST(stg_block_putmvar_finally);
}
-// code fragment executed just before we return to the scheduler
-stg_block_blackhole_finally
-{
-#if defined(THREADED_RTS)
- // The last thing we do is release sched_lock, which is
- // preventing other threads from accessing blackhole_queue and
- // picking up this thread before we are finished with it.
- RELEASE_LOCK(sched_mutex "ptr");
-#endif
- jump StgReturn;
-}
-
stg_block_blackhole
{
Sp_adj(-2);
Sp(1) = R1;
Sp(0) = stg_enter_info;
- BLOCK_BUT_FIRST(stg_block_blackhole_finally);
+ BLOCK_GENERIC;
}
INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused )
#include "Disassembler.h"
#include "Interpreter.h"
#include "ThreadPaused.h"
+#include "Threads.h"
#include <string.h> /* for memcpy */
#ifdef HAVE_ERRNO_H
// to a PAP by the GC, violating the invariant that PAPs
// always contain a tagged pointer to the function.
INTERP_TICK(it_retto_UPDATE);
- UPD_IND(cap, ((StgUpdateFrame *)Sp)->updatee, tagged_obj);
+ updateThunk(cap, cap->r.rCurrentTSO,
+ ((StgUpdateFrame *)Sp)->updatee, tagged_obj);
Sp += sizeofW(StgUpdateFrame);
goto do_return;
case FUN_1_1:
case FUN_0_2:
case BLACKHOLE:
- case CAF_BLACKHOLE:
+ case BLOCKING_QUEUE:
case IND_PERM:
case IND_OLDGEN_PERM:
/*
SymI_HasProto(stable_ptr_table) \
SymI_HasProto(stackOverflow) \
SymI_HasProto(stg_CAF_BLACKHOLE_info) \
+ SymI_HasProto(stg_BLACKHOLE_info) \
SymI_HasProto(__stg_EAGER_BLACKHOLE_info) \
+ SymI_HasProto(stg_BLOCKING_QUEUE_CLEAN_info) \
+ SymI_HasProto(stg_BLOCKING_QUEUE_DIRTY_info) \
SymI_HasProto(startTimer) \
SymI_HasProto(stg_MVAR_CLEAN_info) \
SymI_HasProto(stg_MVAR_DIRTY_info) \
SymI_HasProto(stg_sel_8_upd_info) \
SymI_HasProto(stg_sel_9_upd_info) \
SymI_HasProto(stg_upd_frame_info) \
+ SymI_HasProto(stg_bh_upd_frame_info) \
SymI_HasProto(suspendThread) \
SymI_HasProto(stg_takeMVarzh) \
SymI_HasProto(stg_threadStatuszh) \
--- /dev/null
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2010
+ *
+ * Inter-Capability message passing
+ *
+ * --------------------------------------------------------------------------*/
+
+#include "Rts.h"
+#include "Messages.h"
+#include "Trace.h"
+#include "Capability.h"
+#include "Schedule.h"
+#include "Threads.h"
+#include "RaiseAsync.h"
+#include "sm/Storage.h"
+
+/* ----------------------------------------------------------------------------
+ Send a message to another Capability
+ ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
+{
+ ACQUIRE_LOCK(&to_cap->lock);
+
+#ifdef DEBUG
+ {
+ const StgInfoTable *i = msg->header.info;
+ if (i != &stg_MSG_WAKEUP_info &&
+ i != &stg_MSG_THROWTO_info &&
+ i != &stg_MSG_BLACKHOLE_info &&
+ i != &stg_MSG_TRY_WAKEUP_info &&
+ i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
+ i != &stg_WHITEHOLE_info) {
+ barf("sendMessage: %p", i);
+ }
+ }
+#endif
+
+ msg->link = to_cap->inbox;
+ to_cap->inbox = msg;
+
+ recordClosureMutated(from_cap,(StgClosure*)msg);
+
+ if (to_cap->running_task == NULL) {
+ to_cap->running_task = myTask();
+ // precond for releaseCapability_()
+ releaseCapability_(to_cap,rtsFalse);
+ } else {
+ contextSwitchCapability(to_cap);
+ }
+
+ RELEASE_LOCK(&to_cap->lock);
+}
+
+#endif /* THREADED_RTS */
+
+/* ----------------------------------------------------------------------------
+ Handle a message
+ ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void
+executeMessage (Capability *cap, Message *m)
+{
+ const StgInfoTable *i;
+
+loop:
+ write_barrier(); // allow m->header to be modified by another thread
+ i = m->header.info;
+ if (i == &stg_MSG_WAKEUP_info)
+ {
+ // the plan is to eventually get rid of these and use
+ // TRY_WAKEUP instead.
+ MessageWakeup *w = (MessageWakeup *)m;
+ StgTSO *tso = w->tso;
+ debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
+ (lnat)tso->id);
+ ASSERT(tso->cap == cap);
+ ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
+ ASSERT(tso->block_info.closure == (StgClosure *)m);
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap, tso);
+ }
+ else if (i == &stg_MSG_TRY_WAKEUP_info)
+ {
+ StgTSO *tso = ((MessageWakeup *)m)->tso;
+ debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
+ (lnat)tso->id);
+ tryWakeupThread(cap, tso);
+ }
+ else if (i == &stg_MSG_THROWTO_info)
+ {
+ MessageThrowTo *t = (MessageThrowTo *)m;
+ nat r;
+ const StgInfoTable *i;
+
+ i = lockClosure((StgClosure*)m);
+ if (i != &stg_MSG_THROWTO_info) {
+ unlockClosure((StgClosure*)m, i);
+ goto loop;
+ }
+
+ debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
+ (lnat)t->source->id, (lnat)t->target->id);
+
+ ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
+ ASSERT(t->source->block_info.closure == (StgClosure *)m);
+
+ r = throwToMsg(cap, t);
+
+ switch (r) {
+ case THROWTO_SUCCESS:
+ ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
+ t->source->sp += 3;
+ unblockOne(cap, t->source);
+ // this message is done
+ unlockClosure((StgClosure*)m, &stg_IND_info);
+ break;
+ case THROWTO_BLOCKED:
+ // unlock the message
+ unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
+ break;
+ }
+ }
+ else if (i == &stg_MSG_BLACKHOLE_info)
+ {
+ nat r;
+ MessageBlackHole *b = (MessageBlackHole*)m;
+
+ r = messageBlackHole(cap, b);
+ if (r == 0) {
+ tryWakeupThread(cap, b->tso);
+ }
+ return;
+ }
+ else if (i == &stg_IND_info)
+ {
+ // message was revoked
+ return;
+ }
+ else if (i == &stg_WHITEHOLE_info)
+ {
+ goto loop;
+ }
+ else
+ {
+ barf("executeMessage: %p", i);
+ }
+}
+
+#endif
+
+/* ----------------------------------------------------------------------------
+ Handle a MSG_BLACKHOLE message
+
+ This is called from two places: either we just entered a BLACKHOLE
+ (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
+ cap->inbox.
+
+ We need to establish whether the BLACKHOLE belongs to
+ this Capability, and
+ - if so, arrange to block the current thread on it
+ - otherwise, forward the message to the right place
+
+ Returns:
+ - 0 if the blocked thread can be woken up by the caller
+ - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
+ at some point in the future.
+
+ ------------------------------------------------------------------------- */
+
+nat messageBlackHole(Capability *cap, MessageBlackHole *msg)
+{
+ const StgInfoTable *info;
+ StgClosure *p;
+ StgBlockingQueue *bq;
+ StgClosure *bh = msg->bh;
+ StgTSO *owner;
+
+ debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p",
+ (lnat)msg->tso->id, msg->bh);
+
+ info = bh->header.info;
+
+ // If we got this message in our inbox, it might be that the
+ // BLACKHOLE has already been updated, and GC has shorted out the
+ // indirection, so the pointer no longer points to a BLACKHOLE at
+ // all.
+ if (info != &stg_BLACKHOLE_info &&
+ info != &stg_CAF_BLACKHOLE_info &&
+ info != &stg_WHITEHOLE_info) {
+ // if it is a WHITEHOLE, then a thread is in the process of
+ // trying to BLACKHOLE it. But we know that it was once a
+ // BLACKHOLE, so there is at least a valid pointer in the
+ // payload, so we can carry on.
+ return 0;
+ }
+
+ // we know at this point that the closure
+loop:
+ p = ((StgInd*)bh)->indirectee;
+ info = p->header.info;
+
+ if (info == &stg_IND_info)
+ {
+ // This could happen, if e.g. we got a BLOCKING_QUEUE that has
+ // just been replaced with an IND by another thread in
+ // updateThunk(). In which case, if we read the indirectee
+ // again we should get the value.
+ goto loop;
+ }
+
+ else if (info == &stg_TSO_info)
+ {
+ owner = deRefTSO((StgTSO *)p);
+
+#ifdef THREADED_RTS
+ if (owner->cap != cap) {
+ sendMessage(cap, owner->cap, (Message*)msg);
+ debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
+ return 1;
+ }
+#endif
+ // owner is the owner of the BLACKHOLE, and resides on this
+ // Capability. msg->tso is the first thread to block on this
+ // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
+
+ bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
+
+ // initialise the BLOCKING_QUEUE object
+ SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
+ bq->bh = bh;
+ bq->queue = msg;
+ bq->owner = owner;
+
+ msg->link = (MessageBlackHole*)END_TSO_QUEUE;
+
+ // All BLOCKING_QUEUES are linked in a list on owner->bq, so
+ // that we can search through them in the event that there is
+ // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
+ // becomes orphaned (see updateThunk()).
+ bq->link = owner->bq;
+ owner->bq = bq;
+ dirty_TSO(cap, owner); // we modified owner->bq
+
+ // point to the BLOCKING_QUEUE from the BLACKHOLE
+ write_barrier(); // make the BQ visible
+ ((StgInd*)bh)->indirectee = (StgClosure *)bq;
+ recordClosureMutated(cap,bh); // bh was mutated
+
+ debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
+ (lnat)msg->tso->id, (lnat)owner->id);
+
+ return 1; // blocked
+ }
+ else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
+ info == &stg_BLOCKING_QUEUE_DIRTY_info)
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+ ASSERT(bq->bh == bh);
+
+ owner = deRefTSO(bq->owner);
+
+ ASSERT(owner != END_TSO_QUEUE);
+
+#ifdef THREADED_RTS
+ if (owner->cap != cap) {
+ sendMessage(cap, owner->cap, (Message*)msg);
+ debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
+ return 1;
+ }
+#endif
+
+ msg->link = bq->queue;
+ bq->queue = msg;
+ recordClosureMutated(cap,(StgClosure*)msg);
+
+ if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
+ bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ recordClosureMutated(cap,bq);
+ }
+
+ debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
+ (lnat)msg->tso->id, (lnat)owner->id);
+
+ return 1; // blocked
+ }
+
+ return 0; // not blocked
+}
+
--- /dev/null
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2010
+ *
+ * Inter-Capability message passing
+ *
+ * --------------------------------------------------------------------------*/
+
+BEGIN_RTS_PRIVATE
+
+nat messageBlackHole(Capability *cap, MessageBlackHole *msg);
+
+#ifdef THREADED_RTS
+void executeMessage (Capability *cap, Message *m);
+void sendMessage (Capability *from_cap, Capability *to_cap, Message *msg);
+#endif
+
+END_RTS_PRIVATE
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
RET_P(val);
}
else
/* No further putMVars, MVar is now empty */
StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
RET_P(val);
}
if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
else
{
/* No further putMVars, MVar is now empty */
StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
RET_NP(1, val);
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
jump %ENTRY_CODE(Sp(0));
}
else
/* No further takes, the MVar is now full. */
StgMVar_value(mvar) = val;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
jump %ENTRY_CODE(Sp(0));
}
StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
}
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
else
{
/* No further takes, the MVar is now full. */
StgMVar_value(mvar) = R2;
-#if defined(THREADED_RTS)
unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
- SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
}
RET_N(1);
debugBelch(")\n");
break;
+ case BLACKHOLE:
+ debugBelch("BLACKHOLE(");
+ printPtr((StgPtr)((StgInd*)obj)->indirectee);
+ debugBelch(")\n");
+ break;
+
/* Cannot happen -- use default case.
case RET_BCO:
case RET_SMALL:
break;
}
- case CAF_BLACKHOLE:
- debugBelch("CAF_BH");
- break;
-
- case BLACKHOLE:
- debugBelch("BH\n");
- break;
-
case ARR_WORDS:
{
StgWord i;
[UPDATE_FRAME] = "UPDATE_FRAME",
[CATCH_FRAME] = "CATCH_FRAME",
[STOP_FRAME] = "STOP_FRAME",
- [CAF_BLACKHOLE] = "CAF_BLACKHOLE",
[BLACKHOLE] = "BLACKHOLE",
+ [BLOCKING_QUEUE] = "BLOCKING_QUEUE",
[MVAR_CLEAN] = "MVAR_CLEAN",
[MVAR_DIRTY] = "MVAR_DIRTY",
[ARR_WORDS] = "ARR_WORDS",
case IND_PERM:
case IND_OLDGEN:
case IND_OLDGEN_PERM:
- case CAF_BLACKHOLE:
case BLACKHOLE:
+ case BLOCKING_QUEUE:
case FUN_1_0:
case FUN_0_1:
case FUN_1_1:
#include "STM.h"
#include "sm/Sanity.h"
#include "Profiling.h"
+#include "Messages.h"
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
rtsBool stop_at_atomically)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
}
- while (tso->what_next == ThreadRelocated) {
- tso = tso->_link;
- }
// Remove it from any blocking queues
removeFromQueues(cap,tso);
void
suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
{
+ tso = deRefTSO(tso);
+
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
return;
}
- while (tso->what_next == ThreadRelocated) {
- tso = tso->_link;
- }
// Remove it from any blocking queues
removeFromQueues(cap,tso);
msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
// message starts locked; the caller has to unlock it when it is
// ready.
- msg->header.info = &stg_WHITEHOLE_info;
+ SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
msg->source = source;
msg->target = target;
msg->exception = exception;
{
StgWord status;
StgTSO *target = msg->target;
+ Capability *target_cap;
+ goto check_target;
+
+retry:
+ write_barrier();
+ debugTrace(DEBUG_sched, "throwTo: retrying...");
+
+check_target:
ASSERT(target != END_TSO_QUEUE);
// follow ThreadRelocated links in the target first
- while (target->what_next == ThreadRelocated) {
- target = target->_link;
- // No, it might be a WHITEHOLE:
- // ASSERT(get_itbl(target)->type == TSO);
+ target = deRefTSO(target);
+
+ // Thread already dead?
+ if (target->what_next == ThreadComplete
+ || target->what_next == ThreadKilled) {
+ return THROWTO_SUCCESS;
}
debugTraceCap(DEBUG_sched, cap,
traceThreadStatus(DEBUG_sched, target);
#endif
- goto check_target;
-retry:
- write_barrier();
- debugTrace(DEBUG_sched, "throwTo: retrying...");
-
-check_target:
- ASSERT(target != END_TSO_QUEUE);
-
- // Thread already dead?
- if (target->what_next == ThreadComplete
- || target->what_next == ThreadKilled) {
- return THROWTO_SUCCESS;
+ target_cap = target->cap;
+ if (target->cap != cap) {
+ throwToSendMsg(cap, target_cap, msg);
+ return THROWTO_BLOCKED;
}
status = target->why_blocked;
have also seen the write to Q.
*/
{
- Capability *target_cap;
-
write_barrier();
- target_cap = target->cap;
- if (target_cap != cap) {
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
+ if ((target->flags & TSO_BLOCKEX) == 0) {
+ // It's on our run queue and not blocking exceptions
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
} else {
- if ((target->flags & TSO_BLOCKEX) == 0) {
- // It's on our run queue and not blocking exceptions
- raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- return THROWTO_SUCCESS;
- } else {
- blockedThrowTo(cap,target,msg);
- return THROWTO_BLOCKED;
- }
+ blockedThrowTo(cap,target,msg);
+ return THROWTO_BLOCKED;
}
}
case BlockedOnMsgThrowTo:
{
- Capability *target_cap;
const StgInfoTable *i;
MessageThrowTo *m;
goto retry;
}
- target_cap = target->cap;
- if (target_cap != cap) {
- unlockClosure((StgClosure*)m, i);
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
- }
-
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
unlockClosure((StgClosure*)m, i);
unlockClosure((StgClosure*)m, &stg_IND_info);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
return THROWTO_SUCCESS;
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
+ blockedThrowTo(cap,target,msg);
unlockClosure((StgClosure *)mvar, info);
return THROWTO_BLOCKED;
} else {
removeThreadFromMVarQueue(cap, mvar, target);
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
- unlockClosure((StgClosure *)mvar, info);
+ if (info == &stg_MVAR_CLEAN_info) {
+ dirty_MVAR(&cap->r,(StgClosure*)mvar);
+ }
+ unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
return THROWTO_SUCCESS;
}
}
case BlockedOnBlackHole:
{
- ACQUIRE_LOCK(&sched_mutex);
- // double checking the status after the memory barrier:
- if (target->why_blocked != BlockedOnBlackHole) {
- RELEASE_LOCK(&sched_mutex);
- goto retry;
- }
-
- if (target->flags & TSO_BLOCKEX) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_BLOCKED; // caller releases lock
- } else {
- removeThreadFromQueue(cap, &blackhole_queue, target);
- raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
- RELEASE_LOCK(&sched_mutex);
- return THROWTO_SUCCESS;
- }
+ // Revoke the message by replacing it with IND. We're not
+ // locking anything here, so we might still get a TRY_WAKEUP
+ // message from the owner of the blackhole some time in the
+ // future, but that doesn't matter.
+ ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+ OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+ raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+ return THROWTO_SUCCESS;
}
case BlockedOnSTM:
}
if ((target->flags & TSO_BLOCKEX) &&
((target->flags & TSO_INTERRUPTIBLE) == 0)) {
- Capability *target_cap = target->cap;
- if (target->cap != cap) {
- throwToSendMsg(cap,target_cap,msg);
- } else {
- blockedThrowTo(cap,target,msg);
- }
+ blockedThrowTo(cap,target,msg);
unlockTSO(target);
return THROWTO_BLOCKED;
} else {
raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
- unblockOne(cap, target);
unlockTSO(target);
return THROWTO_SUCCESS;
}
case BlockedOnCCall:
case BlockedOnCCall_NoUnblockExc:
- {
- Capability *target_cap;
-
- target_cap = target->cap;
- if (target_cap != cap) {
- throwToSendMsg(cap, target_cap, msg);
- return THROWTO_BLOCKED;
- }
-
blockedThrowTo(cap,target,msg);
return THROWTO_BLOCKED;
- }
#ifndef THREADEDED_RTS
case BlockedOnRead:
{
#ifdef THREADED_RTS
- debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
- sendMessage(target_cap, (Message*)msg);
+ sendMessage(cap, target_cap, (Message*)msg);
#endif
}
ASSERT(target->cap == cap);
- msg->link = (Message*)target->blocked_exceptions;
+ msg->link = target->blocked_exceptions;
target->blocked_exceptions = msg;
dirty_TSO(cap,target); // we modified the blocked_exceptions queue
}
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
(tso->flags & TSO_BLOCKEX) != 0) {
- debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+ debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
}
if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
case BlockedOnMVar:
removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
+ // we aren't doing a write barrier here: the MVar is supposed to
+ // be already locked, so replacing the info pointer would unlock it.
goto done;
case BlockedOnBlackHole:
- removeThreadFromQueue(cap, &blackhole_queue, tso);
+ // nothing to do
goto done;
case BlockedOnMsgWakeup:
{
// kill the message, atomically:
- tso->block_info.wakeup->header.info = &stg_IND_info;
+ OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
break;
}
* asynchronous exception in an existing thread.
*
* We first remove the thread from any queue on which it might be
- * blocked. The possible blockages are MVARs and BLACKHOLE_BQs.
+ * blocked. The possible blockages are MVARs, BLOCKING_QUEUESs, and
+ * TSO blocked_exception queues.
*
* We strip the stack down to the innermost CATCH_FRAME, building
* thunks in the heap for all the active computations, so they can
StgClosure *updatee;
nat i;
- debugTrace(DEBUG_sched,
- "raising exception in thread %ld.", (long)tso->id);
+ debugTraceCap(DEBUG_sched, cap,
+ "raising exception in thread %ld.", (long)tso->id);
#if defined(PROFILING)
/*
tso->what_next != ThreadKilled &&
tso->what_next != ThreadRelocated);
+ // only if we own this TSO (except that deleteThread() calls this
+ ASSERT(tso->cap == cap);
+
+ // wake it up
+ if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+ }
+
// mark it dirty; we're about to change its stack.
dirty_TSO(cap, tso);
// Perform the update
// TODO: this may waste some work, if the thunk has
// already been updated by another thread.
- UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+ updateThunk(cap, tso,
+ ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
}
sp += sizeofW(StgUpdateFrame) - 1;
{
StgTRecHeader *trec = tso -> trec;
StgTRecHeader *outer = trec -> enclosing_trec;
- debugTrace(DEBUG_stm,
- "found atomically block delivering async exception");
+ debugTraceCap(DEBUG_stm, cap,
+ "found atomically block delivering async exception");
stmAbortTransaction(cap, trec);
stmFreeAbortedTRec(cap, trec);
tso -> trec = outer;
// no child, no SRT
case CONSTR_0_1:
case CONSTR_0_2:
- case CAF_BLACKHOLE:
- case BLACKHOLE:
case ARR_WORDS:
*first_child = NULL;
return;
case IND_PERM:
case IND_OLDGEN_PERM:
case IND_OLDGEN:
+ case BLACKHOLE:
*first_child = ((StgInd *)c)->indirectee;
return;
case CONSTR_1_0:
// no child (fixed), no SRT
case CONSTR_0_1:
case CONSTR_0_2:
- case CAF_BLACKHOLE:
- case BLACKHOLE:
case ARR_WORDS:
// one child (fixed), no SRT
case MUT_VAR_CLEAN:
case FUN_0_2:
// partial applications
case PAP:
- // blackholes
- case CAF_BLACKHOLE:
- case BLACKHOLE:
// indirection
case IND_PERM:
case IND_OLDGEN_PERM:
case IND_OLDGEN:
+ case BLACKHOLE:
// static objects
case CONSTR_STATIC:
case FUN_STATIC:
lockTSO(tso);
if (tso -> why_blocked == BlockedOnSTM) {
TRACE("unpark_tso on tso=%p", tso);
- unblockOne(cap,tso);
+ tryWakeupThread(cap,tso);
} else {
TRACE("spurious unpark_tso on tso=%p", tso);
}
#include "Threads.h"
#include "Timer.h"
#include "ThreadPaused.h"
+#include "Messages.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
#endif
-/* Threads blocked on blackholes.
- * LOCK: sched_mutex+capability, or all capabilities
- */
-StgTSO *blackhole_queue = NULL;
-
-/* The blackhole_queue should be checked for threads to wake up. See
- * Schedule.h for more thorough comment.
- * LOCK: none (doesn't matter if we miss an update)
- */
-rtsBool blackholes_need_checking = rtsFalse;
-
/* Set to true when the latest garbage collection failed to reclaim
* enough space, and the runtime should proceed to shut itself down in
* an orderly fashion (emitting profiling info etc.)
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleProcessInbox(Capability *cap);
-static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
#if defined(THREADED_RTS)
static Capability *scheduleDoGC(Capability *cap, Task *task,
rtsBool force_major);
-static rtsBool checkBlackHoles(Capability *cap);
-
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso);
startHeapProfTimer();
- // Check for exceptions blocked on this thread
- maybePerformBlockedException (cap, t);
-
// ----------------------------------------------------------------------
// Run the current thread
// happened. So find the new location:
t = cap->r.rCurrentTSO;
- // We have run some Haskell code: there might be blackhole-blocked
- // threads to wake up now.
- // Lock-free test here should be ok, we're just setting a flag.
- if ( blackhole_queue != END_TSO_QUEUE ) {
- blackholes_need_checking = rtsTrue;
- }
-
// And save the current errno in this thread.
// XXX: possibly bogus for SMP because this thread might already
// be running again, see code below.
{
scheduleStartSignalHandlers(cap);
- // Only check the black holes here if we've nothing else to do.
- // During normal execution, the black hole list only gets checked
- // at GC time, to avoid repeatedly traversing this possibly long
- // list each time around the scheduler.
- if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
scheduleProcessInbox(cap);
scheduleCheckBlockedThreads(cap);
!shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
!emptyInbox(cap) ||
- blackholes_need_checking ||
sched_state >= SCHED_INTERRUPTING))
return;
//
if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
{
- awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
+ awaitEvent (emptyRunQueue(cap));
}
#endif
}
-
-/* ----------------------------------------------------------------------------
- * Check for threads woken up by other Capabilities
- * ------------------------------------------------------------------------- */
-
-#if defined(THREADED_RTS)
-static void
-executeMessage (Capability *cap, Message *m)
-{
- const StgInfoTable *i;
-
-loop:
- write_barrier(); // allow m->header to be modified by another thread
- i = m->header.info;
- if (i == &stg_MSG_WAKEUP_info)
- {
- MessageWakeup *w = (MessageWakeup *)m;
- StgTSO *tso = w->tso;
- debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld",
- (lnat)tso->id);
- ASSERT(tso->cap == cap);
- ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
- ASSERT(tso->block_info.closure == (StgClosure *)m);
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap, tso);
- }
- else if (i == &stg_MSG_THROWTO_info)
- {
- MessageThrowTo *t = (MessageThrowTo *)m;
- nat r;
- const StgInfoTable *i;
-
- i = lockClosure((StgClosure*)m);
- if (i != &stg_MSG_THROWTO_info) {
- unlockClosure((StgClosure*)m, i);
- goto loop;
- }
-
- debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
- (lnat)t->source->id, (lnat)t->target->id);
-
- ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
- ASSERT(t->source->block_info.closure == (StgClosure *)m);
-
- r = throwToMsg(cap, t);
-
- switch (r) {
- case THROWTO_SUCCESS:
- ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
- t->source->sp += 3;
- unblockOne(cap, t->source);
- // this message is done
- unlockClosure((StgClosure*)m, &stg_IND_info);
- break;
- case THROWTO_BLOCKED:
- // unlock the message
- unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
- break;
- }
- }
- else if (i == &stg_IND_info)
- {
- // message was revoked
- return;
- }
- else if (i == &stg_WHITEHOLE_info)
- {
- goto loop;
- }
- else
- {
- barf("executeMessage: %p", i);
- }
-}
-#endif
-
-static void
-scheduleProcessInbox (Capability *cap USED_IF_THREADS)
-{
-#if defined(THREADED_RTS)
- Message *m;
-
- while (!emptyInbox(cap)) {
- ACQUIRE_LOCK(&cap->lock);
- m = cap->inbox;
- cap->inbox = m->link;
- RELEASE_LOCK(&cap->lock);
- executeMessage(cap, (Message *)m);
- }
-#endif
-}
-
-/* ----------------------------------------------------------------------------
- * Check for threads blocked on BLACKHOLEs that can be woken up
- * ------------------------------------------------------------------------- */
-static void
-scheduleCheckBlackHoles (Capability *cap)
-{
- if ( blackholes_need_checking ) // check without the lock first
- {
- ACQUIRE_LOCK(&sched_mutex);
- if ( blackholes_need_checking ) {
- blackholes_need_checking = rtsFalse;
- // important that we reset the flag *before* checking the
- // blackhole queue, otherwise we could get deadlock. This
- // happens as follows: we wake up a thread that
- // immediately runs on another Capability, blocks on a
- // blackhole, and then we reset the blackholes_need_checking flag.
- checkBlackHoles(cap);
- }
- RELEASE_LOCK(&sched_mutex);
- }
-}
-
/* ----------------------------------------------------------------------------
* Detect deadlock conditions and attempt to resolve them.
* ------------------------------------------------------------------------- */
#endif
/* ----------------------------------------------------------------------------
+ * Process message in the current Capability's inbox
+ * ------------------------------------------------------------------------- */
+
+static void
+scheduleProcessInbox (Capability *cap USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ Message *m;
+
+ while (!emptyInbox(cap)) {
+ ACQUIRE_LOCK(&cap->lock);
+ m = cap->inbox;
+ cap->inbox = m->link;
+ RELEASE_LOCK(&cap->lock);
+ executeMessage(cap, (Message *)m);
+ }
+#endif
+}
+
+/* ----------------------------------------------------------------------------
* Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
* ------------------------------------------------------------------------- */
debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
}
- // do this while the other Capabilities stop:
- if (cap) scheduleCheckBlackHoles(cap);
-
if (gc_type == PENDING_GC_SEQ)
{
// single-threaded GC: grab all the capabilities
waitForGcThreads(cap);
}
-#else /* !THREADED_RTS */
-
- // do this while the other Capabilities stop:
- if (cap) scheduleCheckBlackHoles(cap);
-
#endif
IF_DEBUG(scheduler, printAllThreads());
sleeping_queue = END_TSO_QUEUE;
#endif
- blackhole_queue = END_TSO_QUEUE;
-
sched_state = SCHED_RUNNING;
recent_activity = ACTIVITY_YES;
* of the stack, so we don't attempt to scavenge any part of the
* dead TSO's stack.
*/
- tso->what_next = ThreadRelocated;
setTSOLink(cap,tso,dest);
+ write_barrier(); // other threads seeing ThreadRelocated will look at _link
+ tso->what_next = ThreadRelocated;
tso->sp = (P_)&(tso->stack[tso->stack_size]);
tso->why_blocked = NotBlocked;
debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
(long)tso->id, tso_size_w, tso_sizeW(new_tso));
- tso->what_next = ThreadRelocated;
tso->_link = new_tso; // no write barrier reqd: same generation
+ write_barrier(); // other threads seeing ThreadRelocated will look at _link
+ tso->what_next = ThreadRelocated;
// The TSO attached to this Task may have moved, so update the
// pointer to it.
#endif
/* -----------------------------------------------------------------------------
- * checkBlackHoles()
- *
- * Check the blackhole_queue for threads that can be woken up. We do
- * this periodically: before every GC, and whenever the run queue is
- * empty.
- *
- * An elegant solution might be to just wake up all the blocked
- * threads with awakenBlockedQueue occasionally: they'll go back to
- * sleep again if the object is still a BLACKHOLE. Unfortunately this
- * doesn't give us a way to tell whether we've actually managed to
- * wake up any threads, so we would be busy-waiting.
- *
- * -------------------------------------------------------------------------- */
-
-static rtsBool
-checkBlackHoles (Capability *cap)
-{
- StgTSO **prev, *t;
- rtsBool any_woke_up = rtsFalse;
- StgHalfWord type;
-
- // blackhole_queue is global:
- ASSERT_LOCK_HELD(&sched_mutex);
-
- debugTrace(DEBUG_sched, "checking threads blocked on black holes");
-
- // ASSUMES: sched_mutex
- prev = &blackhole_queue;
- t = blackhole_queue;
- while (t != END_TSO_QUEUE) {
- if (t->what_next == ThreadRelocated) {
- t = t->_link;
- continue;
- }
- ASSERT(t->why_blocked == BlockedOnBlackHole);
- type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
- if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
- IF_DEBUG(sanity,checkTSO(t));
- t = unblockOne(cap, t);
- *prev = t;
- any_woke_up = rtsTrue;
- } else {
- prev = &t->_link;
- t = t->_link;
- }
- }
-
- return any_woke_up;
-}
-
-/* -----------------------------------------------------------------------------
Deleting threads
This is used for interruption (^C) and forking, and corresponds to
-------------------------------------------------------------------------- */
static void
-deleteThread (Capability *cap, StgTSO *tso)
+deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
{
// NOTE: must only be called on a TSO that we have exclusive
// access to, because we will call throwToSingleThreaded() below.
if (tso->why_blocked != BlockedOnCCall &&
tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- throwToSingleThreaded(cap,tso,NULL);
+ throwToSingleThreaded(tso->cap,tso,NULL);
}
}
SET_HDR(raise_closure, &stg_raise_info, CCCS);
raise_closure->payload[0] = exception;
}
- UPD_IND(cap, ((StgUpdateFrame *)p)->updatee,
- (StgClosure *)raise_closure);
+ updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
+ (StgClosure *)raise_closure);
p = next;
continue;
extern StgTSO *sleeping_queue;
#endif
-/* Set to rtsTrue if there are threads on the blackhole_queue, and
- * it is possible that one or more of them may be available to run.
- * This flag is set to rtsFalse after we've checked the queue, and
- * set to rtsTrue just before we run some Haskell code. It is used
- * to decide whether we should yield the Capability or not.
- * Locks required : none (see scheduleCheckBlackHoles()).
- */
-extern rtsBool blackholes_need_checking;
-
extern rtsBool heap_overflow;
#if defined(THREADED_RTS)
waiting for the evaluation of the closure to finish.
------------------------------------------------------------------------- */
-/* Note: a BLACKHOLE must be big enough to be
- * overwritten with an indirection/evacuee/catch. Thus we claim it
- * has 1 non-pointer word of payload.
- */
-INFO_TABLE(stg_BLACKHOLE,0,1,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
+INFO_TABLE(stg_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
{
- TICK_ENT_BH();
-
-#ifdef THREADED_RTS
- // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
- /* Actually this is not necessary because R1 is about to be destroyed. */
- LDV_ENTER(R1);
+ W_ r, p, info, bq, msg, owner, bd;
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(sched_mutex "ptr");
- // released in stg_block_blackhole_finally
-#endif
-
- /* Put ourselves on the blackhole queue */
- StgTSO__link(CurrentTSO) = W_[blackhole_queue];
- W_[blackhole_queue] = CurrentTSO;
-
- /* jot down why and on what closure we are blocked */
- StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
- StgTSO_block_info(CurrentTSO) = R1;
+ TICK_ENT_DYN_IND(); /* tick */
- jump stg_block_blackhole;
+retry:
+ p = StgInd_indirectee(R1);
+ if (GETTAG(p) != 0) {
+ R1 = p;
+ jump %ENTRY_CODE(Sp(0));
+ }
+
+ info = StgHeader_info(p);
+ if (info == stg_IND_info) {
+ // This could happen, if e.g. we got a BLOCKING_QUEUE that has
+ // just been replaced with an IND by another thread in
+ // wakeBlockingQueue().
+ goto retry;
+ }
+
+ if (info == stg_TSO_info ||
+ info == stg_BLOCKING_QUEUE_CLEAN_info ||
+ info == stg_BLOCKING_QUEUE_DIRTY_info)
+ {
+ ("ptr" msg) = foreign "C" allocate(MyCapability() "ptr",
+ BYTES_TO_WDS(SIZEOF_MessageBlackHole)) [R1];
+
+ StgHeader_info(msg) = stg_MSG_BLACKHOLE_info;
+ MessageBlackHole_tso(msg) = CurrentTSO;
+ MessageBlackHole_bh(msg) = R1;
+
+ (r) = foreign "C" messageBlackHole(MyCapability() "ptr", msg "ptr") [R1];
+
+ if (r == 0) {
+ goto retry;
+ } else {
+ StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
+ StgTSO_block_info(CurrentTSO) = msg;
+ jump stg_block_blackhole;
+ }
+ }
+ else
+ {
+ R1 = p;
+ ENTER();
+ }
}
-/* identical to BLACKHOLEs except for the infotag */
-INFO_TABLE(stg_CAF_BLACKHOLE,0,1,CAF_BLACKHOLE,"CAF_BLACKHOLE","CAF_BLACKHOLE")
+INFO_TABLE(__stg_EAGER_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
{
- TICK_ENT_BH();
- LDV_ENTER(R1);
-
-#if defined(THREADED_RTS)
- // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(sched_mutex "ptr");
- // released in stg_block_blackhole_finally
-#endif
-
- /* Put ourselves on the blackhole queue */
- StgTSO__link(CurrentTSO) = W_[blackhole_queue];
- W_[blackhole_queue] = CurrentTSO;
-
- /* jot down why and on what closure we are blocked */
- StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
- StgTSO_block_info(CurrentTSO) = R1;
-
- jump stg_block_blackhole;
+ jump ENTRY_LBL(stg_BLACKHOLE);
}
-INFO_TABLE(__stg_EAGER_BLACKHOLE,0,1,BLACKHOLE,"EAGER_BLACKHOLE","EAGER_BLACKHOLE")
+// CAF_BLACKHOLE is allocated when entering a CAF. The reason it is
+// distinct from BLACKHOLE is so that we can tell the difference
+// between an update frame on the stack that points to a CAF under
+// evaluation, and one that points to a closure that is under
+// evaluation by another thread (a BLACKHOLE). See threadPaused().
+//
+INFO_TABLE(stg_CAF_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
{
- TICK_ENT_BH();
-
-#ifdef THREADED_RTS
- // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
- /* Actually this is not necessary because R1 is about to be destroyed. */
- LDV_ENTER(R1);
-
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(sched_mutex "ptr");
- // released in stg_block_blackhole_finally
-#endif
-
- /* Put ourselves on the blackhole queue */
- StgTSO__link(CurrentTSO) = W_[blackhole_queue];
- W_[blackhole_queue] = CurrentTSO;
+ jump ENTRY_LBL(stg_BLACKHOLE);
+}
- /* jot down why and on what closure we are blocked */
- StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
- StgTSO_block_info(CurrentTSO) = R1;
+INFO_TABLE(stg_BLOCKING_QUEUE_CLEAN,4,0,BLOCKING_QUEUE,"BLOCKING_QUEUE","BLOCKING_QUEUE")
+{ foreign "C" barf("BLOCKING_QUEUE_CLEAN object entered!") never returns; }
+
- jump stg_block_blackhole;
-}
+INFO_TABLE(stg_BLOCKING_QUEUE_DIRTY,4,0,BLOCKING_QUEUE,"BLOCKING_QUEUE","BLOCKING_QUEUE")
+{ foreign "C" barf("BLOCKING_QUEUE_DIRTY object entered!") never returns; }
+
/* ----------------------------------------------------------------------------
Whiteholes are used for the "locked" state of a closure (see lockClosure())
------------------------------------------------------------------------- */
INFO_TABLE(stg_WHITEHOLE, 0,0, WHITEHOLE, "WHITEHOLE", "WHITEHOLE")
-{ foreign "C" barf("WHITEHOLE object entered!") never returns; }
+{
+#if defined(THREADED_RTS)
+ W_ info, i;
+
+ i = 0;
+loop:
+ // spin until the WHITEHOLE is updated
+ info = StgHeader_info(R1);
+ if (info == stg_WHITEHOLE_info) {
+ i = i + 1;
+ if (i == SPIN_COUNT) {
+ i = 0;
+ foreign "C" yieldThread() [R1];
+ }
+ goto loop;
+ }
+ jump %ENTRY_CODE(info);
+#else
+ foreign "C" barf("WHITEHOLE object entered!") never returns;
+#endif
+}
/* ----------------------------------------------------------------------------
Some static info tables for things that don't get entered, and
INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP")
{ foreign "C" barf("MSG_WAKEUP object entered!") never returns; }
+INFO_TABLE_CONSTR(stg_MSG_TRY_WAKEUP,2,0,0,PRIM,"MSG_TRY_WAKEUP","MSG_TRY_WAKEUP")
+{ foreign "C" barf("MSG_TRY_WAKEUP object entered!") never returns; }
+
INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO")
{ foreign "C" barf("MSG_THROWTO object entered!") never returns; }
+INFO_TABLE_CONSTR(stg_MSG_BLACKHOLE,3,0,0,PRIM,"MSG_BLACKHOLE","MSG_BLACKHOLE")
+{ foreign "C" barf("MSG_BLACKHOLE object entered!") never returns; }
+
/* ----------------------------------------------------------------------------
END_TSO_QUEUE
#include "Updates.h"
#include "RaiseAsync.h"
#include "Trace.h"
+#include "Threads.h"
#include <string.h> // for memmove()
* screw us up if we don't check.
*/
if (upd->updatee != updatee && !closure_IND(upd->updatee)) {
- UPD_IND(cap, upd->updatee, updatee);
+ updateThunk(cap, tso, upd->updatee, updatee);
}
// now mark this update frame as a stack gap. The gap
maybePerformBlockedException (cap, tso);
if (tso->what_next == ThreadKilled) { return; }
- // NB. Blackholing is *not* optional, we must either do lazy
+ // NB. Blackholing is *compulsory*, we must either do lazy
// blackholing, or eager blackholing consistently. See Note
// [upd-black-hole] in sm/Scav.c.
#ifdef THREADED_RTS
retry:
#endif
- if (closure_flags[INFO_PTR_TO_STRUCT(bh_info)->type] & _IND
- || bh_info == &stg_BLACKHOLE_info) {
+ if (bh_info == &stg_BLACKHOLE_info ||
+ bh_info == &stg_WHITEHOLE_info)
+ {
debugTrace(DEBUG_squeeze,
"suspending duplicate work: %ld words of stack",
(long)((StgPtr)frame - tso->sp));
// the value to the frame underneath:
tso->sp = (StgPtr)frame + sizeofW(StgUpdateFrame) - 2;
tso->sp[1] = (StgWord)bh;
+ ASSERT(bh->header.info != &stg_TSO_info);
tso->sp[0] = (W_)&stg_enter_info;
// And continue with threadPaused; there might be
continue;
}
- if (bh->header.info != &stg_CAF_BLACKHOLE_info) {
- // zero out the slop so that the sanity checker can tell
- // where the next closure is.
- DEBUG_FILL_SLOP(bh);
-#ifdef PROFILING
- // @LDV profiling
- // We pretend that bh is now dead.
- LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
-#endif
- // an EAGER_BLACKHOLE gets turned into a BLACKHOLE here.
+ // zero out the slop so that the sanity checker can tell
+ // where the next closure is.
+ DEBUG_FILL_SLOP(bh);
+
+ // @LDV profiling
+ // We pretend that bh is now dead.
+ LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)bh);
+
+ // an EAGER_BLACKHOLE or CAF_BLACKHOLE gets turned into a
+ // BLACKHOLE here.
#ifdef THREADED_RTS
- cur_bh_info = (const StgInfoTable *)
- cas((StgVolatilePtr)&bh->header.info,
- (StgWord)bh_info,
- (StgWord)&stg_BLACKHOLE_info);
-
- if (cur_bh_info != bh_info) {
- bh_info = cur_bh_info;
- goto retry;
- }
-#else
- SET_INFO(bh,&stg_BLACKHOLE_info);
+ // first we turn it into a WHITEHOLE to claim it, and if
+ // successful we write our TSO and then the BLACKHOLE info pointer.
+ cur_bh_info = (const StgInfoTable *)
+ cas((StgVolatilePtr)&bh->header.info,
+ (StgWord)bh_info,
+ (StgWord)&stg_WHITEHOLE_info);
+
+ if (cur_bh_info != bh_info) {
+ bh_info = cur_bh_info;
+ goto retry;
+ }
#endif
- // We pretend that bh has just been created.
- LDV_RECORD_CREATE(bh);
- }
+ // The payload of the BLACKHOLE points to the TSO
+ ((StgInd *)bh)->indirectee = (StgClosure *)tso;
+ write_barrier();
+ SET_INFO(bh,&stg_BLACKHOLE_info);
+
+ // .. and we need a write barrier, since we just mutated the closure:
+ recordClosureMutated(cap,bh);
+
+ // We pretend that bh has just been created.
+ LDV_RECORD_CREATE(bh);
frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
if (prev_was_update_frame) {
#include "PosixSource.h"
#include "Rts.h"
+#include "Capability.h"
+#include "Updates.h"
#include "Threads.h"
#include "STM.h"
#include "Schedule.h"
#include "Trace.h"
#include "ThreadLabels.h"
+#include "Updates.h"
+#include "Messages.h"
+#include "sm/Storage.h"
/* Next thread ID to allocate.
* LOCK: sched_mutex
tso->what_next = ThreadRunGHC;
tso->why_blocked = NotBlocked;
+ tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
+ tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
tso->flags = 0;
tso->dirty = 1;
Fails fatally if the TSO is not on the queue.
-------------------------------------------------------------------------- */
-void
+rtsBool // returns True if we modified queue
removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
{
StgTSO *t, *prev;
if (t == tso) {
if (prev) {
setTSOLink(cap,prev,t->_link);
+ return rtsFalse;
} else {
*queue = t->_link;
+ return rtsTrue;
}
- return;
}
}
barf("removeThreadFromQueue: not found");
}
-void
+rtsBool // returns True if we modified head or tail
removeThreadFromDeQueue (Capability *cap,
StgTSO **head, StgTSO **tail, StgTSO *tso)
{
StgTSO *t, *prev;
+ rtsBool flag = rtsFalse;
prev = NULL;
for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
if (t == tso) {
if (prev) {
setTSOLink(cap,prev,t->_link);
+ flag = rtsFalse;
} else {
*head = t->_link;
+ flag = rtsTrue;
}
if (*tail == tso) {
if (prev) {
} else {
*tail = END_TSO_QUEUE;
}
- }
- return;
+ return rtsTrue;
+ } else {
+ return flag;
+ }
}
}
barf("removeThreadFromMVarQueue: not found");
void
removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
{
+ // caller must do the write barrier, because replacing the info
+ // pointer will unlock the MVar.
removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
+ tso->_link = END_TSO_QUEUE;
}
/* ----------------------------------------------------------------------------
return next;
}
+void
+tryWakeupThread (Capability *cap, StgTSO *tso)
+{
+#ifdef THREADED_RTS
+ if (tso->cap != cap)
+ {
+ MessageWakeup *msg;
+ msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
+ SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
+ msg->tso = tso;
+ sendMessage(cap, tso->cap, (Message*)msg);
+ return;
+ }
+#endif
+
+ switch (tso->why_blocked)
+ {
+ case BlockedOnBlackHole:
+ case BlockedOnSTM:
+ {
+ // just run the thread now, if the BH is not really available,
+ // we'll block again.
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+ break;
+ }
+ default:
+ // otherwise, do nothing
+ break;
+ }
+}
+
/* ----------------------------------------------------------------------------
awakenBlockedQueue
------------------------------------------------------------------------- */
void
-awakenBlockedQueue(Capability *cap, StgTSO *tso)
+wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
{
- while (tso != END_TSO_QUEUE) {
- tso = unblockOne(cap,tso);
+ MessageBlackHole *msg;
+ const StgInfoTable *i;
+
+ ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info ||
+ bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info );
+
+ for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
+ msg = msg->link) {
+ i = msg->header.info;
+ if (i != &stg_IND_info) {
+ ASSERT(i == &stg_MSG_BLACKHOLE_info);
+ tryWakeupThread(cap,msg->tso);
+ }
+ }
+
+ // overwrite the BQ with an indirection so it will be
+ // collected at the next GC.
+#if defined(DEBUG) && !defined(THREADED_RTS)
+ // XXX FILL_SLOP, but not if THREADED_RTS because in that case
+ // another thread might be looking at this BLOCKING_QUEUE and
+ // checking the owner field at the same time.
+ bq->bh = 0; bq->queue = 0; bq->owner = 0;
+#endif
+ OVERWRITE_INFO(bq, &stg_IND_info);
+}
+
+// If we update a closure that we know we BLACKHOLE'd, and the closure
+// no longer points to the current TSO as its owner, then there may be
+// an orphaned BLOCKING_QUEUE closure with blocked threads attached to
+// it. We therefore traverse the BLOCKING_QUEUEs attached to the
+// current TSO to see if any can now be woken up.
+void
+checkBlockingQueues (Capability *cap, StgTSO *tso)
+{
+ StgBlockingQueue *bq, *next;
+ StgClosure *p;
+
+ debugTraceCap(DEBUG_sched, cap,
+ "collision occurred; checking blocking queues for thread %ld",
+ (lnat)tso->id);
+
+ for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
+ next = bq->link;
+
+ if (bq->header.info == &stg_IND_info) {
+ // ToDo: could short it out right here, to avoid
+ // traversing this IND multiple times.
+ continue;
+ }
+
+ p = bq->bh;
+
+ if (p->header.info != &stg_BLACKHOLE_info ||
+ ((StgInd *)p)->indirectee != (StgClosure*)bq)
+ {
+ wakeBlockingQueue(cap,bq);
+ }
}
}
+/* ----------------------------------------------------------------------------
+ updateThunk
+
+ Update a thunk with a value. In order to do this, we need to know
+ which TSO owns (or is evaluating) the thunk, in case we need to
+ awaken any threads that are blocked on it.
+ ------------------------------------------------------------------------- */
+
+void
+updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
+{
+ StgClosure *v;
+ StgTSO *owner;
+ const StgInfoTable *i;
+
+ i = thunk->header.info;
+ if (i != &stg_BLACKHOLE_info &&
+ i != &stg_CAF_BLACKHOLE_info &&
+ i != &stg_WHITEHOLE_info) {
+ updateWithIndirection(cap, thunk, val);
+ return;
+ }
+
+ v = ((StgInd*)thunk)->indirectee;
+
+ updateWithIndirection(cap, thunk, val);
+
+ i = v->header.info;
+ if (i == &stg_TSO_info) {
+ owner = deRefTSO((StgTSO*)v);
+ if (owner != tso) {
+ checkBlockingQueues(cap, tso);
+ }
+ return;
+ }
+
+ if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
+ i != &stg_BLOCKING_QUEUE_DIRTY_info) {
+ checkBlockingQueues(cap, tso);
+ return;
+ }
+
+ owner = deRefTSO(((StgBlockingQueue*)v)->owner);
+
+ if (owner != tso) {
+ checkBlockingQueues(cap, tso);
+ } else {
+ wakeBlockingQueue(cap, (StgBlockingQueue*)v);
+ }
+}
+
+/* ----------------------------------------------------------------------------
+ * Wake up a thread on a Capability.
+ *
+ * This is used when the current Task is running on a Capability and
+ * wishes to wake up a thread on a different Capability.
+ * ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void
+wakeupThreadOnCapability (Capability *cap,
+ Capability *other_cap,
+ StgTSO *tso)
+{
+ MessageWakeup *msg;
+
+ // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
+ if (tso->bound) {
+ ASSERT(tso->bound->task->cap == tso->cap);
+ tso->bound->task->cap = other_cap;
+ }
+ tso->cap = other_cap;
+
+ ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
+ tso->block_info.closure->header.info == &stg_IND_info);
+
+ ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
+
+ msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
+ SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
+ msg->tso = tso;
+ tso->block_info.closure = (StgClosure *)msg;
+ dirty_TSO(cap, tso);
+ write_barrier();
+ tso->why_blocked = BlockedOnMsgWakeup;
+
+ sendMessage(cap, other_cap, (Message*)msg);
+}
+
+#endif /* THREADED_RTS */
+
/* ---------------------------------------------------------------------------
* rtsSupportsBoundThreads(): is the RTS built to support bound threads?
* used by Control.Concurrent for error checking.
debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
case BlockedOnBlackHole:
- debugBelch("is blocked on a black hole");
+ debugBelch("is blocked on a black hole %p",
+ ((StgBlockingQueue*)tso->block_info.bh->bh));
break;
case BlockedOnMsgWakeup:
debugBelch("is blocked on a wakeup message");
StgTSO * unblockOne (Capability *cap, StgTSO *tso);
StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate);
-void awakenBlockedQueue (Capability *cap, StgTSO *tso);
+void checkBlockingQueues (Capability *cap, StgTSO *tso);
+void wakeBlockingQueue (Capability *cap, StgBlockingQueue *bq);
+void tryWakeupThread (Capability *cap, StgTSO *tso);
+
+// Wakes up a thread on a Capability (probably a different Capability
+// from the one held by the current Task).
+//
+#ifdef THREADED_RTS
+void wakeupThreadOnCapability (Capability *cap,
+ Capability *other_cap,
+ StgTSO *tso);
+#endif
+
+void updateThunk (Capability *cap, StgTSO *tso,
+ StgClosure *thunk, StgClosure *val);
void removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso);
-void removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso);
-void removeThreadFromDeQueue (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso);
+
+rtsBool removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso);
+rtsBool removeThreadFromDeQueue (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso);
StgBool isThreadBound (StgTSO* tso);
ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTime /
RtsFlags.MiscFlags.tickInterval;
recent_activity = ACTIVITY_INACTIVE;
- blackholes_need_checking = rtsTrue;
- /* hack: re-use the blackholes_need_checking flag */
wakeUpRts();
}
}
#include "Updates.h"
-/* on entry to the update code
- (1) R1 points to the closure being returned
- (2) Sp points to the update frame
-*/
+#if defined(PROFILING)
+#define UPD_FRAME_PARAMS W_ unused1, W_ unused2, P_ unused3
+#else
+#define UPD_FRAME_PARAMS P_ unused1
+#endif
/* The update fragment has been tuned so as to generate good
code with gcc, which accounts for some of the strangeness in the
code), since we don't mind duplicating this jump.
*/
-#define UPD_FRAME_ENTRY_TEMPLATE \
- { \
- W_ updatee; \
- \
- updatee = StgUpdateFrame_updatee(Sp); \
- \
- /* remove the update frame from the stack */ \
- Sp = Sp + SIZEOF_StgUpdateFrame; \
- \
- /* ToDo: it might be a PAP, so we should check... */ \
- TICK_UPD_CON_IN_NEW(sizeW_fromITBL(%GET_STD_INFO(updatee))); \
- \
- updateWithIndirection(stg_IND_direct_info, \
- updatee, \
- R1, \
- jump %ENTRY_CODE(Sp(0))); \
- }
-
-#if defined(PROFILING)
-#define UPD_FRAME_PARAMS W_ unused1, W_ unused2, P_ unused3
-#else
-#define UPD_FRAME_PARAMS P_ unused1
-#endif
-
-/* this bitmap indicates that the first word of an update frame is a
- * non-pointer - this is the update frame link. (for profiling,
- * there's a cost-centre-stack in there too).
- */
+/* on entry to the update code
+ (1) R1 points to the closure being returned
+ (2) Sp points to the update frame
+*/
INFO_TABLE_RET( stg_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS)
-UPD_FRAME_ENTRY_TEMPLATE
+{
+ W_ updatee;
+
+ updatee = StgUpdateFrame_updatee(Sp);
+
+ /* remove the update frame from the stack */
+ Sp = Sp + SIZEOF_StgUpdateFrame;
+
+ /* ToDo: it might be a PAP, so we should check... */
+ TICK_UPD_CON_IN_NEW(sizeW_fromITBL(%GET_STD_INFO(updatee)));
+
+ updateWithIndirection(updatee,
+ R1,
+ jump %ENTRY_CODE(Sp(0)));
+}
INFO_TABLE_RET( stg_marked_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS)
-UPD_FRAME_ENTRY_TEMPLATE
+{
+ W_ updatee, v, i, tso, link;
+
+ // we know the closure is a BLACKHOLE
+ updatee = StgUpdateFrame_updatee(Sp);
+ v = StgInd_indirectee(updatee);
+
+ // remove the update frame from the stack
+ Sp = Sp + SIZEOF_StgUpdateFrame;
+
+ if (GETTAG(v) != 0) {
+ // updated by someone else: discard our value and use the
+ // other one to increase sharing, but check the blocking
+ // queues to see if any threads were waiting on this BLACKHOLE.
+ R1 = v;
+ foreign "C" checkBlockingQueues(MyCapability() "ptr",
+ CurrentTSO "ptr") [R1];
+ jump %ENTRY_CODE(Sp(0));
+ }
+
+ // common case: it is still our BLACKHOLE
+ if (v == CurrentTSO) {
+ updateWithIndirection(updatee,
+ R1,
+ jump %ENTRY_CODE(Sp(0)));
+ }
+
+ // The other cases are all handled by the generic code
+ foreign "C" updateThunk (MyCapability() "ptr", CurrentTSO "ptr",
+ updatee "ptr", R1 "ptr") [R1];
+
+ jump %ENTRY_CODE(Sp(0));
+}
+
+// Special update frame code for CAFs and eager-blackholed thunks: it
+// knows how to update blackholes, but is distinct from
+// stg_marked_upd_frame so that lazy blackholing won't treat it as the
+// high watermark.
+INFO_TABLE_RET (stg_bh_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS)
+{
+ jump stg_marked_upd_frame_info;
+}
W_ sz; \
W_ i; \
inf = %GET_STD_INFO(p); \
- if (%INFO_TYPE(inf) != HALF_W_(BLACKHOLE) \
- && %INFO_TYPE(inf) != HALF_W_(CAF_BLACKHOLE)) { \
+ if (%INFO_TYPE(inf) != HALF_W_(BLACKHOLE)) { \
if (%INFO_TYPE(inf) == HALF_W_(THUNK_SELECTOR)) { \
sz = BYTES_TO_WDS(SIZEOF_StgSelector_NoThunkHdr); \
} else { \
switch (inf->type) {
case BLACKHOLE:
- case CAF_BLACKHOLE:
goto no_slop;
// we already filled in the slop when we overwrote the thunk
// with BLACKHOLE, and also an evacuated BLACKHOLE is only the
*/
#ifdef CMINUSMINUS
-#define updateWithIndirection(ind_info, p1, p2, and_then) \
+#define updateWithIndirection(p1, p2, and_then) \
W_ bd; \
\
DEBUG_FILL_SLOP(p1); \
LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1); \
StgInd_indirectee(p1) = p2; \
prim %write_barrier() []; \
+ SET_INFO(p1, stg_BLACKHOLE_info); \
+ LDV_RECORD_CREATE(p1); \
bd = Bdescr(p1); \
if (bdescr_gen_no(bd) != 0 :: bits16) { \
recordMutableCap(p1, TO_W_(bdescr_gen_no(bd)), R1); \
- SET_INFO(p1, stg_IND_OLDGEN_info); \
- LDV_RECORD_CREATE(p1); \
TICK_UPD_OLD_IND(); \
and_then; \
} else { \
- SET_INFO(p1, ind_info); \
- LDV_RECORD_CREATE(p1); \
TICK_UPD_NEW_IND(); \
and_then; \
}
#else /* !CMINUSMINUS */
INLINE_HEADER void updateWithIndirection (Capability *cap,
- const StgInfoTable *ind_info,
StgClosure *p1,
StgClosure *p2)
{
LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1);
((StgInd *)p1)->indirectee = p2;
write_barrier();
+ SET_INFO(p1, &stg_BLACKHOLE_info);
+ LDV_RECORD_CREATE(p1);
bd = Bdescr((StgPtr)p1);
if (bd->gen_no != 0) {
recordMutableCap(p1, cap, bd->gen_no);
- SET_INFO(p1, &stg_IND_OLDGEN_info);
TICK_UPD_OLD_IND();
} else {
- SET_INFO(p1, ind_info);
- LDV_RECORD_CREATE(p1);
TICK_UPD_NEW_IND();
}
}
#endif /* CMINUSMINUS */
-#define UPD_IND(cap, updclosure, heapptr) \
- updateWithIndirection(cap, &stg_IND_info, \
- updclosure, \
- heapptr)
-
#ifndef CMINUSMINUS
END_RTS_PRIVATE
#endif
(errno == EINVAL && sizeof(void*)==4 && size >= 0xc0000000)) {
// If we request more than 3Gig, then we get EINVAL
// instead of ENOMEM (at least on Linux).
- errorBelch("out of memory (requested %lu bytes)", size);
- stg_exit(EXIT_FAILURE);
+ barf("out of memory (requested %lu bytes)", size);
+// abort();
+// stg_exit(EXIT_FAILURE);
} else {
barf("getMBlock: mmap: %s", strerror(errno));
}
case IND_PERM:
case MUT_VAR_CLEAN:
case MUT_VAR_DIRTY:
- case CAF_BLACKHOLE:
case BLACKHOLE:
+ case BLOCKING_QUEUE:
{
StgPtr end;
// any threads resurrected during this GC
thread((void *)&resurrected_threads);
- // the blackhole queue
- thread((void *)&blackhole_queue);
-
// the task list
{
Task *task;
nat i;
to = alloc_for_copy(size,gen);
- *p = TAG_CLOSURE(tag,(StgClosure*)to);
from = (StgPtr)src;
to[0] = (W_)info;
// if somebody else reads the forwarding pointer, we better make
// sure there's a closure at the end of it.
write_barrier();
+ *p = TAG_CLOSURE(tag,(StgClosure*)to);
src->header.info = (const StgInfoTable *)MK_FORWARDING_PTR(to);
// if (to+size+2 < bd->start + BLOCK_SIZE_W) {
/* Special version of copy() for when we only want to copy the info
* pointer of an object, but reserve some padding after it. This is
- * used to optimise evacuation of BLACKHOLEs.
+ * used to optimise evacuation of TSOs.
*/
static rtsBool
copyPart(StgClosure **p, StgClosure *src, nat size_to_reserve,
#endif
to = alloc_for_copy(size_to_reserve, gen);
- *p = (StgClosure *)to;
from = (StgPtr)src;
to[0] = info;
to[i] = from[i];
}
-#if defined(PARALLEL_GC)
write_barrier();
-#endif
src->header.info = (const StgInfoTable*)MK_FORWARDING_PTR(to);
+ *p = (StgClosure *)to;
#ifdef PROFILING
// We store the size of the just evacuated object in the LDV word so that
tag = GET_CLOSURE_TAG(q);
q = UNTAG_CLOSURE(q);
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(q));
+ ASSERTM(LOOKS_LIKE_CLOSURE_PTR(q), "invalid closure, info=%p", q->header.info);
if (!HEAP_ALLOCED_GC(q)) {
copy_tag_nolock(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag);
return;
+ case BLACKHOLE:
+ {
+ StgClosure *r;
+ const StgInfoTable *i;
+ r = ((StgInd*)q)->indirectee;
+ if (GET_CLOSURE_TAG(r) == 0) {
+ i = r->header.info;
+ if (IS_FORWARDING_PTR(i)) {
+ r = (StgClosure *)UN_FORWARDING_PTR(i);
+ i = r->header.info;
+ }
+ if (i == &stg_TSO_info
+ || i == &stg_WHITEHOLE_info
+ || i == &stg_BLOCKING_QUEUE_CLEAN_info
+ || i == &stg_BLOCKING_QUEUE_DIRTY_info) {
+ copy(p,info,q,sizeofW(StgInd),gen);
+ return;
+ }
+ ASSERT(i != &stg_IND_info);
+ }
+ q = r;
+ *p = r;
+ goto loop;
+ }
+
+ case BLOCKING_QUEUE:
case WEAK:
case PRIM:
case MUT_PRIM:
- copy_tag(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag);
+ copy(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen);
return;
case BCO:
copy(p,info,q,bco_sizeW((StgBCO *)q),gen);
return;
- case CAF_BLACKHOLE:
- case BLACKHOLE:
- copyPart(p,q,BLACKHOLE_sizeW(),sizeofW(StgHeader),gen);
- return;
-
case THUNK_SELECTOR:
eval_thunk_selector(p, (StgSelector *)q, rtsTrue);
return;
prev = NULL;
while (p)
{
-#ifdef THREADED_RTS
ASSERT(p->header.info == &stg_WHITEHOLE_info);
-#else
- ASSERT(p->header.info == &stg_BLACKHOLE_info);
-#endif
// val must be in to-space. Not always: when we recursively
// invoke eval_thunk_selector(), the recursive calls will not
// evacuate the value (because we want to select on the value,
// indirection pointing to itself, and we want the program
// to deadlock if it ever enters this closure, so
// BLACKHOLE is correct.
- SET_INFO(p, &stg_BLACKHOLE_info);
+
+ // XXX we do not have BLACKHOLEs any more; replace with
+ // a THUNK_SELECTOR again. This will go into a loop if it is
+ // entered, and should result in a NonTermination exception.
+ ((StgThunk *)p)->payload[0] = val;
+ write_barrier();
+ SET_INFO(p, &stg_sel_0_upd_info);
} else {
((StgInd *)p)->indirectee = val;
write_barrier();
prev_thunk_selector = NULL;
// this is a chain of THUNK_SELECTORs that we are going to update
// to point to the value of the current THUNK_SELECTOR. Each
- // closure on the chain is a BLACKHOLE, and points to the next in the
+ // closure on the chain is a WHITEHOLE, and points to the next in the
// chain with payload[0].
selector_chain:
}
- // BLACKHOLE the selector thunk, since it is now under evaluation.
+ // WHITEHOLE the selector thunk, since it is now under evaluation.
// This is important to stop us going into an infinite loop if
// this selector thunk eventually refers to itself.
#if defined(THREADED_RTS)
#else
// Save the real info pointer (NOTE: not the same as get_itbl()).
info_ptr = (StgWord)p->header.info;
- SET_INFO(p,&stg_BLACKHOLE_info);
+ SET_INFO(p,&stg_WHITEHOLE_info);
#endif
field = INFO_PTR_TO_STRUCT(info_ptr)->layout.selector_offset;
// the original selector thunk, p.
SET_INFO(p, (StgInfoTable *)info_ptr);
LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)p);
-#if defined(THREADED_RTS)
SET_INFO(p, &stg_WHITEHOLE_info);
-#else
- SET_INFO(p, &stg_BLACKHOLE_info);
-#endif
#endif
// the closure in val is now the "value" of the
selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee );
goto selector_loop;
+ case BLACKHOLE:
+ {
+ StgClosure *r;
+ const StgInfoTable *i;
+ r = ((StgInd*)selectee)->indirectee;
+
+ // establish whether this BH has been updated, and is now an
+ // indirection, as in evacuate().
+ if (GET_CLOSURE_TAG(r) == 0) {
+ i = r->header.info;
+ if (IS_FORWARDING_PTR(i)) {
+ r = (StgClosure *)UN_FORWARDING_PTR(i);
+ i = r->header.info;
+ }
+ if (i == &stg_TSO_info
+ || i == &stg_WHITEHOLE_info
+ || i == &stg_BLOCKING_QUEUE_CLEAN_info
+ || i == &stg_BLOCKING_QUEUE_DIRTY_info) {
+ goto bale_out;
+ }
+ ASSERT(i != &stg_IND_info);
+ }
+
+ selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee );
+ goto selector_loop;
+ }
+
case THUNK_SELECTOR:
{
StgClosure *val;
case THUNK_1_1:
case THUNK_0_2:
case THUNK_STATIC:
- case CAF_BLACKHOLE:
- case BLACKHOLE:
// not evaluated yet
goto bale_out;
// The other threads are now stopped. We might recurse back to
// here, but from now on this is the only thread.
- // if any blackholes are alive, make the threads that wait on
- // them alive too.
- if (traverseBlackholeQueue()) {
- inc_running();
- continue;
- }
-
// must be last... invariant is that everything is fully
// scavenged at this point.
if (traverseWeakPtrList()) { // returns rtsTrue if evaced something
}
}
- /* Finally, we can update the blackhole_queue. This queue
- * simply strings together TSOs blocked on black holes, it is
- * not intended to keep anything alive. Hence, we do not follow
- * pointers on the blackhole_queue until now, when we have
- * determined which TSOs are otherwise reachable. We know at
- * this point that all TSOs have been evacuated, however.
- */
- {
- StgTSO **pt;
- for (pt = &blackhole_queue; *pt != END_TSO_QUEUE; pt = &((*pt)->_link)) {
- *pt = (StgTSO *)isAlive((StgClosure *)*pt);
- ASSERT(*pt != NULL);
- }
- }
-
weak_stage = WeakDone; // *now* we're done,
return rtsTrue; // but one more round of scavenging, please
}
}
/* -----------------------------------------------------------------------------
- The blackhole queue
-
- Threads on this list behave like weak pointers during the normal
- phase of garbage collection: if the blackhole is reachable, then
- the thread is reachable too.
- -------------------------------------------------------------------------- */
-rtsBool
-traverseBlackholeQueue (void)
-{
- StgTSO *prev, *t, *tmp;
- rtsBool flag;
- nat type;
-
- flag = rtsFalse;
- prev = NULL;
-
- for (t = blackhole_queue; t != END_TSO_QUEUE; prev=t, t = t->_link) {
- // if the thread is not yet alive...
- if (! (tmp = (StgTSO *)isAlive((StgClosure*)t))) {
- // if the closure it is blocked on is either (a) a
- // reachable BLAKCHOLE or (b) not a BLACKHOLE, then we
- // make the thread alive.
- if (!isAlive(t->block_info.closure)) {
- type = get_itbl(t->block_info.closure)->type;
- if (type == BLACKHOLE || type == CAF_BLACKHOLE) {
- continue;
- }
- }
- evacuate((StgClosure **)&t);
- if (prev) {
- prev->_link = t;
- } else {
- blackhole_queue = t;
- }
- // no write barrier when on the blackhole queue,
- // because we traverse the whole queue on every GC.
- flag = rtsTrue;
- }
- }
- return flag;
-}
-
-/* -----------------------------------------------------------------------------
Evacuate every weak pointer object on the weak_ptr_list, and update
the link fields.
void initWeakForGC ( void );
rtsBool traverseWeakPtrList ( void );
void markWeakPtrList ( void );
-rtsBool traverseBlackholeQueue ( void );
END_RTS_PRIVATE
case IND_OLDGEN:
case IND_OLDGEN_PERM:
case BLACKHOLE:
- case CAF_BLACKHOLE:
case PRIM:
case MUT_PRIM:
case MUT_VAR_CLEAN:
return sizeW_fromITBL(info);
}
+ case BLOCKING_QUEUE:
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+ // NO: the BH might have been updated now
+ // ASSERT(get_itbl(bq->bh)->type == BLACKHOLE);
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(bq->bh));
+
+ ASSERT(get_itbl(bq->owner)->type == TSO);
+ ASSERT(bq->queue == END_TSO_QUEUE || get_itbl(bq->queue)->type == TSO);
+ ASSERT(bq->link == (StgBlockingQueue*)END_TSO_QUEUE ||
+ get_itbl(bq->link)->type == IND ||
+ get_itbl(bq->link)->type == BLOCKING_QUEUE);
+
+ return sizeofW(StgBlockingQueue);
+ }
+
case BCO: {
StgBCO *bco = (StgBCO *)p;
ASSERT(LOOKS_LIKE_CLOSURE_PTR(bco->instrs));
return;
}
+ ASSERT(tso->_link == END_TSO_QUEUE || get_itbl(tso->_link)->type == TSO);
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->block_info.closure));
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->bq));
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->blocked_exceptions));
+
ASSERT(stack <= sp && sp < stack_end);
checkStackChunk(sp, stack_end);
if (checkTSOs)
checkTSO(tso);
- while (tso->what_next == ThreadRelocated) {
- tso = tso->_link;
- }
+ tso = deRefTSO(tso);
// If this TSO is dirty and in an old generation, it better
// be on the mutable list.
Scavenge a TSO.
-------------------------------------------------------------------------- */
-STATIC_INLINE void
-scavenge_TSO_link (StgTSO *tso)
-{
- // We don't always chase the link field: TSOs on the blackhole
- // queue are not automatically alive, so the link field is a
- // "weak" pointer in that case.
- if (tso->why_blocked != BlockedOnBlackHole) {
- evacuate((StgClosure **)&tso->_link);
- }
-}
-
static void
scavengeTSO (StgTSO *tso)
{
) {
evacuate(&tso->block_info.closure);
}
+#ifdef THREADED_RTS
+ // in the THREADED_RTS, block_info.closure must always point to a
+ // valid closure, because we assume this in throwTo(). In the
+ // non-threaded RTS it might be a FD (for
+ // BlockedOnRead/BlockedOnWrite) or a time value (BlockedOnDelay)
+ else {
+ tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
+ }
+#endif
+
evacuate((StgClosure **)&tso->blocked_exceptions);
+ evacuate((StgClosure **)&tso->bq);
// scavange current transaction record
evacuate((StgClosure **)&tso->trec);
if (gct->failed_to_evac) {
tso->dirty = 1;
- scavenge_TSO_link(tso);
+ evacuate((StgClosure **)&tso->_link);
} else {
tso->dirty = 0;
- scavenge_TSO_link(tso);
+ evacuate((StgClosure **)&tso->_link);
if (gct->failed_to_evac) {
tso->flags |= TSO_LINK_DIRTY;
} else {
}
// fall through
case IND_OLDGEN_PERM:
+ case BLACKHOLE:
evacuate(&((StgInd *)p)->indirectee);
p += sizeofW(StgInd);
break;
p += sizeofW(StgMutVar);
break;
- case CAF_BLACKHOLE:
- case BLACKHOLE:
- p += BLACKHOLE_sizeW();
- break;
+ case BLOCKING_QUEUE:
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+ gct->eager_promotion = rtsFalse;
+ evacuate(&bq->bh);
+ evacuate((StgClosure**)&bq->owner);
+ evacuate((StgClosure**)&bq->queue);
+ evacuate((StgClosure**)&bq->link);
+ gct->eager_promotion = saved_eager_promotion;
+
+ if (gct->failed_to_evac) {
+ bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ } else {
+ bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+ }
+ p += sizeofW(StgBlockingQueue);
+ break;
+ }
case THUNK_SELECTOR:
{
case IND_OLDGEN:
case IND_OLDGEN_PERM:
+ case BLACKHOLE:
evacuate(&((StgInd *)p)->indirectee);
break;
break;
}
- case CAF_BLACKHOLE:
- case BLACKHOLE:
+ case BLOCKING_QUEUE:
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+ gct->eager_promotion = rtsFalse;
+ evacuate(&bq->bh);
+ evacuate((StgClosure**)&bq->owner);
+ evacuate((StgClosure**)&bq->queue);
+ evacuate((StgClosure**)&bq->link);
+ gct->eager_promotion = saved_eager_promotion;
+
+ if (gct->failed_to_evac) {
+ bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ } else {
+ bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+ }
+ break;
+ }
+
case ARR_WORDS:
break;
break;
}
- case CAF_BLACKHOLE:
- case BLACKHOLE:
- break;
-
+ case BLOCKING_QUEUE:
+ {
+ StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+ gct->eager_promotion = rtsFalse;
+ evacuate(&bq->bh);
+ evacuate((StgClosure**)&bq->owner);
+ evacuate((StgClosure**)&bq->queue);
+ evacuate((StgClosure**)&bq->link);
+ gct->eager_promotion = saved_eager_promotion;
+
+ if (gct->failed_to_evac) {
+ bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ } else {
+ bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+ }
+ break;
+ }
+
case THUNK_SELECTOR:
{
StgSelector *s = (StgSelector *)p;
// on the large-object list and then gets updated. See #3424.
case IND_OLDGEN:
case IND_OLDGEN_PERM:
+ case BLACKHOLE:
case IND_STATIC:
evacuate(&((StgInd *)p)->indirectee);
#ifdef DEBUG
switch (get_itbl((StgClosure *)p)->type) {
case MUT_VAR_CLEAN:
- barf("MUT_VAR_CLEAN on mutable list");
+ // can happen due to concurrent writeMutVars
case MUT_VAR_DIRTY:
mutlist_MUTVARS++; break;
case MUT_ARR_PTRS_CLEAN:
// this assertion would be invalid:
// ASSERT(tso->flags & TSO_LINK_DIRTY);
- scavenge_TSO_link(tso);
+ evacuate((StgClosure **)&tso->_link);
if (gct->failed_to_evac) {
recordMutableGen_GC((StgClosure *)p,gen->no);
gct->failed_to_evac = rtsFalse;
// before GC, but that seems like overkill.
//
// Scavenging this update frame as normal would be disastrous;
- // the updatee would end up pointing to the value. So we turn
- // the indirection into an IND_PERM, so that evacuate will
- // copy the indirection into the old generation instead of
- // discarding it.
+ // the updatee would end up pointing to the value. So we
+ // check whether the value after evacuation is a BLACKHOLE,
+ // and if not, we change the update frame to an stg_enter
+ // frame that simply returns the value. Hence, blackholing is
+ // compulsory (otherwise we would have to check for thunks
+ // too).
//
// Note [upd-black-hole]
// One slight hiccup is that the THUNK_SELECTOR machinery can
// the updatee is never a THUNK_SELECTOR and we're ok.
// NB. this is a new invariant: blackholing is not optional.
{
- nat type;
- const StgInfoTable *i;
- StgClosure *updatee;
-
- updatee = ((StgUpdateFrame *)p)->updatee;
- i = updatee->header.info;
- if (!IS_FORWARDING_PTR(i)) {
- type = get_itbl(updatee)->type;
- if (type == IND) {
- updatee->header.info = &stg_IND_PERM_info;
- } else if (type == IND_OLDGEN) {
- updatee->header.info = &stg_IND_OLDGEN_PERM_info;
- }
+ StgUpdateFrame *frame = (StgUpdateFrame *)p;
+ StgClosure *v;
+
+ evacuate(&frame->updatee);
+ v = frame->updatee;
+ if (GET_CLOSURE_TAG(v) != 0 ||
+ (get_itbl(v)->type != BLACKHOLE)) {
+ // blackholing is compulsory, see above.
+ frame->header.info = (const StgInfoTable*)&stg_enter_checkbh_info;
}
- evacuate(&((StgUpdateFrame *)p)->updatee);
- ASSERT(GET_CLOSURE_TAG(((StgUpdateFrame *)p)->updatee) == 0);
+ ASSERT(v->header.info != &stg_TSO_info);
p += sizeofW(StgUpdateFrame);
continue;
}
* doing something reasonable.
*/
/* We use the NOT_NULL variant or gcc warns that the test is always true */
- ASSERT(LOOKS_LIKE_INFO_PTR_NOT_NULL((StgWord)&stg_BLACKHOLE_info));
+ ASSERT(LOOKS_LIKE_INFO_PTR_NOT_NULL((StgWord)&stg_BLOCKING_QUEUE_CLEAN_info));
ASSERT(LOOKS_LIKE_CLOSURE_PTR(&stg_dummy_ret_closure));
ASSERT(!HEAP_ALLOCED(&stg_dummy_ret_closure));
The entry code for every CAF does the following:
- - builds a CAF_BLACKHOLE in the heap
- - pushes an update frame pointing to the CAF_BLACKHOLE
+ - builds a BLACKHOLE in the heap
+ - pushes an update frame pointing to the BLACKHOLE
- invokes UPD_CAF(), which:
- calls newCaf, below
- - updates the CAF with a static indirection to the CAF_BLACKHOLE
+ - updates the CAF with a static indirection to the BLACKHOLE
- Why do we build a BLACKHOLE in the heap rather than just updating
+ Why do we build an BLACKHOLE in the heap rather than just updating
the thunk directly? It's so that we only need one kind of update
frame - otherwise we'd need a static version of the update frame too.
dirty_MUT_VAR(StgRegTable *reg, StgClosure *p)
{
Capability *cap = regTableToCapability(reg);
- bdescr *bd;
if (p->header.info == &stg_MUT_VAR_CLEAN_info) {
p->header.info = &stg_MUT_VAR_DIRTY_info;
- bd = Bdescr((StgPtr)p);
- if (bd->gen_no > 0) recordMutableCap(p,cap,bd->gen_no);
+ recordClosureMutated(cap,p);
}
}
void
setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target)
{
- bdescr *bd;
if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {
tso->flags |= TSO_LINK_DIRTY;
- bd = Bdescr((StgPtr)tso);
- if (bd->gen_no > 0) recordMutableCap((StgClosure*)tso,cap,bd->gen_no);
+ recordClosureMutated(cap,(StgClosure*)tso);
}
tso->_link = target;
}
void
dirty_TSO (Capability *cap, StgTSO *tso)
{
- bdescr *bd;
if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {
- bd = Bdescr((StgPtr)tso);
- if (bd->gen_no > 0) recordMutableCap((StgClosure*)tso,cap,bd->gen_no);
+ recordClosureMutated(cap,(StgClosure*)tso);
}
tso->dirty = 1;
}
void
dirty_MVAR(StgRegTable *reg, StgClosure *p)
{
- Capability *cap = regTableToCapability(reg);
- bdescr *bd;
- bd = Bdescr((StgPtr)p);
- if (bd->gen_no > 0) recordMutableCap(p,cap,bd->gen_no);
+ recordClosureMutated(regTableToCapability(reg),p);
}
/* -----------------------------------------------------------------------------
-- else:
text "case AP,",
text " AP_STACK,",
- text " CAF_BLACKHOLE,",
text " BLACKHOLE,",
+ text " WHITEHOLE,",
text " THUNK,",
text " THUNK_1_0,",
text " THUNK_0_1,",