X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=52fd4d5df655f76668314dc399cb0fca1111d88c;hb=c520a3a2752ffcec5710a88a8a2e219c20edfc8a;hp=d49d4ed8e534bd9060e6fa53b779880c4a92a62b;hpb=4368121d93643f6f108c1dc79ac212bf02e56f98;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index d49d4ed..52fd4d5 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -204,6 +204,7 @@ static void schedulePushWork(Capability *cap, Task *task); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); +static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); #if defined(GRAN) @@ -482,20 +483,7 @@ schedule (Capability *initialCapability, Task *task) // list each time around the scheduler. if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - // Any threads that were woken up by other Capabilities get - // appended to our run queue. - if (!emptyWakeupQueue(cap)) { - ACQUIRE_LOCK(&cap->lock); - if (emptyRunQueue(cap)) { - cap->run_queue_hd = cap->wakeup_queue_hd; - cap->run_queue_tl = cap->wakeup_queue_tl; - } else { - cap->run_queue_tl->link = cap->wakeup_queue_hd; - cap->run_queue_tl = cap->wakeup_queue_tl; - } - cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; - RELEASE_LOCK(&cap->lock); - } + scheduleCheckWakeupThreads(cap); scheduleCheckBlockedThreads(cap); @@ -841,7 +829,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS, next = t->link; t->link = END_TSO_QUEUE; if (t->what_next == ThreadRelocated - || t->bound == task) { // don't move my bound thread + || t->bound == task // don't move my bound thread + || tsoLocked(t)) { // don't move a locked thread prev->link = t; prev = t; } else if (i == n_free_caps) { @@ -928,6 +917,31 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) /* ---------------------------------------------------------------------------- + * Check for threads woken up by other Capabilities + * ------------------------------------------------------------------------- */ + +static void +scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) +{ +#if defined(THREADED_RTS) + // Any threads that were woken up by other Capabilities get + // appended to our run queue. + if (!emptyWakeupQueue(cap)) { + ACQUIRE_LOCK(&cap->lock); + if (emptyRunQueue(cap)) { + cap->run_queue_hd = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } else { + cap->run_queue_tl->link = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } + cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; + RELEASE_LOCK(&cap->lock); + } +#endif +} + +/* ---------------------------------------------------------------------------- * Check for threads blocked on BLACKHOLEs that can be woken up * ------------------------------------------------------------------------- */ static void @@ -2709,6 +2723,28 @@ scheduleThread(Capability *cap, StgTSO *tso) appendToRunQueue(cap,tso); } +void +scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) +{ +#if defined(THREADED_RTS) + tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't + // move this thread from now on. + cpu %= RtsFlags.ParFlags.nNodes; + if (cpu == cap->no) { + appendToRunQueue(cap,tso); + } else { + Capability *target_cap = &capabilities[cpu]; + if (tso->bound) { + tso->bound->cap = target_cap; + } + tso->cap = target_cap; + wakeupThreadOnCapability(target_cap,tso); + } +#else + appendToRunQueue(cap,tso); +#endif +} + Capability * scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) { @@ -3244,7 +3280,8 @@ unblockOne(Capability *cap, StgTSO *tso) next = tso->link; tso->link = END_TSO_QUEUE; - if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) { +#if defined(THREADED_RTS) + if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) { // We are waking up this thread on the current Capability, which // might involve migrating it from the Capability it was last on. if (tso->bound) { @@ -3260,6 +3297,10 @@ unblockOne(Capability *cap, StgTSO *tso) // we'll try to wake it up on the Capability it was last on. wakeupThreadOnCapability(tso->cap, tso); } +#else + appendToRunQueue(cap,tso); + context_switch = 1; +#endif IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no)); return next;