From: Simon Marlow Date: Fri, 24 Mar 2006 16:45:32 +0000 (+0000) Subject: Add some more flexibility to the multiproc scheduler X-Git-Url: http://git.megacz.com/?a=commitdiff_plain;h=4368121d93643f6f108c1dc79ac212bf02e56f98;p=ghc-hetmet.git Add some more flexibility to the multiproc scheduler There are two new options in the -threaded RTS: -qm Don't automatically migrate threads between CPUs -qw Migrate a thread to the current CPU when it is woken up previously both of these were effectively off, i.e. threads were migrated between CPUs willy-milly, and threads were always migrated to the current CPU when woken up. This is the first step in tweaking the scheduling for more effective work balancing, there will no doubt be more to come. --- diff --git a/ghc/includes/RtsFlags.h b/ghc/includes/RtsFlags.h index c997b22..17d2363 100644 --- a/ghc/includes/RtsFlags.h +++ b/ghc/includes/RtsFlags.h @@ -164,6 +164,8 @@ struct PAR_FLAGS { #ifdef THREADED_RTS struct PAR_FLAGS { nat nNodes; /* number of threads to run simultaneously */ + rtsBool migrate; /* migrate threads between capabilities */ + rtsBool wakeupMigrate; /* migrate a thread on wakeup */ unsigned int maxLocalSparks; }; #endif /* THREADED_RTS */ diff --git a/ghc/includes/TSO.h b/ghc/includes/TSO.h index 82ecacb..14c47ab 100644 --- a/ghc/includes/TSO.h +++ b/ghc/includes/TSO.h @@ -93,6 +93,8 @@ typedef StgWord32 StgThreadID; */ #define TSO_DIRTY 1 +#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY) + /* * Type returned after running a thread. Values of this type * include HeapOverflow, StackOverflow etc. See Constants.h for the @@ -134,43 +136,44 @@ typedef union { */ typedef struct StgTSO_ { - StgHeader header; - - struct StgTSO_* link; /* Links threads onto blocking queues */ - struct StgTSO_* global_link; /* Links all threads together */ - - StgWord16 what_next; /* Values defined in Constants.h */ - StgWord16 why_blocked; /* Values defined in Constants.h */ - StgWord32 flags; - StgTSOBlockInfo block_info; - struct StgTSO_* blocked_exceptions; - StgThreadID id; - int saved_errno; - struct Task_* bound; // non-NULL for a bound thread - struct StgTRecHeader_ *trec; /* STM transaction record */ - + StgHeader header; + + struct StgTSO_* link; /* Links threads onto blocking queues */ + struct StgTSO_* global_link; /* Links all threads together */ + + StgWord16 what_next; /* Values defined in Constants.h */ + StgWord16 why_blocked; /* Values defined in Constants.h */ + StgWord32 flags; + StgTSOBlockInfo block_info; + struct StgTSO_* blocked_exceptions; + StgThreadID id; + int saved_errno; + struct Task_* bound; + struct Capability_* cap; + struct StgTRecHeader_ * trec; /* STM transaction record */ + #ifdef TICKY_TICKY - /* TICKY-specific stuff would go here. */ + /* TICKY-specific stuff would go here. */ #endif #ifdef PROFILING - StgTSOProfInfo prof; + StgTSOProfInfo prof; #endif #ifdef PAR - StgTSOParInfo par; + StgTSOParInfo par; #endif #ifdef GRAN - StgTSOGranInfo gran; + StgTSOGranInfo gran; #endif #ifdef DIST - StgTSODistInfo dist; + StgTSODistInfo dist; #endif - /* The thread stack... */ - StgWord stack_size; /* stack size in *words* */ - StgWord max_stack_size; /* maximum stack size in *words* */ - StgPtr sp; - - StgWord stack[FLEXIBLE_ARRAY]; + /* The thread stack... */ + StgWord32 stack_size; /* stack size in *words* */ + StgWord32 max_stack_size; /* maximum stack size in *words* */ + StgPtr sp; + + StgWord stack[FLEXIBLE_ARRAY]; } StgTSO; /* ----------------------------------------------------------------------------- diff --git a/ghc/rts/Apply.cmm b/ghc/rts/Apply.cmm index 6678a63..e0ca039 100644 --- a/ghc/rts/Apply.cmm +++ b/ghc/rts/Apply.cmm @@ -32,7 +32,7 @@ stg_ap_0_fast IF_DEBUG(sanity, foreign "C" checkStackChunk(Sp "ptr", CurrentTSO + TSO_OFFSET_StgTSO_stack + - WDS(StgTSO_stack_size(CurrentTSO)) "ptr") [R1]); + WDS(TO_W_(StgTSO_stack_size(CurrentTSO))) "ptr") [R1]); ENTER(); } diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 143eefe..51a42ef 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -67,7 +67,9 @@ anyWorkForMe( Capability *cap, Task *task ) // other global condition to check, such as threads blocked on // blackholes). if (emptyRunQueue(cap)) { - return !emptySparkPoolCap(cap) || globalWorkToDo(); + return !emptySparkPoolCap(cap) + || !emptyWakeupQueue(cap) + || globalWorkToDo(); } else return cap->run_queue_hd->bound == NULL; } @@ -135,6 +137,8 @@ initCapability( Capability *cap, nat i ) cap->suspended_ccalling_tasks = NULL; cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; + cap->wakeup_queue_hd = END_TSO_QUEUE; + cap->wakeup_queue_tl = END_TSO_QUEUE; #endif cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; @@ -296,7 +300,8 @@ releaseCapability_ (Capability* cap) // If we have an unbound thread on the run queue, or if there's // anything else to do, give the Capability to a worker thread. - if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) { + if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap) + || !emptySparkPoolCap(cap) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap,cap->spare_workers); // The worker Task pops itself from the queue; @@ -502,6 +507,37 @@ yieldCapability (Capability** pCap, Task *task) } /* ---------------------------------------------------------------------------- + * Wake up a thread on a Capability. + * + * This is used when the current Task is running on a Capability and + * wishes to wake up a thread on a different Capability. + * ------------------------------------------------------------------------- */ + +void +wakeupThreadOnCapability (Capability *cap, StgTSO *tso) +{ + ASSERT(tso->cap == cap); + ASSERT(tso->bound ? tso->bound->cap == cap : 1); + + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task == NULL) { + // nobody is running this Capability, we can add our thread + // directly onto the run queue and start up a Task to run it. + appendToRunQueue(cap,tso); + + // start it up + cap->running_task = myTask(); // precond for releaseCapability_() + releaseCapability_(cap); + } else { + appendToWakeupQueue(cap,tso); + // someone is running on this Capability, so it cannot be + // freed without first checking the wakeup queue (see + // releaseCapability_). + } + RELEASE_LOCK(&cap->lock); +} + +/* ---------------------------------------------------------------------------- * prodCapabilities * * Used to indicate that the interrupted flag is now set, or some diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h index 50e0b94..a2551d0 100644 --- a/ghc/rts/Capability.h +++ b/ghc/rts/Capability.h @@ -70,7 +70,7 @@ struct Capability_ { // Worker Tasks waiting in the wings. Singly-linked. Task *spare_workers; - // This lock protects running_task and returning_tasks_{hd,tl}. + // This lock protects running_task, returning_tasks_{hd,tl}, wakeup_queue. Mutex lock; // Tasks waiting to return from a foreign call, or waiting to make @@ -80,6 +80,12 @@ struct Capability_ { // check whether it is NULL without taking the lock, however. Task *returning_tasks_hd; // Singly-linked, with head/tail Task *returning_tasks_tl; + + // A list of threads to append to this Capability's run queue at + // the earliest opportunity. These are threads that have been + // woken up by another Capability. + StgTSO *wakeup_queue_hd; + StgTSO *wakeup_queue_tl; #endif // Per-capability STM-related data @@ -189,6 +195,11 @@ void yieldCapability (Capability** pCap, Task *task); // void waitForCapability (Task *task, Mutex *mutex, Capability **pCap); +// Wakes up a thread on a Capability (probably a different Capability +// from the one held by the current Task). +// +void wakeupThreadOnCapability (Capability *cap, StgTSO *tso); + // Wakes up a worker thread on just one Capability, used when we // need to service some global event. // diff --git a/ghc/rts/Exception.cmm b/ghc/rts/Exception.cmm index 4bb9e48..b5c2962 100644 --- a/ghc/rts/Exception.cmm +++ b/ghc/rts/Exception.cmm @@ -380,7 +380,7 @@ retry_pop_stack: * entry code in StgStartup.cmm. */ Sp = CurrentTSO + TSO_OFFSET_StgTSO_stack - + WDS(StgTSO_stack_size(CurrentTSO)) - WDS(2); + + WDS(TO_W_(StgTSO_stack_size(CurrentTSO))) - WDS(2); Sp(1) = R1; /* save the exception */ Sp(0) = stg_enter_info; /* so that GC can traverse this stack */ StgTSO_what_next(CurrentTSO) = ThreadKilled::I16; diff --git a/ghc/rts/RtsFlags.c b/ghc/rts/RtsFlags.c index f24912f..0f83b33 100644 --- a/ghc/rts/RtsFlags.c +++ b/ghc/rts/RtsFlags.c @@ -219,6 +219,8 @@ void initRtsFlagsDefaults(void) #ifdef THREADED_RTS RtsFlags.ParFlags.nNodes = 1; + RtsFlags.ParFlags.migrate = rtsTrue; + RtsFlags.ParFlags.wakeupMigrate = rtsFalse; #endif #ifdef PAR @@ -437,6 +439,8 @@ usage_text[] = { #endif /* DEBUG */ #if defined(THREADED_RTS) " -N Use OS threads (default: 1)", +" -qm Don't automatically migrate threads between CPUs", +" -qw Migrate a thread to the current CPU when it is woken up", #endif #if defined(THREADED_RTS) || defined(PAR) " -e Size of spark pools (default 100)", @@ -1049,6 +1053,25 @@ error = rtsTrue; } } ) break; + + case 'q': + switch (rts_argv[arg][2]) { + case '\0': + errorBelch("incomplete RTS option: %s",rts_argv[arg]); + error = rtsTrue; + break; + case 'm': + RtsFlags.ParFlags.migrate = rtsFalse; + break; + case 'w': + RtsFlags.ParFlags.wakeupMigrate = rtsTrue; + break; + default: + errorBelch("unknown RTS option: %s",rts_argv[arg]); + error = rtsTrue; + break; + } + break; #endif /* =========== PARALLEL =========================== */ case 'e': @@ -1063,10 +1086,12 @@ error = rtsTrue; } ) break; +#ifdef PAR case 'q': PAR_BUILD_ONLY( process_par_option(arg, rts_argc, rts_argv, &error); ) break; +#endif /* =========== GRAN =============================== */ diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 39aa5e2..d49d4ed 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -482,6 +482,21 @@ schedule (Capability *initialCapability, Task *task) // list each time around the scheduler. if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } + // Any threads that were woken up by other Capabilities get + // appended to our run queue. + if (!emptyWakeupQueue(cap)) { + ACQUIRE_LOCK(&cap->lock); + if (emptyRunQueue(cap)) { + cap->run_queue_hd = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } else { + cap->run_queue_tl->link = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } + cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; + RELEASE_LOCK(&cap->lock); + } + scheduleCheckBlockedThreads(cap); scheduleDetectDeadlock(cap,task); @@ -604,6 +619,7 @@ run_thread: // Run the current thread ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + ASSERT(t->cap == cap); prev_what_next = t->what_next; @@ -674,6 +690,7 @@ run_thread: #endif ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + ASSERT(t->cap == cap); // ---------------------------------------------------------------------- @@ -772,6 +789,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, Capability *free_caps[n_capabilities], *cap0; nat i, n_free_caps; + // migration can be turned off with +RTS -qg + if (!RtsFlags.ParFlags.migrate) return; + // Check whether we have more threads on our run queue, or sparks // in our pool, that we could hand to another Capability. if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) @@ -834,6 +854,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no)); appendToRunQueue(free_caps[i],t); if (t->bound) { t->bound->cap = free_caps[i]; } + t->cap = free_caps[i]; i++; } } @@ -2491,6 +2512,7 @@ createThread(Capability *cap, nat size) tso->saved_errno = 0; tso->bound = NULL; + tso->cap = cap; tso->stack_size = stack_size; tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) @@ -2698,6 +2720,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) // This TSO is now a bound thread; make the Task and TSO // point to each other. tso->bound = task; + tso->cap = cap; task->tso = tso; task->ret = ret; @@ -2905,16 +2928,21 @@ GetRoots( evac_fn evac ) for (i = 0; i < n_capabilities; i++) { cap = &capabilities[i]; - evac((StgClosure **)&cap->run_queue_hd); - evac((StgClosure **)&cap->run_queue_tl); - + evac((StgClosure **)(void *)&cap->run_queue_hd); + evac((StgClosure **)(void *)&cap->run_queue_tl); +#if defined(THREADED_RTS) + evac((StgClosure **)(void *)&cap->wakeup_queue_hd); + evac((StgClosure **)(void *)&cap->wakeup_queue_tl); +#endif for (task = cap->suspended_ccalling_tasks; task != NULL; task=task->next) { IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id)); - evac((StgClosure **)&task->suspended_tso); + evac((StgClosure **)(void *)&task->suspended_tso); } + } + #if !defined(THREADED_RTS) evac((StgClosure **)(void *)&blocked_queue_hd); evac((StgClosure **)(void *)&blocked_queue_tl); @@ -3211,21 +3239,29 @@ unblockOne(Capability *cap, StgTSO *tso) ASSERT(get_itbl(tso)->type == TSO); ASSERT(tso->why_blocked != NotBlocked); + tso->why_blocked = NotBlocked; next = tso->link; tso->link = END_TSO_QUEUE; - // We might have just migrated this TSO to our Capability: - if (tso->bound) { - tso->bound->cap = cap; + if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) { + // We are waking up this thread on the current Capability, which + // might involve migrating it from the Capability it was last on. + if (tso->bound) { + ASSERT(tso->bound->cap == tso->cap); + tso->bound->cap = cap; + } + tso->cap = cap; + appendToRunQueue(cap,tso); + // we're holding a newly woken thread, make sure we context switch + // quickly so we can migrate it if necessary. + context_switch = 1; + } else { + // we'll try to wake it up on the Capability it was last on. + wakeupThreadOnCapability(tso->cap, tso); } - appendToRunQueue(cap,tso); - - // we're holding a newly woken thread, make sure we context switch - // quickly so we can migrate it if necessary. - context_switch = 1; - IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id)); + IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no)); return next; } @@ -3675,6 +3711,7 @@ unblockThread(Capability *cap, StgTSO *tso) if (tso->bound) { tso->bound->cap = cap; } + tso->cap = cap; } #endif @@ -4171,13 +4208,8 @@ resurrectThreads (StgTSO *threads) all_threads = tso; IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id)); - // Wake up the thread on the Capability it was last on for a - // bound thread, or last_free_capability otherwise. - if (tso->bound) { - cap = tso->bound->cap; - } else { - cap = last_free_capability; - } + // Wake up the thread on the Capability it was last on + cap = tso->cap; switch (tso->why_blocked) { case BlockedOnMVar: diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index 63dfeb7..d11162e 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -259,6 +259,20 @@ appendToBlockedQueue(StgTSO *tso) } #endif +#if defined(THREADED_RTS) +STATIC_INLINE void +appendToWakeupQueue (Capability *cap, StgTSO *tso) +{ + ASSERT(tso->link == END_TSO_QUEUE); + if (cap->wakeup_queue_hd == END_TSO_QUEUE) { + cap->wakeup_queue_hd = tso; + } else { + cap->wakeup_queue_tl->link = tso; + } + cap->wakeup_queue_tl = tso; +} +#endif + /* Check whether various thread queues are empty */ STATIC_INLINE rtsBool @@ -273,6 +287,14 @@ emptyRunQueue(Capability *cap) return emptyQueue(cap->run_queue_hd); } +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +emptyWakeupQueue(Capability *cap) +{ + return emptyQueue(cap->wakeup_queue_hd); +} +#endif + #if !defined(THREADED_RTS) #define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd)) #define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))