Keep the remembered sets local to each thread during parallel GC
[ghc-hetmet.git] / rts / sm / GC.c
index ef0c79a..2af5fa1 100644 (file)
@@ -125,10 +125,6 @@ nat n_gc_threads;
 // For stats:
 long copied;        // *words* copied & scavenged during this GC
 
-#ifdef THREADED_RTS
-SpinLock recordMutableGen_sync;
-#endif
-
 DECLARE_GCT
 
 /* -----------------------------------------------------------------------------
@@ -138,7 +134,6 @@ DECLARE_GCT
 static void mark_root               (void *user, StgClosure **root);
 static void zero_static_object_list (StgClosure* first_static);
 static nat  initialise_N            (rtsBool force_major_gc);
-static void alloc_gc_threads        (void);
 static void init_collected_gen      (nat g, nat threads);
 static void init_uncollected_gen    (nat g, nat threads);
 static void init_gc_thread          (gc_thread *t);
@@ -149,8 +144,9 @@ static void start_gc_threads        (void);
 static void scavenge_until_all_done (void);
 static nat  inc_running             (void);
 static nat  dec_running             (void);
-static void wakeup_gc_threads       (nat n_threads);
-static void shutdown_gc_threads     (nat n_threads);
+static void wakeup_gc_threads       (nat n_threads, nat me);
+static void shutdown_gc_threads     (nat n_threads, nat me);
+static void continue_gc_threads     (nat n_threads, nat me);
 
 #if 0 && defined(DEBUG)
 static void gcCAFs                  (void);
@@ -180,7 +176,9 @@ StgPtr  oldgen_scan;
    -------------------------------------------------------------------------- */
 
 void
-GarbageCollect ( rtsBool force_major_gc )
+GarbageCollect (rtsBool force_major_gc, 
+                nat gc_type USED_IF_THREADS,
+                Capability *cap USED_IF_THREADS)
 {
   bdescr *bd;
   step *stp;
@@ -234,26 +232,24 @@ GarbageCollect ( rtsBool force_major_gc )
    */
   n = initialise_N(force_major_gc);
 
-  /* Allocate + initialise the gc_thread structures.
-   */
-  alloc_gc_threads();
-
   /* Start threads, so they can be spinning up while we finish initialisation.
    */
   start_gc_threads();
 
+#if defined(THREADED_RTS)
   /* How many threads will be participating in this GC?
-   * We don't try to parallelise minor GC, or mark/compact/sweep GC.
+   * We don't try to parallelise minor GCs (unless the user asks for
+   * it with +RTS -gn0), or mark/compact/sweep GC.
    */
-#if defined(THREADED_RTS)
-  if (n < (4*1024*1024 / BLOCK_SIZE) || oldest_gen->steps[0].mark) {
-      n_gc_threads = 1;
+  if (gc_type == PENDING_GC_PAR) {
+      n_gc_threads = RtsFlags.ParFlags.nNodes;
   } else {
-      n_gc_threads = RtsFlags.ParFlags.gcThreads;
+      n_gc_threads = 1;
   }
 #else
   n_gc_threads = 1;
 #endif
+
   trace(TRACE_gc|DEBUG_gc, "GC (gen %d): %d KB to collect, %ld MB in use, using %d thread(s)",
         N, n * (BLOCK_SIZE / 1024), mblocks_allocated, n_gc_threads);
 
@@ -289,7 +285,7 @@ GarbageCollect ( rtsBool force_major_gc )
 
   /* Allocate a mark stack if we're doing a major collection.
    */
-  if (major_gc) {
+  if (major_gc && oldest_gen->steps[0].mark) {
       nat mark_stack_blocks;
       mark_stack_blocks = stg_max(MARK_STACK_BLOCKS, 
                                   oldest_gen->steps[0].n_old_blocks / 100);
@@ -302,31 +298,50 @@ GarbageCollect ( rtsBool force_major_gc )
   }
 
   // this is the main thread
+#ifdef THREADED_RTS
+  if (n_gc_threads == 1) {
+      gct = gc_threads[0];
+  } else {
+      gct = gc_threads[cap->no];
+  }
+#else
   gct = gc_threads[0];
+#endif
 
   /* -----------------------------------------------------------------------
    * follow all the roots that we know about:
-   *   - mutable lists from each generation > N
-   * we want to *scavenge* these roots, not evacuate them: they're not
-   * going to move in this GC.
-   * Also do them in reverse generation order, for the usual reason:
-   * namely to reduce the likelihood of spurious old->new pointers.
    */
-  for (g = RtsFlags.GcFlags.generations-1; g > N; g--) {
-      generations[g].saved_mut_list = generations[g].mut_list;
-      generations[g].mut_list = allocBlock(); 
-      // mut_list always has at least one block.
-  }
 
   // the main thread is running: this prevents any other threads from
   // exiting prematurely, so we can start them now.
   // NB. do this after the mutable lists have been saved above, otherwise
   // the other GC threads will be writing into the old mutable lists.
   inc_running();
-  wakeup_gc_threads(n_gc_threads);
-
+  wakeup_gc_threads(n_gc_threads, gct->thread_index);
+
+  // Mutable lists from each generation > N
+  // we want to *scavenge* these roots, not evacuate them: they're not
+  // going to move in this GC.
+  // Also do them in reverse generation order, for the usual reason:
+  // namely to reduce the likelihood of spurious old->new pointers.
+  //
   for (g = RtsFlags.GcFlags.generations-1; g > N; g--) {
-      scavenge_mutable_list(&generations[g]);
+      scavenge_mutable_list(generations[g].saved_mut_list, &generations[g]);
+      freeChain_sync(generations[g].saved_mut_list);
+      generations[g].saved_mut_list = NULL;
+
+  }
+
+  // scavenge the capability-private mutable lists.  This isn't part
+  // of markSomeCapabilities() because markSomeCapabilities() can only
+  // call back into the GC via mark_root() (due to the gct register
+  // variable).
+  if (n_gc_threads == 1) {
+      for (n = 0; n < n_capabilities; n++) {
+          scavenge_capability_mut_lists(&capabilities[n]);
+      }
+  } else {
+      scavenge_capability_mut_lists(&capabilities[gct->thread_index]);
   }
 
   // follow roots from the CAF list (used by GHCi)
@@ -335,7 +350,8 @@ GarbageCollect ( rtsBool force_major_gc )
 
   // follow all the roots that the application knows about.
   gct->evac_step = 0;
-  markSomeCapabilities(mark_root, gct, gct->thread_index, n_gc_threads);
+  markSomeCapabilities(mark_root, gct, gct->thread_index, n_gc_threads,
+                       rtsTrue/*prune sparks*/);
 
 #if defined(RTS_USER_SIGNALS)
   // mark the signal handlers (signals should be already blocked)
@@ -377,7 +393,7 @@ GarbageCollect ( rtsBool force_major_gc )
       break;
   }
 
-  shutdown_gc_threads(n_gc_threads);
+  shutdown_gc_threads(n_gc_threads, gct->thread_index);
 
   // Update pointers from the Task list
   update_task_list();
@@ -536,6 +552,12 @@ GarbageCollect ( rtsBool force_major_gc )
        for (bd = generations[g].mut_list; bd != NULL; bd = bd->link) {
            mut_list_size += bd->free - bd->start;
        }
+        for (n = 0; n < n_capabilities; n++) {
+            for (bd = capabilities[n].mut_lists[g]; 
+                 bd != NULL; bd = bd->link) {
+                mut_list_size += bd->free - bd->start;
+            }
+        }
        copied +=  mut_list_size;
 
        debugTrace(DEBUG_gc,
@@ -755,6 +777,9 @@ GarbageCollect ( rtsBool force_major_gc )
   slop = calcLiveBlocks() * BLOCK_SIZE_W - live;
   stat_endGC(allocated, live, copied, N, max_copied, avg_copied, slop);
 
+  // Guess which generation we'll collect *next* time
+  initialise_N(force_major_gc);
+
 #if defined(RTS_USER_SIGNALS)
   if (RtsFlags.MiscFlags.install_signal_handlers) {
     // unblock signals again
@@ -762,6 +787,8 @@ GarbageCollect ( rtsBool force_major_gc )
   }
 #endif
 
+  continue_gc_threads(n_gc_threads, gct->thread_index);
+
   RELEASE_SM_LOCK;
 
   gct = saved_gct;
@@ -813,6 +840,11 @@ initialise_N (rtsBool force_major_gc)
    Initialise the gc_thread structures.
    -------------------------------------------------------------------------- */
 
+#define GC_THREAD_INACTIVE             0
+#define GC_THREAD_STANDING_BY          1
+#define GC_THREAD_RUNNING              2
+#define GC_THREAD_WAITING_TO_CONTINUE  3
+
 static gc_thread *
 alloc_gc_thread (int n)
 {
@@ -825,11 +857,11 @@ alloc_gc_thread (int n)
 
 #ifdef THREADED_RTS
     t->id = 0;
-    initCondition(&t->wake_cond);
-    initMutex(&t->wake_mutex);
-    t->wakeup = rtsTrue;  // starts true, so we can wait for the
+    initSpinLock(&t->gc_spin);
+    initSpinLock(&t->mut_spin);
+    ACQUIRE_SPIN_LOCK(&t->gc_spin);
+    t->wakeup = GC_THREAD_INACTIVE;  // starts true, so we can wait for the
                           // thread to start up, see wakeup_gc_threads
-    t->exit   = rtsFalse;
 #endif
 
     t->thread_index = n;
@@ -863,17 +895,17 @@ alloc_gc_thread (int n)
 }
 
 
-static void
-alloc_gc_threads (void)
+void
+initGcThreads (void)
 {
     if (gc_threads == NULL) {
 #if defined(THREADED_RTS)
         nat i;
-       gc_threads = stgMallocBytes (RtsFlags.ParFlags.gcThreads * 
+       gc_threads = stgMallocBytes (RtsFlags.ParFlags.nNodes * 
                                     sizeof(gc_thread*), 
                                     "alloc_gc_threads");
 
-       for (i = 0; i < RtsFlags.ParFlags.gcThreads; i++) {
+       for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
            gc_threads[i] = alloc_gc_thread(i);
        }
 #else
@@ -991,112 +1023,108 @@ loop:
 }
 
 #if defined(THREADED_RTS)
-//
-// gc_thread_work(): Scavenge until there's no work left to do and all
-// the running threads are idle.
-//
-static void
-gc_thread_work (void)
+
+void
+gcWorkerThread (Capability *cap)
 {
-    // gc_running_threads has already been incremented for us; this is
-    // a worker thread and the main thread bumped gc_running_threads
-    // before waking us up.
+    cap->in_gc = rtsTrue;
+
+    gct = gc_threads[cap->no];
+    gct->id = osThreadId();
 
+    // Wait until we're told to wake up
+    RELEASE_SPIN_LOCK(&gct->mut_spin);
+    gct->wakeup = GC_THREAD_STANDING_BY;
+    debugTrace(DEBUG_gc, "GC thread %d standing by...", gct->thread_index);
+    ACQUIRE_SPIN_LOCK(&gct->gc_spin);
+    
+#ifdef USE_PAPI
+    // start performance counters in this thread...
+    if (gct->papi_events == -1) {
+        papi_init_eventset(&gct->papi_events);
+    }
+    papi_thread_start_gc1_count(gct->papi_events);
+#endif
+    
     // Every thread evacuates some roots.
     gct->evac_step = 0;
-    markSomeCapabilities(mark_root, gct, gct->thread_index, n_gc_threads);
+    markSomeCapabilities(mark_root, gct, gct->thread_index, n_gc_threads,
+                         rtsTrue/*prune sparks*/);
+    scavenge_capability_mut_lists(&capabilities[gct->thread_index]);
 
     scavenge_until_all_done();
-}
-
-
-static void
-gc_thread_mainloop (void)
-{
-    while (!gct->exit) {
-
-       // Wait until we're told to wake up
-       ACQUIRE_LOCK(&gct->wake_mutex);
-       gct->wakeup = rtsFalse;
-       while (!gct->wakeup) {
-           debugTrace(DEBUG_gc, "GC thread %d standing by...", 
-                      gct->thread_index);
-           waitCondition(&gct->wake_cond, &gct->wake_mutex);
-       }
-       RELEASE_LOCK(&gct->wake_mutex);
-       if (gct->exit) break;
-
+    
 #ifdef USE_PAPI
-        // start performance counters in this thread...
-        if (gct->papi_events == -1) {
-            papi_init_eventset(&gct->papi_events);
-        }
-        papi_thread_start_gc1_count(gct->papi_events);
+    // count events in this thread towards the GC totals
+    papi_thread_stop_gc1_count(gct->papi_events);
 #endif
 
-       gc_thread_work();
+    // Wait until we're told to continue
+    RELEASE_SPIN_LOCK(&gct->gc_spin);
+    gct->wakeup = GC_THREAD_WAITING_TO_CONTINUE;
+    debugTrace(DEBUG_gc, "GC thread %d waiting to continue...", 
+               gct->thread_index);
+    ACQUIRE_SPIN_LOCK(&gct->mut_spin);
+    debugTrace(DEBUG_gc, "GC thread %d on my way...", gct->thread_index);
+}
 
-#ifdef USE_PAPI
-        // count events in this thread towards the GC totals
-        papi_thread_stop_gc1_count(gct->papi_events);
-#endif
-    }
-}      
 #endif
 
-#if defined(THREADED_RTS)
-static void
-gc_thread_entry (gc_thread *my_gct)
+void
+waitForGcThreads (Capability *cap USED_IF_THREADS)
 {
-    gct = my_gct;
-    debugTrace(DEBUG_gc, "GC thread %d starting...", gct->thread_index);
-    gct->id = osThreadId();
-    gc_thread_mainloop();
-}
+#if defined(THREADED_RTS)
+    nat n_threads = RtsFlags.ParFlags.nNodes;
+    nat me = cap->no;
+    nat i, j;
+    rtsBool retry = rtsTrue;
+
+    while(retry) {
+        for (i=0; i < n_threads; i++) {
+            if (i == me) continue;
+            if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+                prodCapability(&capabilities[i], cap->running_task);
+            }
+        }
+        for (j=0; j < 10000000; j++) {
+            retry = rtsFalse;
+            for (i=0; i < n_threads; i++) {
+                if (i == me) continue;
+                write_barrier();
+                setContextSwitches();
+                if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+                    retry = rtsTrue;
+                }
+            }
+            if (!retry) break;
+        }
+    }
 #endif
+}
 
 static void
 start_gc_threads (void)
 {
 #if defined(THREADED_RTS)
-    nat i;
-    OSThreadId id;
-    static rtsBool done = rtsFalse;
-
     gc_running_threads = 0;
     initMutex(&gc_running_mutex);
-
-    if (!done) {
-       // Start from 1: the main thread is 0
-       for (i = 1; i < RtsFlags.ParFlags.gcThreads; i++) {
-           createOSThread(&id, (OSThreadProc*)&gc_thread_entry, 
-                          gc_threads[i]);
-       }
-       done = rtsTrue;
-    }
 #endif
 }
 
 static void
-wakeup_gc_threads (nat n_threads USED_IF_THREADS)
+wakeup_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
     nat i;
-    for (i=1; i < n_threads; i++) {
+    for (i=0; i < n_threads; i++) {
+        if (i == me) continue;
        inc_running();
         debugTrace(DEBUG_gc, "waking up gc thread %d", i);
-        do {
-            ACQUIRE_LOCK(&gc_threads[i]->wake_mutex);
-            if (gc_threads[i]->wakeup) {
-                RELEASE_LOCK(&gc_threads[i]->wake_mutex);
-                continue;
-            } else {
-                break;
-            }
-        } while (1);
-       gc_threads[i]->wakeup = rtsTrue;
-       signalCondition(&gc_threads[i]->wake_cond);
-       RELEASE_LOCK(&gc_threads[i]->wake_mutex);
+        if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) barf("wakeup_gc_threads");
+
+       gc_threads[i]->wakeup = GC_THREAD_RUNNING;
+        ACQUIRE_SPIN_LOCK(&gc_threads[i]->mut_spin);
+        RELEASE_SPIN_LOCK(&gc_threads[i]->gc_spin);
     }
 #endif
 }
@@ -1105,18 +1133,29 @@ wakeup_gc_threads (nat n_threads USED_IF_THREADS)
 // standby state, otherwise they may still be executing inside
 // any_work(), and may even remain awake until the next GC starts.
 static void
-shutdown_gc_threads (nat n_threads USED_IF_THREADS)
+shutdown_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+    nat i;
+    for (i=0; i < n_threads; i++) {
+        if (i == me) continue;
+        while (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) { write_barrier(); }
+    }
+#endif
+}
+
+static void
+continue_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
     nat i;
-    rtsBool wakeup;
-    for (i=1; i < n_threads; i++) {
-        do {
-            ACQUIRE_LOCK(&gc_threads[i]->wake_mutex);
-            wakeup = gc_threads[i]->wakeup;
-            // wakeup is false while the thread is waiting
-            RELEASE_LOCK(&gc_threads[i]->wake_mutex);
-        } while (wakeup);
+    for (i=0; i < n_threads; i++) {
+        if (i == me) continue;
+        if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) barf("continue_gc_threads");
+        
+        gc_threads[i]->wakeup = GC_THREAD_INACTIVE;
+        ACQUIRE_SPIN_LOCK(&gc_threads[i]->gc_spin);
+        RELEASE_SPIN_LOCK(&gc_threads[i]->mut_spin);
     }
 #endif
 }
@@ -1262,11 +1301,21 @@ init_collected_gen (nat g, nat n_threads)
 static void
 init_uncollected_gen (nat g, nat threads)
 {
-    nat s, t, i;
+    nat s, t, n;
     step_workspace *ws;
     step *stp;
     bdescr *bd;
 
+    // save the current mutable lists for this generation, and
+    // allocate a fresh block for each one.  We'll traverse these
+    // mutable lists as roots early on in the GC.
+    generations[g].saved_mut_list = generations[g].mut_list;
+    generations[g].mut_list = allocBlock(); 
+    for (n = 0; n < n_capabilities; n++) {
+        capabilities[n].saved_mut_lists[g] = capabilities[n].mut_lists[g];
+        capabilities[n].mut_lists[g] = allocBlock();
+    }
+
     for (s = 0; s < generations[g].n_steps; s++) {
        stp = &generations[g].steps[s];
        stp->scavenged_large_objects = NULL;
@@ -1327,19 +1376,6 @@ init_uncollected_gen (nat g, nat threads)
             if (t == n_gc_threads) t = 0;
         }
     }
-
-
-    // Move the private mutable lists from each capability onto the
-    // main mutable list for the generation.
-    for (i = 0; i < n_capabilities; i++) {
-       for (bd = capabilities[i].mut_lists[g]; 
-            bd->link != NULL; bd = bd->link) {
-           /* nothing */
-       }
-       bd->link = generations[g].mut_list;
-       generations[g].mut_list = capabilities[i].mut_lists[g];
-       capabilities[i].mut_lists[g] = allocBlock();
-    }
 }
 
 /* -----------------------------------------------------------------------------
@@ -1352,6 +1388,7 @@ init_gc_thread (gc_thread *t)
     t->static_objects = END_OF_STATIC_LIST;
     t->scavenged_static_objects = END_OF_STATIC_LIST;
     t->scan_bd = NULL;
+    t->mut_lists = capabilities[t->thread_index].mut_lists;
     t->evac_step = 0;
     t->failed_to_evac = rtsFalse;
     t->eager_promotion = rtsTrue;