New implementation of BLACKHOLEs
[ghc-hetmet.git] / rts / STM.c
index 6bf20f9..f98d201 100644 (file)
--- a/rts/STM.c
+++ b/rts/STM.c
  * (d) release the locks on the TVars, writing updates to them in the case of a 
  * commit, (e) unlock the STM.
  *
- * Queues of waiting threads hang off the first_watch_queue_entry field of each
- * TVar.  This may only be manipulated when holding that TVar's lock.  In
- * particular, when a thread is putting itself to sleep, it mustn't release
- * the TVar's lock until it has added itself to the wait queue and marked its
- * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it.
+ * Queues of waiting threads hang off the first_watch_queue_entry
+ * field of each TVar.  This may only be manipulated when holding that
+ * TVar's lock.  In particular, when a thread is putting itself to
+ * sleep, it mustn't release the TVar's lock until it has added itself
+ * to the wait queue and marked its TSO as BlockedOnSTM -- this makes
+ * sure that other threads will know to wake it.
  *
  * ---------------------------------------------------------------------------*/
 
 #include "PosixSource.h"
 #include "Rts.h"
-#include "RtsFlags.h"
+
 #include "RtsUtils.h"
 #include "Schedule.h"
-#include "SMP.h"
 #include "STM.h"
-#include "Storage.h"
 #include "Trace.h"
+#include "Threads.h"
 
-#include <stdlib.h>
 #include <stdio.h>
 
 #define TRUE 1
@@ -306,7 +305,7 @@ static StgClosure *lock_tvar(StgTRecHeader *trec,
   do {
     do {
       result = s -> current_value;
-    } while (GET_INFO(result) == &stg_TREC_HEADER_info);
+    } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
   } while (cas((void *)&(s -> current_value),
               (StgWord)result, (StgWord)trec) != (StgWord)result);
   return result;
@@ -353,8 +352,7 @@ static StgBool watcher_is_tso(StgTVarWatchQueue *q) {
 
 static StgBool watcher_is_invariant(StgTVarWatchQueue *q) {
   StgClosure *c = q -> closure;
-  StgInfoTable *info = get_itbl(c);
-  return (info -> type) == ATOMIC_INVARIANT;
+  return (c->header.info == &stg_ATOMIC_INVARIANT_info);
 }
 
 /*......................................................................*/
@@ -379,7 +377,7 @@ static void unpark_tso(Capability *cap, StgTSO *tso) {
     lockTSO(tso);
     if (tso -> why_blocked == BlockedOnSTM) {
        TRACE("unpark_tso on tso=%p", tso);
-       unblockOne(cap,tso);
+        tryWakeupThread(cap,tso);
     } else {
        TRACE("spurious unpark_tso on tso=%p", tso);
     }
@@ -388,10 +386,18 @@ static void unpark_tso(Capability *cap, StgTSO *tso) {
 
 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
   StgTVarWatchQueue *q;
+  StgTVarWatchQueue *trail;
   TRACE("unpark_waiters_on tvar=%p", s);
-  for (q = s -> first_watch_queue_entry; 
-       q != END_STM_WATCH_QUEUE; 
+  // unblock TSOs in reverse order, to be a bit fairer (#2319)
+  for (q = s -> first_watch_queue_entry, trail = q;
+       q != END_STM_WATCH_QUEUE;
        q = q -> next_queue_entry) {
+    trail = q;
+  }
+  q = trail;
+  for (;
+       q != END_STM_WATCH_QUEUE; 
+       q = q -> prev_queue_entry) {
     if (watcher_is_tso(q)) {
       unpark_tso(cap, (StgTSO *)(q -> closure));
     }
@@ -405,7 +411,7 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) {
 static StgInvariantCheckQueue *new_stg_invariant_check_queue(Capability *cap,
                                                             StgAtomicInvariant *invariant) {
   StgInvariantCheckQueue *result;
-  result = (StgInvariantCheckQueue *)allocateLocal(cap, sizeofW(StgInvariantCheckQueue));
+  result = (StgInvariantCheckQueue *)allocate(cap, sizeofW(StgInvariantCheckQueue));
   SET_HDR (result, &stg_INVARIANT_CHECK_QUEUE_info, CCS_SYSTEM);
   result -> invariant = invariant;
   result -> my_execution = NO_TREC;
@@ -415,7 +421,7 @@ static StgInvariantCheckQueue *new_stg_invariant_check_queue(Capability *cap,
 static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap,
                                                   StgClosure *closure) {
   StgTVarWatchQueue *result;
-  result = (StgTVarWatchQueue *)allocateLocal(cap, sizeofW(StgTVarWatchQueue));
+  result = (StgTVarWatchQueue *)allocate(cap, sizeofW(StgTVarWatchQueue));
   SET_HDR (result, &stg_TVAR_WATCH_QUEUE_info, CCS_SYSTEM);
   result -> closure = closure;
   return result;
@@ -423,7 +429,7 @@ static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap,
 
 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
   StgTRecChunk *result;
-  result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk));
+  result = (StgTRecChunk *)allocate(cap, sizeofW(StgTRecChunk));
   SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
   result -> prev_chunk = END_STM_CHUNK_LIST;
   result -> next_entry_idx = 0;
@@ -433,7 +439,7 @@ static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
 static StgTRecHeader *new_stg_trec_header(Capability *cap,
                                           StgTRecHeader *enclosing_trec) {
   StgTRecHeader *result;
-  result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader));
+  result = (StgTRecHeader *) allocate(cap, sizeofW(StgTRecHeader));
   SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
 
   result -> enclosing_trec = enclosing_trec;
@@ -596,8 +602,9 @@ static void remove_watch_queue_entries_for_trec(Capability *cap,
     StgTVarWatchQueue *pq;
     StgTVarWatchQueue *nq;
     StgTVarWatchQueue *q;
+    StgClosure *saw;
     s = e -> tvar;
-    StgClosure *saw = lock_tvar(trec, s);
+    saw = lock_tvar(trec, s);
     q = (StgTVarWatchQueue *) (e -> new_value);
     TRACE("%p : removing tso=%p from watch queue for tvar=%p", 
          trec, 
@@ -906,7 +913,7 @@ static volatile StgBool token_locked = FALSE;
 static void getTokenBatch(Capability *cap) {
   while (cas((void *)&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
   max_commits += TOKEN_BATCH_SIZE;
-  TRACE("%p : cap got token batch, max_commits=%lld", cap, max_commits);
+  TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits);
   cap -> transaction_tokens = TOKEN_BATCH_SIZE;
   token_locked = FALSE;
 }
@@ -943,6 +950,7 @@ StgTRecHeader *stmStartTransaction(Capability *cap,
 
 void stmAbortTransaction(Capability *cap,
                          StgTRecHeader *trec) {
+  StgTRecHeader *et;
   TRACE("%p : stmAbortTransaction", trec);
   ASSERT (trec != NO_TREC);
   ASSERT ((trec -> state == TREC_ACTIVE) || 
@@ -951,7 +959,7 @@ void stmAbortTransaction(Capability *cap,
 
   lock_stm(trec);
 
-  StgTRecHeader *et = trec -> enclosing_trec;
+  et = trec -> enclosing_trec;
   if (et == NO_TREC) {
     // We're a top-level transaction: remove any watch queue entries that
     // we may have.
@@ -1017,16 +1025,6 @@ void stmCondemnTransaction(Capability *cap,
 
 /*......................................................................*/
 
-StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
-  StgTRecHeader *outer;
-  TRACE("%p : stmGetEnclosingTRec", trec);
-  outer = trec -> enclosing_trec;
-  TRACE("%p : stmGetEnclosingTRec()=%p", trec, outer);
-  return outer;
-}
-
-/*......................................................................*/
-
 StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
   StgTRecHeader *t;
   StgBool result;
@@ -1165,18 +1163,18 @@ static void connect_invariant_to_trec(Capability *cap,
 void stmAddInvariantToCheck(Capability *cap, 
                            StgTRecHeader *trec,
                            StgClosure *code) {
+  StgAtomicInvariant *invariant;
+  StgInvariantCheckQueue *q;
   TRACE("%p : stmAddInvariantToCheck closure=%p", trec, code);
   ASSERT(trec != NO_TREC);
   ASSERT(trec -> state == TREC_ACTIVE ||
         trec -> state == TREC_CONDEMNED);
 
-  StgAtomicInvariant *invariant;
-  StgInvariantCheckQueue *q;
 
   // 1. Allocate an StgAtomicInvariant, set last_execution to NO_TREC
   //    to signal that this is a new invariant in the current atomic block
 
-  invariant = (StgAtomicInvariant *) allocateLocal(cap, sizeofW(StgAtomicInvariant));
+  invariant = (StgAtomicInvariant *) allocate(cap, sizeofW(StgAtomicInvariant));
   TRACE("%p : stmAddInvariantToCheck allocated invariant=%p", trec, invariant);
   SET_HDR (invariant, &stg_ATOMIC_INVARIANT_info, CCS_SYSTEM);
   invariant -> code = code;
@@ -1200,6 +1198,7 @@ void stmAddInvariantToCheck(Capability *cap,
  */
 
 StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *trec) {
+  StgTRecChunk *c;
   TRACE("%p : stmGetInvariantsToCheck, head was %p", 
        trec,
        trec -> invariants_to_check);
@@ -1211,7 +1210,7 @@ StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *
   ASSERT(trec -> enclosing_trec == NO_TREC);
 
   lock_stm(trec);
-  StgTRecChunk *c = trec -> current_chunk;
+  c = trec -> current_chunk;
   while (c != END_STM_CHUNK_LIST) {
     unsigned int i;
     for (i = 0; i < c -> next_entry_idx; i ++) {
@@ -1223,15 +1222,15 @@ StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *
        // Pick up any invariants on the TVar being updated
        // by entry "e"
 
-       TRACE("%p : checking for invariants on %p", trec, s);
        StgTVarWatchQueue *q;
+       TRACE("%p : checking for invariants on %p", trec, s);
        for (q = s -> first_watch_queue_entry;
             q != END_STM_WATCH_QUEUE;
             q = q -> next_queue_entry) {
          if (watcher_is_invariant(q)) {
-           TRACE("%p : Touching invariant %p", trec, q -> closure);
            StgBool found = FALSE;
            StgInvariantCheckQueue *q2;
+           TRACE("%p : Touching invariant %p", trec, q -> closure);
            for (q2 = trec -> invariants_to_check;
                 q2 != END_INVARIANT_CHECK_QUEUE;
                 q2 = q2 -> next_queue_entry) {
@@ -1243,8 +1242,8 @@ StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *
            }
            
            if (!found) {
-             TRACE("%p : Not already found %p", trec, q -> closure);
              StgInvariantCheckQueue *q3;
+             TRACE("%p : Not already found %p", trec, q -> closure);
              q3 = alloc_stg_invariant_check_queue(cap,
                                                   (StgAtomicInvariant*) q -> closure);
              q3 -> next_queue_entry = trec -> invariants_to_check;
@@ -1273,6 +1272,8 @@ StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *
 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
   int result;
   StgInt64 max_commits_at_start = max_commits;
+  StgBool touched_invariants;
+  StgBool use_read_phase;
 
   TRACE("%p : stmCommitTransaction()", trec);
   ASSERT (trec != NO_TREC);
@@ -1286,7 +1287,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
   // touched_invariants is true if we've written to a TVar with invariants 
   // attached to it, or if we're trying to add a new invariant to the system.
 
-  StgBool touched_invariants = (trec -> invariants_to_check != END_INVARIANT_CHECK_QUEUE);
+  touched_invariants = (trec -> invariants_to_check != END_INVARIANT_CHECK_QUEUE);
 
   // If we have touched invariants then (i) lock the invariant, and (ii) add
   // the invariant's read set to our own.  Step (i) is needed to serialize
@@ -1298,18 +1299,20 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
   // invariant from both tvars).
 
   if (touched_invariants) {
-    TRACE("%p : locking invariants", trec);
     StgInvariantCheckQueue *q = trec -> invariants_to_check;
+    TRACE("%p : locking invariants", trec);
     while (q != END_INVARIANT_CHECK_QUEUE) {
+      StgTRecHeader *inv_old_trec;
+      StgAtomicInvariant *inv;
       TRACE("%p : locking invariant %p", trec, q -> invariant);
-      StgAtomicInvariant *inv = q -> invariant;
+      inv = q -> invariant;
       if (!lock_inv(inv)) {
         TRACE("%p : failed to lock %p", trec, inv);
         trec -> state = TREC_CONDEMNED;
         break;
       }
 
-      StgTRecHeader *inv_old_trec = inv -> last_execution;
+      inv_old_trec = inv -> last_execution;
       if (inv_old_trec != NO_TREC) {
        StgTRecChunk *c = inv_old_trec -> current_chunk;
        while (c != END_STM_CHUNK_LIST) {
@@ -1336,7 +1339,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
   // invariants and TVars are managed by the TVar watch queues which are
   // protected by the TVar's locks.
 
-  StgBool use_read_phase = ((config_use_read_phase) && (!touched_invariants));
+  use_read_phase = ((config_use_read_phase) && (!touched_invariants));
 
   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
   if (result) {
@@ -1344,12 +1347,13 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
     ASSERT (trec -> state == TREC_ACTIVE);
 
     if (use_read_phase) {
+      StgInt64 max_commits_at_end;
+      StgInt64 max_concurrent_commits;
       TRACE("%p : doing read check", trec);
       result = check_read_only(trec);
       TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
 
-      StgInt64 max_commits_at_end = max_commits;
-      StgInt64 max_concurrent_commits;
+      max_commits_at_end = max_commits;
       max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
                                 (n_capabilities * TOKEN_BATCH_SIZE));
       if (((max_concurrent_commits >> 32) > 0) || shake()) {
@@ -1557,7 +1561,7 @@ static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *t
   result = tvar -> current_value;
 
 #if defined(STM_FG_LOCKS)
-  while (GET_INFO(result) == &stg_TREC_HEADER_info) {
+  while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) {
     TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
     result = tvar -> current_value;
   }
@@ -1572,7 +1576,7 @@ static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *t
 StgClosure *stmReadTVar(Capability *cap,
                         StgTRecHeader *trec, 
                        StgTVar *tvar) {
-  StgTRecHeader *entry_in;
+  StgTRecHeader *entry_in = NULL;
   StgClosure *result = NULL;
   TRecEntry *entry = NULL;
   TRACE("%p : stmReadTVar(%p)", trec, tvar);
@@ -1615,7 +1619,7 @@ void stmWriteTVar(Capability *cap,
                  StgTVar *tvar, 
                  StgClosure *new_value) {
 
-  StgTRecHeader *entry_in;
+  StgTRecHeader *entry_in = NULL;
   TRecEntry *entry = NULL;
   TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
   ASSERT (trec != NO_TREC);
@@ -1652,7 +1656,7 @@ void stmWriteTVar(Capability *cap,
 StgTVar *stmNewTVar(Capability *cap,
                     StgClosure *new_value) {
   StgTVar *result;
-  result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar));
+  result = (StgTVar *)allocate(cap, sizeofW(StgTVar));
   SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
   result -> current_value = new_value;
   result -> first_watch_queue_entry = END_STM_WATCH_QUEUE;