[project @ 2005-11-21 15:58:47 by tharris]
authortharris <unknown>
Mon, 21 Nov 2005 15:58:47 +0000 (15:58 +0000)
committertharris <unknown>
Mon, 21 Nov 2005 15:58:47 +0000 (15:58 +0000)
Re-use temporary storage in the STM implementation

ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/Exception.cmm
ghc/rts/GC.c
ghc/rts/GCCompact.c
ghc/rts/PrimOps.cmm
ghc/rts/RtsStartup.c
ghc/rts/STM.c
ghc/rts/Schedule.c

index 5872f42..2379c32 100644 (file)
@@ -20,6 +20,7 @@
 #include "Rts.h"
 #include "RtsUtils.h"
 #include "RtsFlags.h"
+#include "STM.h"
 #include "OSThreads.h"
 #include "Capability.h"
 #include "Schedule.h"
@@ -155,6 +156,11 @@ initCapability( Capability *cap, nat i )
     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
        cap->mut_lists[g] = NULL;
     }
+
+    cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE;
+    cap->free_trec_chunks = END_STM_CHUNK_LIST;
+    cap->free_trec_headers = NO_TREC;
+    cap->transaction_tokens = 0;
 }
 
 /* ---------------------------------------------------------------------------
index f9ae894..2a04e41 100644 (file)
@@ -81,6 +81,12 @@ struct Capability_ {
     Task *returning_tasks_hd; // Singly-linked, with head/tail
     Task *returning_tasks_tl;
 #endif
+
+    // Per-capability STM-related data
+    StgTVarWaitQueue *free_tvar_wait_queues;
+    StgTRecChunk *free_trec_chunks;
+    StgTRecHeader *free_trec_headers;
+    nat transaction_tokens;
 }; // typedef Capability, defined in RtsAPI.h
 
 
index 4007b78..3fdfdfd 100644 (file)
@@ -360,7 +360,7 @@ retry_pop_stack:
       W_ r;
       trec = StgTSO_trec(CurrentTSO);
       r = foreign "C" stmValidateNestOfTransactions(trec "ptr");
-      foreign "C" stmAbortTransaction(trec "ptr");
+      foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr");
       StgTSO_trec(CurrentTSO) = NO_TREC;
       if (r) {
         // Transaction was valid: continue searching for a catch frame
@@ -369,7 +369,7 @@ retry_pop_stack:
       } else {
         // Transaction was not valid: we retry the exception (otherwise continue
         // with a further call to raiseExceptionHelper)
-        "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", NO_TREC "ptr");
+        "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", NO_TREC "ptr");
         StgTSO_trec(CurrentTSO) = trec;
         R1 = StgAtomicallyFrame_code(Sp);
         Sp_adj(-1);
index bc8546a..513d14a 100644 (file)
@@ -3051,9 +3051,6 @@ scavenge(step *stp)
        evac_gen = 0;
        tvar->current_value = evacuate((StgClosure*)tvar->current_value);
        tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry);
-#if defined(SMP)
-       tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by);
-#endif
        evac_gen = saved_evac_gen;
        failed_to_evac = rtsTrue; // mutable
        p += sizeofW(StgTVar);
@@ -3408,9 +3405,6 @@ linear_scan:
            evac_gen = 0;
            tvar->current_value = evacuate((StgClosure*)tvar->current_value);
            tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry);
-#if defined(SMP)
-            tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by);
-#endif
            evac_gen = saved_evac_gen;
            failed_to_evac = rtsTrue; // mutable
            break;
@@ -3732,9 +3726,6 @@ scavenge_one(StgPtr p)
        evac_gen = 0;
        tvar->current_value = evacuate((StgClosure*)tvar->current_value);
        tvar->first_wait_queue_entry = (StgTVarWaitQueue *)evacuate((StgClosure*)tvar->first_wait_queue_entry);
-#if defined(SMP)
-       tvar->last_update_by = (StgTRecHeader *)evacuate((StgClosure*)tvar->last_update_by);
-#endif
        evac_gen = saved_evac_gen;
        failed_to_evac = rtsTrue; // mutable
        break;
index 775aa75..e53429c 100644 (file)
@@ -688,9 +688,6 @@ thread_obj (StgInfoTable *info, StgPtr p)
         StgTVar *tvar = (StgTVar *)p;
        thread((StgPtr)&tvar->current_value);
        thread((StgPtr)&tvar->first_wait_queue_entry);
-#if defined(SMP)
-       thread((StgPtr)&tvar->last_update_by);
-#endif
        return p + sizeofW(StgTVar);
     }
     
index b4e95f3..b25a1e5 100644 (file)
@@ -1048,7 +1048,7 @@ INFO_TABLE_RET(stg_atomically_frame,
    trec = StgTSO_trec(CurrentTSO);
    if (StgAtomicallyFrame_waiting(frame)) {
      /* The TSO is currently waiting: should we stop waiting? */
-     valid = foreign "C" stmReWait(CurrentTSO "ptr");
+     valid = foreign "C" stmReWait(MyCapability() "ptr", CurrentTSO "ptr");
      if (valid) {
        /* Previous attempt is still valid: no point trying again yet */
          IF_NOT_REG_R1(Sp_adj(-2);
@@ -1268,6 +1268,8 @@ retry_pop_stack:
       r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", other_trec "ptr");
       if (r) {
         r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr");
+      } else {
+        foreign "C" stmAbortTransaction(MyCapability() "ptr", trec "ptr");
       }
       if (r) {
         // Merge between siblings succeeded: commit it back to enclosing transaction
index f9b1c85..10cd451 100644 (file)
@@ -217,8 +217,6 @@ hs_init(int *argc, char **argv[])
     startupAsyncIO();
 #endif
 
-    initSTM();
-
 #ifdef RTS_GTK_FRONTPANEL
     if (RtsFlags.GcFlags.frontpanel) {
        initFrontPanel();
index 15369cb..bc8e9bf 100644 (file)
@@ -1,5 +1,4 @@
 /* -----------------------------------------------------------------------------
- *
  * (c) The GHC Team 1998-2005
  * 
  * STM implementation.
@@ -173,6 +172,13 @@ static int shake(void) {
      
 /*......................................................................*/
 
+// if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
+// and wait queue entries without GC
+
+#define REUSE_MEMORY
+
+/*......................................................................*/
+
 #define IF_STM_UNIPROC(__X)  do { } while (0)
 #define IF_STM_CG_LOCK(__X)  do { } while (0)
 #define IF_STM_FG_LOCKS(__X) do { } while (0)
@@ -350,7 +356,7 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) {
 
 /*......................................................................*/
 
-// Helper functions for allocation and initialization
+// Helper functions for downstream allocation and initialization
 
 static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap,
                                                  StgTSO *waiting_tso) {
@@ -392,6 +398,89 @@ static StgTRecHeader *new_stg_trec_header(Capability *cap,
 
 /*......................................................................*/
 
+// Allocation / deallocation functions that retain per-capability lists
+// of closures that can be re-used
+
+static StgTVarWaitQueue *alloc_stg_tvar_wait_queue(Capability *cap,
+                                                   StgTSO *waiting_tso) {
+  StgTVarWaitQueue *result = NULL;
+  if (cap -> free_tvar_wait_queues == END_STM_WAIT_QUEUE) {
+    result = new_stg_tvar_wait_queue(cap, waiting_tso);
+  } else {
+    result = cap -> free_tvar_wait_queues;
+    result -> waiting_tso = waiting_tso;
+    cap -> free_tvar_wait_queues = result -> next_queue_entry;
+  }
+  return result;
+}
+
+static void free_stg_tvar_wait_queue(Capability *cap,
+                                     StgTVarWaitQueue *wq) {
+#if defined(REUSE_MEMORY)
+  wq -> next_queue_entry = cap -> free_tvar_wait_queues;
+  cap -> free_tvar_wait_queues = wq;
+#endif
+}
+
+static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
+  StgTRecChunk *result = NULL;
+  if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
+    result = new_stg_trec_chunk(cap);
+  } else {
+    result = cap -> free_trec_chunks;
+    cap -> free_trec_chunks = result -> prev_chunk;
+    result -> prev_chunk = END_STM_CHUNK_LIST;
+    result -> next_entry_idx = 0;
+  }
+  return result;
+}
+
+static void free_stg_trec_chunk(Capability *cap, 
+                                StgTRecChunk *c) {
+#if defined(REUSE_MEMORY)
+  c -> prev_chunk = cap -> free_trec_chunks;
+  cap -> free_trec_chunks = c;
+#endif
+}
+
+static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
+                                            StgTRecHeader *enclosing_trec) {
+  StgTRecHeader *result = NULL;
+  if (cap -> free_trec_headers == NO_TREC) {
+    result = new_stg_trec_header(cap, enclosing_trec);
+  } else {
+    result = cap -> free_trec_headers;
+    cap -> free_trec_headers = result -> enclosing_trec;
+    result -> enclosing_trec = enclosing_trec;
+    result -> current_chunk -> next_entry_idx = 0;
+    if (enclosing_trec == NO_TREC) {
+      result -> state = TREC_ACTIVE;
+    } else {
+      ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
+             enclosing_trec -> state == TREC_CONDEMNED);
+      result -> state = enclosing_trec -> state;
+    }
+  }
+  return result;
+}
+
+static void free_stg_trec_header(Capability *cap,
+                                 StgTRecHeader *trec) {
+#if defined(REUSE_MEMORY)
+  StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
+  while (chunk != END_STM_CHUNK_LIST) {
+    StgTRecChunk *prev_chunk = chunk -> prev_chunk;
+    free_stg_trec_chunk(cap, chunk);
+    chunk = prev_chunk;
+  } 
+  trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
+  trec -> enclosing_trec = cap -> free_trec_headers;
+  cap -> free_trec_headers = trec;
+#endif
+}
+
+/*......................................................................*/
+
 // Helper functions for managing waiting lists
 
 static void build_wait_queue_entries_for_trec(Capability *cap,
@@ -412,7 +501,7 @@ static void build_wait_queue_entries_for_trec(Capability *cap,
     ACQ_ASSERT(s -> current_value == trec);
     NACQ_ASSERT(s -> current_value == e -> expected_value);
     fq = s -> first_wait_queue_entry;
-    q = new_stg_tvar_wait_queue(cap, tso);
+    q = alloc_stg_tvar_wait_queue(cap, tso);
     q -> next_queue_entry = fq;
     q -> prev_queue_entry = END_STM_WAIT_QUEUE;
     if (fq != END_STM_WAIT_QUEUE) {
@@ -423,7 +512,8 @@ static void build_wait_queue_entries_for_trec(Capability *cap,
   });
 }
 
-static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
+static void remove_wait_queue_entries_for_trec(Capability *cap,
+                                               StgTRecHeader *trec) {
   ASSERT(trec != NO_TREC);
   ASSERT(trec -> enclosing_trec == NO_TREC);
   ASSERT(trec -> state == TREC_WAITING ||
@@ -452,6 +542,7 @@ static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
       ASSERT (s -> first_wait_queue_entry == q);
       s -> first_wait_queue_entry = nq;
     }
+    free_stg_tvar_wait_queue(cap, q);
     unlock_tvar(trec, s, saw, FALSE);
   });
 }
@@ -475,7 +566,7 @@ static TRecEntry *get_new_entry(Capability *cap,
   } else {
     // Current chunk is full: allocate a fresh one
     StgTRecChunk *nc;
-    nc = new_stg_trec_chunk(cap);
+    nc = alloc_stg_trec_chunk(cap);
     nc -> prev_chunk = c;
     nc -> next_entry_idx = 1;
     t -> current_chunk = nc;
@@ -614,13 +705,13 @@ static StgBool validate_and_acquire_ownership (StgTRecHeader *trec,
             result = FALSE;
             BREAK_FOR_EACH;
           }
-          e -> saw_update_by = s -> last_update_by;
+          e -> num_updates = s -> num_updates;
           if (s -> current_value != e -> expected_value) {
             TRACE("%p : doesn't match (race)\n", trec);
             result = FALSE;
             BREAK_FOR_EACH;
           } else {
-            TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by);
+            TRACE("%p : need to check version %d\n", trec, e -> num_updates);
           }
         });
       }
@@ -654,8 +745,8 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
       StgTVar *s;
       s = e -> tvar;
       if (entry_is_read_only(e)) {
-        TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by);
-        if (s -> last_update_by != e -> saw_update_by) {
+        TRACE("%p : check_read_only for TVar %p, saw %d\n", trec, s, e -> num_updates);
+        if (s -> num_updates != e -> num_updates) {
           // ||s -> current_value != e -> expected_value) {
           TRACE("%p : mismatch\n", trec);
           result = FALSE;
@@ -672,31 +763,75 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
 /************************************************************************/
 
 void stmPreGCHook() {
+  nat i;
+
   lock_stm(NO_TREC);
   TRACE("stmPreGCHook\n");
+  for (i = 0; i < n_capabilities; i ++) {
+    Capability *cap = &capabilities[i];
+    cap -> free_tvar_wait_queues = END_STM_WAIT_QUEUE;
+    cap -> free_trec_chunks = END_STM_CHUNK_LIST;
+    cap -> free_trec_headers = NO_TREC;
+  }
   unlock_stm(NO_TREC);
 }
 
 /************************************************************************/
 
-void initSTM() {
-  TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
+// check_read_only relies on version numbers held in TVars' "num_updates" 
+// fields not wrapping around while a transaction is committed.  The version
+// number is incremented each time an update is committed to the TVar
+// This is unlikely to wrap around when 32-bit integers are used for the counts, 
+// but to ensure correctness we maintain a shared count on the maximum
+// number of commit operations that may occur and check that this has 
+// not increased by more than 2^32 during a commit.
+
+#define TOKEN_BATCH_SIZE 1024
+
+static volatile StgInt64 max_commits = 0;
+
+static volatile StgBool token_locked = FALSE;
+
+#if defined(SMP)
+static void getTokenBatch(Capability *cap) {
+  while (cas(&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
+  max_commits += TOKEN_BATCH_SIZE;
+  cap -> transaction_tokens = TOKEN_BATCH_SIZE;
+  token_locked = FALSE;
+}
+
+static void getToken(Capability *cap) {
+  if (cap -> transaction_tokens == 0) {
+    getTokenBatch(cap);
+  }
+  cap -> transaction_tokens --;
 }
+#else
+static void getToken(Capability *cap STG_UNUSED) {
+  // Nothing
+}
+#endif
 
 /*......................................................................*/
 
 StgTRecHeader *stmStartTransaction(Capability *cap,
                                    StgTRecHeader *outer) {
   StgTRecHeader *t;
-  TRACE("%p : stmStartTransaction\n", outer);
-  t = new_stg_trec_header(cap, outer);
+  TRACE("%p : stmStartTransaction with %d tokens\n", 
+        outer, 
+        cap -> transaction_tokens);
+
+  getToken(cap);
+
+  t = alloc_stg_trec_header(cap, outer);
   TRACE("%p : stmStartTransaction()=%p\n", outer, t);
   return t;
 }
 
 /*......................................................................*/
 
-void stmAbortTransaction(StgTRecHeader *trec) {
+void stmAbortTransaction(Capability *cap,
+                         StgTRecHeader *trec) {
   TRACE("%p : stmAbortTransaction\n", trec);
   ASSERT (trec != NO_TREC);
   ASSERT ((trec -> state == TREC_ACTIVE) || 
@@ -707,17 +842,20 @@ void stmAbortTransaction(StgTRecHeader *trec) {
   if (trec -> state == TREC_WAITING) {
     ASSERT (trec -> enclosing_trec == NO_TREC);
     TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
-    remove_wait_queue_entries_for_trec(trec);
+    remove_wait_queue_entries_for_trec(cap, trec);
   } 
   trec -> state = TREC_ABORTED;
   unlock_stm(trec);
 
+  free_stg_trec_header(cap, trec);
+
   TRACE("%p : stmAbortTransaction done\n", trec);
 }
 
 /*......................................................................*/
 
-void stmCondemnTransaction(StgTRecHeader *trec) {
+void stmCondemnTransaction(Capability *cap,
+                           StgTRecHeader *trec) {
   TRACE("%p : stmCondemnTransaction\n", trec);
   ASSERT (trec != NO_TREC);
   ASSERT ((trec -> state == TREC_ACTIVE) || 
@@ -728,7 +866,7 @@ void stmCondemnTransaction(StgTRecHeader *trec) {
   if (trec -> state == TREC_WAITING) {
     ASSERT (trec -> enclosing_trec == NO_TREC);
     TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
-    remove_wait_queue_entries_for_trec(trec);
+    remove_wait_queue_entries_for_trec(cap, trec);
   } 
   trec -> state = TREC_CONDEMNED;
   unlock_stm(trec);
@@ -781,6 +919,7 @@ StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
 
 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
   int result;
+  StgInt64 max_commits_at_start = max_commits;
 
   TRACE("%p : stmCommitTransaction()\n", trec);
   ASSERT (trec != NO_TREC);
@@ -800,6 +939,14 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
       TRACE("%p : doing read check\n", trec);
       result = check_read_only(trec);
       TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed");
+
+      StgInt64 max_commits_at_end = max_commits;
+      StgInt64 max_concurrent_commits;
+      max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
+                                (n_capabilities * TOKEN_BATCH_SIZE));
+      if (((max_concurrent_commits >> 32) > 0) || shake()) {
+        result = FALSE;
+      }
     }
     
     if (result) {
@@ -818,7 +965,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
           TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
           unpark_waiters_on(cap,s);
           IF_STM_FG_LOCKS({
-            s -> last_update_by = trec;
+            s -> num_updates ++;
           });
           unlock_tvar(trec, s, e -> new_value, TRUE);
         } 
@@ -831,6 +978,8 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
 
   unlock_stm(trec);
 
+  free_stg_trec_header(cap, trec);
+
   TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
 
   return result;
@@ -883,6 +1032,8 @@ StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
 
   unlock_stm(trec);
 
+  free_stg_trec_header(cap, trec);
+
   TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
 
   return result;
@@ -922,6 +1073,7 @@ StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
 
   } else {
     unlock_stm(trec);
+    free_stg_trec_header(cap, trec);
   }
 
   TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
@@ -930,14 +1082,14 @@ StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
 
 
 void
-stmWaitUnlock(Capability *cap, StgTRecHeader *trec) {
+stmWaitUnlock(Capability *cap STG_UNUSED, StgTRecHeader *trec) {
     revert_ownership(trec, TRUE);
     unlock_stm(trec);
 }
 
 /*......................................................................*/
 
-StgBool stmReWait(StgTSO *tso) {
+StgBool stmReWait(Capability *cap, StgTSO *tso) {
   int result;
   StgTRecHeader *trec = tso->trec;
 
@@ -960,9 +1112,9 @@ StgBool stmReWait(StgTSO *tso) {
     // The transcation has become invalid.  We can now remove it from the wait
     // queues.
     if (trec -> state != TREC_CONDEMNED) {
-      remove_wait_queue_entries_for_trec (trec);
+      remove_wait_queue_entries_for_trec (cap, trec);
     }
-
+    free_stg_trec_header(cap, trec);
   }
   unlock_stm(trec);
 
@@ -1099,7 +1251,7 @@ StgTVar *stmNewTVar(Capability *cap,
   result -> current_value = new_value;
   result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
 #if defined(SMP)
-  result -> last_update_by = NO_TREC;
+  result -> num_updates = 0;
 #endif
   return result;
 }
index 4891bbf..a82b6a7 100644 (file)
@@ -3802,7 +3802,7 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
        case ATOMICALLY_FRAME:
            if (stop_at_atomically) {
                ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
-               stmCondemnTransaction(tso -> trec);
+               stmCondemnTransaction(cap, tso -> trec);
 #ifdef REG_R1
                tso->sp = frame;
 #else
@@ -3829,8 +3829,10 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
            // and will not be visible after the abort.
            IF_DEBUG(stm,
                     debugBelch("Found atomically block delivering async exception\n"));
-           stmAbortTransaction(tso -> trec);
-           tso -> trec = stmGetEnclosingTRec(tso -> trec);
+            StgTRecHeader *trec = tso -> trec;
+            StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+            stmAbortTransaction(cap, trec);
+            tso -> trec = outer;
            break;
            
        default: