74894ecf53821e8eff26495b24373403912b1f41
[ghc-hetmet.git] / ghc / rts / STM.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 1998-2005
4  * 
5  * STM implementation.
6  *
7  * Overview
8  * --------
9  *
10  * See the PPoPP 2005 paper "Composable memory transactions".  In summary, 
11  * each transcation has a TRec (transaction record) holding entries for each of the
12  * TVars (transactional variables) that it has accessed.  Each entry records
13  * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
14  * the transaction wants to write to the TVar, (d) during commit, the identity of
15  * the TRec that wrote the expected value.  
16  *
17  * Separate TRecs are used for each level in a nest of transactions.  This allows
18  * a nested transaction to be aborted without condemning its enclosing transactions.
19  * This is needed in the implementation of catchRetry.  Note that the "expected value"
20  * in a nested transaction's TRec is the value expected to be *held in memory* if
21  * the transaction commits -- not the "new value" stored in one of the enclosing
22  * transactions.  This means that validation can be done without searching through
23  * a nest of TRecs.
24  *
25  * Concurrency control
26  * -------------------
27  *
28  * Three different concurrency control schemes can be built according to the settings
29  * in STM.h:
30  * 
31  * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
32  * In the Haskell RTS this means it is suitable only for non-SMP builds.
33  *
34  * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
35  * an invocation on the STM interface.  Note that this does not mean that 
36  * transactions are simply serialized -- the lock is only held *within* the 
37  * implementation of stmCommitTransaction, stmWait etc.
38  *
39  * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
40  * and, when committing a transaction, no locks are acquired for TVars that have
41  * been read but not updated.
42  *
43  * Concurrency control is implemented in the functions:
44  *
45  *    lock_stm
46  *    unlock_stm
47  *    lock_tvar / cond_lock_tvar
48  *    unlock_tvar
49  *
50  * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the 
51  * implementation of these functions.  
52  *
53  * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
54  * using STM_CG_LOCK, and otherwise they are no-ops.
55  *
56  * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they 
57  * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
58  * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
59  * builds).  This is because locking a TVar is implemented by writing the lock
60  * holder's TRec into the TVar's current_value field:
61  *
62  *   lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value 
63  *               it contained.
64  *
65  *   cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it 
66  *               contains a specified value.  Return TRUE if this succeeds,
67  *               FALSE otherwise.
68  *
69  *   unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
70  *               storing a specified value in place of the lock entry.
71  *
72  * Using these operations, the typcial pattern of a commit/validate/wait operation
73  * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that 
74  * the TVars that were only read from still contain their expected values, 
75  * (d) release the locks on the TVars, writing updates to them in the case of a 
76  * commit, (e) unlock the STM.
77  *
78  * Queues of waiting threads hang off the first_wait_queue_entry field of each
79  * TVar.  This may only be manipulated when holding that TVar's lock.  In
80  * particular, when a thread is putting itself to sleep, it mustn't release
81  * the TVar's lock until it has added itself to the wait queue and marked its
82  * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it.
83  *
84  * ---------------------------------------------------------------------------*/
85
86 #include "PosixSource.h"
87 #include "Rts.h"
88 #include "RtsFlags.h"
89 #include "RtsUtils.h"
90 #include "Schedule.h"
91 #include "SMP.h"
92 #include "STM.h"
93 #include "Storage.h"
94
95 #include <stdlib.h>
96 #include <stdio.h>
97
98 #define TRUE 1
99 #define FALSE 0
100
101 // ACQ_ASSERT is used for assertions which are only required for SMP builds with
102 // fine-grained locking. 
103
104 #if defined(STM_FG_LOCKS)
105 #define ACQ_ASSERT(_X) ASSERT(_X)
106 #define NACQ_ASSERT(_X) /*Nothing*/
107 #else
108 #define ACQ_ASSERT(_X) /*Nothing*/
109 #define NACQ_ASSERT(_X) ASSERT(_X)
110 #endif
111
112 /*......................................................................*/
113
114 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
115 // unusualy code paths if genuine contention is rare
116
117 #if defined(DEBUG)
118 #define SHAKE
119 #define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
120 #else
121 #define TRACE(_x...) /*Nothing*/
122 #endif
123
124 #ifdef SHAKE
125 static const int do_shake = TRUE;
126 #else
127 static const int do_shake = FALSE;
128 #endif
129 static int shake_ctr = 0;
130 static int shake_lim = 1;
131
132 static int shake(void) {
133   if (do_shake) {
134     if (((shake_ctr++) % shake_lim) == 0) {
135       shake_ctr = 1;
136       shake_lim ++;
137       return TRUE;
138     } 
139     return FALSE;
140   } else {
141     return FALSE;
142   }
143 }
144
145 /*......................................................................*/
146
147 // Helper macros for iterating over entries within a transaction
148 // record
149
150 #define FOR_EACH_ENTRY(_t,_x,CODE) do {                                         \
151   StgTRecHeader *__t = (_t);                                                    \
152   StgTRecChunk *__c = __t -> current_chunk;                                     \
153   StgWord __limit = __c -> next_entry_idx;                                      \
154   TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld\n", __t, __c, __limit); \
155   while (__c != END_STM_CHUNK_LIST) {                                           \
156     StgWord __i;                                                                \
157     for (__i = 0; __i < __limit; __i ++) {                                      \
158       TRecEntry *_x = &(__c -> entries[__i]);                                   \
159       do { CODE } while (0);                                                    \
160     }                                                                           \
161     __c = __c -> prev_chunk;                                                    \
162     __limit = TREC_CHUNK_NUM_ENTRIES;                                           \
163   }                                                                             \
164  exit_for_each:                                                                 \
165   if (FALSE) goto exit_for_each;                                                \
166 } while (0)
167
168 #define BREAK_FOR_EACH goto exit_for_each
169      
170 /*......................................................................*/
171
172 #define IF_STM_UNIPROC(__X)  do { } while (0)
173 #define IF_STM_CG_LOCK(__X)  do { } while (0)
174 #define IF_STM_FG_LOCKS(__X) do { } while (0)
175
176 #if defined(STM_UNIPROC)
177 #undef IF_STM_UNIPROC
178 #define IF_STM_UNIPROC(__X)  do { __X } while (0)
179 static const StgBool use_read_phase = FALSE;
180
181 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
182   TRACE("%p : lock_stm()\n", trec);
183 }
184
185 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
186   TRACE("%p : unlock_stm()\n", trec);
187 }
188
189 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
190                              StgTVar *s STG_UNUSED) {
191   StgClosure *result;
192   TRACE("%p : lock_tvar(%p)\n", trec, s);
193   result = s -> current_value;
194   return result;
195 }
196
197 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
198                         StgTVar *s STG_UNUSED,
199                         StgClosure *c,
200                         StgBool force_update) {
201   TRACE("%p : unlock_tvar(%p)\n", trec, s);
202   if (force_update) {
203     s -> current_value = c;
204   }
205 }
206
207 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
208                               StgTVar *s STG_UNUSED,
209                               StgClosure *expected) {
210   StgClosure *result;
211   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
212   result = s -> current_value;
213   TRACE("%p : %s\n", trec, (result == expected) ? "success" : "failure");
214   return (result == expected);
215 }
216 #endif
217
218 #if defined(STM_CG_LOCK) /*........................................*/
219
220 #undef IF_STM_CG_LOCK
221 #define IF_STM_CG_LOCK(__X)  do { __X } while (0)
222 static const StgBool use_read_phase = FALSE;
223 static volatile StgTRecHeader *smp_locked = NULL;
224
225 static void lock_stm(StgTRecHeader *trec) {
226   while (cas(&smp_locked, NULL, trec) != NULL) { }
227   TRACE("%p : lock_stm()\n", trec);
228 }
229
230 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
231   TRACE("%p : unlock_stm()\n", trec);
232   ASSERT (smp_locked == trec);
233   smp_locked = 0;
234 }
235
236 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
237                              StgTVar *s STG_UNUSED) {
238   StgClosure *result;
239   TRACE("%p : lock_tvar(%p)\n", trec, s);
240   ASSERT (smp_locked == trec);
241   result = s -> current_value;
242   return result;
243 }
244
245 static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
246                          StgTVar *s STG_UNUSED,
247                          StgClosure *c,
248                          StgBool force_update) {
249   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
250   ASSERT (smp_locked == trec);
251   if (force_update) {
252     s -> current_value = c;
253   }
254 }
255
256 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
257                                StgTVar *s STG_UNUSED,
258                                StgClosure *expected) {
259   StgClosure *result;
260   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
261   ASSERT (smp_locked == trec);
262   result = s -> current_value;
263   TRACE("%p : %d\n", result ? "success" : "failure");
264   return (result == expected);
265 }
266 #endif
267
268 #if defined(STM_FG_LOCKS) /*...................................*/
269
270 #undef IF_STM_FG_LOCKS
271 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
272 static const StgBool use_read_phase = TRUE;
273
274 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
275   TRACE("%p : lock_stm()\n", trec);
276 }
277
278 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
279   TRACE("%p : unlock_stm()\n", trec);
280 }
281
282 static StgClosure *lock_tvar(StgTRecHeader *trec, 
283                              StgTVar *s STG_UNUSED) {
284   StgClosure *result;
285   TRACE("%p : lock_tvar(%p)\n", trec, s);
286   do {
287     do {
288       result = s -> current_value;
289     } while (GET_INFO(result) == &stg_TREC_HEADER_info);
290   } while (cas(&(s -> current_value), result, trec) != result);
291   return result;
292 }
293
294 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
295                         StgTVar *s,
296                         StgClosure *c,
297                         StgBool force_update STG_UNUSED) {
298   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
299   ASSERT(s -> current_value == trec);
300   s -> current_value = c;
301 }
302
303 static StgBool cond_lock_tvar(StgTRecHeader *trec, 
304                               StgTVar *s,
305                               StgClosure *expected) {
306   StgClosure *result;
307   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
308   result = cas(&(s -> current_value), expected, trec);
309   TRACE("%p : %s\n", trec, result ? "success" : "failure");
310   return (result == expected);
311 }
312 #endif
313
314 /*......................................................................*/
315
316 // Helper functions for thread blocking and unblocking
317
318 static void park_tso(StgTSO *tso) {
319   ACQUIRE_LOCK(&sched_mutex);
320   ASSERT(tso -> why_blocked == NotBlocked);
321   tso -> why_blocked = BlockedOnSTM;
322   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
323   RELEASE_LOCK(&sched_mutex);
324   TRACE("park_tso on tso=%p\n", tso);
325 }
326
327 static void unpark_tso(StgTSO *tso) {
328   // We will continue unparking threads while they remain on one of the wait
329   // queues: it's up to the thread itself to remove it from the wait queues
330   // if it decides to do so when it is scheduled.
331   if (tso -> why_blocked == BlockedOnSTM) {
332     TRACE("unpark_tso on tso=%p\n", tso);
333     ACQUIRE_LOCK(&sched_mutex);
334     tso -> why_blocked = NotBlocked;
335     PUSH_ON_RUN_QUEUE(tso);
336     RELEASE_LOCK(&sched_mutex);
337   } else {
338     TRACE("spurious unpark_tso on tso=%p\n", tso);
339   }
340 }
341
342 static void unpark_waiters_on(StgTVar *s) {
343   StgTVarWaitQueue *q;
344   TRACE("unpark_waiters_on tvar=%p\n", s);
345   for (q = s -> first_wait_queue_entry; 
346        q != END_STM_WAIT_QUEUE; 
347        q = q -> next_queue_entry) {
348     unpark_tso(q -> waiting_tso);
349   }
350 }
351
352 /*......................................................................*/
353
354 // Helper functions for allocation and initialization
355
356 static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgRegTable *reg,
357                                                  StgTSO *waiting_tso) {
358   StgTVarWaitQueue *result;
359   result = (StgTVarWaitQueue *)allocateLocal(reg, sizeofW(StgTVarWaitQueue));
360   SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
361   result -> waiting_tso = waiting_tso;
362   return result;
363 }
364
365 static StgTRecChunk *new_stg_trec_chunk(StgRegTable *reg) {
366   StgTRecChunk *result;
367   result = (StgTRecChunk *)allocateLocal(reg, sizeofW(StgTRecChunk));
368   SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
369   result -> prev_chunk = END_STM_CHUNK_LIST;
370   result -> next_entry_idx = 0;
371   return result;
372 }
373
374 static StgTRecHeader *new_stg_trec_header(StgRegTable *reg,
375                                           StgTRecHeader *enclosing_trec) {
376   StgTRecHeader *result;
377   result = (StgTRecHeader *) allocateLocal(reg, sizeofW(StgTRecHeader));
378   SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
379
380   result -> enclosing_trec = enclosing_trec;
381   result -> current_chunk = new_stg_trec_chunk(reg);
382
383   if (enclosing_trec == NO_TREC) {
384     result -> state = TREC_ACTIVE;
385   } else {
386     ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
387            enclosing_trec -> state == TREC_CONDEMNED);
388     result -> state = enclosing_trec -> state;
389   }
390
391   return result;  
392 }
393
394 static StgTVar *new_tvar(StgRegTable *reg,
395                          StgClosure *new_value) {
396   StgTVar *result;
397   result = (StgTVar *)allocateLocal(reg, sizeofW(StgTVar));
398   SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
399   result -> current_value = new_value;
400   result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
401 #if defined(SMP)
402   result -> last_update_by = NO_TREC;
403 #endif
404   return result;
405 }
406
407 /*......................................................................*/
408
409 // Helper functions for managing waiting lists
410
411 static void build_wait_queue_entries_for_trec(StgRegTable *reg,
412                                       StgTSO *tso, 
413                                       StgTRecHeader *trec) {
414   ASSERT(trec != NO_TREC);
415   ASSERT(trec -> enclosing_trec == NO_TREC);
416   ASSERT(trec -> state == TREC_ACTIVE);
417
418   TRACE("%p : build_wait_queue_entries_for_trec()\n", trec);
419
420   FOR_EACH_ENTRY(trec, e, {
421     StgTVar *s;
422     StgTVarWaitQueue *q;
423     StgTVarWaitQueue *fq;
424     s = e -> tvar;
425     TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s);
426     ACQ_ASSERT(s -> current_value == trec);
427     NACQ_ASSERT(s -> current_value == e -> expected_value);
428     fq = s -> first_wait_queue_entry;
429     q = new_stg_tvar_wait_queue(reg, tso);
430     q -> next_queue_entry = fq;
431     q -> prev_queue_entry = END_STM_WAIT_QUEUE;
432     if (fq != END_STM_WAIT_QUEUE) {
433       fq -> prev_queue_entry = q;
434     }
435     s -> first_wait_queue_entry = q;
436     e -> new_value = (StgClosure *) q;
437   });
438 }
439
440 static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
441   ASSERT(trec != NO_TREC);
442   ASSERT(trec -> enclosing_trec == NO_TREC);
443   ASSERT(trec -> state == TREC_WAITING ||
444          trec -> state == TREC_CONDEMNED);
445
446   TRACE("%p : remove_wait_queue_entries_for_trec()\n", trec);
447
448   FOR_EACH_ENTRY(trec, e, {
449     StgTVar *s;
450     StgTVarWaitQueue *pq;
451     StgTVarWaitQueue *nq;
452     StgTVarWaitQueue *q;
453     s = e -> tvar;
454     StgClosure *saw = lock_tvar(trec, s);
455     q = (StgTVarWaitQueue *) (e -> new_value);
456     TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s);
457     ACQ_ASSERT(s -> current_value == trec);
458     nq = q -> next_queue_entry;
459     pq = q -> prev_queue_entry;
460     if (nq != END_STM_WAIT_QUEUE) {
461       nq -> prev_queue_entry = pq;
462     }
463     if (pq != END_STM_WAIT_QUEUE) {
464       pq -> next_queue_entry = nq;
465     } else {
466       ASSERT (s -> first_wait_queue_entry == q);
467       s -> first_wait_queue_entry = nq;
468     }
469     unlock_tvar(trec, s, saw, FALSE);
470   });
471 }
472  
473 /*......................................................................*/
474  
475 static TRecEntry *get_new_entry(StgRegTable *reg,
476                                 StgTRecHeader *t) {
477   TRecEntry *result;
478   StgTRecChunk *c;
479   int i;
480
481   c = t -> current_chunk;
482   i = c -> next_entry_idx;
483   ASSERT(c != END_STM_CHUNK_LIST);
484
485   if (i < TREC_CHUNK_NUM_ENTRIES) {
486     // Continue to use current chunk
487     result = &(c -> entries[i]);
488     c -> next_entry_idx ++;
489   } else {
490     // Current chunk is full: allocate a fresh one
491     StgTRecChunk *nc;
492     nc = new_stg_trec_chunk(reg);
493     nc -> prev_chunk = c;
494     nc -> next_entry_idx = 1;
495     t -> current_chunk = nc;
496     result = &(nc -> entries[0]);
497   }
498
499   return result;
500 }
501
502 /*......................................................................*/
503
504 static void merge_update_into(StgRegTable *reg,
505                               StgTRecHeader *t,
506                               StgTVar *tvar,
507                               StgClosure *expected_value,
508                               StgClosure *new_value) {
509   int found;
510   
511   // Look for an entry in this trec
512   found = FALSE;
513   FOR_EACH_ENTRY(t, e, {
514     StgTVar *s;
515     s = e -> tvar;
516     if (s == tvar) {
517       found = TRUE;
518       if (e -> expected_value != expected_value) {
519         // Must abort if the two entries start from different values
520         TRACE("%p : entries inconsistent at %p (%p vs %p)\n", 
521               t, tvar, e -> expected_value, expected_value);
522         t -> state = TREC_CONDEMNED;
523       } 
524       e -> new_value = new_value;
525       BREAK_FOR_EACH;
526     }
527   });
528
529   if (!found) {
530     // No entry so far in this trec
531     TRecEntry *ne;
532     ne = get_new_entry(reg, t);
533     ne -> tvar = tvar;
534     ne -> expected_value = expected_value;
535     ne -> new_value = new_value;
536   }
537 }
538
539 /*......................................................................*/
540
541 static StgBool entry_is_update(TRecEntry *e) {
542   StgBool result;
543   result = (e -> expected_value != e -> new_value);
544   return result;
545
546
547 static StgBool entry_is_read_only(TRecEntry *e) {
548   StgBool result;
549   result = (e -> expected_value == e -> new_value);
550   return result;
551
552
553 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
554   StgClosure *c;
555   StgBool result;
556   c = s -> current_value;
557   result = (c == (StgClosure *) h);
558   return result;  
559 }
560
561 // revert_ownership : release a lock on a TVar, storing back
562 // the value that it held when the lock was acquired.  "revert_all"
563 // is set in stmWait and stmReWait when we acquired locks on all of 
564 // the TVars involved.  "revert_all" is not set in commit operations
565 // where we don't lock TVars that have been read from but not updated.
566
567 static void revert_ownership(StgTRecHeader *trec STG_UNUSED,
568                              StgBool revert_all STG_UNUSED) {
569 #if defined(STM_FG_LOCKS) 
570   FOR_EACH_ENTRY(trec, e, {
571     if (revert_all || entry_is_update(e)) {
572       StgTVar *s;
573       s = e -> tvar;
574       if (tvar_is_locked(s, trec)) {
575         unlock_tvar(trec, s, e -> expected_value, TRUE);
576       }
577     }
578   });
579 #endif
580 }
581
582 /*......................................................................*/
583
584 // validate_and_acquire_ownership : this performs the twin functions
585 // of checking that the TVars referred to by entries in trec hold the
586 // expected values and:
587 // 
588 //   - locking the TVar (on updated TVars during commit, or all TVars
589 //     during wait)
590 //
591 //   - recording the identity of the TRec who wrote the value seen in the
592 //     TVar (on non-updated TVars during commit).  These values are 
593 //     stashed in the TRec entries and are then checked in check_read_only
594 //     to ensure that an atomic snapshot of all of these locations has been
595 //     seen.
596
597 static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, 
598                                                int acquire_all,
599                                                int retain_ownership) {
600   StgBool result;
601
602   if (shake()) {
603     TRACE("%p : shake, pretending trec is invalid when it may not be\n", trec);
604     return FALSE;
605   }
606
607   ASSERT ((trec -> state == TREC_ACTIVE) || 
608           (trec -> state == TREC_WAITING) ||
609           (trec -> state == TREC_CONDEMNED));
610   result = !((trec -> state) == TREC_CONDEMNED);
611   if (result) {
612     FOR_EACH_ENTRY(trec, e, {
613       StgTVar *s;
614       s = e -> tvar;
615       if (acquire_all || entry_is_update(e)) {
616         TRACE("%p : trying to acquire %p\n", trec, s);
617         if (!cond_lock_tvar(trec, s, e -> expected_value)) {
618           TRACE("%p : failed to acquire %p\n", trec, s);
619           result = FALSE;
620           BREAK_FOR_EACH;
621         }
622       } else {
623         ASSERT(use_read_phase);
624         IF_STM_FG_LOCKS({
625           TRACE("%p : will need to check %p\n", trec, s);
626           if (s -> current_value != e -> expected_value) {
627             TRACE("%p : doesn't match\n", trec);
628             result = FALSE;
629             BREAK_FOR_EACH;
630           }
631           e -> saw_update_by = s -> last_update_by;
632           if (s -> current_value != e -> expected_value) {
633             TRACE("%p : doesn't match (race)\n", trec);
634             result = FALSE;
635             BREAK_FOR_EACH;
636           } else {
637             TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by);
638           }
639         });
640       }
641     });
642   }
643
644   if ((!result) || (!retain_ownership)) {
645     revert_ownership(trec, acquire_all);
646   }
647   
648   return result;
649 }
650
651 // check_read_only : check that we've seen an atomic snapshot of the
652 // non-updated TVars accessed by a trec.  This checks that the last TRec to
653 // commit an update to the TVar is unchanged since the value was stashed in
654 // validate_and_acquire_ownership.  If no udpate is seen to any TVar than
655 // all of them contained their expected values at the start of the call to
656 // check_read_only.
657 //
658 // The paper "Concurrent programming without locks" (under submission), or
659 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
660 // this kind of algorithm.
661
662 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
663   StgBool result = TRUE;
664
665   ASSERT (use_read_phase);
666   IF_STM_FG_LOCKS({
667     FOR_EACH_ENTRY(trec, e, {
668       StgTVar *s;
669       s = e -> tvar;
670       if (entry_is_read_only(e)) {
671         TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by);
672         if (s -> last_update_by != e -> saw_update_by) {
673           // ||s -> current_value != e -> expected_value) {
674           TRACE("%p : mismatch\n", trec);
675           result = FALSE;
676           BREAK_FOR_EACH;
677         }
678       }
679     });
680   });
681
682   return result;
683 }
684
685
686 /************************************************************************/
687
688 void stmPreGCHook() {
689   lock_stm(NO_TREC);
690   TRACE("stmPreGCHook\n");
691   unlock_stm(NO_TREC);
692 }
693
694 /************************************************************************/
695
696 void initSTM() {
697   TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
698 }
699
700 /*......................................................................*/
701
702 StgTRecHeader *stmStartTransaction(StgRegTable *reg,
703                                    StgTRecHeader *outer) {
704   StgTRecHeader *t;
705   TRACE("%p : stmStartTransaction\n", outer);
706   t = new_stg_trec_header(reg, outer);
707   TRACE("%p : stmStartTransaction()=%p\n", outer, t);
708   return t;
709 }
710
711 /*......................................................................*/
712
713 void stmAbortTransaction(StgTRecHeader *trec) {
714   TRACE("%p : stmAbortTransaction\n", trec);
715   ASSERT (trec != NO_TREC);
716   ASSERT ((trec -> state == TREC_ACTIVE) || 
717           (trec -> state == TREC_WAITING) ||
718           (trec -> state == TREC_CONDEMNED));
719
720   lock_stm(trec);
721   if (trec -> state == TREC_WAITING) {
722     ASSERT (trec -> enclosing_trec == NO_TREC);
723     TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
724     remove_wait_queue_entries_for_trec(trec);
725   } 
726   trec -> state = TREC_ABORTED;
727   unlock_stm(trec);
728
729   TRACE("%p : stmAbortTransaction done\n", trec);
730 }
731
732 /*......................................................................*/
733
734 void stmCondemnTransaction(StgTRecHeader *trec) {
735   TRACE("%p : stmCondemnTransaction\n", trec);
736   ASSERT (trec != NO_TREC);
737   ASSERT ((trec -> state == TREC_ACTIVE) || 
738           (trec -> state == TREC_WAITING) ||
739           (trec -> state == TREC_CONDEMNED));
740
741   lock_stm(trec);
742   if (trec -> state == TREC_WAITING) {
743     ASSERT (trec -> enclosing_trec == NO_TREC);
744     TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
745     remove_wait_queue_entries_for_trec(trec);
746   } 
747   trec -> state = TREC_CONDEMNED;
748   unlock_stm(trec);
749
750   TRACE("%p : stmCondemnTransaction done\n", trec);
751 }
752
753 /*......................................................................*/
754
755 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
756   StgTRecHeader *outer;
757   TRACE("%p : stmGetEnclosingTRec\n", trec);
758   outer = trec -> enclosing_trec;
759   TRACE("%p : stmGetEnclosingTRec()=%p\n", trec, outer);
760   return outer;
761 }
762
763 /*......................................................................*/
764
765 StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
766   StgTRecHeader *t;
767   StgBool result;
768
769   TRACE("%p : stmValidateNestOfTransactions\n", trec);
770   ASSERT(trec != NO_TREC);
771   ASSERT((trec -> state == TREC_ACTIVE) || 
772          (trec -> state == TREC_WAITING) ||
773          (trec -> state == TREC_CONDEMNED));
774
775   lock_stm(trec);
776
777   t = trec;
778   result = TRUE;
779   while (t != NO_TREC) {
780     result &= validate_and_acquire_ownership(t, TRUE, FALSE);
781     t = t -> enclosing_trec;
782   }
783
784   if (!result && trec -> state != TREC_WAITING) {
785     trec -> state = TREC_CONDEMNED; 
786   }
787
788   unlock_stm(trec);
789
790   TRACE("%p : stmValidateNestOfTransactions()=%d\n", trec, result);
791   return result;
792 }
793
794 /*......................................................................*/
795
796 StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) {
797   int result;
798   TRACE("%p : stmCommitTransaction()\n", trec);
799   ASSERT (trec != NO_TREC);
800   ASSERT (trec -> enclosing_trec == NO_TREC);
801   ASSERT ((trec -> state == TREC_ACTIVE) || 
802           (trec -> state == TREC_CONDEMNED));
803
804   lock_stm(trec);
805   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
806   if (result) {
807     // We now know that all the updated locations hold their expected values.
808     ASSERT (trec -> state == TREC_ACTIVE);
809
810     if (use_read_phase) {
811       TRACE("%p : doing read check\n", trec);
812       result = check_read_only(trec);
813       TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed");
814     }
815     
816     if (result) {
817       // We now know that all of the read-only locations held their exepcted values
818       // at the end of the call to validate_and_acquire_ownership.  This forms the
819       // linearization point of the commit.
820       
821       FOR_EACH_ENTRY(trec, e, {
822         StgTVar *s;
823         s = e -> tvar;
824         if (e -> new_value != e -> expected_value) {
825           // Entry is an update: write the value back to the TVar, unlocking it if
826           // necessary.
827
828           ACQ_ASSERT(tvar_is_locked(s, trec));
829           TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
830           unpark_waiters_on(s);
831           IF_STM_FG_LOCKS({
832             s -> last_update_by = trec;
833           });
834           unlock_tvar(trec, s, e -> new_value, TRUE);
835         } 
836         ACQ_ASSERT(!tvar_is_locked(s, trec));
837       });
838     } else {
839       revert_ownership(trec, FALSE);
840     }
841   } 
842
843   unlock_stm(trec);
844
845   TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
846
847   return result;
848 }
849
850 /*......................................................................*/
851
852 StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) {
853   StgTRecHeader *et;
854   int result;
855   ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
856   TRACE("%p : stmCommitNestedTransaction() into %p\n", trec, trec -> enclosing_trec);
857   ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
858
859   lock_stm(trec);
860
861   et = trec -> enclosing_trec;
862   result = validate_and_acquire_ownership(trec, FALSE, TRUE);
863   if (result) {
864     // We now know that all the updated locations hold their expected values.
865
866     if (use_read_phase) {
867       TRACE("%p : doing read check\n", trec);
868       result = check_read_only(trec);
869     }
870     if (result) {
871       // We now know that all of the read-only locations held their exepcted values
872       // at the end of the call to validate_and_acquire_ownership.  This forms the
873       // linearization point of the commit.
874
875       if (result) {
876         TRACE("%p : read-check succeeded\n", trec);
877         FOR_EACH_ENTRY(trec, e, {
878           // Merge each entry into the enclosing transaction record, release all
879           // locks.
880
881           StgTVar *s;
882           s = e -> tvar;
883           if (entry_is_update(e)) {
884             unlock_tvar(trec, s, e -> expected_value, FALSE);
885           }
886           merge_update_into(reg, et, s, e -> expected_value, e -> new_value);
887           ACQ_ASSERT(s -> current_value != trec);
888         });
889       } else {
890         revert_ownership(trec, FALSE);
891       }
892     }
893   } 
894
895   unlock_stm(trec);
896
897   TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
898
899   return result;
900 }
901
902 /*......................................................................*/
903
904 StgBool stmWait(StgRegTable *reg, StgTSO *tso, StgTRecHeader *trec) {
905   int result;
906   TRACE("%p : stmWait(%p)\n", trec, tso);
907   ASSERT (trec != NO_TREC);
908   ASSERT (trec -> enclosing_trec == NO_TREC);
909   ASSERT ((trec -> state == TREC_ACTIVE) || 
910           (trec -> state == TREC_CONDEMNED));
911
912   lock_stm(trec);
913   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
914   if (result) {
915     // The transaction is valid so far so we can actually start waiting.
916     // (Otherwise the transaction was not valid and the thread will have to
917     // retry it).
918
919     // Put ourselves to sleep.  We retain locks on all the TVars involved
920     // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
921     // in the TSO, (c) TREC_WAITING in the Trec.  
922     build_wait_queue_entries_for_trec(reg, tso, trec);
923     park_tso(tso);
924     trec -> state = TREC_WAITING;
925
926     // As soon as we start releasing ownership, another thread may find us 
927     // and wake us up.  This may happen even before we have finished 
928     // releasing ownership.
929     revert_ownership(trec, TRUE);
930   }  
931
932   unlock_stm(trec);
933
934   TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
935   return result;
936 }
937
938 /*......................................................................*/
939
940 StgBool stmReWait(StgTSO *tso) {
941   int result;
942   StgTRecHeader *trec = tso->trec;
943
944   TRACE("%p : stmReWait\n", trec);
945   ASSERT (trec != NO_TREC);
946   ASSERT (trec -> enclosing_trec == NO_TREC);
947   ASSERT ((trec -> state == TREC_WAITING) || 
948           (trec -> state == TREC_CONDEMNED));
949
950   lock_stm(trec);
951   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
952   TRACE("%p : validation %s\n", trec, result ? "succeeded" : "failed");
953   if (result) {
954     // The transaction remains valid -- do nothing because it is already on
955     // the wait queues
956     ASSERT (trec -> state == TREC_WAITING);
957     park_tso(tso);
958     revert_ownership(trec, TRUE);
959   } else {
960     // The transcation has become invalid.  We can now remove it from the wait
961     // queues.
962     if (trec -> state != TREC_CONDEMNED) {
963       remove_wait_queue_entries_for_trec (trec);
964     }
965
966   }
967   unlock_stm(trec);
968
969   TRACE("%p : stmReWait()=%d\n", trec, result);
970   return result;
971 }
972
973 /*......................................................................*/
974
975 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
976   TRecEntry *result = NULL;
977
978   TRACE("%p : get_entry_for TVar %p\n", trec, tvar);
979   ASSERT(trec != NO_TREC);
980
981   do {
982     FOR_EACH_ENTRY(trec, e, {
983       if (e -> tvar == tvar) {
984         result = e;
985         if (in != NULL) {
986           *in = trec;
987         }
988         BREAK_FOR_EACH;
989       }
990     });
991     trec = trec -> enclosing_trec;
992   } while (result == NULL && trec != NO_TREC);
993
994   return result;    
995 }
996
997 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
998   StgClosure *result;
999   result = tvar -> current_value;
1000
1001 #if defined(STM_FG_LOCKS)
1002   while (GET_INFO(result) == &stg_TREC_HEADER_info) {
1003     TRACE("%p : read_current_value(%p) saw %p\n", trec, tvar, result);
1004     result = tvar -> current_value;
1005   }
1006 #endif
1007
1008   TRACE("%p : read_current_value(%p)=%p\n", trec, tvar, result);
1009   return result;
1010 }
1011
1012 /*......................................................................*/
1013
1014 StgClosure *stmReadTVar(StgRegTable *reg,
1015                         StgTRecHeader *trec, 
1016                         StgTVar *tvar) {
1017   StgTRecHeader *entry_in;
1018   StgClosure *result = NULL;
1019   TRecEntry *entry = NULL;
1020   TRACE("%p : stmReadTVar(%p)\n", trec, tvar);
1021   ASSERT (trec != NO_TREC);
1022   ASSERT (trec -> state == TREC_ACTIVE || 
1023           trec -> state == TREC_CONDEMNED);
1024
1025   entry = get_entry_for(trec, tvar, &entry_in);
1026
1027   if (entry != NULL) {
1028     if (entry_in == trec) {
1029       // Entry found in our trec
1030       result = entry -> new_value;
1031     } else {
1032       // Entry found in another trec
1033       TRecEntry *new_entry = get_new_entry(reg, trec);
1034       new_entry -> tvar = tvar;
1035       new_entry -> expected_value = entry -> expected_value;
1036       new_entry -> new_value = entry -> new_value;
1037       result = new_entry -> new_value;
1038     } 
1039   } else {
1040     // No entry found
1041     StgClosure *current_value = read_current_value(trec, tvar);
1042     TRecEntry *new_entry = get_new_entry(reg, trec);
1043     new_entry -> tvar = tvar;
1044     new_entry -> expected_value = current_value;
1045     new_entry -> new_value = current_value;
1046     result = current_value;
1047   }
1048
1049   TRACE("%p : stmReadTVar(%p)=%p\n", trec, tvar, result);
1050   return result;
1051 }
1052
1053 /*......................................................................*/
1054
1055 void stmWriteTVar(StgRegTable *reg,
1056                   StgTRecHeader *trec,
1057                   StgTVar *tvar, 
1058                   StgClosure *new_value) {
1059
1060   StgTRecHeader *entry_in;
1061   TRecEntry *entry = NULL;
1062   TRACE("%p : stmWriteTVar(%p, %p)\n", trec, tvar, new_value);
1063   ASSERT (trec != NO_TREC);
1064   ASSERT (trec -> state == TREC_ACTIVE || 
1065           trec -> state == TREC_CONDEMNED);
1066
1067   entry = get_entry_for(trec, tvar, &entry_in);
1068
1069   if (entry != NULL) {
1070     if (entry_in == trec) {
1071       // Entry found in our trec
1072       entry -> new_value = new_value;
1073     } else {
1074       // Entry found in another trec
1075       TRecEntry *new_entry = get_new_entry(reg, trec);
1076       new_entry -> tvar = tvar;
1077       new_entry -> expected_value = entry -> expected_value;
1078       new_entry -> new_value = new_value;
1079     } 
1080   } else {
1081     // No entry found
1082     StgClosure *current_value = read_current_value(trec, tvar);
1083     TRecEntry *new_entry = get_new_entry(reg, trec);
1084     new_entry -> tvar = tvar;
1085     new_entry -> expected_value = current_value;
1086     new_entry -> new_value = new_value;
1087   }
1088
1089   TRACE("%p : stmWriteTVar done\n", trec);
1090 }
1091
1092 /*......................................................................*/
1093
1094 StgTVar *stmNewTVar(StgRegTable *reg,
1095                     StgClosure *new_value) {
1096   StgTVar *result;
1097   result = new_tvar(reg, new_value);
1098   return result;
1099 }
1100
1101 /*......................................................................*/
1102