X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FCapability.c;h=51a42ef468470d9dc750606b3a90c7c54a126794;hb=28a464a75e14cece5db40f2765a29348273ff2d2;hp=8c40b639927b3af1202bdc36d0b130a9ddece7c3;hpb=2f861d149686a96d7783ce984afa7c263a39c355;p=ghc-hetmet.git diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 8c40b63..51a42ef 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -44,7 +44,7 @@ STATIC_INLINE rtsBool globalWorkToDo (void) { return blackholes_need_checking - || interrupted + || sched_state >= SCHED_INTERRUPTING ; } #endif @@ -67,7 +67,9 @@ anyWorkForMe( Capability *cap, Task *task ) // other global condition to check, such as threads blocked on // blackholes). if (emptyRunQueue(cap)) { - return !emptySparkPoolCap(cap) || globalWorkToDo(); + return !emptySparkPoolCap(cap) + || !emptyWakeupQueue(cap) + || globalWorkToDo(); } else return cap->run_queue_hd->bound == NULL; } @@ -135,6 +137,8 @@ initCapability( Capability *cap, nat i ) cap->suspended_ccalling_tasks = NULL; cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; + cap->wakeup_queue_hd = END_TSO_QUEUE; + cap->wakeup_queue_tl = END_TSO_QUEUE; #endif cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; @@ -286,7 +290,7 @@ releaseCapability_ (Capability* cap) // is interrupted, we only create a worker task if there // are threads that need to be completed. If the system is // shutting down, we never create a new worker. - if (!shutting_down_scheduler) { + if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) { IF_DEBUG(scheduler, sched_belch("starting new worker on capability %d", cap->no)); startWorkerTask(cap, workerStart); @@ -296,7 +300,8 @@ releaseCapability_ (Capability* cap) // If we have an unbound thread on the run queue, or if there's // anything else to do, give the Capability to a worker thread. - if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) { + if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap) + || !emptySparkPoolCap(cap) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap,cap->spare_workers); // The worker Task pops itself from the queue; @@ -502,6 +507,37 @@ yieldCapability (Capability** pCap, Task *task) } /* ---------------------------------------------------------------------------- + * Wake up a thread on a Capability. + * + * This is used when the current Task is running on a Capability and + * wishes to wake up a thread on a different Capability. + * ------------------------------------------------------------------------- */ + +void +wakeupThreadOnCapability (Capability *cap, StgTSO *tso) +{ + ASSERT(tso->cap == cap); + ASSERT(tso->bound ? tso->bound->cap == cap : 1); + + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task == NULL) { + // nobody is running this Capability, we can add our thread + // directly onto the run queue and start up a Task to run it. + appendToRunQueue(cap,tso); + + // start it up + cap->running_task = myTask(); // precond for releaseCapability_() + releaseCapability_(cap); + } else { + appendToWakeupQueue(cap,tso); + // someone is running on this Capability, so it cannot be + // freed without first checking the wakeup queue (see + // releaseCapability_). + } + RELEASE_LOCK(&cap->lock); +} + +/* ---------------------------------------------------------------------------- * prodCapabilities * * Used to indicate that the interrupted flag is now set, or some @@ -575,7 +611,7 @@ shutdownCapability (Capability *cap, Task *task) { nat i; - ASSERT(interrupted && shutting_down_scheduler); + ASSERT(sched_state == SCHED_SHUTTING_DOWN); task->cap = cap;