e92763fbb0b69cc24c7b22d017044edb7fc442bc
[ghc-hetmet.git] / ghc / rts / STM.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 1998-2005
4  * 
5  * STM implementation.
6  *
7  * Overview
8  * --------
9  *
10  * See the PPoPP 2005 paper "Composable memory transactions".  In summary, 
11  * each transcation has a TRec (transaction record) holding entries for each of the
12  * TVars (transactional variables) that it has accessed.  Each entry records
13  * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
14  * the transaction wants to write to the TVar, (d) during commit, the identity of
15  * the TRec that wrote the expected value.  
16  *
17  * Separate TRecs are used for each level in a nest of transactions.  This allows
18  * a nested transaction to be aborted without condemning its enclosing transactions.
19  * This is needed in the implementation of catchRetry.  Note that the "expected value"
20  * in a nested transaction's TRec is the value expected to be *held in memory* if
21  * the transaction commits -- not the "new value" stored in one of the enclosing
22  * transactions.  This means that validation can be done without searching through
23  * a nest of TRecs.
24  *
25  * Concurrency control
26  * -------------------
27  *
28  * Three different concurrency control schemes can be built according to the settings
29  * in STM.h:
30  * 
31  * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
32  * In the Haskell RTS this means it is suitable only for non-SMP builds.
33  *
34  * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
35  * an invocation on the STM interface.  Note that this does not mean that 
36  * transactions are simply serialized -- the lock is only held *within* the 
37  * implementation of stmCommitTransaction, stmWait etc.
38  *
39  * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
40  * and, when committing a transaction, no locks are acquired for TVars that have
41  * been read but not updated.
42  *
43  * Concurrency control is implemented in the functions:
44  *
45  *    lock_stm
46  *    unlock_stm
47  *    lock_tvar / cond_lock_tvar
48  *    unlock_tvar
49  *
50  * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the 
51  * implementation of these functions.  
52  *
53  * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
54  * using STM_CG_LOCK, and otherwise they are no-ops.
55  *
56  * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they 
57  * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
58  * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
59  * builds).  This is because locking a TVar is implemented by writing the lock
60  * holder's TRec into the TVar's current_value field:
61  *
62  *   lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value 
63  *               it contained.
64  *
65  *   cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it 
66  *               contains a specified value.  Return TRUE if this succeeds,
67  *               FALSE otherwise.
68  *
69  *   unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
70  *               storing a specified value in place of the lock entry.
71  *
72  * Using these operations, the typcial pattern of a commit/validate/wait operation
73  * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that 
74  * the TVars that were only read from still contain their expected values, 
75  * (d) release the locks on the TVars, writing updates to them in the case of a 
76  * commit, (e) unlock the STM.
77  *
78  * Queues of waiting threads hang off the first_wait_queue_entry field of each
79  * TVar.  This may only be manipulated when holding that TVar's lock.  In
80  * particular, when a thread is putting itself to sleep, it mustn't release
81  * the TVar's lock until it has added itself to the wait queue and marked its
82  * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it.
83  *
84  * ---------------------------------------------------------------------------*/
85
86 #include "PosixSource.h"
87 #include "Rts.h"
88 #include "RtsFlags.h"
89 #include "RtsUtils.h"
90 #include "Schedule.h"
91 #include "SMP.h"
92 #include "STM.h"
93 #include "Storage.h"
94
95 #include <stdlib.h>
96 #include <stdio.h>
97
98 #define TRUE 1
99 #define FALSE 0
100
101 // ACQ_ASSERT is used for assertions which are only required for SMP builds with
102 // fine-grained locking. 
103
104 #if defined(STM_FG_LOCKS)
105 #define ACQ_ASSERT(_X) ASSERT(_X)
106 #define NACQ_ASSERT(_X) /*Nothing*/
107 #else
108 #define ACQ_ASSERT(_X) /*Nothing*/
109 #define NACQ_ASSERT(_X) ASSERT(_X)
110 #endif
111
112 /*......................................................................*/
113
114 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
115 // unusualy code paths if genuine contention is rare
116
117 #if defined(DEBUG)
118 #define SHAKE
119 #define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
120 #else
121 #define TRACE(_x...) /*Nothing*/
122 #endif
123
124 #ifdef SHAKE
125 static const int do_shake = TRUE;
126 #else
127 static const int do_shake = FALSE;
128 #endif
129 static int shake_ctr = 0;
130 static int shake_lim = 1;
131
132 static int shake(void) {
133   if (do_shake) {
134     if (((shake_ctr++) % shake_lim) == 0) {
135       shake_ctr = 1;
136       shake_lim ++;
137       return TRUE;
138     } 
139     return FALSE;
140   } else {
141     return FALSE;
142   }
143 }
144
145 /*......................................................................*/
146
147 // Helper macros for iterating over entries within a transaction
148 // record
149
150 #define FOR_EACH_ENTRY(_t,_x,CODE) do {                                         \
151   StgTRecHeader *__t = (_t);                                                    \
152   StgTRecChunk *__c = __t -> current_chunk;                                     \
153   StgWord __limit = __c -> next_entry_idx;                                      \
154   TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld\n", __t, __c, __limit); \
155   while (__c != END_STM_CHUNK_LIST) {                                           \
156     StgWord __i;                                                                \
157     for (__i = 0; __i < __limit; __i ++) {                                      \
158       TRecEntry *_x = &(__c -> entries[__i]);                                   \
159       do { CODE } while (0);                                                    \
160     }                                                                           \
161     __c = __c -> prev_chunk;                                                    \
162     __limit = TREC_CHUNK_NUM_ENTRIES;                                           \
163   }                                                                             \
164  exit_for_each:                                                                 \
165   if (FALSE) goto exit_for_each;                                                \
166 } while (0)
167
168 #define BREAK_FOR_EACH goto exit_for_each
169      
170 /*......................................................................*/
171
172 #define IF_STM_UNIPROC(__X)  do { } while (0)
173 #define IF_STM_CG_LOCK(__X)  do { } while (0)
174 #define IF_STM_FG_LOCKS(__X) do { } while (0)
175
176 #if defined(STM_UNIPROC)
177 #undef IF_STM_UNIPROC
178 #define IF_STM_UNIPROC(__X)  do { __X } while (0)
179 static const StgBool use_read_phase = FALSE;
180
181 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
182   TRACE("%p : lock_stm()\n", trec);
183 }
184
185 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
186   TRACE("%p : unlock_stm()\n", trec);
187 }
188
189 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
190                              StgTVar *s STG_UNUSED) {
191   StgClosure *result;
192   TRACE("%p : lock_tvar(%p)\n", trec, s);
193   result = s -> current_value;
194   return result;
195 }
196
197 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
198                         StgTVar *s STG_UNUSED,
199                         StgClosure *c,
200                         StgBool force_update) {
201   TRACE("%p : unlock_tvar(%p)\n", trec, s);
202   if (force_update) {
203     s -> current_value = c;
204   }
205 }
206
207 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
208                               StgTVar *s STG_UNUSED,
209                               StgClosure *expected) {
210   StgClosure *result;
211   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
212   result = s -> current_value;
213   TRACE("%p : %s\n", trec, (result == expected) ? "success" : "failure");
214   return (result == expected);
215 }
216 #endif
217
218 #if defined(STM_CG_LOCK) /*........................................*/
219
220 #undef IF_STM_CG_LOCK
221 #define IF_STM_CG_LOCK(__X)  do { __X } while (0)
222 static const StgBool use_read_phase = FALSE;
223 static volatile StgTRecHeader *smp_locked = NULL;
224
225 static void lock_stm(StgTRecHeader *trec) {
226   while (cas(&smp_locked, NULL, trec) != NULL) { }
227   TRACE("%p : lock_stm()\n", trec);
228 }
229
230 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
231   TRACE("%p : unlock_stm()\n", trec);
232   ASSERT (smp_locked == trec);
233   smp_locked = 0;
234 }
235
236 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
237                              StgTVar *s STG_UNUSED) {
238   StgClosure *result;
239   TRACE("%p : lock_tvar(%p)\n", trec, s);
240   ASSERT (smp_locked == trec);
241   result = s -> current_value;
242   return result;
243 }
244
245 static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
246                          StgTVar *s STG_UNUSED,
247                          StgClosure *c,
248                          StgBool force_update) {
249   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
250   ASSERT (smp_locked == trec);
251   if (force_update) {
252     s -> current_value = c;
253   }
254 }
255
256 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
257                                StgTVar *s STG_UNUSED,
258                                StgClosure *expected) {
259   StgClosure *result;
260   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
261   ASSERT (smp_locked == trec);
262   result = s -> current_value;
263   TRACE("%p : %d\n", result ? "success" : "failure");
264   return (result == expected);
265 }
266 #endif
267
268 #if defined(STM_FG_LOCKS) /*...................................*/
269
270 #undef IF_STM_FG_LOCKS
271 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
272 static const StgBool use_read_phase = TRUE;
273
274 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
275   TRACE("%p : lock_stm()\n", trec);
276 }
277
278 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
279   TRACE("%p : unlock_stm()\n", trec);
280 }
281
282 static StgClosure *lock_tvar(StgTRecHeader *trec, 
283                              StgTVar *s STG_UNUSED) {
284   StgClosure *result;
285   TRACE("%p : lock_tvar(%p)\n", trec, s);
286   do {
287     do {
288       result = s -> current_value;
289     } while (GET_INFO(result) == &stg_TREC_HEADER_info);
290   } while (cas(&(s -> current_value), result, trec) != result);
291   return result;
292 }
293
294 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
295                         StgTVar *s,
296                         StgClosure *c,
297                         StgBool force_update STG_UNUSED) {
298   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
299   ASSERT(s -> current_value == trec);
300   s -> current_value = c;
301 }
302
303 static StgBool cond_lock_tvar(StgTRecHeader *trec, 
304                               StgTVar *s,
305                               StgClosure *expected) {
306   StgClosure *result;
307   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
308   result = cas(&(s -> current_value), expected, trec);
309   TRACE("%p : %s\n", trec, result ? "success" : "failure");
310   return (result == expected);
311 }
312 #endif
313
314 /*......................................................................*/
315
316 // Helper functions for thread blocking and unblocking
317
318 static void park_tso(StgTSO *tso) {
319   ASSERT(tso -> why_blocked == NotBlocked);
320   tso -> why_blocked = BlockedOnSTM;
321   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
322   TRACE("park_tso on tso=%p\n", tso);
323 }
324
325 static void unpark_tso(Capability *cap, StgTSO *tso) {
326   // We will continue unparking threads while they remain on one of the wait
327   // queues: it's up to the thread itself to remove it from the wait queues
328   // if it decides to do so when it is scheduled.
329   if (tso -> why_blocked == BlockedOnSTM) {
330     TRACE("unpark_tso on tso=%p\n", tso);
331     tso -> why_blocked = NotBlocked;
332     pushOnRunQueue(cap,tso);
333   } else {
334     TRACE("spurious unpark_tso on tso=%p\n", tso);
335   }
336 }
337
338 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
339   StgTVarWaitQueue *q;
340   TRACE("unpark_waiters_on tvar=%p\n", s);
341   for (q = s -> first_wait_queue_entry; 
342        q != END_STM_WAIT_QUEUE; 
343        q = q -> next_queue_entry) {
344     unpark_tso(cap, q -> waiting_tso);
345   }
346 }
347
348 /*......................................................................*/
349
350 // Helper functions for allocation and initialization
351
352 static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap,
353                                                  StgTSO *waiting_tso) {
354   StgTVarWaitQueue *result;
355   result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue));
356   SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
357   result -> waiting_tso = waiting_tso;
358   return result;
359 }
360
361 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
362   StgTRecChunk *result;
363   result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk));
364   SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
365   result -> prev_chunk = END_STM_CHUNK_LIST;
366   result -> next_entry_idx = 0;
367   return result;
368 }
369
370 static StgTRecHeader *new_stg_trec_header(Capability *cap,
371                                           StgTRecHeader *enclosing_trec) {
372   StgTRecHeader *result;
373   result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader));
374   SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
375
376   result -> enclosing_trec = enclosing_trec;
377   result -> current_chunk = new_stg_trec_chunk(cap);
378
379   if (enclosing_trec == NO_TREC) {
380     result -> state = TREC_ACTIVE;
381   } else {
382     ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
383            enclosing_trec -> state == TREC_CONDEMNED);
384     result -> state = enclosing_trec -> state;
385   }
386
387   return result;  
388 }
389
390 static StgTVar *new_tvar(Capability *cap,
391                          StgClosure *new_value) {
392   StgTVar *result;
393   result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar));
394   SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
395   result -> current_value = new_value;
396   result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
397 #if defined(SMP)
398   result -> last_update_by = NO_TREC;
399 #endif
400   return result;
401 }
402
403 /*......................................................................*/
404
405 // Helper functions for managing waiting lists
406
407 static void build_wait_queue_entries_for_trec(Capability *cap,
408                                       StgTSO *tso, 
409                                       StgTRecHeader *trec) {
410   ASSERT(trec != NO_TREC);
411   ASSERT(trec -> enclosing_trec == NO_TREC);
412   ASSERT(trec -> state == TREC_ACTIVE);
413
414   TRACE("%p : build_wait_queue_entries_for_trec()\n", trec);
415
416   FOR_EACH_ENTRY(trec, e, {
417     StgTVar *s;
418     StgTVarWaitQueue *q;
419     StgTVarWaitQueue *fq;
420     s = e -> tvar;
421     TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s);
422     ACQ_ASSERT(s -> current_value == trec);
423     NACQ_ASSERT(s -> current_value == e -> expected_value);
424     fq = s -> first_wait_queue_entry;
425     q = new_stg_tvar_wait_queue(cap, tso);
426     q -> next_queue_entry = fq;
427     q -> prev_queue_entry = END_STM_WAIT_QUEUE;
428     if (fq != END_STM_WAIT_QUEUE) {
429       fq -> prev_queue_entry = q;
430     }
431     s -> first_wait_queue_entry = q;
432     e -> new_value = (StgClosure *) q;
433   });
434 }
435
436 static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
437   ASSERT(trec != NO_TREC);
438   ASSERT(trec -> enclosing_trec == NO_TREC);
439   ASSERT(trec -> state == TREC_WAITING ||
440          trec -> state == TREC_CONDEMNED);
441
442   TRACE("%p : remove_wait_queue_entries_for_trec()\n", trec);
443
444   FOR_EACH_ENTRY(trec, e, {
445     StgTVar *s;
446     StgTVarWaitQueue *pq;
447     StgTVarWaitQueue *nq;
448     StgTVarWaitQueue *q;
449     s = e -> tvar;
450     StgClosure *saw = lock_tvar(trec, s);
451     q = (StgTVarWaitQueue *) (e -> new_value);
452     TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s);
453     ACQ_ASSERT(s -> current_value == trec);
454     nq = q -> next_queue_entry;
455     pq = q -> prev_queue_entry;
456     if (nq != END_STM_WAIT_QUEUE) {
457       nq -> prev_queue_entry = pq;
458     }
459     if (pq != END_STM_WAIT_QUEUE) {
460       pq -> next_queue_entry = nq;
461     } else {
462       ASSERT (s -> first_wait_queue_entry == q);
463       s -> first_wait_queue_entry = nq;
464     }
465     unlock_tvar(trec, s, saw, FALSE);
466   });
467 }
468  
469 /*......................................................................*/
470  
471 static TRecEntry *get_new_entry(Capability *cap,
472                                 StgTRecHeader *t) {
473   TRecEntry *result;
474   StgTRecChunk *c;
475   int i;
476
477   c = t -> current_chunk;
478   i = c -> next_entry_idx;
479   ASSERT(c != END_STM_CHUNK_LIST);
480
481   if (i < TREC_CHUNK_NUM_ENTRIES) {
482     // Continue to use current chunk
483     result = &(c -> entries[i]);
484     c -> next_entry_idx ++;
485   } else {
486     // Current chunk is full: allocate a fresh one
487     StgTRecChunk *nc;
488     nc = new_stg_trec_chunk(cap);
489     nc -> prev_chunk = c;
490     nc -> next_entry_idx = 1;
491     t -> current_chunk = nc;
492     result = &(nc -> entries[0]);
493   }
494
495   return result;
496 }
497
498 /*......................................................................*/
499
500 static void merge_update_into(Capability *cap,
501                               StgTRecHeader *t,
502                               StgTVar *tvar,
503                               StgClosure *expected_value,
504                               StgClosure *new_value) {
505   int found;
506   
507   // Look for an entry in this trec
508   found = FALSE;
509   FOR_EACH_ENTRY(t, e, {
510     StgTVar *s;
511     s = e -> tvar;
512     if (s == tvar) {
513       found = TRUE;
514       if (e -> expected_value != expected_value) {
515         // Must abort if the two entries start from different values
516         TRACE("%p : entries inconsistent at %p (%p vs %p)\n", 
517               t, tvar, e -> expected_value, expected_value);
518         t -> state = TREC_CONDEMNED;
519       } 
520       e -> new_value = new_value;
521       BREAK_FOR_EACH;
522     }
523   });
524
525   if (!found) {
526     // No entry so far in this trec
527     TRecEntry *ne;
528     ne = get_new_entry(cap, t);
529     ne -> tvar = tvar;
530     ne -> expected_value = expected_value;
531     ne -> new_value = new_value;
532   }
533 }
534
535 /*......................................................................*/
536
537 static StgBool entry_is_update(TRecEntry *e) {
538   StgBool result;
539   result = (e -> expected_value != e -> new_value);
540   return result;
541
542
543 static StgBool entry_is_read_only(TRecEntry *e) {
544   StgBool result;
545   result = (e -> expected_value == e -> new_value);
546   return result;
547
548
549 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
550   StgClosure *c;
551   StgBool result;
552   c = s -> current_value;
553   result = (c == (StgClosure *) h);
554   return result;  
555 }
556
557 // revert_ownership : release a lock on a TVar, storing back
558 // the value that it held when the lock was acquired.  "revert_all"
559 // is set in stmWait and stmReWait when we acquired locks on all of 
560 // the TVars involved.  "revert_all" is not set in commit operations
561 // where we don't lock TVars that have been read from but not updated.
562
563 static void revert_ownership(StgTRecHeader *trec STG_UNUSED,
564                              StgBool revert_all STG_UNUSED) {
565 #if defined(STM_FG_LOCKS) 
566   FOR_EACH_ENTRY(trec, e, {
567     if (revert_all || entry_is_update(e)) {
568       StgTVar *s;
569       s = e -> tvar;
570       if (tvar_is_locked(s, trec)) {
571         unlock_tvar(trec, s, e -> expected_value, TRUE);
572       }
573     }
574   });
575 #endif
576 }
577
578 /*......................................................................*/
579
580 // validate_and_acquire_ownership : this performs the twin functions
581 // of checking that the TVars referred to by entries in trec hold the
582 // expected values and:
583 // 
584 //   - locking the TVar (on updated TVars during commit, or all TVars
585 //     during wait)
586 //
587 //   - recording the identity of the TRec who wrote the value seen in the
588 //     TVar (on non-updated TVars during commit).  These values are 
589 //     stashed in the TRec entries and are then checked in check_read_only
590 //     to ensure that an atomic snapshot of all of these locations has been
591 //     seen.
592
593 static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, 
594                                                int acquire_all,
595                                                int retain_ownership) {
596   StgBool result;
597
598   if (shake()) {
599     TRACE("%p : shake, pretending trec is invalid when it may not be\n", trec);
600     return FALSE;
601   }
602
603   ASSERT ((trec -> state == TREC_ACTIVE) || 
604           (trec -> state == TREC_WAITING) ||
605           (trec -> state == TREC_CONDEMNED));
606   result = !((trec -> state) == TREC_CONDEMNED);
607   if (result) {
608     FOR_EACH_ENTRY(trec, e, {
609       StgTVar *s;
610       s = e -> tvar;
611       if (acquire_all || entry_is_update(e)) {
612         TRACE("%p : trying to acquire %p\n", trec, s);
613         if (!cond_lock_tvar(trec, s, e -> expected_value)) {
614           TRACE("%p : failed to acquire %p\n", trec, s);
615           result = FALSE;
616           BREAK_FOR_EACH;
617         }
618       } else {
619         ASSERT(use_read_phase);
620         IF_STM_FG_LOCKS({
621           TRACE("%p : will need to check %p\n", trec, s);
622           if (s -> current_value != e -> expected_value) {
623             TRACE("%p : doesn't match\n", trec);
624             result = FALSE;
625             BREAK_FOR_EACH;
626           }
627           e -> saw_update_by = s -> last_update_by;
628           if (s -> current_value != e -> expected_value) {
629             TRACE("%p : doesn't match (race)\n", trec);
630             result = FALSE;
631             BREAK_FOR_EACH;
632           } else {
633             TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by);
634           }
635         });
636       }
637     });
638   }
639
640   if ((!result) || (!retain_ownership)) {
641     revert_ownership(trec, acquire_all);
642   }
643   
644   return result;
645 }
646
647 // check_read_only : check that we've seen an atomic snapshot of the
648 // non-updated TVars accessed by a trec.  This checks that the last TRec to
649 // commit an update to the TVar is unchanged since the value was stashed in
650 // validate_and_acquire_ownership.  If no udpate is seen to any TVar than
651 // all of them contained their expected values at the start of the call to
652 // check_read_only.
653 //
654 // The paper "Concurrent programming without locks" (under submission), or
655 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
656 // this kind of algorithm.
657
658 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
659   StgBool result = TRUE;
660
661   ASSERT (use_read_phase);
662   IF_STM_FG_LOCKS({
663     FOR_EACH_ENTRY(trec, e, {
664       StgTVar *s;
665       s = e -> tvar;
666       if (entry_is_read_only(e)) {
667         TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by);
668         if (s -> last_update_by != e -> saw_update_by) {
669           // ||s -> current_value != e -> expected_value) {
670           TRACE("%p : mismatch\n", trec);
671           result = FALSE;
672           BREAK_FOR_EACH;
673         }
674       }
675     });
676   });
677
678   return result;
679 }
680
681
682 /************************************************************************/
683
684 void stmPreGCHook() {
685   lock_stm(NO_TREC);
686   TRACE("stmPreGCHook\n");
687   unlock_stm(NO_TREC);
688 }
689
690 /************************************************************************/
691
692 void initSTM() {
693   TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
694 }
695
696 /*......................................................................*/
697
698 StgTRecHeader *stmStartTransaction(Capability *cap,
699                                    StgTRecHeader *outer) {
700   StgTRecHeader *t;
701   TRACE("%p : stmStartTransaction\n", outer);
702   t = new_stg_trec_header(cap, outer);
703   TRACE("%p : stmStartTransaction()=%p\n", outer, t);
704   return t;
705 }
706
707 /*......................................................................*/
708
709 void stmAbortTransaction(StgTRecHeader *trec) {
710   TRACE("%p : stmAbortTransaction\n", trec);
711   ASSERT (trec != NO_TREC);
712   ASSERT ((trec -> state == TREC_ACTIVE) || 
713           (trec -> state == TREC_WAITING) ||
714           (trec -> state == TREC_CONDEMNED));
715
716   lock_stm(trec);
717   if (trec -> state == TREC_WAITING) {
718     ASSERT (trec -> enclosing_trec == NO_TREC);
719     TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
720     remove_wait_queue_entries_for_trec(trec);
721   } 
722   trec -> state = TREC_ABORTED;
723   unlock_stm(trec);
724
725   TRACE("%p : stmAbortTransaction done\n", trec);
726 }
727
728 /*......................................................................*/
729
730 void stmCondemnTransaction(StgTRecHeader *trec) {
731   TRACE("%p : stmCondemnTransaction\n", trec);
732   ASSERT (trec != NO_TREC);
733   ASSERT ((trec -> state == TREC_ACTIVE) || 
734           (trec -> state == TREC_WAITING) ||
735           (trec -> state == TREC_CONDEMNED));
736
737   lock_stm(trec);
738   if (trec -> state == TREC_WAITING) {
739     ASSERT (trec -> enclosing_trec == NO_TREC);
740     TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
741     remove_wait_queue_entries_for_trec(trec);
742   } 
743   trec -> state = TREC_CONDEMNED;
744   unlock_stm(trec);
745
746   TRACE("%p : stmCondemnTransaction done\n", trec);
747 }
748
749 /*......................................................................*/
750
751 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
752   StgTRecHeader *outer;
753   TRACE("%p : stmGetEnclosingTRec\n", trec);
754   outer = trec -> enclosing_trec;
755   TRACE("%p : stmGetEnclosingTRec()=%p\n", trec, outer);
756   return outer;
757 }
758
759 /*......................................................................*/
760
761 StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
762   StgTRecHeader *t;
763   StgBool result;
764
765   TRACE("%p : stmValidateNestOfTransactions\n", trec);
766   ASSERT(trec != NO_TREC);
767   ASSERT((trec -> state == TREC_ACTIVE) || 
768          (trec -> state == TREC_WAITING) ||
769          (trec -> state == TREC_CONDEMNED));
770
771   lock_stm(trec);
772
773   t = trec;
774   result = TRUE;
775   while (t != NO_TREC) {
776     result &= validate_and_acquire_ownership(t, TRUE, FALSE);
777     t = t -> enclosing_trec;
778   }
779
780   if (!result && trec -> state != TREC_WAITING) {
781     trec -> state = TREC_CONDEMNED; 
782   }
783
784   unlock_stm(trec);
785
786   TRACE("%p : stmValidateNestOfTransactions()=%d\n", trec, result);
787   return result;
788 }
789
790 /*......................................................................*/
791
792 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
793   int result;
794
795   TRACE("%p : stmCommitTransaction()\n", trec);
796   ASSERT (trec != NO_TREC);
797   ASSERT (trec -> enclosing_trec == NO_TREC);
798   ASSERT ((trec -> state == TREC_ACTIVE) || 
799           (trec -> state == TREC_CONDEMNED));
800
801   lock_stm(trec);
802   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
803   if (result) {
804     // We now know that all the updated locations hold their expected values.
805     ASSERT (trec -> state == TREC_ACTIVE);
806
807     if (use_read_phase) {
808       TRACE("%p : doing read check\n", trec);
809       result = check_read_only(trec);
810       TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed");
811     }
812     
813     if (result) {
814       // We now know that all of the read-only locations held their exepcted values
815       // at the end of the call to validate_and_acquire_ownership.  This forms the
816       // linearization point of the commit.
817       
818       FOR_EACH_ENTRY(trec, e, {
819         StgTVar *s;
820         s = e -> tvar;
821         if (e -> new_value != e -> expected_value) {
822           // Entry is an update: write the value back to the TVar, unlocking it if
823           // necessary.
824
825           ACQ_ASSERT(tvar_is_locked(s, trec));
826           TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
827           unpark_waiters_on(cap,s);
828           IF_STM_FG_LOCKS({
829             s -> last_update_by = trec;
830           });
831           unlock_tvar(trec, s, e -> new_value, TRUE);
832         } 
833         ACQ_ASSERT(!tvar_is_locked(s, trec));
834       });
835     } else {
836       revert_ownership(trec, FALSE);
837     }
838   } 
839
840   unlock_stm(trec);
841
842   TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
843
844   return result;
845 }
846
847 /*......................................................................*/
848
849 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
850   StgTRecHeader *et;
851   int result;
852   ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
853   TRACE("%p : stmCommitNestedTransaction() into %p\n", trec, trec -> enclosing_trec);
854   ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
855
856   lock_stm(trec);
857
858   et = trec -> enclosing_trec;
859   result = validate_and_acquire_ownership(trec, FALSE, TRUE);
860   if (result) {
861     // We now know that all the updated locations hold their expected values.
862
863     if (use_read_phase) {
864       TRACE("%p : doing read check\n", trec);
865       result = check_read_only(trec);
866     }
867     if (result) {
868       // We now know that all of the read-only locations held their exepcted values
869       // at the end of the call to validate_and_acquire_ownership.  This forms the
870       // linearization point of the commit.
871
872       if (result) {
873         TRACE("%p : read-check succeeded\n", trec);
874         FOR_EACH_ENTRY(trec, e, {
875           // Merge each entry into the enclosing transaction record, release all
876           // locks.
877
878           StgTVar *s;
879           s = e -> tvar;
880           if (entry_is_update(e)) {
881             unlock_tvar(trec, s, e -> expected_value, FALSE);
882           }
883           merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
884           ACQ_ASSERT(s -> current_value != trec);
885         });
886       } else {
887         revert_ownership(trec, FALSE);
888       }
889     }
890   } 
891
892   unlock_stm(trec);
893
894   TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
895
896   return result;
897 }
898
899 /*......................................................................*/
900
901 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
902   int result;
903   TRACE("%p : stmWait(%p)\n", trec, tso);
904   ASSERT (trec != NO_TREC);
905   ASSERT (trec -> enclosing_trec == NO_TREC);
906   ASSERT ((trec -> state == TREC_ACTIVE) || 
907           (trec -> state == TREC_CONDEMNED));
908
909   lock_stm(trec);
910   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
911   if (result) {
912     // The transaction is valid so far so we can actually start waiting.
913     // (Otherwise the transaction was not valid and the thread will have to
914     // retry it).
915
916     // Put ourselves to sleep.  We retain locks on all the TVars involved
917     // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
918     // in the TSO, (c) TREC_WAITING in the Trec.  
919     build_wait_queue_entries_for_trec(cap, tso, trec);
920     park_tso(tso);
921     trec -> state = TREC_WAITING;
922
923     // As soon as we start releasing ownership, another thread may find us 
924     // and wake us up.  This may happen even before we have finished 
925     // releasing ownership.
926     revert_ownership(trec, TRUE);
927   }  
928
929   unlock_stm(trec);
930
931   TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
932   return result;
933 }
934
935 /*......................................................................*/
936
937 StgBool stmReWait(StgTSO *tso) {
938   int result;
939   StgTRecHeader *trec = tso->trec;
940
941   TRACE("%p : stmReWait\n", trec);
942   ASSERT (trec != NO_TREC);
943   ASSERT (trec -> enclosing_trec == NO_TREC);
944   ASSERT ((trec -> state == TREC_WAITING) || 
945           (trec -> state == TREC_CONDEMNED));
946
947   lock_stm(trec);
948   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
949   TRACE("%p : validation %s\n", trec, result ? "succeeded" : "failed");
950   if (result) {
951     // The transaction remains valid -- do nothing because it is already on
952     // the wait queues
953     ASSERT (trec -> state == TREC_WAITING);
954     park_tso(tso);
955     revert_ownership(trec, TRUE);
956   } else {
957     // The transcation has become invalid.  We can now remove it from the wait
958     // queues.
959     if (trec -> state != TREC_CONDEMNED) {
960       remove_wait_queue_entries_for_trec (trec);
961     }
962
963   }
964   unlock_stm(trec);
965
966   TRACE("%p : stmReWait()=%d\n", trec, result);
967   return result;
968 }
969
970 /*......................................................................*/
971
972 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
973   TRecEntry *result = NULL;
974
975   TRACE("%p : get_entry_for TVar %p\n", trec, tvar);
976   ASSERT(trec != NO_TREC);
977
978   do {
979     FOR_EACH_ENTRY(trec, e, {
980       if (e -> tvar == tvar) {
981         result = e;
982         if (in != NULL) {
983           *in = trec;
984         }
985         BREAK_FOR_EACH;
986       }
987     });
988     trec = trec -> enclosing_trec;
989   } while (result == NULL && trec != NO_TREC);
990
991   return result;    
992 }
993
994 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
995   StgClosure *result;
996   result = tvar -> current_value;
997
998 #if defined(STM_FG_LOCKS)
999   while (GET_INFO(result) == &stg_TREC_HEADER_info) {
1000     TRACE("%p : read_current_value(%p) saw %p\n", trec, tvar, result);
1001     result = tvar -> current_value;
1002   }
1003 #endif
1004
1005   TRACE("%p : read_current_value(%p)=%p\n", trec, tvar, result);
1006   return result;
1007 }
1008
1009 /*......................................................................*/
1010
1011 StgClosure *stmReadTVar(Capability *cap,
1012                         StgTRecHeader *trec, 
1013                         StgTVar *tvar) {
1014   StgTRecHeader *entry_in;
1015   StgClosure *result = NULL;
1016   TRecEntry *entry = NULL;
1017   TRACE("%p : stmReadTVar(%p)\n", trec, tvar);
1018   ASSERT (trec != NO_TREC);
1019   ASSERT (trec -> state == TREC_ACTIVE || 
1020           trec -> state == TREC_CONDEMNED);
1021
1022   entry = get_entry_for(trec, tvar, &entry_in);
1023
1024   if (entry != NULL) {
1025     if (entry_in == trec) {
1026       // Entry found in our trec
1027       result = entry -> new_value;
1028     } else {
1029       // Entry found in another trec
1030       TRecEntry *new_entry = get_new_entry(cap, trec);
1031       new_entry -> tvar = tvar;
1032       new_entry -> expected_value = entry -> expected_value;
1033       new_entry -> new_value = entry -> new_value;
1034       result = new_entry -> new_value;
1035     } 
1036   } else {
1037     // No entry found
1038     StgClosure *current_value = read_current_value(trec, tvar);
1039     TRecEntry *new_entry = get_new_entry(cap, trec);
1040     new_entry -> tvar = tvar;
1041     new_entry -> expected_value = current_value;
1042     new_entry -> new_value = current_value;
1043     result = current_value;
1044   }
1045
1046   TRACE("%p : stmReadTVar(%p)=%p\n", trec, tvar, result);
1047   return result;
1048 }
1049
1050 /*......................................................................*/
1051
1052 void stmWriteTVar(Capability *cap,
1053                   StgTRecHeader *trec,
1054                   StgTVar *tvar, 
1055                   StgClosure *new_value) {
1056
1057   StgTRecHeader *entry_in;
1058   TRecEntry *entry = NULL;
1059   TRACE("%p : stmWriteTVar(%p, %p)\n", trec, tvar, new_value);
1060   ASSERT (trec != NO_TREC);
1061   ASSERT (trec -> state == TREC_ACTIVE || 
1062           trec -> state == TREC_CONDEMNED);
1063
1064   entry = get_entry_for(trec, tvar, &entry_in);
1065
1066   if (entry != NULL) {
1067     if (entry_in == trec) {
1068       // Entry found in our trec
1069       entry -> new_value = new_value;
1070     } else {
1071       // Entry found in another trec
1072       TRecEntry *new_entry = get_new_entry(cap, trec);
1073       new_entry -> tvar = tvar;
1074       new_entry -> expected_value = entry -> expected_value;
1075       new_entry -> new_value = new_value;
1076     } 
1077   } else {
1078     // No entry found
1079     StgClosure *current_value = read_current_value(trec, tvar);
1080     TRecEntry *new_entry = get_new_entry(cap, trec);
1081     new_entry -> tvar = tvar;
1082     new_entry -> expected_value = current_value;
1083     new_entry -> new_value = new_value;
1084   }
1085
1086   TRACE("%p : stmWriteTVar done\n", trec);
1087 }
1088
1089 /*......................................................................*/
1090
1091 StgTVar *stmNewTVar(Capability *cap,
1092                     StgClosure *new_value) {
1093   StgTVar *result;
1094   result = new_tvar(cap, new_value);
1095   return result;
1096 }
1097
1098 /*......................................................................*/
1099