X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;ds=sidebyside;f=ghc%2Frts%2FSchedule.c;h=d49d4ed8e534bd9060e6fa53b779880c4a92a62b;hb=a1b4e3b88a6987deed7bb7f1bd870b30eef1b475;hp=bbc6a8b5a8c632960cbdf6e3e143c467cfd78180;hpb=c137ecd7e6e83d0f9c39b15ccdb9f2355f243c91;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index bbc6a8b..d49d4ed 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -482,6 +482,21 @@ schedule (Capability *initialCapability, Task *task) // list each time around the scheduler. if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } + // Any threads that were woken up by other Capabilities get + // appended to our run queue. + if (!emptyWakeupQueue(cap)) { + ACQUIRE_LOCK(&cap->lock); + if (emptyRunQueue(cap)) { + cap->run_queue_hd = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } else { + cap->run_queue_tl->link = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } + cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; + RELEASE_LOCK(&cap->lock); + } + scheduleCheckBlockedThreads(cap); scheduleDetectDeadlock(cap,task); @@ -604,6 +619,7 @@ run_thread: // Run the current thread ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + ASSERT(t->cap == cap); prev_what_next = t->what_next; @@ -674,6 +690,7 @@ run_thread: #endif ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + ASSERT(t->cap == cap); // ---------------------------------------------------------------------- @@ -772,6 +789,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, Capability *free_caps[n_capabilities], *cap0; nat i, n_free_caps; + // migration can be turned off with +RTS -qg + if (!RtsFlags.ParFlags.migrate) return; + // Check whether we have more threads on our run queue, or sparks // in our pool, that we could hand to another Capability. if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) @@ -834,6 +854,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no)); appendToRunQueue(free_caps[i],t); if (t->bound) { t->bound->cap = free_caps[i]; } + t->cap = free_caps[i]; i++; } } @@ -2149,11 +2170,15 @@ forkProcess(HsStablePtr *entry // now gone. for (t = all_threads; t != END_TSO_QUEUE; t = next) { - next = t->global_link; - // don't allow threads to catch the ThreadKilled - // exception, but we do want to raiseAsync() because these - // threads may be evaluating thunks that we need later. - deleteThread_(cap,t); + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + // don't allow threads to catch the ThreadKilled + // exception, but we do want to raiseAsync() because these + // threads may be evaluating thunks that we need later. + deleteThread_(cap,t); + } } // Empty the run queue. It seems tempting to let all the @@ -2487,6 +2512,7 @@ createThread(Capability *cap, nat size) tso->saved_errno = 0; tso->bound = NULL; + tso->cap = cap; tso->stack_size = stack_size; tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) @@ -2694,6 +2720,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) // This TSO is now a bound thread; make the Task and TSO // point to each other. tso->bound = task; + tso->cap = cap; task->tso = tso; task->ret = ret; @@ -2901,16 +2928,21 @@ GetRoots( evac_fn evac ) for (i = 0; i < n_capabilities; i++) { cap = &capabilities[i]; - evac((StgClosure **)&cap->run_queue_hd); - evac((StgClosure **)&cap->run_queue_tl); - + evac((StgClosure **)(void *)&cap->run_queue_hd); + evac((StgClosure **)(void *)&cap->run_queue_tl); +#if defined(THREADED_RTS) + evac((StgClosure **)(void *)&cap->wakeup_queue_hd); + evac((StgClosure **)(void *)&cap->wakeup_queue_tl); +#endif for (task = cap->suspended_ccalling_tasks; task != NULL; task=task->next) { IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id)); - evac((StgClosure **)&task->suspended_tso); + evac((StgClosure **)(void *)&task->suspended_tso); } + } + #if !defined(THREADED_RTS) evac((StgClosure **)(void *)&blocked_queue_hd); evac((StgClosure **)(void *)&blocked_queue_tl); @@ -3207,21 +3239,29 @@ unblockOne(Capability *cap, StgTSO *tso) ASSERT(get_itbl(tso)->type == TSO); ASSERT(tso->why_blocked != NotBlocked); + tso->why_blocked = NotBlocked; next = tso->link; tso->link = END_TSO_QUEUE; - // We might have just migrated this TSO to our Capability: - if (tso->bound) { - tso->bound->cap = cap; + if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) { + // We are waking up this thread on the current Capability, which + // might involve migrating it from the Capability it was last on. + if (tso->bound) { + ASSERT(tso->bound->cap == tso->cap); + tso->bound->cap = cap; + } + tso->cap = cap; + appendToRunQueue(cap,tso); + // we're holding a newly woken thread, make sure we context switch + // quickly so we can migrate it if necessary. + context_switch = 1; + } else { + // we'll try to wake it up on the Capability it was last on. + wakeupThreadOnCapability(tso->cap, tso); } - appendToRunQueue(cap,tso); - - // we're holding a newly woken thread, make sure we context switch - // quickly so we can migrate it if necessary. - context_switch = 1; - IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id)); + IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no)); return next; } @@ -3671,6 +3711,7 @@ unblockThread(Capability *cap, StgTSO *tso) if (tso->bound) { tso->bound->cap = cap; } + tso->cap = cap; } #endif @@ -4167,13 +4208,8 @@ resurrectThreads (StgTSO *threads) all_threads = tso; IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id)); - // Wake up the thread on the Capability it was last on for a - // bound thread, or last_free_capability otherwise. - if (tso->bound) { - cap = tso->bound->cap; - } else { - cap = last_free_capability; - } + // Wake up the thread on the Capability it was last on + cap = tso->cap; switch (tso->why_blocked) { case BlockedOnMVar: