X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FThreads.c;h=7344134a7d5a0d54e6a1d6bc60aa66c7c8277842;hp=05a13c7f3b262f07b04cc68d7a8118f0604f9974;hb=83d563cb9ede0ba792836e529b1e2929db926355;hpb=848797ebb9b60cf9c8a004c97afd008f5325c75f diff --git a/rts/Threads.c b/rts/Threads.c index 05a13c7..7344134 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -205,83 +205,26 @@ removeThreadFromDeQueue (Capability *cap, barf("removeThreadFromMVarQueue: not found"); } -void -removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso) -{ - // caller must do the write barrier, because replacing the info - // pointer will unlock the MVar. - removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso); - tso->_link = END_TSO_QUEUE; -} - /* ---------------------------------------------------------------------------- - unblockOne() + tryWakeupThread() - unblock a single thread. - ------------------------------------------------------------------------- */ + Attempt to wake up a thread. tryWakeupThread is idempotent: it is + always safe to call it too many times, but it is not safe in + general to omit a call. -StgTSO * -unblockOne (Capability *cap, StgTSO *tso) -{ - return unblockOne_(cap,tso,rtsTrue); // allow migration -} + ------------------------------------------------------------------------- */ -StgTSO * -unblockOne_ (Capability *cap, StgTSO *tso, - rtsBool allow_migrate USED_IF_THREADS) +void +tryWakeupThread (Capability *cap, StgTSO *tso) { - StgTSO *next; - - // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO); - ASSERT(tso->why_blocked != NotBlocked); - ASSERT(tso->why_blocked != BlockedOnMsgWakeup || - tso->block_info.closure->header.info == &stg_IND_info); - - next = tso->_link; - tso->_link = END_TSO_QUEUE; - -#if defined(THREADED_RTS) - if (tso->cap == cap || (!tsoLocked(tso) && - allow_migrate && - RtsFlags.ParFlags.wakeupMigrate)) { - // We are waking up this thread on the current Capability, which - // might involve migrating it from the Capability it was last on. - if (tso->bound) { - ASSERT(tso->bound->task->cap == tso->cap); - tso->bound->task->cap = cap; - } - - tso->cap = cap; - write_barrier(); - tso->why_blocked = NotBlocked; - appendToRunQueue(cap,tso); - - // context-switch soonish so we can migrate the new thread if - // necessary. NB. not contextSwitchCapability(cap), which would - // force a context switch immediately. - cap->context_switch = 1; - } else { - // we'll try to wake it up on the Capability it was last on. - wakeupThreadOnCapability(cap, tso->cap, tso); - } -#else - tso->why_blocked = NotBlocked; - appendToRunQueue(cap,tso); - - // context-switch soonish so we can migrate the new thread if - // necessary. NB. not contextSwitchCapability(cap), which would - // force a context switch immediately. - cap->context_switch = 1; -#endif - - traceEventThreadWakeup (cap, tso, tso->cap->no); - - return next; + tryWakeupThread_(cap, deRefTSO(tso)); } void -tryWakeupThread (Capability *cap, StgTSO *tso) +tryWakeupThread_ (Capability *cap, StgTSO *tso) { + traceEventThreadWakeup (cap, tso, tso->cap->no); + #ifdef THREADED_RTS if (tso->cap != cap) { @@ -298,6 +241,16 @@ tryWakeupThread (Capability *cap, StgTSO *tso) switch (tso->why_blocked) { + case BlockedOnMVar: + { + if (tso->_link == END_TSO_QUEUE) { + tso->block_info.closure = (StgClosure*)END_TSO_QUEUE; + goto unblock; + } else { + return; + } + } + case BlockedOnMsgThrowTo: { const StgInfoTable *i; @@ -307,27 +260,45 @@ tryWakeupThread (Capability *cap, StgTSO *tso) if (i != &stg_MSG_NULL_info) { debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)", (lnat)tso->id, tso->block_info.throwto->header.info); - break; // still blocked + return; } // remove the block frame from the stack ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info); tso->sp += 3; - // fall through... + goto unblock; } + case BlockedOnBlackHole: case BlockedOnSTM: - { - // just run the thread now, if the BH is not really available, - // we'll block again. - tso->why_blocked = NotBlocked; - appendToRunQueue(cap,tso); - break; - } + case ThreadMigrating: + goto unblock; + default: // otherwise, do nothing - break; + return; } + +unblock: + // just run the thread now, if the BH is not really available, + // we'll block again. + tso->why_blocked = NotBlocked; + appendToRunQueue(cap,tso); +} + +/* ---------------------------------------------------------------------------- + migrateThread + ------------------------------------------------------------------------- */ + +void +migrateThread (Capability *from, StgTSO *tso, Capability *to) +{ + traceEventMigrateThread (from, tso, to->no); + // ThreadMigrating tells the target cap that it needs to be added to + // the run queue when it receives the MSG_TRY_WAKEUP. + tso->why_blocked = ThreadMigrating; + tso->cap = to; + tryWakeupThread(from, tso); } /* ---------------------------------------------------------------------------- @@ -417,6 +388,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) i = thunk->header.info; if (i != &stg_BLACKHOLE_info && i != &stg_CAF_BLACKHOLE_info && + i != &__stg_EAGER_BLACKHOLE_info && i != &stg_WHITEHOLE_info) { updateWithIndirection(cap, thunk, val); return; @@ -450,47 +422,6 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) } } -/* ---------------------------------------------------------------------------- - * Wake up a thread on a Capability. - * - * This is used when the current Task is running on a Capability and - * wishes to wake up a thread on a different Capability. - * ------------------------------------------------------------------------- */ - -#ifdef THREADED_RTS - -void -wakeupThreadOnCapability (Capability *cap, - Capability *other_cap, - StgTSO *tso) -{ - MessageWakeup *msg; - - // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) - if (tso->bound) { - ASSERT(tso->bound->task->cap == tso->cap); - tso->bound->task->cap = other_cap; - } - tso->cap = other_cap; - - ASSERT(tso->why_blocked != BlockedOnMsgWakeup || - tso->block_info.closure->header.info == &stg_IND_info); - - ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info); - - msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup)); - SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM); - msg->tso = tso; - tso->block_info.closure = (StgClosure *)msg; - dirty_TSO(cap, tso); - write_barrier(); - tso->why_blocked = BlockedOnMsgWakeup; - - sendMessage(cap, other_cap, (Message*)msg); -} - -#endif /* THREADED_RTS */ - /* --------------------------------------------------------------------------- * rtsSupportsBoundThreads(): is the RTS built to support bound threads? * used by Control.Concurrent for error checking. @@ -549,20 +480,20 @@ printThreadBlockage(StgTSO *tso) debugBelch("is blocked on a black hole %p", ((StgBlockingQueue*)tso->block_info.bh->bh)); break; - case BlockedOnMsgWakeup: - debugBelch("is blocked on a wakeup message"); - break; case BlockedOnMsgThrowTo: debugBelch("is blocked on a throwto message"); break; case NotBlocked: debugBelch("is not blocked"); break; + case ThreadMigrating: + debugBelch("is runnable, but not on the run queue"); + break; case BlockedOnCCall: debugBelch("is blocked on an external call"); break; - case BlockedOnCCall_NoUnblockExc: - debugBelch("is blocked on an external call (exceptions were already blocked)"); + case BlockedOnCCall_Interruptible: + debugBelch("is blocked on an external call (but may be interrupted)"); break; case BlockedOnSTM: debugBelch("is blocked on an STM operation");