[project @ 2005-10-27 15:26:06 by simonmar]
authorsimonmar <unknown>
Thu, 27 Oct 2005 15:26:06 +0000 (15:26 +0000)
committersimonmar <unknown>
Thu, 27 Oct 2005 15:26:06 +0000 (15:26 +0000)
- Very simple work-sharing amongst Capabilities: whenever a Capability
  detects that it has more than 1 thread in its run queue, it runs
  around looking for empty Capabilities, and shares the threads on its
  run queue equally with the free Capabilities it finds.

- unlock the garbage collector's mutable lists, by having private
  mutable lists per capability (and per generation).  The private
  mutable lists are moved onto the main mutable lists at each GC.
  This pulls the old-generation update code out of the storage manager
  mutex, which is one of the last remaining causes of (alleged) contention.

- Fix some problems with synchronising when a GC is required.  We should
  synchronise quicker now.

ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/GC.c
ghc/rts/Schedule.c
ghc/rts/Storage.c
ghc/rts/Updates.h [moved from ghc/includes/Updates.h with 98% similarity]

index cb921d1..7aacc24 100644 (file)
@@ -142,6 +142,10 @@ initCapability( Capability *cap, nat i )
 
     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
     cap->f.stgGCFun        = (F_)__stg_gc_fun;
+
+    cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) * 
+                                    RtsFlags.GcFlags.generations,
+                                    "initCapability");
 }
 
 /* ---------------------------------------------------------------------------
@@ -572,6 +576,29 @@ shutdownCapability (Capability *cap, Task *task)
     // list are both empty.
 }
 
+/* ----------------------------------------------------------------------------
+ * tryGrabCapability
+ *
+ * Attempt to gain control of a Capability if it is free.
+ * 
+ * ------------------------------------------------------------------------- */
+
+rtsBool
+tryGrabCapability (Capability *cap, Task *task)
+{
+    if (cap->running_task != NULL) return rtsFalse;
+    ACQUIRE_LOCK(&cap->lock);
+    if (cap->running_task != NULL) {
+       RELEASE_LOCK(&cap->lock);
+       return rtsFalse;
+    }
+    task->cap = cap;
+    cap->running_task = task;
+    RELEASE_LOCK(&cap->lock);
+    return rtsTrue;
+}
+
+
 #endif /* THREADED_RTS */
 
 
index 3ebb9f0..f9ae894 100644 (file)
@@ -60,6 +60,12 @@ struct Capability_ {
     // this list.
     Task *suspended_ccalling_tasks;
 
+    // One mutable list per generation, so we don't need to take any
+    // locks when updating an old-generation thunk.  These
+    // mini-mut-lists are moved onto the respective gen->mut_list at
+    // each GC.
+    bdescr **mut_lists;
+
 #if defined(THREADED_RTS)
     // Worker Tasks waiting in the wings.  Singly-linked.
     Task *spare_workers;
@@ -146,6 +152,8 @@ extern Capability *last_free_capability;
 //
 void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
 
+INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
+
 #if defined(THREADED_RTS)
 
 // Gives up the current capability IFF there is a higher-priority
@@ -181,6 +189,10 @@ void prodAllCapabilities (void);
 //
 void shutdownCapability (Capability *cap, Task *task);
 
+// Attempt to gain control of a Capability if it is free.
+//
+rtsBool tryGrabCapability (Capability *cap, Task *task);
+
 #else // !THREADED_RTS
 
 // Grab a capability.  (Only in the non-threaded RTS; in the threaded
@@ -190,4 +202,24 @@ extern void grabCapability (Capability **pCap);
 
 #endif /* !THREADED_RTS */
 
+/* -----------------------------------------------------------------------------
+ * INLINE functions... private below here
+ * -------------------------------------------------------------------------- */
+
+INLINE_HEADER void
+recordMutableCap (StgClosure *p, Capability *cap, nat gen)
+{
+    bdescr *bd;
+
+    bd = cap->mut_lists[gen];
+    if (bd->free >= bd->start + BLOCK_SIZE_W) {
+       bdescr *new_bd;
+       new_bd = allocBlock_lock();
+       new_bd->link = bd;
+       bd = new_bd;
+       cap->mut_lists[gen] = bd;
+    }
+    *bd->free++ = (StgWord)p;
+}
+
 #endif /* CAPABILITY_H */
index de41016..c4823bd 100644 (file)
@@ -334,7 +334,7 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc )
   step *stp;
   lnat live, allocated, collected = 0, copied = 0, scavd_copied = 0;
   lnat oldgen_saved_blocks = 0;
-  nat g, s;
+  nat g, s, i;
 
   ACQUIRE_SM_LOCK;
 
@@ -439,6 +439,10 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc )
     if (g != 0) {
        freeChain(generations[g].mut_list);
        generations[g].mut_list = allocBlock();
+       for (i = 0; i < n_capabilities; i++) {
+           freeChain(capabilities[i].mut_lists[g]);
+           capabilities[i].mut_lists[g] = allocBlock();
+       }
     }
 
     for (s = 0; s < generations[g].n_steps; s++) {
@@ -541,6 +545,19 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc )
       stp->scavenged_large_objects = NULL;
       stp->n_scavenged_large_blocks = 0;
     }
+
+    /* Move the private mutable lists from each capability onto the
+     * main mutable list for the generation.
+     */
+    for (i = 0; i < n_capabilities; i++) {
+       for (bd = capabilities[i].mut_lists[g]; 
+            bd->link != NULL; bd = bd->link) {
+           /* nothing */
+       }
+       bd->link = generations[g].mut_list;
+       generations[g].mut_list = capabilities[i].mut_lists[g];
+       capabilities[i].mut_lists[g] = allocBlock();
+    }
   }
 
   /* Allocate a mark stack if we're doing a major collection.
index 0de7679..2bc7472 100644 (file)
@@ -210,6 +210,7 @@ static Capability *schedule (Capability *initialCapability, Task *task);
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
+static void schedulePushWork(Capability *cap, Task *task);
 static void scheduleStartSignalHandlers (void);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleCheckBlackHoles (Capability *cap);
@@ -386,6 +387,10 @@ schedule (Capability *initialCapability, Task *task)
       }
 #endif
       
+#ifdef SMP
+      schedulePushWork(cap,task);
+#endif         
+
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
     // call).
@@ -599,6 +604,10 @@ run_thread:
 
     cap->in_haskell = rtsFalse;
 
+    // The TSO might have moved, eg. if it re-entered the RTS and a GC
+    // happened.  So find the new location:
+    t = cap->r.rCurrentTSO;
+
 #ifdef SMP
     // If ret is ThreadBlocked, and this Task is bound to the TSO that
     // blocked, we are in limbo - the TSO is now owned by whatever it
@@ -606,15 +615,16 @@ run_thread:
     // perhaps even on a different Capability.  It may be the case
     // that task->cap != cap.  We better yield this Capability
     // immediately and return to normaility.
-    if (ret == ThreadBlocked) continue;
+    if (ret == ThreadBlocked) {
+       IF_DEBUG(scheduler,
+                debugBelch("--<< thread %d (%s) stopped: blocked\n",
+                           t->id, whatNext_strs[t->what_next]));
+       continue;
+    }
 #endif
 
     ASSERT_CAPABILITY_INVARIANTS(cap,task);
 
-    // The TSO might have moved, eg. if it re-entered the RTS and a GC
-    // happened.  So find the new location:
-    t = cap->r.rCurrentTSO;
-
     // And save the current errno in this thread.
     t->saved_errno = errno;
 
@@ -665,6 +675,7 @@ run_thread:
 
     case ThreadFinished:
        if (scheduleHandleThreadFinished(cap, task, t)) return cap;
+       ASSERT_CAPABILITY_INVARIANTS(cap,task);
        break;
 
     default:
@@ -705,6 +716,87 @@ schedulePreLoop(void)
 #endif
 }
 
+/* -----------------------------------------------------------------------------
+ * schedulePushWork()
+ *
+ * Push work to other Capabilities if we have some.
+ * -------------------------------------------------------------------------- */
+
+static void
+schedulePushWork(Capability *cap, Task *task)
+{
+#ifdef SMP
+    Capability *free_caps[n_capabilities], *cap0;
+    nat i, n_free_caps;
+
+    // Check whether we have more threads on our run queue that we
+    // could hand to another Capability.
+    if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
+       return;
+    }
+
+    // First grab as many free Capabilities as we can.
+    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
+       cap0 = &capabilities[i];
+       if (cap != cap0 && tryGrabCapability(cap0,task)) {
+           if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+               // it already has some work, we just grabbed it at 
+               // the wrong moment.  Or maybe it's deadlocked!
+               releaseCapability(cap0);
+           } else {
+               free_caps[n_free_caps++] = cap0;
+           }
+       }
+    }
+
+    // we now have n_free_caps free capabilities stashed in
+    // free_caps[].  Share our run queue equally with them.  This is
+    // probably the simplest thing we could do; improvements we might
+    // want to do include:
+    //
+    //   - giving high priority to moving relatively new threads, on 
+    //     the gournds that they haven't had time to build up a
+    //     working set in the cache on this CPU/Capability.
+    //
+    //   - giving low priority to moving long-lived threads
+
+    if (n_free_caps > 0) {
+       StgTSO *prev, *t, *next;
+       IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
+
+       prev = cap->run_queue_hd;
+       t = prev->link;
+       prev->link = END_TSO_QUEUE;
+       i = 0;
+       for (; t != END_TSO_QUEUE; t = next) {
+           next = t->link;
+           t->link = END_TSO_QUEUE;
+           if (t->what_next == ThreadRelocated) {
+               prev->link = t;
+               prev = t;
+           } else if (i == n_free_caps) {
+               i = 0;
+               // keep one for us
+               prev->link = t;
+               prev = t;
+           } else {
+               appendToRunQueue(free_caps[i],t);
+               if (t->bound) { t->bound->cap = free_caps[i]; }
+               i++;
+           }
+       }
+       cap->run_queue_tl = prev;
+
+       // release the capabilities
+       for (i = 0; i < n_free_caps; i++) {
+           task->cap = free_caps[i];
+           releaseCapability(free_caps[i]);
+       }
+    }
+    task->cap = cap; // reset to point to our Capability.
+#endif
+}
+
 /* ----------------------------------------------------------------------------
  * Start any pending signal handlers
  * ------------------------------------------------------------------------- */
@@ -1775,7 +1867,13 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
     //
        
     was_waiting = cas(&waiting_for_gc, 0, 1);
-    if (was_waiting) return;
+    if (was_waiting) {
+       do {
+           IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
+           yieldCapability(&cap,task);
+       } while (waiting_for_gc);
+       return;
+    }
 
     for (i=0; i < n_capabilities; i++) {
        IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
@@ -1787,6 +1885,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
            // all the Capabilities, but even so it's a slightly
            // unsavoury invariant.
            task->cap = pcap;
+           context_switch = 1;
            waitForReturnCapability(&pcap, task);
            if (pcap != &capabilities[i]) {
                barf("scheduleDoGC: got the wrong capability");
@@ -3986,25 +4085,37 @@ printThreadBlockage(StgTSO *tso)
   }
 }
 
-static void
-printThreadStatus(StgTSO *tso)
+void
+printThreadStatus(StgTSO *t)
 {
-  switch (tso->what_next) {
-  case ThreadKilled:
-    debugBelch("has been killed");
-    break;
-  case ThreadComplete:
-    debugBelch("has completed");
-    break;
-  default:
-    printThreadBlockage(tso);
-  }
+    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+    {
+      void *label = lookupThreadLabel(t->id);
+      if (label) debugBelch("[\"%s\"] ",(char *)label);
+    }
+    if (t->what_next == ThreadRelocated) {
+       debugBelch("has been relocated...\n");
+    } else {
+       switch (t->what_next) {
+       case ThreadKilled:
+           debugBelch("has been killed");
+           break;
+       case ThreadComplete:
+           debugBelch("has completed");
+           break;
+       default:
+           printThreadBlockage(t);
+       }
+       debugBelch("\n");
+    }
 }
 
 void
 printAllThreads(void)
 {
-  StgTSO *t;
+  StgTSO *t, *next;
+  nat i;
+  Capability *cap;
 
 # if defined(GRAN)
   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
@@ -4022,20 +4133,23 @@ printAllThreads(void)
   debugBelch("all threads:\n");
 # endif
 
-  for (t = all_threads; t != END_TSO_QUEUE; ) {
-    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
-    {
-      void *label = lookupThreadLabel(t->id);
-      if (label) debugBelch("[\"%s\"] ",(char *)label);
-    }
-    if (t->what_next == ThreadRelocated) {
-       debugBelch("has been relocated...\n");
-       t = t->link;
-    } else {
-       printThreadStatus(t);
-       debugBelch("\n");
-       t = t->global_link;
-    }
+  for (i = 0; i < n_capabilities; i++) {
+      cap = &capabilities[i];
+      debugBelch("threads on capability %d:\n", cap->no);
+      for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
+         printThreadStatus(t);
+      }
+  }
+
+  for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+      if (t->why_blocked != NotBlocked) {
+         printThreadStatus(t);
+      }
+      if (t->what_next == ThreadRelocated) {
+         next = t->link;
+      } else {
+         next = t->global_link;
+      }
   }
 }
 
@@ -4045,13 +4159,7 @@ printThreadQueue(StgTSO *t)
 {
     nat i = 0;
     for (; t != END_TSO_QUEUE; t = t->link) {
-       debugBelch("\tthread %d @ %p ", t->id, (void *)t);
-       if (t->what_next == ThreadRelocated) {
-           debugBelch("has been relocated...\n");
-       } else {
-           printThreadStatus(t);
-           debugBelch("\n");
-       }
+       printThreadStatus(t);
        i++;
     }
     debugBelch("%d threads on queue\n", i);
index c7ff7c3..57e3d70 100644 (file)
@@ -980,6 +980,11 @@ memInventory(void)
   /* count the blocks we current have */
 
   for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+      for (i = 0; i < n_capabilities; i++) {
+         for (bd = capabilities[i].mut_lists[g]; bd != NULL; bd = bd->link) {
+             total_blocks += bd->blocks;
+         }
+      }          
       for (bd = generations[g].mut_list; bd != NULL; bd = bd->link) {
          total_blocks += bd->blocks;
       }
similarity index 98%
rename from ghc/includes/Updates.h
rename to ghc/rts/Updates.h
index 4bc6199..06f67e9 100644 (file)
@@ -280,8 +280,9 @@ DEBUG_FILL_SLOP(StgClosure *p)
       and_then;                                                        \
     } else {                                                   \
       DEBUG_FILL_SLOP(p1);                                     \
-      foreign "C" recordMutableGenLock(p1 "ptr",               \
-                generation(TO_W_(bdescr_gen_no(bd))) "ptr");   \
+      foreign "C" recordMutableCap(p1 "ptr",                   \
+                                  MyCapability() "ptr",        \
+                                  bdescr_gen_no(bd));          \
       StgInd_indirectee(p1) = p2;                              \
       SET_INFO(p1, stg_IND_OLDGEN_info);                       \
       LDV_RECORD_CREATE(p1);                                   \