X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FCapability.c;h=5f54ecae4d034785344e1e74166115da93e693ba;hb=79957d77c1bff767f1041d3fabdeb94d92a52878;hp=ce6ecebd7214fdf7f4884b14208d0d5cb1a2ed72;hpb=7effbbbbdfe7eb05c6402fa9337e358e7e9fadde;p=ghc-hetmet.git diff --git a/rts/Capability.c b/rts/Capability.c index ce6eceb..5f54eca 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -223,8 +223,7 @@ initCapability( Capability *cap, nat i ) cap->suspended_ccalls = NULL; cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; - cap->wakeup_queue_hd = END_TSO_QUEUE; - cap->wakeup_queue_tl = END_TSO_QUEUE; + cap->inbox = (Message*)END_TSO_QUEUE; cap->sparks_created = 0; cap->sparks_converted = 0; cap->sparks_pruned = 0; @@ -419,7 +418,7 @@ releaseCapability_ (Capability* cap, // If we have an unbound thread on the run queue, or if there's // anything else to do, give the Capability to a worker thread. if (always_wakeup || - !emptyRunQueue(cap) || !emptyWakeupQueue(cap) || + !emptyRunQueue(cap) || !emptyInbox(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap,cap->spare_workers); @@ -645,11 +644,11 @@ yieldCapability (Capability** pCap, Task *task) * ------------------------------------------------------------------------- */ void -wakeupThreadOnCapability (Capability *my_cap, +wakeupThreadOnCapability (Capability *cap, Capability *other_cap, StgTSO *tso) { - ACQUIRE_LOCK(&other_cap->lock); + MessageWakeup *msg; // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability) if (tso->bound) { @@ -658,27 +657,20 @@ wakeupThreadOnCapability (Capability *my_cap, } tso->cap = other_cap; - ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1); + ASSERT(tso->why_blocked != BlockedOnMsgWakeup || + tso->block_info.closure->header.info == &stg_IND_info); - if (other_cap->running_task == NULL) { - // nobody is running this Capability, we can add our thread - // directly onto the run queue and start up a Task to run it. + ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info); - other_cap->running_task = myTask(); - // precond for releaseCapability_() and appendToRunQueue() + msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup)); + msg->header.info = &stg_MSG_WAKEUP_info; + msg->tso = tso; + tso->block_info.closure = (StgClosure *)msg; + dirty_TSO(cap, tso); + write_barrier(); + tso->why_blocked = BlockedOnMsgWakeup; - appendToRunQueue(other_cap,tso); - - releaseCapability_(other_cap,rtsFalse); - } else { - appendToWakeupQueue(my_cap,other_cap,tso); - other_cap->context_switch = 1; - // someone is running on this Capability, so it cannot be - // freed without first checking the wakeup queue (see - // releaseCapability_). - } - - RELEASE_LOCK(&other_cap->lock); + sendMessage(other_cap, (Message*)msg); } /* ---------------------------------------------------------------------------- @@ -881,8 +873,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta, evac(user, (StgClosure **)(void *)&cap->run_queue_hd); evac(user, (StgClosure **)(void *)&cap->run_queue_tl); #if defined(THREADED_RTS) - evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd); - evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl); + evac(user, (StgClosure **)(void *)&cap->inbox); #endif for (incall = cap->suspended_ccalls; incall != NULL; incall=incall->next) { @@ -910,3 +901,29 @@ markCapabilities (evac_fn evac, void *user) { markSomeCapabilities(evac, user, 0, 1, rtsFalse); } + +/* ----------------------------------------------------------------------------- + Messages + -------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS + +void sendMessage(Capability *cap, Message *msg) +{ + ACQUIRE_LOCK(&cap->lock); + + msg->link = cap->inbox; + cap->inbox = msg; + + if (cap->running_task == NULL) { + cap->running_task = myTask(); + // precond for releaseCapability_() + releaseCapability_(cap,rtsFalse); + } else { + contextSwitchCapability(cap); + } + + RELEASE_LOCK(&cap->lock); +} + +#endif // THREADED_RTS