cap->suspended_ccalls = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
- cap->wakeup_queue_hd = END_TSO_QUEUE;
- cap->wakeup_queue_tl = END_TSO_QUEUE;
+ cap->inbox = (Message*)END_TSO_QUEUE;
cap->sparks_created = 0;
cap->sparks_converted = 0;
cap->sparks_pruned = 0;
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
if (always_wakeup ||
- !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+ !emptyRunQueue(cap) || !emptyInbox(cap) ||
!emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
* ------------------------------------------------------------------------- */
void
-wakeupThreadOnCapability (Capability *my_cap,
+wakeupThreadOnCapability (Capability *cap,
Capability *other_cap,
StgTSO *tso)
{
- ACQUIRE_LOCK(&other_cap->lock);
+ MessageWakeup *msg;
// ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
if (tso->bound) {
}
tso->cap = other_cap;
- ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);
+ ASSERT(tso->why_blocked != BlockedOnMsgWakeup ||
+ tso->block_info.closure->header.info == &stg_IND_info);
- if (other_cap->running_task == NULL) {
- // nobody is running this Capability, we can add our thread
- // directly onto the run queue and start up a Task to run it.
+ ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
- other_cap->running_task = myTask();
- // precond for releaseCapability_() and appendToRunQueue()
+ msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
+ msg->header.info = &stg_MSG_WAKEUP_info;
+ msg->tso = tso;
+ tso->block_info.closure = (StgClosure *)msg;
+ dirty_TSO(cap, tso);
+ write_barrier();
+ tso->why_blocked = BlockedOnMsgWakeup;
- appendToRunQueue(other_cap,tso);
-
- releaseCapability_(other_cap,rtsFalse);
- } else {
- appendToWakeupQueue(my_cap,other_cap,tso);
- other_cap->context_switch = 1;
- // someone is running on this Capability, so it cannot be
- // freed without first checking the wakeup queue (see
- // releaseCapability_).
- }
-
- RELEASE_LOCK(&other_cap->lock);
+ sendMessage(other_cap, (Message*)msg);
}
/* ----------------------------------------------------------------------------
evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
#if defined(THREADED_RTS)
- evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
- evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
+ evac(user, (StgClosure **)(void *)&cap->inbox);
#endif
for (incall = cap->suspended_ccalls; incall != NULL;
incall=incall->next) {
{
markSomeCapabilities(evac, user, 0, 1, rtsFalse);
}
+
+/* -----------------------------------------------------------------------------
+ Messages
+ -------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void sendMessage(Capability *cap, Message *msg)
+{
+ ACQUIRE_LOCK(&cap->lock);
+
+ msg->link = cap->inbox;
+ cap->inbox = msg;
+
+ if (cap->running_task == NULL) {
+ cap->running_task = myTask();
+ // precond for releaseCapability_()
+ releaseCapability_(cap,rtsFalse);
+ } else {
+ contextSwitchCapability(cap);
+ }
+
+ RELEASE_LOCK(&cap->lock);
+}
+
+#endif // THREADED_RTS