Use work-stealing for load-balancing in the GC
authorSimon Marlow <marlowsd@gmail.com>
Fri, 13 Mar 2009 13:51:16 +0000 (13:51 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Fri, 13 Mar 2009 13:51:16 +0000 (13:51 +0000)
New flag: "+RTS -qb" disables load-balancing in the parallel GC
(though this is subject to change, I think we will probably want to do
something more automatic before releasing this).

To get the "PARGC3" configuration described in the "Runtime support
for Multicore Haskell" paper, use "+RTS -qg0 -qb -RTS".

The main advantage of this is that it allows us to easily disable
load-balancing altogether, which turns out to be important in parallel
programs.  Maintaining locality is sometimes more important that
spreading the work out in parallel GC.  There is a side benefit in
that the parallel GC should have improved locality even when
load-balancing, because each processor prefers to take work from its
own queue before stealing from others.

13 files changed:
includes/RtsFlags.h
includes/Storage.h
rts/RtsFlags.c
rts/Stats.c
rts/parallel/WSDeque.c
rts/parallel/WSDeque.h
rts/sm/GC.c
rts/sm/GC.h
rts/sm/GCThread.h
rts/sm/GCUtils.c
rts/sm/GCUtils.h
rts/sm/Scav.c
rts/sm/Storage.c

index db196a9..5c72bbb 100644 (file)
@@ -183,6 +183,7 @@ struct PAR_FLAGS {
   rtsBool        parGcEnabled;   /* enable parallel GC */
   rtsBool        parGcGen;       /* do parallel GC in this generation
                                   * and higher only */
+  rtsBool        parGcLoadBalancing; /* do load-balancing in parallel GC */
 };
 #endif /* THREADED_RTS */
 
index f43eb79..e541082 100644 (file)
@@ -81,7 +81,6 @@ typedef struct step_ {
 #if defined(THREADED_RTS)
     char pad[128];                      // make sure the following is
                                         // on a separate cache line.
-    SpinLock     sync_todo;             // lock for todos
     SpinLock     sync_large_objects;    // lock for large_objects
                                         //    and scavenged_large_objects
 #endif
@@ -93,10 +92,6 @@ typedef struct step_ {
     unsigned int n_old_blocks;         // number of blocks in from-space
     unsigned int live_estimate;         // for sweeping: estimate of live data
     
-    bdescr *     todos;                        // blocks waiting to be scavenged
-    bdescr *     todos_last;
-    unsigned int n_todos;               // count of above
-
     bdescr *     part_blocks;           // partially-full scanned blocks
     unsigned int n_part_blocks;         // count of above
 
index cbc2bb5..2af67e7 100644 (file)
@@ -226,6 +226,7 @@ void initRtsFlagsDefaults(void)
     RtsFlags.ParFlags.wakeupMigrate     = rtsFalse;
     RtsFlags.ParFlags.parGcEnabled      = 1;
     RtsFlags.ParFlags.parGcGen          = 1;
+    RtsFlags.ParFlags.parGcLoadBalancing = 1;
 #endif
 
 #ifdef PAR
@@ -1211,6 +1212,9 @@ error = rtsTrue;
                             error = rtsTrue;
                         }
                         break;
+                   case 'b':
+                       RtsFlags.ParFlags.parGcLoadBalancing = rtsFalse;
+                       break;
                    case 'm':
                        RtsFlags.ParFlags.migrate = rtsFalse;
                        break;
index c43806f..7c8042a 100644 (file)
@@ -716,7 +716,6 @@ stat_exit(int alloc)
                 statsPrintf("whitehole_spin: %"FMT_Word64"\n", whitehole_spin);
                 for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
                     for (s = 0; s < generations[g].n_steps; s++) {
-                        statsPrintf("gen[%d].steps[%d].sync_todo: %"FMT_Word64"\n", g, s, generations[g].steps[s].sync_todo.spin);
                         statsPrintf("gen[%d].steps[%d].sync_large_objects: %"FMT_Word64"\n", g, s, generations[g].steps[s].sync_large_objects.spin);
                     }
                 }
index ced6c95..acecb85 100644 (file)
@@ -43,8 +43,6 @@
 #include "WSDeque.h"
 #include "SMP.h" // for cas
 
-#if defined(THREADED_RTS)
-
 #define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
 
 /* -----------------------------------------------------------------------------
@@ -285,5 +283,3 @@ pushWSDeque (WSDeque* q, void * elem)
     ASSERT_WSDEQUE_INVARIANTS(q); 
     return rtsTrue;
 }
-
-#endif
index c254671..d85567c 100644 (file)
@@ -9,8 +9,6 @@
 #ifndef WSDEQUE_H
 #define WSDEQUE_H
 
-#if defined(THREADED_RTS)
-
 typedef struct WSDeque_ {
     // Size of elements array. Used for modulo calculation: we round up
     // to powers of 2 and use the dyadic log (modulo == bitwise &) 
@@ -125,6 +123,4 @@ discardElements (WSDeque *q)
 //    pool->topBound = pool->top;
 }
 
-#endif // THREADED_RTS
-
 #endif // WSDEQUE_H
index ec9cd07..45fecc9 100644 (file)
@@ -125,6 +125,8 @@ nat n_gc_threads;
 // For stats:
 long copied;        // *words* copied & scavenged during this GC
 
+rtsBool work_stealing;
+
 DECLARE_GCT
 
 /* -----------------------------------------------------------------------------
@@ -231,6 +233,19 @@ GarbageCollect (rtsBool force_major_gc,
    */
   n = initialise_N(force_major_gc);
 
+#if defined(THREADED_RTS)
+  work_stealing = RtsFlags.ParFlags.parGcLoadBalancing;
+      // It's not always a good idea to do load balancing in parallel
+      // GC.  In particular, for a parallel program we don't want to
+      // lose locality by moving cached data into another CPU's cache
+      // (this effect can be quite significant). 
+      //
+      // We could have a more complex way to deterimine whether to do
+      // work stealing or not, e.g. it might be a good idea to do it
+      // if the heap is big.  For now, we just turn it on or off with
+      // a flag.
+#endif
+
   /* Start threads, so they can be spinning up while we finish initialisation.
    */
   start_gc_threads();
@@ -879,7 +894,9 @@ alloc_gc_thread (int n)
         ws->gct = t;
         
         ws->todo_bd = NULL;
-        ws->buffer_todo_bd = NULL;
+        ws->todo_q = newWSDeque(128);
+        ws->todo_overflow = NULL;
+        ws->n_todo_overflow = 0;
         
         ws->part_list = NULL;
         ws->n_part_blocks = 0;
@@ -971,8 +988,23 @@ any_work (void)
         }
         ws = &gct->steps[s];
         if (ws->todo_large_objects) return rtsTrue;
-        if (ws->step->todos) return rtsTrue;
+        if (!looksEmptyWSDeque(ws->todo_q)) return rtsTrue;
+        if (ws->todo_overflow) return rtsTrue;
+    }
+
+#if defined(THREADED_RTS)
+    if (work_stealing) {
+        nat n;
+        // look for work to steal
+        for (n = 0; n < n_gc_threads; n++) {
+            if (n == gct->thread_index) continue;
+            for (s = total_steps-1; s >= 0; s--) {
+                ws = &gc_threads[n]->steps[s];
+                if (!looksEmptyWSDeque(ws->todo_q)) return rtsTrue;
+            }
+        }
     }
+#endif
 
     gct->no_work++;
 
@@ -1001,18 +1033,18 @@ loop:
     r = dec_running();
     
     debugTrace(DEBUG_gc, "GC thread %d idle (%d still running)", 
-              gct->thread_index, r);
-
+               gct->thread_index, r);
+    
     while (gc_running_threads != 0) {
         // usleep(1);
-       if (any_work()) {
-           inc_running();
-           goto loop;
-       }
-       // any_work() does not remove the work from the queue, it
-       // just checks for the presence of work.  If we find any,
-       // then we increment gc_running_threads and go back to 
-       // scavenge_loop() to perform any pending work.
+        if (any_work()) {
+            inc_running();
+            goto loop;
+        }
+        // any_work() does not remove the work from the queue, it
+        // just checks for the presence of work.  If we find any,
+        // then we increment gc_running_threads and go back to 
+        // scavenge_loop() to perform any pending work.
     }
     
     // All threads are now stopped
@@ -1207,11 +1239,6 @@ init_collected_gen (nat g, nat n_threads)
        stp->n_words      = 0;
        stp->live_estimate = 0;
 
-       // we don't have any to-be-scavenged blocks yet
-       stp->todos = NULL;
-        stp->todos_last = NULL;
-       stp->n_todos = 0;
-
        // initialise the large object queues.
        stp->scavenged_large_objects = NULL;
        stp->n_scavenged_large_blocks = 0;
@@ -1284,9 +1311,12 @@ init_collected_gen (nat g, nat n_threads)
            // allocate the first to-space block; extra blocks will be
            // chained on as necessary.
            ws->todo_bd = NULL;
-           ws->buffer_todo_bd = NULL;
+            ASSERT(looksEmptyWSDeque(ws->todo_q));
            alloc_todo_block(ws,0);
 
+            ws->todo_overflow = NULL;
+            ws->n_todo_overflow = 0;
+
            ws->scavd_list = NULL;
            ws->n_scavd_blocks = 0;
        }
@@ -1329,7 +1359,7 @@ init_uncollected_gen (nat g, nat threads)
         for (t = 0; t < threads; t++) {
            ws = &gc_threads[t]->steps[g * RtsFlags.GcFlags.steps + s];
            
-           ws->buffer_todo_bd = NULL;
+            ASSERT(looksEmptyWSDeque(ws->todo_q));
            ws->todo_large_objects = NULL;
 
             ws->part_list = NULL;
index 366125d..fb6e385 100644 (file)
@@ -28,6 +28,8 @@ extern StgPtr  oldgen_scan;
 
 extern long copied;
 
+extern rtsBool work_stealing;
+
 #ifdef DEBUG
 extern nat mutlist_MUTVARS, mutlist_MUTARRS, mutlist_MVARS, mutlist_OTHERS;
 #endif
index 3ee2757..aacef82 100644 (file)
@@ -15,6 +15,7 @@
 #define GCTHREAD_H
 
 #include "OSThreads.h"
+#include "WSDeque.h"
 
 /* -----------------------------------------------------------------------------
    General scheme
@@ -81,13 +82,14 @@ typedef struct step_workspace_ {
     StgPtr       todo_free;            // free ptr for todo_bd
     StgPtr       todo_lim;             // lim for todo_bd
 
-    bdescr *     buffer_todo_bd;     // buffer to reduce contention
-                                     // on the step's todos list
+    WSDeque *    todo_q;
+    bdescr *     todo_overflow;
+    nat          n_todo_overflow;
 
     // where large objects to be scavenged go
     bdescr *     todo_large_objects;
 
-    // Objects that have already been, scavenged.
+    // Objects that have already been scavenged.
     bdescr *     scavd_list;
     nat          n_scavd_blocks;     // count of blocks in this list
 
@@ -95,7 +97,7 @@ typedef struct step_workspace_ {
     bdescr *     part_list;
     unsigned int n_part_blocks;      // count of above
 
-    StgWord pad[5];
+    StgWord pad[3];
 
 } step_workspace ATTRIBUTE_ALIGNED(64);
 // align so that computing gct->steps[n] is a shift, not a multiply
index 4432ad6..84b7564 100644 (file)
@@ -19,6 +19,9 @@
 #include "GCUtils.h"
 #include "Printer.h"
 #include "Trace.h"
+#ifdef THREADED_RTS
+#include "WSDeque.h"
+#endif
 
 #ifdef THREADED_RTS
 SpinLock gc_alloc_block_sync;
@@ -72,34 +75,47 @@ freeChain_sync(bdescr *bd)
    -------------------------------------------------------------------------- */
 
 bdescr *
-grab_todo_block (step_workspace *ws)
+grab_local_todo_block (step_workspace *ws)
 {
     bdescr *bd;
     step *stp;
 
     stp = ws->step;
-    bd = NULL;
 
-    if (ws->buffer_todo_bd)
+    bd = ws->todo_overflow;
+    if (bd != NULL)
+    {
+        ws->todo_overflow = bd->link;
+        bd->link = NULL;
+        ws->n_todo_overflow--;
+       return bd;
+    }
+
+    bd = popWSDeque(ws->todo_q);
+    if (bd != NULL)
     {
-       bd = ws->buffer_todo_bd;
        ASSERT(bd->link == NULL);
-       ws->buffer_todo_bd = NULL;
        return bd;
     }
 
-    ACQUIRE_SPIN_LOCK(&stp->sync_todo);
-    if (stp->todos) {
-       bd = stp->todos;
-        if (stp->todos == stp->todos_last) {
-            stp->todos_last = NULL;
+    return NULL;
+}
+
+bdescr *
+steal_todo_block (nat s)
+{
+    nat n;
+    bdescr *bd;
+
+    // look for work to steal
+    for (n = 0; n < n_gc_threads; n++) {
+        if (n == gct->thread_index) continue;
+        bd = stealWSDeque(gc_threads[n]->steps[s].todo_q);
+        if (bd) {
+            return bd;
         }
-       stp->todos = bd->link;
-        stp->n_todos--;
-       bd->link = NULL;
-    }  
-    RELEASE_SPIN_LOCK(&stp->sync_todo);
-    return bd;
+    }
+    return NULL;
 }
 
 void
@@ -145,7 +161,7 @@ todo_block_full (nat size, step_workspace *ws)
     // this block to push, and there's enough room in
     // this block to evacuate the current object, then just increase
     // the limit.
-    if (ws->step->todos != NULL || 
+    if (!looksEmptyWSDeque(ws->todo_q) || 
         (ws->todo_free - bd->u.scan < WORK_UNIT_WORDS / 2)) {
         if (ws->todo_free + size < bd->start + BLOCK_SIZE_W) {
             ws->todo_lim = stg_min(bd->start + BLOCK_SIZE_W,
@@ -178,20 +194,15 @@ todo_block_full (nat size, step_workspace *ws)
         {
             step *stp;
             stp = ws->step;
-            trace(TRACE_gc|DEBUG_gc, "push todo block %p (%ld words), step %d, n_todos: %d", 
+            trace(TRACE_gc|DEBUG_gc, "push todo block %p (%ld words), step %d, todo_q: %ld", 
                   bd->start, (unsigned long)(bd->free - bd->u.scan),
-                  stp->abs_no, stp->n_todos);
-            // ToDo: use buffer_todo
-            ACQUIRE_SPIN_LOCK(&stp->sync_todo);
-            if (stp->todos_last == NULL) {
-                stp->todos_last = bd;
-                stp->todos = bd;
-            } else {
-                stp->todos_last->link = bd;
-                stp->todos_last = bd;
+                  stp->abs_no, dequeElements(ws->todo_q));
+
+            if (!pushWSDeque(ws->todo_q, bd)) {
+                bd->link = ws->todo_overflow;
+                ws->todo_overflow = bd;
+                ws->n_todo_overflow++;
             }
-            stp->n_todos++;
-            RELEASE_SPIN_LOCK(&stp->sync_todo);
         }
     }
 
@@ -207,7 +218,7 @@ todo_block_full (nat size, step_workspace *ws)
 StgPtr
 alloc_todo_block (step_workspace *ws, nat size)
 {
-    bdescr *bd/*, *hd, *tl*/;
+    bdescr *bd/*, *hd, *tl */;
 
     // Grab a part block if we have one, and it has enough room
     if (ws->part_list != NULL && 
@@ -221,12 +232,12 @@ alloc_todo_block (step_workspace *ws, nat size)
     {
         // blocks in to-space get the BF_EVACUATED flag.
 
-//        allocBlocks_sync(4, &hd, &tl, 
+//        allocBlocks_sync(16, &hd, &tl, 
 //                         ws->step->gen_no, ws->step, BF_EVACUATED);
 //
 //        tl->link = ws->part_list;
 //        ws->part_list = hd->link;
-//        ws->n_part_blocks += 3;
+//        ws->n_part_blocks += 15;
 //
 //        bd = hd;
 
index 6948b2f..e71f437 100644 (file)
@@ -17,10 +17,12 @@ bdescr *allocBlock_sync(void);
 void    freeChain_sync(bdescr *bd);
 
 void    push_scanned_block   (bdescr *bd, step_workspace *ws);
-bdescr *grab_todo_block      (step_workspace *ws);
 StgPtr  todo_block_full      (nat size, step_workspace *ws);
 StgPtr  alloc_todo_block     (step_workspace *ws, nat size);
 
+bdescr *grab_local_todo_block  (step_workspace *ws);
+bdescr *steal_todo_block       (nat s);
+
 // Returns true if a block is partially full.  This predicate is used to try
 // to re-use partial blocks wherever possible, and to reduce wastage.
 // We might need to tweak the actual value.
index 34096d4..d5e9b12 100644 (file)
@@ -335,6 +335,7 @@ scavenge_block (bdescr *bd)
   // time around the loop.
   while (p < bd->free || (bd == ws->todo_bd && p < ws->todo_free)) {
 
+      ASSERT(bd->link == NULL);
     ASSERT(LOOKS_LIKE_CLOSURE_PTR(p));
     info = get_itbl((StgClosure *)p);
     
@@ -1915,7 +1916,7 @@ loop:
             break;
         }
 
-        if ((bd = grab_todo_block(ws)) != NULL) {
+        if ((bd = grab_local_todo_block(ws)) != NULL) {
             scavenge_block(bd);
             did_something = rtsTrue;
             break;
@@ -1926,6 +1927,28 @@ loop:
         did_anything = rtsTrue;
         goto loop;
     }
+
+#if defined(THREADED_RTS)
+    if (work_stealing) {
+        // look for work to steal
+        for (s = total_steps-1; s >= 0; s--) {
+            if (s == 0 && RtsFlags.GcFlags.generations > 1) { 
+                continue; 
+            }
+            if ((bd = steal_todo_block(s)) != NULL) {
+                scavenge_block(bd);
+                did_something = rtsTrue;
+                break;
+            }
+        }
+
+        if (did_something) {
+            did_anything = rtsTrue;
+            goto loop;
+        }
+    }
+#endif
+
     // only return when there is no more work to do
 
     return did_anything;
index f38842b..cfe649a 100644 (file)
@@ -104,7 +104,6 @@ initStep (step *stp, int g, int s)
     stp->compact = 0;
     stp->bitmap = NULL;
 #ifdef THREADED_RTS
-    initSpinLock(&stp->sync_todo);
     initSpinLock(&stp->sync_large_objects);
 #endif
     stp->threads = END_TSO_QUEUE;