Use mutator threads to do GC, instead of having a separate pool of GC threads
[ghc-hetmet.git] / rts / sm / GC.c
index aff3320..bf2464b 100644 (file)
@@ -138,7 +138,6 @@ DECLARE_GCT
 static void mark_root               (void *user, StgClosure **root);
 static void zero_static_object_list (StgClosure* first_static);
 static nat  initialise_N            (rtsBool force_major_gc);
-static void alloc_gc_threads        (void);
 static void init_collected_gen      (nat g, nat threads);
 static void init_uncollected_gen    (nat g, nat threads);
 static void init_gc_thread          (gc_thread *t);
@@ -149,8 +148,9 @@ static void start_gc_threads        (void);
 static void scavenge_until_all_done (void);
 static nat  inc_running             (void);
 static nat  dec_running             (void);
-static void wakeup_gc_threads       (nat n_threads);
-static void shutdown_gc_threads     (nat n_threads);
+static void wakeup_gc_threads       (nat n_threads, nat me);
+static void shutdown_gc_threads     (nat n_threads, nat me);
+static void continue_gc_threads     (nat n_threads, nat me);
 
 #if 0 && defined(DEBUG)
 static void gcCAFs                  (void);
@@ -180,7 +180,9 @@ StgPtr  oldgen_scan;
    -------------------------------------------------------------------------- */
 
 void
-GarbageCollect ( rtsBool force_major_gc )
+GarbageCollect (rtsBool force_major_gc, 
+                nat gc_type USED_IF_THREADS,
+                Capability *cap USED_IF_THREADS)
 {
   bdescr *bd;
   step *stp;
@@ -234,26 +236,24 @@ GarbageCollect ( rtsBool force_major_gc )
    */
   n = initialise_N(force_major_gc);
 
-  /* Allocate + initialise the gc_thread structures.
-   */
-  alloc_gc_threads();
-
   /* Start threads, so they can be spinning up while we finish initialisation.
    */
   start_gc_threads();
 
+#if defined(THREADED_RTS)
   /* How many threads will be participating in this GC?
-   * We don't try to parallelise minor GC, or mark/compact/sweep GC.
+   * We don't try to parallelise minor GCs (unless the user asks for
+   * it with +RTS -gn0), or mark/compact/sweep GC.
    */
-#if defined(THREADED_RTS)
-  if (n < (4*1024*1024 / BLOCK_SIZE) || oldest_gen->steps[0].mark) {
-      n_gc_threads = 1;
+  if (gc_type == PENDING_GC_PAR) {
+      n_gc_threads = RtsFlags.ParFlags.nNodes;
   } else {
-      n_gc_threads = RtsFlags.ParFlags.gcThreads;
+      n_gc_threads = 1;
   }
 #else
   n_gc_threads = 1;
 #endif
+
   trace(TRACE_gc|DEBUG_gc, "GC (gen %d): %d KB to collect, %ld MB in use, using %d thread(s)",
         N, n * (BLOCK_SIZE / 1024), mblocks_allocated, n_gc_threads);
 
@@ -302,7 +302,15 @@ GarbageCollect ( rtsBool force_major_gc )
   }
 
   // this is the main thread
+#ifdef THREADED_RTS
+  if (n_gc_threads == 1) {
+      gct = gc_threads[0];
+  } else {
+      gct = gc_threads[cap->no];
+  }
+#else
   gct = gc_threads[0];
+#endif
 
   /* -----------------------------------------------------------------------
    * follow all the roots that we know about:
@@ -323,7 +331,7 @@ GarbageCollect ( rtsBool force_major_gc )
   // NB. do this after the mutable lists have been saved above, otherwise
   // the other GC threads will be writing into the old mutable lists.
   inc_running();
-  wakeup_gc_threads(n_gc_threads);
+  wakeup_gc_threads(n_gc_threads, gct->thread_index);
 
   for (g = RtsFlags.GcFlags.generations-1; g > N; g--) {
       scavenge_mutable_list(&generations[g]);
@@ -378,7 +386,7 @@ GarbageCollect ( rtsBool force_major_gc )
       break;
   }
 
-  shutdown_gc_threads(n_gc_threads);
+  shutdown_gc_threads(n_gc_threads, gct->thread_index);
 
   // Update pointers from the Task list
   update_task_list();
@@ -756,6 +764,9 @@ GarbageCollect ( rtsBool force_major_gc )
   slop = calcLiveBlocks() * BLOCK_SIZE_W - live;
   stat_endGC(allocated, live, copied, N, max_copied, avg_copied, slop);
 
+  // Guess which generation we'll collect *next* time
+  initialise_N(force_major_gc);
+
 #if defined(RTS_USER_SIGNALS)
   if (RtsFlags.MiscFlags.install_signal_handlers) {
     // unblock signals again
@@ -763,6 +774,8 @@ GarbageCollect ( rtsBool force_major_gc )
   }
 #endif
 
+  continue_gc_threads(n_gc_threads, gct->thread_index);
+
   RELEASE_SM_LOCK;
 
   gct = saved_gct;
@@ -814,6 +827,11 @@ initialise_N (rtsBool force_major_gc)
    Initialise the gc_thread structures.
    -------------------------------------------------------------------------- */
 
+#define GC_THREAD_INACTIVE             0
+#define GC_THREAD_STANDING_BY          1
+#define GC_THREAD_RUNNING              2
+#define GC_THREAD_WAITING_TO_CONTINUE  3
+
 static gc_thread *
 alloc_gc_thread (int n)
 {
@@ -826,11 +844,11 @@ alloc_gc_thread (int n)
 
 #ifdef THREADED_RTS
     t->id = 0;
-    initCondition(&t->wake_cond);
-    initMutex(&t->wake_mutex);
-    t->wakeup = rtsTrue;  // starts true, so we can wait for the
+    initSpinLock(&t->gc_spin);
+    initSpinLock(&t->mut_spin);
+    ACQUIRE_SPIN_LOCK(&t->gc_spin);
+    t->wakeup = GC_THREAD_INACTIVE;  // starts true, so we can wait for the
                           // thread to start up, see wakeup_gc_threads
-    t->exit   = rtsFalse;
 #endif
 
     t->thread_index = n;
@@ -864,17 +882,17 @@ alloc_gc_thread (int n)
 }
 
 
-static void
-alloc_gc_threads (void)
+void
+initGcThreads (void)
 {
     if (gc_threads == NULL) {
 #if defined(THREADED_RTS)
         nat i;
-       gc_threads = stgMallocBytes (RtsFlags.ParFlags.gcThreads * 
+       gc_threads = stgMallocBytes (RtsFlags.ParFlags.nNodes * 
                                     sizeof(gc_thread*), 
                                     "alloc_gc_threads");
 
-       for (i = 0; i < RtsFlags.ParFlags.gcThreads; i++) {
+       for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
            gc_threads[i] = alloc_gc_thread(i);
        }
 #else
@@ -992,113 +1010,107 @@ loop:
 }
 
 #if defined(THREADED_RTS)
-//
-// gc_thread_work(): Scavenge until there's no work left to do and all
-// the running threads are idle.
-//
-static void
-gc_thread_work (void)
+
+void
+gcWorkerThread (Capability *cap)
 {
-    // gc_running_threads has already been incremented for us; this is
-    // a worker thread and the main thread bumped gc_running_threads
-    // before waking us up.
+    cap->in_gc = rtsTrue;
+
+    gct = gc_threads[cap->no];
+    gct->id = osThreadId();
 
+    // Wait until we're told to wake up
+    RELEASE_SPIN_LOCK(&gct->mut_spin);
+    gct->wakeup = GC_THREAD_STANDING_BY;
+    debugTrace(DEBUG_gc, "GC thread %d standing by...", gct->thread_index);
+    ACQUIRE_SPIN_LOCK(&gct->gc_spin);
+    
+#ifdef USE_PAPI
+    // start performance counters in this thread...
+    if (gct->papi_events == -1) {
+        papi_init_eventset(&gct->papi_events);
+    }
+    papi_thread_start_gc1_count(gct->papi_events);
+#endif
+    
     // Every thread evacuates some roots.
     gct->evac_step = 0;
     markSomeCapabilities(mark_root, gct, gct->thread_index, n_gc_threads,
                          rtsTrue/*prune sparks*/);
 
     scavenge_until_all_done();
-}
-
-
-static void
-gc_thread_mainloop (void)
-{
-    while (!gct->exit) {
-
-       // Wait until we're told to wake up
-       ACQUIRE_LOCK(&gct->wake_mutex);
-       gct->wakeup = rtsFalse;
-       while (!gct->wakeup) {
-           debugTrace(DEBUG_gc, "GC thread %d standing by...", 
-                      gct->thread_index);
-           waitCondition(&gct->wake_cond, &gct->wake_mutex);
-       }
-       RELEASE_LOCK(&gct->wake_mutex);
-       if (gct->exit) break;
-
+    
 #ifdef USE_PAPI
-        // start performance counters in this thread...
-        if (gct->papi_events == -1) {
-            papi_init_eventset(&gct->papi_events);
-        }
-        papi_thread_start_gc1_count(gct->papi_events);
+    // count events in this thread towards the GC totals
+    papi_thread_stop_gc1_count(gct->papi_events);
 #endif
 
-       gc_thread_work();
+    // Wait until we're told to continue
+    RELEASE_SPIN_LOCK(&gct->gc_spin);
+    gct->wakeup = GC_THREAD_WAITING_TO_CONTINUE;
+    debugTrace(DEBUG_gc, "GC thread %d waiting to continue...", 
+               gct->thread_index);
+    ACQUIRE_SPIN_LOCK(&gct->mut_spin);
+    debugTrace(DEBUG_gc, "GC thread %d on my way...", gct->thread_index);
+}
 
-#ifdef USE_PAPI
-        // count events in this thread towards the GC totals
-        papi_thread_stop_gc1_count(gct->papi_events);
-#endif
-    }
-}      
 #endif
 
-#if defined(THREADED_RTS)
-static void
-gc_thread_entry (gc_thread *my_gct)
+void
+waitForGcThreads (Capability *cap USED_IF_THREADS)
 {
-    gct = my_gct;
-    debugTrace(DEBUG_gc, "GC thread %d starting...", gct->thread_index);
-    gct->id = osThreadId();
-    gc_thread_mainloop();
-}
+#if defined(THREADED_RTS)
+    nat n_threads = RtsFlags.ParFlags.nNodes;
+    nat me = cap->no;
+    nat i, j;
+    rtsBool retry = rtsTrue;
+
+    while(retry) {
+        for (i=0; i < n_threads; i++) {
+            if (i == me) continue;
+            if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+                prodCapability(&capabilities[i], cap->running_task);
+            }
+        }
+        for (j=0; j < 10000000; j++) {
+            retry = rtsFalse;
+            for (i=0; i < n_threads; i++) {
+                if (i == me) continue;
+                write_barrier();
+                setContextSwitches();
+                if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+                    retry = rtsTrue;
+                }
+            }
+            if (!retry) break;
+        }
+    }
 #endif
+}
 
 static void
 start_gc_threads (void)
 {
 #if defined(THREADED_RTS)
-    nat i;
-    OSThreadId id;
-    static rtsBool done = rtsFalse;
-
     gc_running_threads = 0;
     initMutex(&gc_running_mutex);
-
-    if (!done) {
-       // Start from 1: the main thread is 0
-       for (i = 1; i < RtsFlags.ParFlags.gcThreads; i++) {
-           createOSThread(&id, (OSThreadProc*)&gc_thread_entry, 
-                          gc_threads[i]);
-       }
-       done = rtsTrue;
-    }
 #endif
 }
 
 static void
-wakeup_gc_threads (nat n_threads USED_IF_THREADS)
+wakeup_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
     nat i;
-    for (i=1; i < n_threads; i++) {
+    for (i=0; i < n_threads; i++) {
+        if (i == me) continue;
        inc_running();
         debugTrace(DEBUG_gc, "waking up gc thread %d", i);
-        do {
-            ACQUIRE_LOCK(&gc_threads[i]->wake_mutex);
-            if (gc_threads[i]->wakeup) {
-                RELEASE_LOCK(&gc_threads[i]->wake_mutex);
-                continue;
-            } else {
-                break;
-            }
-        } while (1);
-       gc_threads[i]->wakeup = rtsTrue;
-       signalCondition(&gc_threads[i]->wake_cond);
-       RELEASE_LOCK(&gc_threads[i]->wake_mutex);
+        if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) barf("wakeup_gc_threads");
+
+       gc_threads[i]->wakeup = GC_THREAD_RUNNING;
+        ACQUIRE_SPIN_LOCK(&gc_threads[i]->mut_spin);
+        RELEASE_SPIN_LOCK(&gc_threads[i]->gc_spin);
     }
 #endif
 }
@@ -1107,18 +1119,29 @@ wakeup_gc_threads (nat n_threads USED_IF_THREADS)
 // standby state, otherwise they may still be executing inside
 // any_work(), and may even remain awake until the next GC starts.
 static void
-shutdown_gc_threads (nat n_threads USED_IF_THREADS)
+shutdown_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
     nat i;
-    rtsBool wakeup;
-    for (i=1; i < n_threads; i++) {
-        do {
-            ACQUIRE_LOCK(&gc_threads[i]->wake_mutex);
-            wakeup = gc_threads[i]->wakeup;
-            // wakeup is false while the thread is waiting
-            RELEASE_LOCK(&gc_threads[i]->wake_mutex);
-        } while (wakeup);
+    for (i=0; i < n_threads; i++) {
+        if (i == me) continue;
+        while (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) { write_barrier(); }
+    }
+#endif
+}
+
+static void
+continue_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+    nat i;
+    for (i=0; i < n_threads; i++) {
+        if (i == me) continue;
+        if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) barf("continue_gc_threads");
+        
+        gc_threads[i]->wakeup = GC_THREAD_INACTIVE;
+        ACQUIRE_SPIN_LOCK(&gc_threads[i]->gc_spin);
+        RELEASE_SPIN_LOCK(&gc_threads[i]->mut_spin);
     }
 #endif
 }