barf("removeThreadFromMVarQueue: not found");
}
-void
-removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
-{
- // caller must do the write barrier, because replacing the info
- // pointer will unlock the MVar.
- removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
- tso->_link = END_TSO_QUEUE;
-}
-
/* ----------------------------------------------------------------------------
- unblockOne()
+ tryWakeupThread()
- unblock a single thread.
- ------------------------------------------------------------------------- */
+ Attempt to wake up a thread. tryWakeupThread is idempotent: it is
+ always safe to call it too many times, but it is not safe in
+ general to omit a call.
-StgTSO *
-unblockOne (Capability *cap, StgTSO *tso)
-{
- return unblockOne_(cap,tso,rtsTrue); // allow migration
-}
+ ------------------------------------------------------------------------- */
-StgTSO *
-unblockOne_ (Capability *cap, StgTSO *tso,
- rtsBool allow_migrate USED_IF_THREADS)
+void
+tryWakeupThread (Capability *cap, StgTSO *tso)
{
- StgTSO *next;
-
- // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
- ASSERT(tso->why_blocked != NotBlocked);
- ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
- tso->block_info.closure->header.info == &stg_IND_info);
-
- next = tso->_link;
- tso->_link = END_TSO_QUEUE;
-
-#if defined(THREADED_RTS)
- if (tso->cap == cap || (!tsoLocked(tso) &&
- allow_migrate &&
- RtsFlags.ParFlags.wakeupMigrate)) {
- // We are waking up this thread on the current Capability, which
- // might involve migrating it from the Capability it was last on.
- if (tso->bound) {
- ASSERT(tso->bound->task->cap == tso->cap);
- tso->bound->task->cap = cap;
- }
-
- tso->cap = cap;
- write_barrier();
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
-
- // context-switch soonish so we can migrate the new thread if
- // necessary. NB. not contextSwitchCapability(cap), which would
- // force a context switch immediately.
- cap->context_switch = 1;
- } else {
- // we'll try to wake it up on the Capability it was last on.
- wakeupThreadOnCapability(cap, tso->cap, tso);
- }
-#else
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
-
- // context-switch soonish so we can migrate the new thread if
- // necessary. NB. not contextSwitchCapability(cap), which would
- // force a context switch immediately.
- cap->context_switch = 1;
-#endif
-
- traceEventThreadWakeup (cap, tso, tso->cap->no);
-
- return next;
+ tryWakeupThread_(cap, deRefTSO(tso));
}
void
-tryWakeupThread (Capability *cap, StgTSO *tso)
+tryWakeupThread_ (Capability *cap, StgTSO *tso)
{
+ traceEventThreadWakeup (cap, tso, tso->cap->no);
+
#ifdef THREADED_RTS
if (tso->cap != cap)
{
switch (tso->why_blocked)
{
+ case BlockedOnMVar:
+ {
+ if (tso->_link == END_TSO_QUEUE) {
+ tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
+ goto unblock;
+ } else {
+ return;
+ }
+ }
+
case BlockedOnMsgThrowTo:
{
const StgInfoTable *i;
if (i != &stg_MSG_NULL_info) {
debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
(lnat)tso->id, tso->block_info.throwto->header.info);
- break; // still blocked
+ return;
}
// remove the block frame from the stack
ASSERT(tso->sp[0] == (StgWord)&stg_block_throwto_info);
tso->sp += 3;
- // fall through...
+ goto unblock;
}
+
case BlockedOnBlackHole:
case BlockedOnSTM:
- {
- // just run the thread now, if the BH is not really available,
- // we'll block again.
- tso->why_blocked = NotBlocked;
- appendToRunQueue(cap,tso);
- break;
- }
+ case ThreadMigrating:
+ goto unblock;
+
default:
// otherwise, do nothing
- break;
+ return;
}
+
+unblock:
+ // just run the thread now, if the BH is not really available,
+ // we'll block again.
+ tso->why_blocked = NotBlocked;
+ appendToRunQueue(cap,tso);
+}
+
+/* ----------------------------------------------------------------------------
+ migrateThread
+ ------------------------------------------------------------------------- */
+
+void
+migrateThread (Capability *from, StgTSO *tso, Capability *to)
+{
+ traceEventMigrateThread (from, tso, to->no);
+ // ThreadMigrating tells the target cap that it needs to be added to
+ // the run queue when it receives the MSG_TRY_WAKEUP.
+ tso->why_blocked = ThreadMigrating;
+ tso->cap = to;
+ tryWakeupThread(from, tso);
}
/* ----------------------------------------------------------------------------
}
}
-/* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
- *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
- * ------------------------------------------------------------------------- */
-
-#ifdef THREADED_RTS
-
-void
-wakeupThreadOnCapability (Capability *cap,
- Capability *other_cap,
- StgTSO *tso)
-{
- MessageWakeup *msg;
-
- // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
- if (tso->bound) {
- ASSERT(tso->bound->task->cap == tso->cap);
- tso->bound->task->cap = other_cap;
- }
- tso->cap = other_cap;
-
- ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
- tso->block_info.closure->header.info == &stg_IND_info);
-
- ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
-
- msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
- SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
- msg->tso = tso;
- tso->block_info.closure = (StgClosure *)msg;
- dirty_TSO(cap, tso);
- write_barrier();
- tso->why_blocked = BlockedOnMsgWakeup;
-
- sendMessage(cap, other_cap, (Message*)msg);
-}
-
-#endif /* THREADED_RTS */
-
/* ---------------------------------------------------------------------------
* rtsSupportsBoundThreads(): is the RTS built to support bound threads?
* used by Control.Concurrent for error checking.
debugBelch("is blocked on a black hole %p",
((StgBlockingQueue*)tso->block_info.bh->bh));
break;
- case BlockedOnMsgWakeup:
- debugBelch("is blocked on a wakeup message");
- break;
case BlockedOnMsgThrowTo:
debugBelch("is blocked on a throwto message");
break;
case NotBlocked:
debugBelch("is not blocked");
break;
+ case ThreadMigrating:
+ debugBelch("is runnable, but not on the run queue");
+ break;
case BlockedOnCCall:
debugBelch("is blocked on an external call");
break;