Use mutator threads to do GC, instead of having a separate pool of GC threads
authorSimon Marlow <marlowsd@gmail.com>
Fri, 21 Nov 2008 15:12:33 +0000 (15:12 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Fri, 21 Nov 2008 15:12:33 +0000 (15:12 +0000)
Previously, the GC had its own pool of threads to use as workers when
doing parallel GC.  There was a "leader", which was the mutator thread
that initiated the GC, and the other threads were taken from the pool.

This was simple and worked fine for sequential programs, where we did
most of the benchmarking for the parallel GC, but falls down for
parallel programs.  When we have N mutator threads and N cores, at GC
time we would have to stop N-1 mutator threads and start up N-1 GC
threads, and hope that the OS schedules them all onto separate cores.
It practice it doesn't, as you might expect.

Now we use the mutator threads to do GC.  This works quite nicely,
particularly for parallel programs, where each mutator thread scans
its own spark pool, which is probably in its cache anyway.

There are some flag changes:

  -g<n> is removed (-g1 is still accepted for backwards compat).
  There's no way to have a different number of GC threads than mutator
  threads now.

  -q1       Use one OS thread for GC (turns off parallel GC)
  -qg<n>    Use parallel GC for generations >= <n> (default: 1)

Using parallel GC only for generations >=1 works well for sequential
programs.  Compiling an ordinary sequential program with -threaded and
running it with -N2 or more should help if you do a lot of GC.  I've
found that adding -qg0 (do parallel GC for generation 0 too) speeds up
some parallel programs, but slows down some sequential programs.
Being conservative, I left the threshold at 1.

ToDo: document the new options.

includes/RtsFlags.h
includes/Storage.h
rts/Capability.c
rts/Capability.h
rts/RtsFlags.c
rts/Schedule.c
rts/Stats.c
rts/sm/GC.c
rts/sm/GC.h
rts/sm/GCThread.h
rts/sm/Storage.c

index 55b00bb..e14c940 100644 (file)
@@ -179,7 +179,9 @@ struct PAR_FLAGS {
   rtsBool        migrate;        /* migrate threads between capabilities */
   rtsBool        wakeupMigrate;  /* migrate a thread on wakeup */
   unsigned int  maxLocalSparks;
-  nat            gcThreads;      /* number of threads for parallel GC */
+  rtsBool        parGcEnabled;   /* enable parallel GC */
+  rtsBool        parGcGen;       /* do parallel GC in this generation
+                                  * and higher only */
 };
 #endif /* THREADED_RTS */
 
index d431298..0a7aae6 100644 (file)
@@ -220,7 +220,7 @@ extern bdescr * splitLargeBlock (bdescr *bd, nat blocks);
 
    -------------------------------------------------------------------------- */
 
-extern void GarbageCollect(rtsBool force_major_gc);
+extern void GarbageCollect(rtsBool force_major_gc, nat gc_type, Capability *cap);
 
 /* -----------------------------------------------------------------------------
    Generational garbage collection support
index 8dddbc5..7c6ceb5 100644 (file)
@@ -26,6 +26,7 @@
 #include "Schedule.h"
 #include "Sparks.h"
 #include "Trace.h"
+#include "GC.h"
 
 // one global capability, this is the Capability for non-threaded
 // builds, and for +RTS -N1
@@ -190,6 +191,7 @@ initCapability( Capability *cap, nat i )
 
     cap->no = i;
     cap->in_haskell        = rtsFalse;
+    cap->in_gc             = rtsFalse;
 
     cap->run_queue_hd      = END_TSO_QUEUE;
     cap->run_queue_tl      = END_TSO_QUEUE;
@@ -358,14 +360,7 @@ releaseCapability_ (Capability* cap,
        return;
     }
 
-    /* if waiting_for_gc was the reason to release the cap: thread
-       comes from yieldCap->releaseAndQueueWorker. Unconditionally set
-       cap. free and return (see default after the if-protected other
-       special cases). Thread will wait on cond.var and re-acquire the
-       same cap after GC (GC-triggering cap. calls releaseCap and
-       enters the spare_workers case)
-    */
-    if (waiting_for_gc) {
+    if (waiting_for_gc == PENDING_GC_SEQ) {
       last_free_capability = cap; // needed?
       trace(TRACE_sched | DEBUG_sched, 
            "GC pending, set capability %d free", cap->no);
@@ -557,6 +552,12 @@ yieldCapability (Capability** pCap, Task *task)
 {
     Capability *cap = *pCap;
 
+    if (waiting_for_gc == PENDING_GC_PAR) {
+       debugTrace(DEBUG_sched, "capability %d: becoming a GC thread", cap->no);
+        gcWorkerThread(cap);
+        return;
+    }
+
        debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
 
        // We must now release the capability and wait to be woken up
@@ -655,58 +656,21 @@ wakeupThreadOnCapability (Capability *my_cap,
 }
 
 /* ----------------------------------------------------------------------------
- * prodCapabilities
+ * prodCapability
  *
- * Used to indicate that the interrupted flag is now set, or some
- * other global condition that might require waking up a Task on each
- * Capability.
- * ------------------------------------------------------------------------- */
-
-static void
-prodCapabilities(rtsBool all)
-{
-    nat i;
-    Capability *cap;
-    Task *task;
-
-    for (i=0; i < n_capabilities; i++) {
-       cap = &capabilities[i];
-       ACQUIRE_LOCK(&cap->lock);
-       if (!cap->running_task) {
-           if (cap->spare_workers) {
-               trace(TRACE_sched, "resuming capability %d", cap->no);
-               task = cap->spare_workers;
-               ASSERT(!task->stopped);
-               giveCapabilityToTask(cap,task);
-               if (!all) {
-                   RELEASE_LOCK(&cap->lock);
-                   return;
-               }
-           }
-       }
-       RELEASE_LOCK(&cap->lock);
-    }
-    return;
-}
-
-void
-prodAllCapabilities (void)
-{
-    prodCapabilities(rtsTrue);
-}
-
-/* ----------------------------------------------------------------------------
- * prodOneCapability
- *
- * Like prodAllCapabilities, but we only require a single Task to wake
- * up in order to service some global event, such as checking for
- * deadlock after some idle time has passed.
+ * If a Capability is currently idle, wake up a Task on it.  Used to 
+ * get every Capability into the GC.
  * ------------------------------------------------------------------------- */
 
 void
-prodOneCapability (void)
+prodCapability (Capability *cap, Task *task)
 {
-    prodCapabilities(rtsFalse);
+    ACQUIRE_LOCK(&cap->lock);
+    if (!cap->running_task) {
+        cap->running_task = task;
+        releaseCapability_(cap,rtsTrue);
+    }
+    RELEASE_LOCK(&cap->lock);
 }
 
 /* ----------------------------------------------------------------------------
index 8954578..478b0f1 100644 (file)
@@ -50,6 +50,9 @@ struct Capability_ {
     // catching unsafe call-ins.
     rtsBool in_haskell;
 
+    // true if this Capability is currently in the GC
+    rtsBool in_gc;
+
     // The run queue.  The Task owning this Capability has exclusive
     // access to its run queue, so can wake up threads without
     // taking a lock, and the common path through the scheduler is
@@ -191,6 +194,8 @@ extern Capability *capabilities;
 extern Capability *last_free_capability;
 
 // GC indicator, in scope for the scheduler
+#define PENDING_GC_SEQ 1
+#define PENDING_GC_PAR 2
 extern volatile StgWord waiting_for_gc;
 
 // Acquires a capability at a return point.  If *cap is non-NULL, then
@@ -237,6 +242,7 @@ void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
 // need to service some global event.
 //
 void prodOneCapability (void);
+void prodCapability (Capability *cap, Task *task);
 
 // Similar to prodOneCapability(), but prods all of them.
 //
index 1cbd569..cce2b28 100644 (file)
@@ -214,7 +214,8 @@ void initRtsFlagsDefaults(void)
     RtsFlags.ParFlags.nNodes           = 1;
     RtsFlags.ParFlags.migrate           = rtsTrue;
     RtsFlags.ParFlags.wakeupMigrate     = rtsFalse;
-    RtsFlags.ParFlags.gcThreads         = 1;
+    RtsFlags.ParFlags.parGcEnabled      = 1;
+    RtsFlags.ParFlags.parGcGen          = 1;
 #endif
 
 #ifdef PAR
@@ -450,8 +451,9 @@ usage_text[] = {
 "",
 #endif /* DEBUG */
 #if defined(THREADED_RTS) && !defined(NOSMP)
-"  -N<n>     Use <n> OS threads (default: 1) (also sets -g)",
-"  -g<n>     Use <n> OS threads for GC (default: 1)",
+"  -N<n>     Use <n> OS threads (default: 1)",
+"  -q1       Use one OS thread for GC (turns off parallel GC)",
+"  -qg<n>    Use parallel GC only for generations >= <n> (default: 1)",
 "  -qm       Don't automatically migrate threads between CPUs",
 "  -qw       Migrate a thread to the current CPU when it is woken up",
 #endif
@@ -1132,8 +1134,6 @@ error = rtsTrue;
                if (rts_argv[arg][2] != '\0') {
                    RtsFlags.ParFlags.nNodes
                      = strtol(rts_argv[arg]+2, (char **) NULL, 10);
-                    // set -g at the same time as -N by default
-                   RtsFlags.ParFlags.gcThreads = RtsFlags.ParFlags.nNodes;
                    if (RtsFlags.ParFlags.nNodes <= 0) {
                      errorBelch("bad value for -N");
                      error = rtsTrue;
@@ -1149,15 +1149,17 @@ error = rtsTrue;
 
              case 'g':
                THREADED_BUILD_ONLY(
-               if (rts_argv[arg][2] != '\0') {
-                   RtsFlags.ParFlags.gcThreads
-                     = strtol(rts_argv[arg]+2, (char **) NULL, 10);
-                   if (RtsFlags.ParFlags.gcThreads <= 0) {
-                     errorBelch("bad value for -g");
-                     error = rtsTrue;
-                   }
-               }
-               ) break;
+                   switch (rts_argv[arg][2]) {
+                    case '1':
+                        // backwards compat only
+                        RtsFlags.ParFlags.parGcEnabled = rtsFalse;
+                        break;
+                   default:
+                       errorBelch("unknown RTS option: %s",rts_argv[arg]);
+                       error = rtsTrue;
+                       break;
+                    }
+                    ) break;
 
              case 'q':
                    switch (rts_argv[arg][2]) {
@@ -1165,6 +1167,18 @@ error = rtsTrue;
                        errorBelch("incomplete RTS option: %s",rts_argv[arg]);
                        error = rtsTrue;
                        break;
+                    case '1':
+                        RtsFlags.ParFlags.parGcEnabled = rtsFalse;
+                        break;
+                    case 'g':
+                        if (rts_argv[arg][3] != '\0') {
+                            RtsFlags.ParFlags.parGcGen
+                                = strtol(rts_argv[arg]+3, (char **) NULL, 10);
+                        } else {
+                            errorBelch("bad value for -qg");
+                            error = rtsTrue;
+                        }
+                        break;
                    case 'm':
                        RtsFlags.ParFlags.migrate = rtsFalse;
                        break;
index 7dd0634..31a4875 100644 (file)
@@ -31,6 +31,7 @@
 #include "Updates.h"
 #include "Proftimer.h"
 #include "ProfHeap.h"
+#include "GC.h"
 
 /* PARALLEL_HASKELL includes go here */
 
@@ -1478,7 +1479,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
 #ifdef THREADED_RTS
     /* extern static volatile StgWord waiting_for_gc; 
        lives inside capability.c */
-    rtsBool was_waiting;
+    rtsBool gc_type, prev_pending_gc;
     nat i;
 #endif
 
@@ -1490,6 +1491,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     }
 
 #ifdef THREADED_RTS
+    if (sched_state < SCHED_INTERRUPTING
+        && RtsFlags.ParFlags.parGcEnabled
+        && N >= RtsFlags.ParFlags.parGcGen
+        && ! oldest_gen->steps[0].mark)
+    {
+        gc_type = PENDING_GC_PAR;
+    } else {
+        gc_type = PENDING_GC_SEQ;
+    }
+
     // In order to GC, there must be no threads running Haskell code.
     // Therefore, the GC thread needs to hold *all* the capabilities,
     // and release them after the GC has completed.  
@@ -1500,39 +1511,55 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     // actually did the GC.  But it's quite hard to arrange for all
     // the other tasks to sleep and stay asleep.
     //
-       
+
     /*  Other capabilities are prevented from running yet more Haskell
        threads if waiting_for_gc is set. Tested inside
        yieldCapability() and releaseCapability() in Capability.c */
 
-    was_waiting = cas(&waiting_for_gc, 0, 1);
-    if (was_waiting) {
+    prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
+    if (prev_pending_gc) {
        do {
-           debugTrace(DEBUG_sched, "someone else is trying to GC...");
-           if (cap) yieldCapability(&cap,task);
+           debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
+                       prev_pending_gc);
+            ASSERT(cap);
+            yieldCapability(&cap,task);
        } while (waiting_for_gc);
        return cap;  // NOTE: task->cap might have changed here
     }
 
     setContextSwitches();
-    for (i=0; i < n_capabilities; i++) {
-       debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
-       if (cap != &capabilities[i]) {
-           Capability *pcap = &capabilities[i];
-           // we better hope this task doesn't get migrated to
-           // another Capability while we're waiting for this one.
-           // It won't, because load balancing happens while we have
-           // all the Capabilities, but even so it's a slightly
-           // unsavoury invariant.
-           task->cap = pcap;
-           waitForReturnCapability(&pcap, task);
-           if (pcap != &capabilities[i]) {
-               barf("scheduleDoGC: got the wrong capability");
-           }
-       }
+
+    // The final shutdown GC is always single-threaded, because it's
+    // possible that some of the Capabilities have no worker threads.
+    
+    if (gc_type == PENDING_GC_SEQ)
+    {
+        // single-threaded GC: grab all the capabilities
+        for (i=0; i < n_capabilities; i++) {
+            debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
+            if (cap != &capabilities[i]) {
+                Capability *pcap = &capabilities[i];
+                // we better hope this task doesn't get migrated to
+                // another Capability while we're waiting for this one.
+                // It won't, because load balancing happens while we have
+                // all the Capabilities, but even so it's a slightly
+                // unsavoury invariant.
+                task->cap = pcap;
+                waitForReturnCapability(&pcap, task);
+                if (pcap != &capabilities[i]) {
+                    barf("scheduleDoGC: got the wrong capability");
+                }
+            }
+        }
     }
+    else
+    {
+        // multi-threaded GC: make sure all the Capabilities donate one
+        // GC thread each.
+        debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
 
-    waiting_for_gc = rtsFalse;
+        waitForGcThreads(cap);
+    }
 #endif
 
     // so this happens periodically:
@@ -1545,23 +1572,23 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
      * state, then we should take the opportunity to delete all the
      * threads in the system.
      */
-    if (sched_state >= SCHED_INTERRUPTING) {
-       deleteAllThreads(&capabilities[0]);
+    if (sched_state == SCHED_INTERRUPTING) {
+       deleteAllThreads(cap);
        sched_state = SCHED_SHUTTING_DOWN;
     }
     
     heap_census = scheduleNeedHeapProfile(rtsTrue);
 
-    /* everybody back, start the GC.
-     * Could do it in this thread, or signal a condition var
-     * to do it in another thread.  Either way, we need to
-     * broadcast on gc_pending_cond afterward.
-     */
 #if defined(THREADED_RTS)
     debugTrace(DEBUG_sched, "doing GC");
+    // reset waiting_for_gc *before* GC, so that when the GC threads
+    // emerge they don't immediately re-enter the GC.
+    waiting_for_gc = 0;
+    GarbageCollect(force_major || heap_census, gc_type, cap);
+#else
+    GarbageCollect(force_major || heap_census, 0, cap);
 #endif
-    GarbageCollect(force_major || heap_census);
-    
+
     if (heap_census) {
         debugTrace(DEBUG_sched, "performing heap census");
         heapCensus();
@@ -1587,12 +1614,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     }
 
 #if defined(THREADED_RTS)
-    // release our stash of capabilities.
-    for (i = 0; i < n_capabilities; i++) {
-       if (cap != &capabilities[i]) {
-           task->cap = &capabilities[i];
-           releaseCapability(&capabilities[i]);
-       }
+    if (gc_type == PENDING_GC_SEQ) {
+        // release our stash of capabilities.
+        for (i = 0; i < n_capabilities; i++) {
+            if (cap != &capabilities[i]) {
+                task->cap = &capabilities[i];
+                releaseCapability(&capabilities[i]);
+            }
+        }
     }
     if (cap) {
        task->cap = cap;
@@ -2131,7 +2160,13 @@ exitScheduler(
     // If we haven't killed all the threads yet, do it now.
     if (sched_state < SCHED_SHUTTING_DOWN) {
        sched_state = SCHED_INTERRUPTING;
-       scheduleDoGC(NULL,task,rtsFalse);    
+#if defined(THREADED_RTS)
+        waitForReturnCapability(&task->cap,task);
+       scheduleDoGC(task->cap,task,rtsFalse);    
+        releaseCapability(task->cap);
+#else
+       scheduleDoGC(&MainCapability,task,rtsFalse);    
+#endif
     }
     sched_state = SCHED_SHUTTING_DOWN;
 
@@ -2184,13 +2219,17 @@ static void
 performGC_(rtsBool force_major)
 {
     Task *task;
+
     // We must grab a new Task here, because the existing Task may be
     // associated with a particular Capability, and chained onto the 
     // suspended_ccalling_tasks queue.
     ACQUIRE_LOCK(&sched_mutex);
     task = newBoundTask();
     RELEASE_LOCK(&sched_mutex);
-    scheduleDoGC(NULL,task,force_major);
+
+    waitForReturnCapability(&task->cap,task);
+    scheduleDoGC(task->cap,task,force_major);
+    releaseCapability(task->cap);
     boundTaskExiting(task);
 }
 
index 228f0c0..9c17856 100644 (file)
@@ -613,11 +613,11 @@ stat_exit(int alloc)
            }
 
 #if defined(THREADED_RTS)
-            if (RtsFlags.ParFlags.gcThreads > 1) {
+            if (RtsFlags.ParFlags.parGcEnabled) {
                 statsPrintf("\n  Parallel GC work balance: %.2f (%ld / %ld, ideal %d)\n", 
                             (double)GC_par_avg_copied / (double)GC_par_max_copied,
                             (lnat)GC_par_avg_copied, (lnat)GC_par_max_copied,
-                            RtsFlags.ParFlags.gcThreads
+                            RtsFlags.ParFlags.nNodes
                     );
             }
 #endif
index aff3320..bf2464b 100644 (file)
@@ -138,7 +138,6 @@ DECLARE_GCT
 static void mark_root               (void *user, StgClosure **root);
 static void zero_static_object_list (StgClosure* first_static);
 static nat  initialise_N            (rtsBool force_major_gc);
-static void alloc_gc_threads        (void);
 static void init_collected_gen      (nat g, nat threads);
 static void init_uncollected_gen    (nat g, nat threads);
 static void init_gc_thread          (gc_thread *t);
@@ -149,8 +148,9 @@ static void start_gc_threads        (void);
 static void scavenge_until_all_done (void);
 static nat  inc_running             (void);
 static nat  dec_running             (void);
-static void wakeup_gc_threads       (nat n_threads);
-static void shutdown_gc_threads     (nat n_threads);
+static void wakeup_gc_threads       (nat n_threads, nat me);
+static void shutdown_gc_threads     (nat n_threads, nat me);
+static void continue_gc_threads     (nat n_threads, nat me);
 
 #if 0 && defined(DEBUG)
 static void gcCAFs                  (void);
@@ -180,7 +180,9 @@ StgPtr  oldgen_scan;
    -------------------------------------------------------------------------- */
 
 void
-GarbageCollect ( rtsBool force_major_gc )
+GarbageCollect (rtsBool force_major_gc, 
+                nat gc_type USED_IF_THREADS,
+                Capability *cap USED_IF_THREADS)
 {
   bdescr *bd;
   step *stp;
@@ -234,26 +236,24 @@ GarbageCollect ( rtsBool force_major_gc )
    */
   n = initialise_N(force_major_gc);
 
-  /* Allocate + initialise the gc_thread structures.
-   */
-  alloc_gc_threads();
-
   /* Start threads, so they can be spinning up while we finish initialisation.
    */
   start_gc_threads();
 
+#if defined(THREADED_RTS)
   /* How many threads will be participating in this GC?
-   * We don't try to parallelise minor GC, or mark/compact/sweep GC.
+   * We don't try to parallelise minor GCs (unless the user asks for
+   * it with +RTS -gn0), or mark/compact/sweep GC.
    */
-#if defined(THREADED_RTS)
-  if (n < (4*1024*1024 / BLOCK_SIZE) || oldest_gen->steps[0].mark) {
-      n_gc_threads = 1;
+  if (gc_type == PENDING_GC_PAR) {
+      n_gc_threads = RtsFlags.ParFlags.nNodes;
   } else {
-      n_gc_threads = RtsFlags.ParFlags.gcThreads;
+      n_gc_threads = 1;
   }
 #else
   n_gc_threads = 1;
 #endif
+
   trace(TRACE_gc|DEBUG_gc, "GC (gen %d): %d KB to collect, %ld MB in use, using %d thread(s)",
         N, n * (BLOCK_SIZE / 1024), mblocks_allocated, n_gc_threads);
 
@@ -302,7 +302,15 @@ GarbageCollect ( rtsBool force_major_gc )
   }
 
   // this is the main thread
+#ifdef THREADED_RTS
+  if (n_gc_threads == 1) {
+      gct = gc_threads[0];
+  } else {
+      gct = gc_threads[cap->no];
+  }
+#else
   gct = gc_threads[0];
+#endif
 
   /* -----------------------------------------------------------------------
    * follow all the roots that we know about:
@@ -323,7 +331,7 @@ GarbageCollect ( rtsBool force_major_gc )
   // NB. do this after the mutable lists have been saved above, otherwise
   // the other GC threads will be writing into the old mutable lists.
   inc_running();
-  wakeup_gc_threads(n_gc_threads);
+  wakeup_gc_threads(n_gc_threads, gct->thread_index);
 
   for (g = RtsFlags.GcFlags.generations-1; g > N; g--) {
       scavenge_mutable_list(&generations[g]);
@@ -378,7 +386,7 @@ GarbageCollect ( rtsBool force_major_gc )
       break;
   }
 
-  shutdown_gc_threads(n_gc_threads);
+  shutdown_gc_threads(n_gc_threads, gct->thread_index);
 
   // Update pointers from the Task list
   update_task_list();
@@ -756,6 +764,9 @@ GarbageCollect ( rtsBool force_major_gc )
   slop = calcLiveBlocks() * BLOCK_SIZE_W - live;
   stat_endGC(allocated, live, copied, N, max_copied, avg_copied, slop);
 
+  // Guess which generation we'll collect *next* time
+  initialise_N(force_major_gc);
+
 #if defined(RTS_USER_SIGNALS)
   if (RtsFlags.MiscFlags.install_signal_handlers) {
     // unblock signals again
@@ -763,6 +774,8 @@ GarbageCollect ( rtsBool force_major_gc )
   }
 #endif
 
+  continue_gc_threads(n_gc_threads, gct->thread_index);
+
   RELEASE_SM_LOCK;
 
   gct = saved_gct;
@@ -814,6 +827,11 @@ initialise_N (rtsBool force_major_gc)
    Initialise the gc_thread structures.
    -------------------------------------------------------------------------- */
 
+#define GC_THREAD_INACTIVE             0
+#define GC_THREAD_STANDING_BY          1
+#define GC_THREAD_RUNNING              2
+#define GC_THREAD_WAITING_TO_CONTINUE  3
+
 static gc_thread *
 alloc_gc_thread (int n)
 {
@@ -826,11 +844,11 @@ alloc_gc_thread (int n)
 
 #ifdef THREADED_RTS
     t->id = 0;
-    initCondition(&t->wake_cond);
-    initMutex(&t->wake_mutex);
-    t->wakeup = rtsTrue;  // starts true, so we can wait for the
+    initSpinLock(&t->gc_spin);
+    initSpinLock(&t->mut_spin);
+    ACQUIRE_SPIN_LOCK(&t->gc_spin);
+    t->wakeup = GC_THREAD_INACTIVE;  // starts true, so we can wait for the
                           // thread to start up, see wakeup_gc_threads
-    t->exit   = rtsFalse;
 #endif
 
     t->thread_index = n;
@@ -864,17 +882,17 @@ alloc_gc_thread (int n)
 }
 
 
-static void
-alloc_gc_threads (void)
+void
+initGcThreads (void)
 {
     if (gc_threads == NULL) {
 #if defined(THREADED_RTS)
         nat i;
-       gc_threads = stgMallocBytes (RtsFlags.ParFlags.gcThreads * 
+       gc_threads = stgMallocBytes (RtsFlags.ParFlags.nNodes * 
                                     sizeof(gc_thread*), 
                                     "alloc_gc_threads");
 
-       for (i = 0; i < RtsFlags.ParFlags.gcThreads; i++) {
+       for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
            gc_threads[i] = alloc_gc_thread(i);
        }
 #else
@@ -992,113 +1010,107 @@ loop:
 }
 
 #if defined(THREADED_RTS)
-//
-// gc_thread_work(): Scavenge until there's no work left to do and all
-// the running threads are idle.
-//
-static void
-gc_thread_work (void)
+
+void
+gcWorkerThread (Capability *cap)
 {
-    // gc_running_threads has already been incremented for us; this is
-    // a worker thread and the main thread bumped gc_running_threads
-    // before waking us up.
+    cap->in_gc = rtsTrue;
+
+    gct = gc_threads[cap->no];
+    gct->id = osThreadId();
 
+    // Wait until we're told to wake up
+    RELEASE_SPIN_LOCK(&gct->mut_spin);
+    gct->wakeup = GC_THREAD_STANDING_BY;
+    debugTrace(DEBUG_gc, "GC thread %d standing by...", gct->thread_index);
+    ACQUIRE_SPIN_LOCK(&gct->gc_spin);
+    
+#ifdef USE_PAPI
+    // start performance counters in this thread...
+    if (gct->papi_events == -1) {
+        papi_init_eventset(&gct->papi_events);
+    }
+    papi_thread_start_gc1_count(gct->papi_events);
+#endif
+    
     // Every thread evacuates some roots.
     gct->evac_step = 0;
     markSomeCapabilities(mark_root, gct, gct->thread_index, n_gc_threads,
                          rtsTrue/*prune sparks*/);
 
     scavenge_until_all_done();
-}
-
-
-static void
-gc_thread_mainloop (void)
-{
-    while (!gct->exit) {
-
-       // Wait until we're told to wake up
-       ACQUIRE_LOCK(&gct->wake_mutex);
-       gct->wakeup = rtsFalse;
-       while (!gct->wakeup) {
-           debugTrace(DEBUG_gc, "GC thread %d standing by...", 
-                      gct->thread_index);
-           waitCondition(&gct->wake_cond, &gct->wake_mutex);
-       }
-       RELEASE_LOCK(&gct->wake_mutex);
-       if (gct->exit) break;
-
+    
 #ifdef USE_PAPI
-        // start performance counters in this thread...
-        if (gct->papi_events == -1) {
-            papi_init_eventset(&gct->papi_events);
-        }
-        papi_thread_start_gc1_count(gct->papi_events);
+    // count events in this thread towards the GC totals
+    papi_thread_stop_gc1_count(gct->papi_events);
 #endif
 
-       gc_thread_work();
+    // Wait until we're told to continue
+    RELEASE_SPIN_LOCK(&gct->gc_spin);
+    gct->wakeup = GC_THREAD_WAITING_TO_CONTINUE;
+    debugTrace(DEBUG_gc, "GC thread %d waiting to continue...", 
+               gct->thread_index);
+    ACQUIRE_SPIN_LOCK(&gct->mut_spin);
+    debugTrace(DEBUG_gc, "GC thread %d on my way...", gct->thread_index);
+}
 
-#ifdef USE_PAPI
-        // count events in this thread towards the GC totals
-        papi_thread_stop_gc1_count(gct->papi_events);
-#endif
-    }
-}      
 #endif
 
-#if defined(THREADED_RTS)
-static void
-gc_thread_entry (gc_thread *my_gct)
+void
+waitForGcThreads (Capability *cap USED_IF_THREADS)
 {
-    gct = my_gct;
-    debugTrace(DEBUG_gc, "GC thread %d starting...", gct->thread_index);
-    gct->id = osThreadId();
-    gc_thread_mainloop();
-}
+#if defined(THREADED_RTS)
+    nat n_threads = RtsFlags.ParFlags.nNodes;
+    nat me = cap->no;
+    nat i, j;
+    rtsBool retry = rtsTrue;
+
+    while(retry) {
+        for (i=0; i < n_threads; i++) {
+            if (i == me) continue;
+            if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+                prodCapability(&capabilities[i], cap->running_task);
+            }
+        }
+        for (j=0; j < 10000000; j++) {
+            retry = rtsFalse;
+            for (i=0; i < n_threads; i++) {
+                if (i == me) continue;
+                write_barrier();
+                setContextSwitches();
+                if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+                    retry = rtsTrue;
+                }
+            }
+            if (!retry) break;
+        }
+    }
 #endif
+}
 
 static void
 start_gc_threads (void)
 {
 #if defined(THREADED_RTS)
-    nat i;
-    OSThreadId id;
-    static rtsBool done = rtsFalse;
-
     gc_running_threads = 0;
     initMutex(&gc_running_mutex);
-
-    if (!done) {
-       // Start from 1: the main thread is 0
-       for (i = 1; i < RtsFlags.ParFlags.gcThreads; i++) {
-           createOSThread(&id, (OSThreadProc*)&gc_thread_entry, 
-                          gc_threads[i]);
-       }
-       done = rtsTrue;
-    }
 #endif
 }
 
 static void
-wakeup_gc_threads (nat n_threads USED_IF_THREADS)
+wakeup_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
     nat i;
-    for (i=1; i < n_threads; i++) {
+    for (i=0; i < n_threads; i++) {
+        if (i == me) continue;
        inc_running();
         debugTrace(DEBUG_gc, "waking up gc thread %d", i);
-        do {
-            ACQUIRE_LOCK(&gc_threads[i]->wake_mutex);
-            if (gc_threads[i]->wakeup) {
-                RELEASE_LOCK(&gc_threads[i]->wake_mutex);
-                continue;
-            } else {
-                break;
-            }
-        } while (1);
-       gc_threads[i]->wakeup = rtsTrue;
-       signalCondition(&gc_threads[i]->wake_cond);
-       RELEASE_LOCK(&gc_threads[i]->wake_mutex);
+        if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) barf("wakeup_gc_threads");
+
+       gc_threads[i]->wakeup = GC_THREAD_RUNNING;
+        ACQUIRE_SPIN_LOCK(&gc_threads[i]->mut_spin);
+        RELEASE_SPIN_LOCK(&gc_threads[i]->gc_spin);
     }
 #endif
 }
@@ -1107,18 +1119,29 @@ wakeup_gc_threads (nat n_threads USED_IF_THREADS)
 // standby state, otherwise they may still be executing inside
 // any_work(), and may even remain awake until the next GC starts.
 static void
-shutdown_gc_threads (nat n_threads USED_IF_THREADS)
+shutdown_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
     nat i;
-    rtsBool wakeup;
-    for (i=1; i < n_threads; i++) {
-        do {
-            ACQUIRE_LOCK(&gc_threads[i]->wake_mutex);
-            wakeup = gc_threads[i]->wakeup;
-            // wakeup is false while the thread is waiting
-            RELEASE_LOCK(&gc_threads[i]->wake_mutex);
-        } while (wakeup);
+    for (i=0; i < n_threads; i++) {
+        if (i == me) continue;
+        while (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) { write_barrier(); }
+    }
+#endif
+}
+
+static void
+continue_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+    nat i;
+    for (i=0; i < n_threads; i++) {
+        if (i == me) continue;
+        if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) barf("continue_gc_threads");
+        
+        gc_threads[i]->wakeup = GC_THREAD_INACTIVE;
+        ACQUIRE_SPIN_LOCK(&gc_threads[i]->gc_spin);
+        RELEASE_SPIN_LOCK(&gc_threads[i]->mut_spin);
     }
 #endif
 }
index 6331320..5fb142f 100644 (file)
@@ -40,6 +40,10 @@ extern SpinLock gc_alloc_block_sync;
 extern StgWord64 whitehole_spin;
 #endif
 
+void gcWorkerThread (Capability *cap);
+void initGcThreads (void);
+void waitForGcThreads (Capability *cap);
+
 #define WORK_UNIT_WORDS 128
 
 #endif /* GC_H */
index 1b5c5d4..d6af2b1 100644 (file)
@@ -113,10 +113,9 @@ typedef struct step_workspace_ {
 typedef struct gc_thread_ {
 #ifdef THREADED_RTS
     OSThreadId id;                 // The OS thread that this struct belongs to
-    Mutex      wake_mutex;
-    Condition  wake_cond;          // So we can go to sleep between GCs
-    rtsBool    wakeup;
-    rtsBool    exit;
+    SpinLock   gc_spin;
+    SpinLock   mut_spin;
+    volatile rtsBool wakeup;
 #endif
     nat thread_index;              // a zero based index identifying the thread
 
index 6c45cbe..bf7c452 100644 (file)
@@ -276,6 +276,10 @@ initStorage( void )
   whitehole_spin = 0;
 #endif
 
+  N = 0;
+
+  initGcThreads();
+
   IF_DEBUG(gc, statDescribeGens());
 
   RELEASE_SM_LOCK;