Cleanup after the OPTIONS parsing was moved.
[ghc-hetmet.git] / ghc / rts / STM.c
index 74894ec..d3283a9 100644 (file)
@@ -1,5 +1,4 @@
 /* -----------------------------------------------------------------------------
- *
  * (c) The GHC Team 1998-2005
  * 
  * STM implementation.
@@ -29,7 +28,7 @@
  * in STM.h:
  * 
  * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
- * In the Haskell RTS this means it is suitable only for non-SMP builds.
+ * In the Haskell RTS this means it is suitable only for non-THREADED_RTS builds.
  *
  * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
  * an invocation on the STM interface.  Note that this does not mean that 
@@ -98,8 +97,8 @@
 #define TRUE 1
 #define FALSE 0
 
-// ACQ_ASSERT is used for assertions which are only required for SMP builds with
-// fine-grained locking. 
+// ACQ_ASSERT is used for assertions which are only required for
+// THREADED_RTS builds with fine-grained locking.
 
 #if defined(STM_FG_LOCKS)
 #define ACQ_ASSERT(_X) ASSERT(_X)
 
 #if defined(DEBUG)
 #define SHAKE
+#if defined(THREADED_RTS)
+#define TRACE(_x...) IF_DEBUG(stm, debugBelch("STM  (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()); debugBelch ( _x ))
+#else
 #define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
+#endif
 #else
 #define TRACE(_x...) /*Nothing*/
 #endif
@@ -169,6 +172,13 @@ static int shake(void) {
      
 /*......................................................................*/
 
+// if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
+// and wait queue entries without GC
+
+#define REUSE_MEMORY
+
+/*......................................................................*/
+
 #define IF_STM_UNIPROC(__X)  do { } while (0)
 #define IF_STM_CG_LOCK(__X)  do { } while (0)
 #define IF_STM_FG_LOCKS(__X) do { } while (0)
@@ -316,69 +326,64 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec,
 // Helper functions for thread blocking and unblocking
 
 static void park_tso(StgTSO *tso) {
-  ACQUIRE_LOCK(&sched_mutex);
   ASSERT(tso -> why_blocked == NotBlocked);
   tso -> why_blocked = BlockedOnSTM;
   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
-  RELEASE_LOCK(&sched_mutex);
   TRACE("park_tso on tso=%p\n", tso);
 }
 
-static void unpark_tso(StgTSO *tso) {
+static void unpark_tso(Capability *cap, StgTSO *tso) {
   // We will continue unparking threads while they remain on one of the wait
   // queues: it's up to the thread itself to remove it from the wait queues
   // if it decides to do so when it is scheduled.
   if (tso -> why_blocked == BlockedOnSTM) {
     TRACE("unpark_tso on tso=%p\n", tso);
-    ACQUIRE_LOCK(&sched_mutex);
-    tso -> why_blocked = NotBlocked;
-    PUSH_ON_RUN_QUEUE(tso);
-    RELEASE_LOCK(&sched_mutex);
+    unblockOne(cap,tso);
   } else {
     TRACE("spurious unpark_tso on tso=%p\n", tso);
   }
 }
 
-static void unpark_waiters_on(StgTVar *s) {
+static void unpark_waiters_on(Capability *cap, StgTVar *s) {
   StgTVarWaitQueue *q;
   TRACE("unpark_waiters_on tvar=%p\n", s);
   for (q = s -> first_wait_queue_entry; 
        q != END_STM_WAIT_QUEUE; 
        q = q -> next_queue_entry) {
-    unpark_tso(q -> waiting_tso);
+    unpark_tso(cap, q -> waiting_tso);
   }
 }
 
 /*......................................................................*/
 
-// Helper functions for allocation and initialization
+// Helper functions for downstream allocation and initialization
 
-static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgRegTable *reg,
+static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap,
                                                  StgTSO *waiting_tso) {
   StgTVarWaitQueue *result;
-  result = (StgTVarWaitQueue *)allocateLocal(reg, sizeofW(StgTVarWaitQueue));
+  result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue));
   SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
   result -> waiting_tso = waiting_tso;
   return result;
 }
 
-static StgTRecChunk *new_stg_trec_chunk(StgRegTable *reg) {
+static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
   StgTRecChunk *result;
-  result = (StgTRecChunk *)allocateLocal(reg, sizeofW(StgTRecChunk));
+  result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk));
   SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
   result -> prev_chunk = END_STM_CHUNK_LIST;
   result -> next_entry_idx = 0;
   return result;
 }
 
-static StgTRecHeader *new_stg_trec_header(StgRegTable *reg,
+static StgTRecHeader *new_stg_trec_header(Capability *cap,
                                           StgTRecHeader *enclosing_trec) {
   StgTRecHeader *result;
-  result = (StgTRecHeader *) allocateLocal(reg, sizeofW(StgTRecHeader));
+  result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader));
   SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
 
   result -> enclosing_trec = enclosing_trec;
-  result -> current_chunk = new_stg_trec_chunk(reg);
+  result -> current_chunk = new_stg_trec_chunk(cap);
 
   if (enclosing_trec == NO_TREC) {
     result -> state = TREC_ACTIVE;
@@ -391,24 +396,94 @@ static StgTRecHeader *new_stg_trec_header(StgRegTable *reg,
   return result;  
 }
 
-static StgTVar *new_tvar(StgRegTable *reg,
-                         StgClosure *new_value) {
-  StgTVar *result;
-  result = (StgTVar *)allocateLocal(reg, sizeofW(StgTVar));
-  SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
-  result -> current_value = new_value;
-  result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
-#if defined(SMP)
-  result -> last_update_by = NO_TREC;
+/*......................................................................*/
+
+// Allocation / deallocation functions that retain per-capability lists
+// of closures that can be re-used
+
+static StgTVarWaitQueue *alloc_stg_tvar_wait_queue(Capability *cap,
+                                                   StgTSO *waiting_tso) {
+  StgTVarWaitQueue *result = NULL;
+  if (cap -> free_tvar_wait_queues == END_STM_WAIT_QUEUE) {
+    result = new_stg_tvar_wait_queue(cap, waiting_tso);
+  } else {
+    result = cap -> free_tvar_wait_queues;
+    result -> waiting_tso = waiting_tso;
+    cap -> free_tvar_wait_queues = result -> next_queue_entry;
+  }
+  return result;
+}
+
+static void free_stg_tvar_wait_queue(Capability *cap,
+                                     StgTVarWaitQueue *wq) {
+#if defined(REUSE_MEMORY)
+  wq -> next_queue_entry = cap -> free_tvar_wait_queues;
+  cap -> free_tvar_wait_queues = wq;
 #endif
+}
+
+static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
+  StgTRecChunk *result = NULL;
+  if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
+    result = new_stg_trec_chunk(cap);
+  } else {
+    result = cap -> free_trec_chunks;
+    cap -> free_trec_chunks = result -> prev_chunk;
+    result -> prev_chunk = END_STM_CHUNK_LIST;
+    result -> next_entry_idx = 0;
+  }
   return result;
 }
 
+static void free_stg_trec_chunk(Capability *cap, 
+                                StgTRecChunk *c) {
+#if defined(REUSE_MEMORY)
+  c -> prev_chunk = cap -> free_trec_chunks;
+  cap -> free_trec_chunks = c;
+#endif
+}
+
+static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
+                                            StgTRecHeader *enclosing_trec) {
+  StgTRecHeader *result = NULL;
+  if (cap -> free_trec_headers == NO_TREC) {
+    result = new_stg_trec_header(cap, enclosing_trec);
+  } else {
+    result = cap -> free_trec_headers;
+    cap -> free_trec_headers = result -> enclosing_trec;
+    result -> enclosing_trec = enclosing_trec;
+    result -> current_chunk -> next_entry_idx = 0;
+    if (enclosing_trec == NO_TREC) {
+      result -> state = TREC_ACTIVE;
+    } else {
+      ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
+             enclosing_trec -> state == TREC_CONDEMNED);
+      result -> state = enclosing_trec -> state;
+    }
+  }
+  return result;
+}
+
+static void free_stg_trec_header(Capability *cap,
+                                 StgTRecHeader *trec) {
+#if defined(REUSE_MEMORY)
+  StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
+  while (chunk != END_STM_CHUNK_LIST) {
+    StgTRecChunk *prev_chunk = chunk -> prev_chunk;
+    free_stg_trec_chunk(cap, chunk);
+    chunk = prev_chunk;
+  } 
+  trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
+  trec -> enclosing_trec = cap -> free_trec_headers;
+  cap -> free_trec_headers = trec;
+#endif
+}
+
 /*......................................................................*/
 
 // Helper functions for managing waiting lists
 
-static void build_wait_queue_entries_for_trec(StgRegTable *reg,
+static void build_wait_queue_entries_for_trec(Capability *cap,
                                       StgTSO *tso, 
                                       StgTRecHeader *trec) {
   ASSERT(trec != NO_TREC);
@@ -426,7 +501,7 @@ static void build_wait_queue_entries_for_trec(StgRegTable *reg,
     ACQ_ASSERT(s -> current_value == trec);
     NACQ_ASSERT(s -> current_value == e -> expected_value);
     fq = s -> first_wait_queue_entry;
-    q = new_stg_tvar_wait_queue(reg, tso);
+    q = alloc_stg_tvar_wait_queue(cap, tso);
     q -> next_queue_entry = fq;
     q -> prev_queue_entry = END_STM_WAIT_QUEUE;
     if (fq != END_STM_WAIT_QUEUE) {
@@ -437,7 +512,8 @@ static void build_wait_queue_entries_for_trec(StgRegTable *reg,
   });
 }
 
-static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
+static void remove_wait_queue_entries_for_trec(Capability *cap,
+                                               StgTRecHeader *trec) {
   ASSERT(trec != NO_TREC);
   ASSERT(trec -> enclosing_trec == NO_TREC);
   ASSERT(trec -> state == TREC_WAITING ||
@@ -466,13 +542,14 @@ static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
       ASSERT (s -> first_wait_queue_entry == q);
       s -> first_wait_queue_entry = nq;
     }
+    free_stg_tvar_wait_queue(cap, q);
     unlock_tvar(trec, s, saw, FALSE);
   });
 }
  
 /*......................................................................*/
  
-static TRecEntry *get_new_entry(StgRegTable *reg,
+static TRecEntry *get_new_entry(Capability *cap,
                                 StgTRecHeader *t) {
   TRecEntry *result;
   StgTRecChunk *c;
@@ -489,7 +566,7 @@ static TRecEntry *get_new_entry(StgRegTable *reg,
   } else {
     // Current chunk is full: allocate a fresh one
     StgTRecChunk *nc;
-    nc = new_stg_trec_chunk(reg);
+    nc = alloc_stg_trec_chunk(cap);
     nc -> prev_chunk = c;
     nc -> next_entry_idx = 1;
     t -> current_chunk = nc;
@@ -501,7 +578,7 @@ static TRecEntry *get_new_entry(StgRegTable *reg,
 
 /*......................................................................*/
 
-static void merge_update_into(StgRegTable *reg,
+static void merge_update_into(Capability *cap,
                               StgTRecHeader *t,
                               StgTVar *tvar,
                               StgClosure *expected_value,
@@ -529,7 +606,7 @@ static void merge_update_into(StgRegTable *reg,
   if (!found) {
     // No entry so far in this trec
     TRecEntry *ne;
-    ne = get_new_entry(reg, t);
+    ne = get_new_entry(cap, t);
     ne -> tvar = tvar;
     ne -> expected_value = expected_value;
     ne -> new_value = new_value;
@@ -544,6 +621,7 @@ static StgBool entry_is_update(TRecEntry *e) {
   return result;
 } 
 
+#if defined(STM_FG_LOCKS)
 static StgBool entry_is_read_only(TRecEntry *e) {
   StgBool result;
   result = (e -> expected_value == e -> new_value);
@@ -557,6 +635,7 @@ static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
   result = (c == (StgClosure *) h);
   return result;  
 }
+#endif
 
 // revert_ownership : release a lock on a TVar, storing back
 // the value that it held when the lock was acquired.  "revert_all"
@@ -628,13 +707,13 @@ static StgBool validate_and_acquire_ownership (StgTRecHeader *trec,
             result = FALSE;
             BREAK_FOR_EACH;
           }
-          e -> saw_update_by = s -> last_update_by;
+          e -> num_updates = s -> num_updates;
           if (s -> current_value != e -> expected_value) {
             TRACE("%p : doesn't match (race)\n", trec);
             result = FALSE;
             BREAK_FOR_EACH;
           } else {
-            TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by);
+            TRACE("%p : need to check version %d\n", trec, e -> num_updates);
           }
         });
       }
@@ -668,8 +747,8 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
       StgTVar *s;
       s = e -> tvar;
       if (entry_is_read_only(e)) {
-        TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by);
-        if (s -> last_update_by != e -> saw_update_by) {
+        TRACE("%p : check_read_only for TVar %p, saw %d\n", trec, s, e -> num_updates);
+        if (s -> num_updates != e -> num_updates) {
           // ||s -> current_value != e -> expected_value) {
           TRACE("%p : mismatch\n", trec);
           result = FALSE;
@@ -686,31 +765,75 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
 /************************************************************************/
 
 void stmPreGCHook() {
+  nat i;
+
   lock_stm(NO_TREC);
   TRACE("stmPreGCHook\n");
+  for (i = 0; i < n_capabilities; i ++) {
+    Capability *cap = &capabilities[i];
+    cap -> free_tvar_wait_queues = END_STM_WAIT_QUEUE;
+    cap -> free_trec_chunks = END_STM_CHUNK_LIST;
+    cap -> free_trec_headers = NO_TREC;
+  }
   unlock_stm(NO_TREC);
 }
 
 /************************************************************************/
 
-void initSTM() {
-  TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
+// check_read_only relies on version numbers held in TVars' "num_updates" 
+// fields not wrapping around while a transaction is committed.  The version
+// number is incremented each time an update is committed to the TVar
+// This is unlikely to wrap around when 32-bit integers are used for the counts, 
+// but to ensure correctness we maintain a shared count on the maximum
+// number of commit operations that may occur and check that this has 
+// not increased by more than 2^32 during a commit.
+
+#define TOKEN_BATCH_SIZE 1024
+
+static volatile StgInt64 max_commits = 0;
+
+static volatile StgBool token_locked = FALSE;
+
+#if defined(THREADED_RTS)
+static void getTokenBatch(Capability *cap) {
+  while (cas(&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
+  max_commits += TOKEN_BATCH_SIZE;
+  cap -> transaction_tokens = TOKEN_BATCH_SIZE;
+  token_locked = FALSE;
+}
+
+static void getToken(Capability *cap) {
+  if (cap -> transaction_tokens == 0) {
+    getTokenBatch(cap);
+  }
+  cap -> transaction_tokens --;
+}
+#else
+static void getToken(Capability *cap STG_UNUSED) {
+  // Nothing
 }
+#endif
 
 /*......................................................................*/
 
-StgTRecHeader *stmStartTransaction(StgRegTable *reg,
+StgTRecHeader *stmStartTransaction(Capability *cap,
                                    StgTRecHeader *outer) {
   StgTRecHeader *t;
-  TRACE("%p : stmStartTransaction\n", outer);
-  t = new_stg_trec_header(reg, outer);
+  TRACE("%p : stmStartTransaction with %d tokens\n", 
+        outer, 
+        cap -> transaction_tokens);
+
+  getToken(cap);
+
+  t = alloc_stg_trec_header(cap, outer);
   TRACE("%p : stmStartTransaction()=%p\n", outer, t);
   return t;
 }
 
 /*......................................................................*/
 
-void stmAbortTransaction(StgTRecHeader *trec) {
+void stmAbortTransaction(Capability *cap,
+                         StgTRecHeader *trec) {
   TRACE("%p : stmAbortTransaction\n", trec);
   ASSERT (trec != NO_TREC);
   ASSERT ((trec -> state == TREC_ACTIVE) || 
@@ -721,17 +844,20 @@ void stmAbortTransaction(StgTRecHeader *trec) {
   if (trec -> state == TREC_WAITING) {
     ASSERT (trec -> enclosing_trec == NO_TREC);
     TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
-    remove_wait_queue_entries_for_trec(trec);
+    remove_wait_queue_entries_for_trec(cap, trec);
   } 
   trec -> state = TREC_ABORTED;
   unlock_stm(trec);
 
+  free_stg_trec_header(cap, trec);
+
   TRACE("%p : stmAbortTransaction done\n", trec);
 }
 
 /*......................................................................*/
 
-void stmCondemnTransaction(StgTRecHeader *trec) {
+void stmCondemnTransaction(Capability *cap,
+                           StgTRecHeader *trec) {
   TRACE("%p : stmCondemnTransaction\n", trec);
   ASSERT (trec != NO_TREC);
   ASSERT ((trec -> state == TREC_ACTIVE) || 
@@ -742,7 +868,7 @@ void stmCondemnTransaction(StgTRecHeader *trec) {
   if (trec -> state == TREC_WAITING) {
     ASSERT (trec -> enclosing_trec == NO_TREC);
     TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
-    remove_wait_queue_entries_for_trec(trec);
+    remove_wait_queue_entries_for_trec(cap, trec);
   } 
   trec -> state = TREC_CONDEMNED;
   unlock_stm(trec);
@@ -793,15 +919,19 @@ StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
 
 /*......................................................................*/
 
-StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) {
+StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
   int result;
+  StgInt64 max_commits_at_start = max_commits;
+
   TRACE("%p : stmCommitTransaction()\n", trec);
   ASSERT (trec != NO_TREC);
+
+  lock_stm(trec);
+
   ASSERT (trec -> enclosing_trec == NO_TREC);
   ASSERT ((trec -> state == TREC_ACTIVE) || 
           (trec -> state == TREC_CONDEMNED));
 
-  lock_stm(trec);
   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
   if (result) {
     // We now know that all the updated locations hold their expected values.
@@ -811,6 +941,14 @@ StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) {
       TRACE("%p : doing read check\n", trec);
       result = check_read_only(trec);
       TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed");
+
+      StgInt64 max_commits_at_end = max_commits;
+      StgInt64 max_concurrent_commits;
+      max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
+                                (n_capabilities * TOKEN_BATCH_SIZE));
+      if (((max_concurrent_commits >> 32) > 0) || shake()) {
+        result = FALSE;
+      }
     }
     
     if (result) {
@@ -827,9 +965,9 @@ StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) {
 
           ACQ_ASSERT(tvar_is_locked(s, trec));
           TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
-          unpark_waiters_on(s);
+          unpark_waiters_on(cap,s);
           IF_STM_FG_LOCKS({
-            s -> last_update_by = trec;
+            s -> num_updates ++;
           });
           unlock_tvar(trec, s, e -> new_value, TRUE);
         } 
@@ -842,6 +980,8 @@ StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) {
 
   unlock_stm(trec);
 
+  free_stg_trec_header(cap, trec);
+
   TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
 
   return result;
@@ -849,7 +989,7 @@ StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) {
 
 /*......................................................................*/
 
-StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) {
+StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
   StgTRecHeader *et;
   int result;
   ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
@@ -859,7 +999,7 @@ StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) {
   lock_stm(trec);
 
   et = trec -> enclosing_trec;
-  result = validate_and_acquire_ownership(trec, FALSE, TRUE);
+  result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
   if (result) {
     // We now know that all the updated locations hold their expected values.
 
@@ -883,7 +1023,7 @@ StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) {
           if (entry_is_update(e)) {
             unlock_tvar(trec, s, e -> expected_value, FALSE);
           }
-          merge_update_into(reg, et, s, e -> expected_value, e -> new_value);
+          merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
           ACQ_ASSERT(s -> current_value != trec);
         });
       } else {
@@ -894,6 +1034,8 @@ StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) {
 
   unlock_stm(trec);
 
+  free_stg_trec_header(cap, trec);
+
   TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
 
   return result;
@@ -901,7 +1043,7 @@ StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) {
 
 /*......................................................................*/
 
-StgBool stmWait(StgRegTable *reg, StgTSO *tso, StgTRecHeader *trec) {
+StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
   int result;
   TRACE("%p : stmWait(%p)\n", trec, tso);
   ASSERT (trec != NO_TREC);
@@ -919,25 +1061,37 @@ StgBool stmWait(StgRegTable *reg, StgTSO *tso, StgTRecHeader *trec) {
     // Put ourselves to sleep.  We retain locks on all the TVars involved
     // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
     // in the TSO, (c) TREC_WAITING in the Trec.  
-    build_wait_queue_entries_for_trec(reg, tso, trec);
+    build_wait_queue_entries_for_trec(cap, tso, trec);
     park_tso(tso);
     trec -> state = TREC_WAITING;
 
-    // As soon as we start releasing ownership, another thread may find us 
-    // and wake us up.  This may happen even before we have finished 
-    // releasing ownership.
-    revert_ownership(trec, TRUE);
-  }  
+    // We haven't released ownership of the transaction yet.  The TSO
+    // has been put on the wait queue for the TVars it is waiting for,
+    // but we haven't yet tidied up the TSO's stack and made it safe
+    // to wake up the TSO.  Therefore, we must wait until the TSO is
+    // safe to wake up before we release ownership - when all is well,
+    // the runtime will call stmWaitUnlock() below, with the same
+    // TRec.
 
-  unlock_stm(trec);
+  } else {
+    unlock_stm(trec);
+    free_stg_trec_header(cap, trec);
+  }
 
   TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
   return result;
 }
 
+
+void
+stmWaitUnlock(Capability *cap STG_UNUSED, StgTRecHeader *trec) {
+    revert_ownership(trec, TRUE);
+    unlock_stm(trec);
+}
+
 /*......................................................................*/
 
-StgBool stmReWait(StgTSO *tso) {
+StgBool stmReWait(Capability *cap, StgTSO *tso) {
   int result;
   StgTRecHeader *trec = tso->trec;
 
@@ -960,9 +1114,9 @@ StgBool stmReWait(StgTSO *tso) {
     // The transcation has become invalid.  We can now remove it from the wait
     // queues.
     if (trec -> state != TREC_CONDEMNED) {
-      remove_wait_queue_entries_for_trec (trec);
+      remove_wait_queue_entries_for_trec (cap, trec);
     }
-
+    free_stg_trec_header(cap, trec);
   }
   unlock_stm(trec);
 
@@ -1011,7 +1165,7 @@ static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *t
 
 /*......................................................................*/
 
-StgClosure *stmReadTVar(StgRegTable *reg,
+StgClosure *stmReadTVar(Capability *cap,
                         StgTRecHeader *trec, 
                        StgTVar *tvar) {
   StgTRecHeader *entry_in;
@@ -1030,7 +1184,7 @@ StgClosure *stmReadTVar(StgRegTable *reg,
       result = entry -> new_value;
     } else {
       // Entry found in another trec
-      TRecEntry *new_entry = get_new_entry(reg, trec);
+      TRecEntry *new_entry = get_new_entry(cap, trec);
       new_entry -> tvar = tvar;
       new_entry -> expected_value = entry -> expected_value;
       new_entry -> new_value = entry -> new_value;
@@ -1039,7 +1193,7 @@ StgClosure *stmReadTVar(StgRegTable *reg,
   } else {
     // No entry found
     StgClosure *current_value = read_current_value(trec, tvar);
-    TRecEntry *new_entry = get_new_entry(reg, trec);
+    TRecEntry *new_entry = get_new_entry(cap, trec);
     new_entry -> tvar = tvar;
     new_entry -> expected_value = current_value;
     new_entry -> new_value = current_value;
@@ -1052,7 +1206,7 @@ StgClosure *stmReadTVar(StgRegTable *reg,
 
 /*......................................................................*/
 
-void stmWriteTVar(StgRegTable *reg,
+void stmWriteTVar(Capability *cap,
                   StgTRecHeader *trec,
                  StgTVar *tvar, 
                  StgClosure *new_value) {
@@ -1072,7 +1226,7 @@ void stmWriteTVar(StgRegTable *reg,
       entry -> new_value = new_value;
     } else {
       // Entry found in another trec
-      TRecEntry *new_entry = get_new_entry(reg, trec);
+      TRecEntry *new_entry = get_new_entry(cap, trec);
       new_entry -> tvar = tvar;
       new_entry -> expected_value = entry -> expected_value;
       new_entry -> new_value = new_value;
@@ -1080,7 +1234,7 @@ void stmWriteTVar(StgRegTable *reg,
   } else {
     // No entry found
     StgClosure *current_value = read_current_value(trec, tvar);
-    TRecEntry *new_entry = get_new_entry(reg, trec);
+    TRecEntry *new_entry = get_new_entry(cap, trec);
     new_entry -> tvar = tvar;
     new_entry -> expected_value = current_value;
     new_entry -> new_value = new_value;
@@ -1091,12 +1245,17 @@ void stmWriteTVar(StgRegTable *reg,
 
 /*......................................................................*/
 
-StgTVar *stmNewTVar(StgRegTable *reg,
+StgTVar *stmNewTVar(Capability *cap,
                     StgClosure *new_value) {
   StgTVar *result;
-  result = new_tvar(reg, new_value);
+  result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar));
+  SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
+  result -> current_value = new_value;
+  result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
+#if defined(THREADED_RTS)
+  result -> num_updates = 0;
+#endif
   return result;
 }
 
 /*......................................................................*/
-