Use work-stealing for load-balancing in the GC
[ghc-hetmet.git] / rts / sm / GC.c
index e44a310..45fecc9 100644 (file)
@@ -125,6 +125,8 @@ nat n_gc_threads;
 // For stats:
 long copied;        // *words* copied & scavenged during this GC
 
+rtsBool work_stealing;
+
 DECLARE_GCT
 
 /* -----------------------------------------------------------------------------
@@ -146,7 +148,6 @@ static nat  inc_running             (void);
 static nat  dec_running             (void);
 static void wakeup_gc_threads       (nat n_threads, nat me);
 static void shutdown_gc_threads     (nat n_threads, nat me);
-static void continue_gc_threads     (nat n_threads, nat me);
 
 #if 0 && defined(DEBUG)
 static void gcCAFs                  (void);
@@ -232,6 +233,19 @@ GarbageCollect (rtsBool force_major_gc,
    */
   n = initialise_N(force_major_gc);
 
+#if defined(THREADED_RTS)
+  work_stealing = RtsFlags.ParFlags.parGcLoadBalancing;
+      // It's not always a good idea to do load balancing in parallel
+      // GC.  In particular, for a parallel program we don't want to
+      // lose locality by moving cached data into another CPU's cache
+      // (this effect can be quite significant). 
+      //
+      // We could have a more complex way to deterimine whether to do
+      // work stealing or not, e.g. it might be a good idea to do it
+      // if the heap is big.  For now, we just turn it on or off with
+      // a flag.
+#endif
+
   /* Start threads, so they can be spinning up while we finish initialisation.
    */
   start_gc_threads();
@@ -787,8 +801,6 @@ GarbageCollect (rtsBool force_major_gc,
   }
 #endif
 
-  continue_gc_threads(n_gc_threads, gct->thread_index);
-
   RELEASE_SM_LOCK;
 
   gct = saved_gct;
@@ -882,7 +894,9 @@ alloc_gc_thread (int n)
         ws->gct = t;
         
         ws->todo_bd = NULL;
-        ws->buffer_todo_bd = NULL;
+        ws->todo_q = newWSDeque(128);
+        ws->todo_overflow = NULL;
+        ws->n_todo_overflow = 0;
         
         ws->part_list = NULL;
         ws->n_part_blocks = 0;
@@ -974,9 +988,24 @@ any_work (void)
         }
         ws = &gct->steps[s];
         if (ws->todo_large_objects) return rtsTrue;
-        if (ws->step->todos) return rtsTrue;
+        if (!looksEmptyWSDeque(ws->todo_q)) return rtsTrue;
+        if (ws->todo_overflow) return rtsTrue;
     }
 
+#if defined(THREADED_RTS)
+    if (work_stealing) {
+        nat n;
+        // look for work to steal
+        for (n = 0; n < n_gc_threads; n++) {
+            if (n == gct->thread_index) continue;
+            for (s = total_steps-1; s >= 0; s--) {
+                ws = &gc_threads[n]->steps[s];
+                if (!looksEmptyWSDeque(ws->todo_q)) return rtsTrue;
+            }
+        }
+    }
+#endif
+
     gct->no_work++;
 
     return rtsFalse;
@@ -1004,18 +1033,18 @@ loop:
     r = dec_running();
     
     debugTrace(DEBUG_gc, "GC thread %d idle (%d still running)", 
-              gct->thread_index, r);
-
+               gct->thread_index, r);
+    
     while (gc_running_threads != 0) {
         // usleep(1);
-       if (any_work()) {
-           inc_running();
-           goto loop;
-       }
-       // any_work() does not remove the work from the queue, it
-       // just checks for the presence of work.  If we find any,
-       // then we increment gc_running_threads and go back to 
-       // scavenge_loop() to perform any pending work.
+        if (any_work()) {
+            inc_running();
+            goto loop;
+        }
+        // any_work() does not remove the work from the queue, it
+        // just checks for the presence of work.  If we find any,
+        // then we increment gc_running_threads and go back to 
+        // scavenge_loop() to perform any pending work.
     }
     
     // All threads are now stopped
@@ -1144,14 +1173,17 @@ shutdown_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
 #endif
 }
 
-static void
-continue_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
+void
+releaseGCThreads (Capability *cap USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
+    nat n_threads = RtsFlags.ParFlags.nNodes;
+    nat me = cap->no;
     nat i;
     for (i=0; i < n_threads; i++) {
         if (i == me) continue;
-        if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) barf("continue_gc_threads");
+        if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) 
+            barf("releaseGCThreads");
         
         gc_threads[i]->wakeup = GC_THREAD_INACTIVE;
         ACQUIRE_SPIN_LOCK(&gc_threads[i]->gc_spin);
@@ -1207,11 +1239,6 @@ init_collected_gen (nat g, nat n_threads)
        stp->n_words      = 0;
        stp->live_estimate = 0;
 
-       // we don't have any to-be-scavenged blocks yet
-       stp->todos = NULL;
-        stp->todos_last = NULL;
-       stp->n_todos = 0;
-
        // initialise the large object queues.
        stp->scavenged_large_objects = NULL;
        stp->n_scavenged_large_blocks = 0;
@@ -1284,9 +1311,12 @@ init_collected_gen (nat g, nat n_threads)
            // allocate the first to-space block; extra blocks will be
            // chained on as necessary.
            ws->todo_bd = NULL;
-           ws->buffer_todo_bd = NULL;
+            ASSERT(looksEmptyWSDeque(ws->todo_q));
            alloc_todo_block(ws,0);
 
+            ws->todo_overflow = NULL;
+            ws->n_todo_overflow = 0;
+
            ws->scavd_list = NULL;
            ws->n_scavd_blocks = 0;
        }
@@ -1329,7 +1359,7 @@ init_uncollected_gen (nat g, nat threads)
         for (t = 0; t < threads; t++) {
            ws = &gc_threads[t]->steps[g * RtsFlags.GcFlags.steps + s];
            
-           ws->buffer_todo_bd = NULL;
+            ASSERT(looksEmptyWSDeque(ws->todo_q));
            ws->todo_large_objects = NULL;
 
             ws->part_list = NULL;