From 4e3542263207ae49963811aeb84927027e7bb61d Mon Sep 17 00:00:00 2001 From: Simon Marlow Date: Fri, 13 Mar 2009 13:51:16 +0000 Subject: [PATCH] Use work-stealing for load-balancing in the GC New flag: "+RTS -qb" disables load-balancing in the parallel GC (though this is subject to change, I think we will probably want to do something more automatic before releasing this). To get the "PARGC3" configuration described in the "Runtime support for Multicore Haskell" paper, use "+RTS -qg0 -qb -RTS". The main advantage of this is that it allows us to easily disable load-balancing altogether, which turns out to be important in parallel programs. Maintaining locality is sometimes more important that spreading the work out in parallel GC. There is a side benefit in that the parallel GC should have improved locality even when load-balancing, because each processor prefers to take work from its own queue before stealing from others. --- includes/RtsFlags.h | 1 + includes/Storage.h | 5 ---- rts/RtsFlags.c | 4 +++ rts/Stats.c | 1 - rts/parallel/WSDeque.c | 4 --- rts/parallel/WSDeque.h | 4 --- rts/sm/GC.c | 68 +++++++++++++++++++++++++++++++------------ rts/sm/GC.h | 2 ++ rts/sm/GCThread.h | 10 ++++--- rts/sm/GCUtils.c | 75 +++++++++++++++++++++++++++--------------------- rts/sm/GCUtils.h | 4 ++- rts/sm/Scav.c | 25 +++++++++++++++- rts/sm/Storage.c | 1 - 13 files changed, 132 insertions(+), 72 deletions(-) diff --git a/includes/RtsFlags.h b/includes/RtsFlags.h index db196a9..5c72bbb 100644 --- a/includes/RtsFlags.h +++ b/includes/RtsFlags.h @@ -183,6 +183,7 @@ struct PAR_FLAGS { rtsBool parGcEnabled; /* enable parallel GC */ rtsBool parGcGen; /* do parallel GC in this generation * and higher only */ + rtsBool parGcLoadBalancing; /* do load-balancing in parallel GC */ }; #endif /* THREADED_RTS */ diff --git a/includes/Storage.h b/includes/Storage.h index f43eb79..e541082 100644 --- a/includes/Storage.h +++ b/includes/Storage.h @@ -81,7 +81,6 @@ typedef struct step_ { #if defined(THREADED_RTS) char pad[128]; // make sure the following is // on a separate cache line. - SpinLock sync_todo; // lock for todos SpinLock sync_large_objects; // lock for large_objects // and scavenged_large_objects #endif @@ -93,10 +92,6 @@ typedef struct step_ { unsigned int n_old_blocks; // number of blocks in from-space unsigned int live_estimate; // for sweeping: estimate of live data - bdescr * todos; // blocks waiting to be scavenged - bdescr * todos_last; - unsigned int n_todos; // count of above - bdescr * part_blocks; // partially-full scanned blocks unsigned int n_part_blocks; // count of above diff --git a/rts/RtsFlags.c b/rts/RtsFlags.c index cbc2bb5..2af67e7 100644 --- a/rts/RtsFlags.c +++ b/rts/RtsFlags.c @@ -226,6 +226,7 @@ void initRtsFlagsDefaults(void) RtsFlags.ParFlags.wakeupMigrate = rtsFalse; RtsFlags.ParFlags.parGcEnabled = 1; RtsFlags.ParFlags.parGcGen = 1; + RtsFlags.ParFlags.parGcLoadBalancing = 1; #endif #ifdef PAR @@ -1211,6 +1212,9 @@ error = rtsTrue; error = rtsTrue; } break; + case 'b': + RtsFlags.ParFlags.parGcLoadBalancing = rtsFalse; + break; case 'm': RtsFlags.ParFlags.migrate = rtsFalse; break; diff --git a/rts/Stats.c b/rts/Stats.c index c43806f..7c8042a 100644 --- a/rts/Stats.c +++ b/rts/Stats.c @@ -716,7 +716,6 @@ stat_exit(int alloc) statsPrintf("whitehole_spin: %"FMT_Word64"\n", whitehole_spin); for (g = 0; g < RtsFlags.GcFlags.generations; g++) { for (s = 0; s < generations[g].n_steps; s++) { - statsPrintf("gen[%d].steps[%d].sync_todo: %"FMT_Word64"\n", g, s, generations[g].steps[s].sync_todo.spin); statsPrintf("gen[%d].steps[%d].sync_large_objects: %"FMT_Word64"\n", g, s, generations[g].steps[s].sync_large_objects.spin); } } diff --git a/rts/parallel/WSDeque.c b/rts/parallel/WSDeque.c index ced6c95..acecb85 100644 --- a/rts/parallel/WSDeque.c +++ b/rts/parallel/WSDeque.c @@ -43,8 +43,6 @@ #include "WSDeque.h" #include "SMP.h" // for cas -#if defined(THREADED_RTS) - #define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new))) /* ----------------------------------------------------------------------------- @@ -285,5 +283,3 @@ pushWSDeque (WSDeque* q, void * elem) ASSERT_WSDEQUE_INVARIANTS(q); return rtsTrue; } - -#endif diff --git a/rts/parallel/WSDeque.h b/rts/parallel/WSDeque.h index c254671..d85567c 100644 --- a/rts/parallel/WSDeque.h +++ b/rts/parallel/WSDeque.h @@ -9,8 +9,6 @@ #ifndef WSDEQUE_H #define WSDEQUE_H -#if defined(THREADED_RTS) - typedef struct WSDeque_ { // Size of elements array. Used for modulo calculation: we round up // to powers of 2 and use the dyadic log (modulo == bitwise &) @@ -125,6 +123,4 @@ discardElements (WSDeque *q) // pool->topBound = pool->top; } -#endif // THREADED_RTS - #endif // WSDEQUE_H diff --git a/rts/sm/GC.c b/rts/sm/GC.c index ec9cd07..45fecc9 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -125,6 +125,8 @@ nat n_gc_threads; // For stats: long copied; // *words* copied & scavenged during this GC +rtsBool work_stealing; + DECLARE_GCT /* ----------------------------------------------------------------------------- @@ -231,6 +233,19 @@ GarbageCollect (rtsBool force_major_gc, */ n = initialise_N(force_major_gc); +#if defined(THREADED_RTS) + work_stealing = RtsFlags.ParFlags.parGcLoadBalancing; + // It's not always a good idea to do load balancing in parallel + // GC. In particular, for a parallel program we don't want to + // lose locality by moving cached data into another CPU's cache + // (this effect can be quite significant). + // + // We could have a more complex way to deterimine whether to do + // work stealing or not, e.g. it might be a good idea to do it + // if the heap is big. For now, we just turn it on or off with + // a flag. +#endif + /* Start threads, so they can be spinning up while we finish initialisation. */ start_gc_threads(); @@ -879,7 +894,9 @@ alloc_gc_thread (int n) ws->gct = t; ws->todo_bd = NULL; - ws->buffer_todo_bd = NULL; + ws->todo_q = newWSDeque(128); + ws->todo_overflow = NULL; + ws->n_todo_overflow = 0; ws->part_list = NULL; ws->n_part_blocks = 0; @@ -971,8 +988,23 @@ any_work (void) } ws = &gct->steps[s]; if (ws->todo_large_objects) return rtsTrue; - if (ws->step->todos) return rtsTrue; + if (!looksEmptyWSDeque(ws->todo_q)) return rtsTrue; + if (ws->todo_overflow) return rtsTrue; + } + +#if defined(THREADED_RTS) + if (work_stealing) { + nat n; + // look for work to steal + for (n = 0; n < n_gc_threads; n++) { + if (n == gct->thread_index) continue; + for (s = total_steps-1; s >= 0; s--) { + ws = &gc_threads[n]->steps[s]; + if (!looksEmptyWSDeque(ws->todo_q)) return rtsTrue; + } + } } +#endif gct->no_work++; @@ -1001,18 +1033,18 @@ loop: r = dec_running(); debugTrace(DEBUG_gc, "GC thread %d idle (%d still running)", - gct->thread_index, r); - + gct->thread_index, r); + while (gc_running_threads != 0) { // usleep(1); - if (any_work()) { - inc_running(); - goto loop; - } - // any_work() does not remove the work from the queue, it - // just checks for the presence of work. If we find any, - // then we increment gc_running_threads and go back to - // scavenge_loop() to perform any pending work. + if (any_work()) { + inc_running(); + goto loop; + } + // any_work() does not remove the work from the queue, it + // just checks for the presence of work. If we find any, + // then we increment gc_running_threads and go back to + // scavenge_loop() to perform any pending work. } // All threads are now stopped @@ -1207,11 +1239,6 @@ init_collected_gen (nat g, nat n_threads) stp->n_words = 0; stp->live_estimate = 0; - // we don't have any to-be-scavenged blocks yet - stp->todos = NULL; - stp->todos_last = NULL; - stp->n_todos = 0; - // initialise the large object queues. stp->scavenged_large_objects = NULL; stp->n_scavenged_large_blocks = 0; @@ -1284,9 +1311,12 @@ init_collected_gen (nat g, nat n_threads) // allocate the first to-space block; extra blocks will be // chained on as necessary. ws->todo_bd = NULL; - ws->buffer_todo_bd = NULL; + ASSERT(looksEmptyWSDeque(ws->todo_q)); alloc_todo_block(ws,0); + ws->todo_overflow = NULL; + ws->n_todo_overflow = 0; + ws->scavd_list = NULL; ws->n_scavd_blocks = 0; } @@ -1329,7 +1359,7 @@ init_uncollected_gen (nat g, nat threads) for (t = 0; t < threads; t++) { ws = &gc_threads[t]->steps[g * RtsFlags.GcFlags.steps + s]; - ws->buffer_todo_bd = NULL; + ASSERT(looksEmptyWSDeque(ws->todo_q)); ws->todo_large_objects = NULL; ws->part_list = NULL; diff --git a/rts/sm/GC.h b/rts/sm/GC.h index 366125d..fb6e385 100644 --- a/rts/sm/GC.h +++ b/rts/sm/GC.h @@ -28,6 +28,8 @@ extern StgPtr oldgen_scan; extern long copied; +extern rtsBool work_stealing; + #ifdef DEBUG extern nat mutlist_MUTVARS, mutlist_MUTARRS, mutlist_MVARS, mutlist_OTHERS; #endif diff --git a/rts/sm/GCThread.h b/rts/sm/GCThread.h index 3ee2757..aacef82 100644 --- a/rts/sm/GCThread.h +++ b/rts/sm/GCThread.h @@ -15,6 +15,7 @@ #define GCTHREAD_H #include "OSThreads.h" +#include "WSDeque.h" /* ----------------------------------------------------------------------------- General scheme @@ -81,13 +82,14 @@ typedef struct step_workspace_ { StgPtr todo_free; // free ptr for todo_bd StgPtr todo_lim; // lim for todo_bd - bdescr * buffer_todo_bd; // buffer to reduce contention - // on the step's todos list + WSDeque * todo_q; + bdescr * todo_overflow; + nat n_todo_overflow; // where large objects to be scavenged go bdescr * todo_large_objects; - // Objects that have already been, scavenged. + // Objects that have already been scavenged. bdescr * scavd_list; nat n_scavd_blocks; // count of blocks in this list @@ -95,7 +97,7 @@ typedef struct step_workspace_ { bdescr * part_list; unsigned int n_part_blocks; // count of above - StgWord pad[5]; + StgWord pad[3]; } step_workspace ATTRIBUTE_ALIGNED(64); // align so that computing gct->steps[n] is a shift, not a multiply diff --git a/rts/sm/GCUtils.c b/rts/sm/GCUtils.c index 4432ad6..84b7564 100644 --- a/rts/sm/GCUtils.c +++ b/rts/sm/GCUtils.c @@ -19,6 +19,9 @@ #include "GCUtils.h" #include "Printer.h" #include "Trace.h" +#ifdef THREADED_RTS +#include "WSDeque.h" +#endif #ifdef THREADED_RTS SpinLock gc_alloc_block_sync; @@ -72,34 +75,47 @@ freeChain_sync(bdescr *bd) -------------------------------------------------------------------------- */ bdescr * -grab_todo_block (step_workspace *ws) +grab_local_todo_block (step_workspace *ws) { bdescr *bd; step *stp; stp = ws->step; - bd = NULL; - if (ws->buffer_todo_bd) + bd = ws->todo_overflow; + if (bd != NULL) + { + ws->todo_overflow = bd->link; + bd->link = NULL; + ws->n_todo_overflow--; + return bd; + } + + bd = popWSDeque(ws->todo_q); + if (bd != NULL) { - bd = ws->buffer_todo_bd; ASSERT(bd->link == NULL); - ws->buffer_todo_bd = NULL; return bd; } - ACQUIRE_SPIN_LOCK(&stp->sync_todo); - if (stp->todos) { - bd = stp->todos; - if (stp->todos == stp->todos_last) { - stp->todos_last = NULL; + return NULL; +} + +bdescr * +steal_todo_block (nat s) +{ + nat n; + bdescr *bd; + + // look for work to steal + for (n = 0; n < n_gc_threads; n++) { + if (n == gct->thread_index) continue; + bd = stealWSDeque(gc_threads[n]->steps[s].todo_q); + if (bd) { + return bd; } - stp->todos = bd->link; - stp->n_todos--; - bd->link = NULL; - } - RELEASE_SPIN_LOCK(&stp->sync_todo); - return bd; + } + return NULL; } void @@ -145,7 +161,7 @@ todo_block_full (nat size, step_workspace *ws) // this block to push, and there's enough room in // this block to evacuate the current object, then just increase // the limit. - if (ws->step->todos != NULL || + if (!looksEmptyWSDeque(ws->todo_q) || (ws->todo_free - bd->u.scan < WORK_UNIT_WORDS / 2)) { if (ws->todo_free + size < bd->start + BLOCK_SIZE_W) { ws->todo_lim = stg_min(bd->start + BLOCK_SIZE_W, @@ -178,20 +194,15 @@ todo_block_full (nat size, step_workspace *ws) { step *stp; stp = ws->step; - trace(TRACE_gc|DEBUG_gc, "push todo block %p (%ld words), step %d, n_todos: %d", + trace(TRACE_gc|DEBUG_gc, "push todo block %p (%ld words), step %d, todo_q: %ld", bd->start, (unsigned long)(bd->free - bd->u.scan), - stp->abs_no, stp->n_todos); - // ToDo: use buffer_todo - ACQUIRE_SPIN_LOCK(&stp->sync_todo); - if (stp->todos_last == NULL) { - stp->todos_last = bd; - stp->todos = bd; - } else { - stp->todos_last->link = bd; - stp->todos_last = bd; + stp->abs_no, dequeElements(ws->todo_q)); + + if (!pushWSDeque(ws->todo_q, bd)) { + bd->link = ws->todo_overflow; + ws->todo_overflow = bd; + ws->n_todo_overflow++; } - stp->n_todos++; - RELEASE_SPIN_LOCK(&stp->sync_todo); } } @@ -207,7 +218,7 @@ todo_block_full (nat size, step_workspace *ws) StgPtr alloc_todo_block (step_workspace *ws, nat size) { - bdescr *bd/*, *hd, *tl*/; + bdescr *bd/*, *hd, *tl */; // Grab a part block if we have one, and it has enough room if (ws->part_list != NULL && @@ -221,12 +232,12 @@ alloc_todo_block (step_workspace *ws, nat size) { // blocks in to-space get the BF_EVACUATED flag. -// allocBlocks_sync(4, &hd, &tl, +// allocBlocks_sync(16, &hd, &tl, // ws->step->gen_no, ws->step, BF_EVACUATED); // // tl->link = ws->part_list; // ws->part_list = hd->link; -// ws->n_part_blocks += 3; +// ws->n_part_blocks += 15; // // bd = hd; diff --git a/rts/sm/GCUtils.h b/rts/sm/GCUtils.h index 6948b2f..e71f437 100644 --- a/rts/sm/GCUtils.h +++ b/rts/sm/GCUtils.h @@ -17,10 +17,12 @@ bdescr *allocBlock_sync(void); void freeChain_sync(bdescr *bd); void push_scanned_block (bdescr *bd, step_workspace *ws); -bdescr *grab_todo_block (step_workspace *ws); StgPtr todo_block_full (nat size, step_workspace *ws); StgPtr alloc_todo_block (step_workspace *ws, nat size); +bdescr *grab_local_todo_block (step_workspace *ws); +bdescr *steal_todo_block (nat s); + // Returns true if a block is partially full. This predicate is used to try // to re-use partial blocks wherever possible, and to reduce wastage. // We might need to tweak the actual value. diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index 34096d4..d5e9b12 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -335,6 +335,7 @@ scavenge_block (bdescr *bd) // time around the loop. while (p < bd->free || (bd == ws->todo_bd && p < ws->todo_free)) { + ASSERT(bd->link == NULL); ASSERT(LOOKS_LIKE_CLOSURE_PTR(p)); info = get_itbl((StgClosure *)p); @@ -1915,7 +1916,7 @@ loop: break; } - if ((bd = grab_todo_block(ws)) != NULL) { + if ((bd = grab_local_todo_block(ws)) != NULL) { scavenge_block(bd); did_something = rtsTrue; break; @@ -1926,6 +1927,28 @@ loop: did_anything = rtsTrue; goto loop; } + +#if defined(THREADED_RTS) + if (work_stealing) { + // look for work to steal + for (s = total_steps-1; s >= 0; s--) { + if (s == 0 && RtsFlags.GcFlags.generations > 1) { + continue; + } + if ((bd = steal_todo_block(s)) != NULL) { + scavenge_block(bd); + did_something = rtsTrue; + break; + } + } + + if (did_something) { + did_anything = rtsTrue; + goto loop; + } + } +#endif + // only return when there is no more work to do return did_anything; diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c index f38842b..cfe649a 100644 --- a/rts/sm/Storage.c +++ b/rts/sm/Storage.c @@ -104,7 +104,6 @@ initStep (step *stp, int g, int s) stp->compact = 0; stp->bitmap = NULL; #ifdef THREADED_RTS - initSpinLock(&stp->sync_todo); initSpinLock(&stp->sync_large_objects); #endif stp->threads = END_TSO_QUEUE; -- 1.7.10.4