Massive patch for the first months work adding System FC to GHC #15
[ghc-hetmet.git] / rts / STM.c
index d3283a9..01155b1 100644 (file)
--- a/rts/STM.c
+++ b/rts/STM.c
@@ -90,6 +90,7 @@
 #include "SMP.h"
 #include "STM.h"
 #include "Storage.h"
+#include "Trace.h"
 
 #include <stdlib.h>
 #include <stdio.h>
 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
 // unusualy code paths if genuine contention is rare
 
-#if defined(DEBUG)
-#define SHAKE
-#if defined(THREADED_RTS)
-#define TRACE(_x...) IF_DEBUG(stm, debugBelch("STM  (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()); debugBelch ( _x ))
-#else
-#define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
-#endif
-#else
-#define TRACE(_x...) /*Nothing*/
-#endif
+#define TRACE(_x...) debugTrace(DEBUG_stm, "STM: " _x)
 
 #ifdef SHAKE
 static const int do_shake = TRUE;
@@ -297,7 +289,8 @@ static StgClosure *lock_tvar(StgTRecHeader *trec,
     do {
       result = s -> current_value;
     } while (GET_INFO(result) == &stg_TREC_HEADER_info);
-  } while (cas(&(s -> current_value), result, trec) != result);
+  } while (cas((void *)&(s -> current_value),
+              (StgWord)result, (StgWord)trec) != (StgWord)result);
   return result;
 }
 
@@ -306,7 +299,7 @@ static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
                         StgClosure *c,
                         StgBool force_update STG_UNUSED) {
   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
-  ASSERT(s -> current_value == trec);
+  ASSERT(s -> current_value == (StgClosure *)trec);
   s -> current_value = c;
 }
 
@@ -314,8 +307,10 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec,
                               StgTVar *s,
                               StgClosure *expected) {
   StgClosure *result;
+  StgWord w;
   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
-  result = cas(&(s -> current_value), expected, trec);
+  w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
+  result = (StgClosure *)w;
   TRACE("%p : %s\n", trec, result ? "success" : "failure");
   return (result == expected);
 }
@@ -333,15 +328,21 @@ static void park_tso(StgTSO *tso) {
 }
 
 static void unpark_tso(Capability *cap, StgTSO *tso) {
-  // We will continue unparking threads while they remain on one of the wait
-  // queues: it's up to the thread itself to remove it from the wait queues
-  // if it decides to do so when it is scheduled.
-  if (tso -> why_blocked == BlockedOnSTM) {
-    TRACE("unpark_tso on tso=%p\n", tso);
-    unblockOne(cap,tso);
-  } else {
-    TRACE("spurious unpark_tso on tso=%p\n", tso);
-  }
+    // We will continue unparking threads while they remain on one of the wait
+    // queues: it's up to the thread itself to remove it from the wait queues
+    // if it decides to do so when it is scheduled.
+
+    // Unblocking a TSO from BlockedOnSTM is done under the TSO lock,
+    // to avoid multiple CPUs unblocking the same TSO, and also to
+    // synchronise with throwTo().
+    lockTSO(tso);
+    if (tso -> why_blocked == BlockedOnSTM) {
+       TRACE("unpark_tso on tso=%p\n", tso);
+       unblockOne(cap,tso);
+    } else {
+       TRACE("spurious unpark_tso on tso=%p\n", tso);
+    }
+    unlockTSO(tso);
 }
 
 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
@@ -498,7 +499,7 @@ static void build_wait_queue_entries_for_trec(Capability *cap,
     StgTVarWaitQueue *fq;
     s = e -> tvar;
     TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s);
-    ACQ_ASSERT(s -> current_value == trec);
+    ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
     NACQ_ASSERT(s -> current_value == e -> expected_value);
     fq = s -> first_wait_queue_entry;
     q = alloc_stg_tvar_wait_queue(cap, tso);
@@ -530,7 +531,7 @@ static void remove_wait_queue_entries_for_trec(Capability *cap,
     StgClosure *saw = lock_tvar(trec, s);
     q = (StgTVarWaitQueue *) (e -> new_value);
     TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s);
-    ACQ_ASSERT(s -> current_value == trec);
+    ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
     nq = q -> next_queue_entry;
     pq = q -> prev_queue_entry;
     if (nq != END_STM_WAIT_QUEUE) {
@@ -713,7 +714,7 @@ static StgBool validate_and_acquire_ownership (StgTRecHeader *trec,
             result = FALSE;
             BREAK_FOR_EACH;
           } else {
-            TRACE("%p : need to check version %d\n", trec, e -> num_updates);
+            TRACE("%p : need to check version %ld\n", trec, e -> num_updates);
           }
         });
       }
@@ -747,7 +748,7 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
       StgTVar *s;
       s = e -> tvar;
       if (entry_is_read_only(e)) {
-        TRACE("%p : check_read_only for TVar %p, saw %d\n", trec, s, e -> num_updates);
+        TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
         if (s -> num_updates != e -> num_updates) {
           // ||s -> current_value != e -> expected_value) {
           TRACE("%p : mismatch\n", trec);
@@ -792,11 +793,11 @@ void stmPreGCHook() {
 
 static volatile StgInt64 max_commits = 0;
 
+#if defined(THREADED_RTS)
 static volatile StgBool token_locked = FALSE;
 
-#if defined(THREADED_RTS)
 static void getTokenBatch(Capability *cap) {
-  while (cas(&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
+  while (cas((void *)&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
   max_commits += TOKEN_BATCH_SIZE;
   cap -> transaction_tokens = TOKEN_BATCH_SIZE;
   token_locked = FALSE;
@@ -1024,7 +1025,7 @@ StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
             unlock_tvar(trec, s, e -> expected_value, FALSE);
           }
           merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
-          ACQ_ASSERT(s -> current_value != trec);
+          ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
         });
       } else {
         revert_ownership(trec, FALSE);