Refactoring and tidy up
[ghc-hetmet.git] / rts / STM.c
1 /* -----------------------------------------------------------------------------
2  * (c) The GHC Team 1998-2005
3  * 
4  * STM implementation.
5  *
6  * Overview
7  * --------
8  *
9  * See the PPoPP 2005 paper "Composable memory transactions".  In summary, 
10  * each transcation has a TRec (transaction record) holding entries for each of the
11  * TVars (transactional variables) that it has accessed.  Each entry records
12  * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
13  * the transaction wants to write to the TVar, (d) during commit, the identity of
14  * the TRec that wrote the expected value.  
15  *
16  * Separate TRecs are used for each level in a nest of transactions.  This allows
17  * a nested transaction to be aborted without condemning its enclosing transactions.
18  * This is needed in the implementation of catchRetry.  Note that the "expected value"
19  * in a nested transaction's TRec is the value expected to be *held in memory* if
20  * the transaction commits -- not the "new value" stored in one of the enclosing
21  * transactions.  This means that validation can be done without searching through
22  * a nest of TRecs.
23  *
24  * Concurrency control
25  * -------------------
26  *
27  * Three different concurrency control schemes can be built according to the settings
28  * in STM.h:
29  * 
30  * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
31  * In the Haskell RTS this means it is suitable only for non-THREADED_RTS builds.
32  *
33  * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
34  * an invocation on the STM interface.  Note that this does not mean that 
35  * transactions are simply serialized -- the lock is only held *within* the 
36  * implementation of stmCommitTransaction, stmWait etc.
37  *
38  * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
39  * and, when committing a transaction, no locks are acquired for TVars that have
40  * been read but not updated.
41  *
42  * Concurrency control is implemented in the functions:
43  *
44  *    lock_stm
45  *    unlock_stm
46  *    lock_tvar / cond_lock_tvar
47  *    unlock_tvar
48  *
49  * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the 
50  * implementation of these functions.  
51  *
52  * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
53  * using STM_CG_LOCK, and otherwise they are no-ops.
54  *
55  * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they 
56  * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
57  * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
58  * builds).  This is because locking a TVar is implemented by writing the lock
59  * holder's TRec into the TVar's current_value field:
60  *
61  *   lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value 
62  *               it contained.
63  *
64  *   cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it 
65  *               contains a specified value.  Return TRUE if this succeeds,
66  *               FALSE otherwise.
67  *
68  *   unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
69  *               storing a specified value in place of the lock entry.
70  *
71  * Using these operations, the typcial pattern of a commit/validate/wait operation
72  * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that 
73  * the TVars that were only read from still contain their expected values, 
74  * (d) release the locks on the TVars, writing updates to them in the case of a 
75  * commit, (e) unlock the STM.
76  *
77  * Queues of waiting threads hang off the first_watch_queue_entry
78  * field of each TVar.  This may only be manipulated when holding that
79  * TVar's lock.  In particular, when a thread is putting itself to
80  * sleep, it mustn't release the TVar's lock until it has added itself
81  * to the wait queue and marked its TSO as BlockedOnSTM -- this makes
82  * sure that other threads will know to wake it.
83  *
84  * ---------------------------------------------------------------------------*/
85
86 #include "PosixSource.h"
87 #include "Rts.h"
88
89 #include "RtsUtils.h"
90 #include "Schedule.h"
91 #include "STM.h"
92 #include "Trace.h"
93 #include "Threads.h"
94
95 #include <stdio.h>
96
97 #define TRUE 1
98 #define FALSE 0
99
100 // ACQ_ASSERT is used for assertions which are only required for
101 // THREADED_RTS builds with fine-grained locking.
102
103 #if defined(STM_FG_LOCKS)
104 #define ACQ_ASSERT(_X) ASSERT(_X)
105 #define NACQ_ASSERT(_X) /*Nothing*/
106 #else
107 #define ACQ_ASSERT(_X) /*Nothing*/
108 #define NACQ_ASSERT(_X) ASSERT(_X)
109 #endif
110
111 /*......................................................................*/
112
113 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
114 // unusualy code paths if genuine contention is rare
115
116 #define TRACE(_x...) debugTrace(DEBUG_stm, "STM: " _x)
117
118 #ifdef SHAKE
119 static const int do_shake = TRUE;
120 #else
121 static const int do_shake = FALSE;
122 #endif
123 static int shake_ctr = 0;
124 static int shake_lim = 1;
125
126 static int shake(void) {
127   if (do_shake) {
128     if (((shake_ctr++) % shake_lim) == 0) {
129       shake_ctr = 1;
130       shake_lim ++;
131       return TRUE;
132     } 
133     return FALSE;
134   } else {
135     return FALSE;
136   }
137 }
138
139 /*......................................................................*/
140
141 // Helper macros for iterating over entries within a transaction
142 // record
143
144 #define FOR_EACH_ENTRY(_t,_x,CODE) do {                                         \
145   StgTRecHeader *__t = (_t);                                                    \
146   StgTRecChunk *__c = __t -> current_chunk;                                     \
147   StgWord __limit = __c -> next_entry_idx;                                      \
148   TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld", __t, __c, __limit);  \
149   while (__c != END_STM_CHUNK_LIST) {                                           \
150     StgWord __i;                                                                \
151     for (__i = 0; __i < __limit; __i ++) {                                      \
152       TRecEntry *_x = &(__c -> entries[__i]);                                   \
153       do { CODE } while (0);                                                    \
154     }                                                                           \
155     __c = __c -> prev_chunk;                                                    \
156     __limit = TREC_CHUNK_NUM_ENTRIES;                                           \
157   }                                                                             \
158  exit_for_each:                                                                 \
159   if (FALSE) goto exit_for_each;                                                \
160 } while (0)
161
162 #define BREAK_FOR_EACH goto exit_for_each
163      
164 /*......................................................................*/
165
166 // if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
167 // and wait queue entries without GC
168
169 #define REUSE_MEMORY
170
171 /*......................................................................*/
172
173 #define IF_STM_UNIPROC(__X)  do { } while (0)
174 #define IF_STM_CG_LOCK(__X)  do { } while (0)
175 #define IF_STM_FG_LOCKS(__X) do { } while (0)
176
177 #if defined(STM_UNIPROC)
178 #undef IF_STM_UNIPROC
179 #define IF_STM_UNIPROC(__X)  do { __X } while (0)
180 static const StgBool config_use_read_phase = FALSE;
181
182 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
183   TRACE("%p : lock_stm()", trec);
184 }
185
186 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
187   TRACE("%p : unlock_stm()", trec);
188 }
189
190 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
191                              StgTVar *s STG_UNUSED) {
192   StgClosure *result;
193   TRACE("%p : lock_tvar(%p)", trec, s);
194   result = s -> current_value;
195   return result;
196 }
197
198 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
199                         StgTVar *s STG_UNUSED,
200                         StgClosure *c,
201                         StgBool force_update) {
202   TRACE("%p : unlock_tvar(%p)", trec, s);
203   if (force_update) {
204     s -> current_value = c;
205   }
206 }
207
208 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
209                               StgTVar *s STG_UNUSED,
210                               StgClosure *expected) {
211   StgClosure *result;
212   TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
213   result = s -> current_value;
214   TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
215   return (result == expected);
216 }
217
218 static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
219   // Nothing -- uniproc
220   return TRUE;
221 }
222
223 static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) { 
224   // Nothing -- uniproc
225 }
226 #endif
227
228 #if defined(STM_CG_LOCK) /*........................................*/
229
230 #undef IF_STM_CG_LOCK
231 #define IF_STM_CG_LOCK(__X)  do { __X } while (0)
232 static const StgBool config_use_read_phase = FALSE;
233 static volatile StgTRecHeader *smp_locked = NULL;
234
235 static void lock_stm(StgTRecHeader *trec) {
236   while (cas(&smp_locked, NULL, trec) != NULL) { }
237   TRACE("%p : lock_stm()", trec);
238 }
239
240 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
241   TRACE("%p : unlock_stm()", trec);
242   ASSERT (smp_locked == trec);
243   smp_locked = 0;
244 }
245
246 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
247                              StgTVar *s STG_UNUSED) {
248   StgClosure *result;
249   TRACE("%p : lock_tvar(%p)", trec, s);
250   ASSERT (smp_locked == trec);
251   result = s -> current_value;
252   return result;
253 }
254
255 static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
256                          StgTVar *s STG_UNUSED,
257                          StgClosure *c,
258                          StgBool force_update) {
259   TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
260   ASSERT (smp_locked == trec);
261   if (force_update) {
262     s -> current_value = c;
263   }
264 }
265
266 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
267                                StgTVar *s STG_UNUSED,
268                                StgClosure *expected) {
269   StgClosure *result;
270   TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
271   ASSERT (smp_locked == trec);
272   result = s -> current_value;
273   TRACE("%p : %d", result ? "success" : "failure");
274   return (result == expected);
275 }
276
277 static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
278   // Nothing -- protected by STM lock
279   return TRUE;
280 }
281
282 static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) { 
283   // Nothing -- protected by STM lock
284 }
285 #endif
286
287 #if defined(STM_FG_LOCKS) /*...................................*/
288
289 #undef IF_STM_FG_LOCKS
290 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
291 static const StgBool config_use_read_phase = TRUE;
292
293 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
294   TRACE("%p : lock_stm()", trec);
295 }
296
297 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
298   TRACE("%p : unlock_stm()", trec);
299 }
300
301 static StgClosure *lock_tvar(StgTRecHeader *trec, 
302                              StgTVar *s STG_UNUSED) {
303   StgClosure *result;
304   TRACE("%p : lock_tvar(%p)", trec, s);
305   do {
306     do {
307       result = s -> current_value;
308     } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
309   } while (cas((void *)&(s -> current_value),
310                (StgWord)result, (StgWord)trec) != (StgWord)result);
311   return result;
312 }
313
314 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
315                         StgTVar *s,
316                         StgClosure *c,
317                         StgBool force_update STG_UNUSED) {
318   TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
319   ASSERT(s -> current_value == (StgClosure *)trec);
320   s -> current_value = c;
321 }
322
323 static StgBool cond_lock_tvar(StgTRecHeader *trec, 
324                               StgTVar *s,
325                               StgClosure *expected) {
326   StgClosure *result;
327   StgWord w;
328   TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
329   w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
330   result = (StgClosure *)w;
331   TRACE("%p : %s", trec, result ? "success" : "failure");
332   return (result == expected);
333 }
334
335 static StgBool lock_inv(StgAtomicInvariant *inv) {
336   return (cas(&(inv -> lock), 0, 1) == 0);
337 }
338
339 static void unlock_inv(StgAtomicInvariant *inv) { 
340   ASSERT(inv -> lock == 1);
341   inv -> lock = 0;
342 }
343 #endif
344
345 /*......................................................................*/
346  
347 static StgBool watcher_is_tso(StgTVarWatchQueue *q) {
348   StgClosure *c = q -> closure;
349   StgInfoTable *info = get_itbl(c);
350   return (info -> type) == TSO;
351 }
352
353 static StgBool watcher_is_invariant(StgTVarWatchQueue *q) {
354   StgClosure *c = q -> closure;
355   return (c->header.info == &stg_ATOMIC_INVARIANT_info);
356 }
357
358 /*......................................................................*/
359  
360 // Helper functions for thread blocking and unblocking
361
362 static void park_tso(StgTSO *tso) {
363   ASSERT(tso -> why_blocked == NotBlocked);
364   tso -> why_blocked = BlockedOnSTM;
365   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
366   TRACE("park_tso on tso=%p", tso);
367 }
368
369 static void unpark_tso(Capability *cap, StgTSO *tso) {
370     // We will continue unparking threads while they remain on one of the wait
371     // queues: it's up to the thread itself to remove it from the wait queues
372     // if it decides to do so when it is scheduled.
373
374     // Unblocking a TSO from BlockedOnSTM is done under the TSO lock,
375     // to avoid multiple CPUs unblocking the same TSO, and also to
376     // synchronise with throwTo().
377     lockTSO(tso);
378     if (tso -> why_blocked == BlockedOnSTM) {
379         TRACE("unpark_tso on tso=%p", tso);
380         tryWakeupThread(cap,tso);
381     } else {
382         TRACE("spurious unpark_tso on tso=%p", tso);
383     }
384     unlockTSO(tso);
385 }
386
387 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
388   StgTVarWatchQueue *q;
389   StgTVarWatchQueue *trail;
390   TRACE("unpark_waiters_on tvar=%p", s);
391   // unblock TSOs in reverse order, to be a bit fairer (#2319)
392   for (q = s -> first_watch_queue_entry, trail = q;
393        q != END_STM_WATCH_QUEUE;
394        q = q -> next_queue_entry) {
395     trail = q;
396   }
397   q = trail;
398   for (;
399        q != END_STM_WATCH_QUEUE; 
400        q = q -> prev_queue_entry) {
401     if (watcher_is_tso(q)) {
402       unpark_tso(cap, (StgTSO *)(q -> closure));
403     }
404   }
405 }
406
407 /*......................................................................*/
408
409 // Helper functions for downstream allocation and initialization
410
411 static StgInvariantCheckQueue *new_stg_invariant_check_queue(Capability *cap,
412                                                              StgAtomicInvariant *invariant) {
413   StgInvariantCheckQueue *result;
414   result = (StgInvariantCheckQueue *)allocate(cap, sizeofW(StgInvariantCheckQueue));
415   SET_HDR (result, &stg_INVARIANT_CHECK_QUEUE_info, CCS_SYSTEM);
416   result -> invariant = invariant;
417   result -> my_execution = NO_TREC;
418   return result;
419 }
420
421 static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap,
422                                                    StgClosure *closure) {
423   StgTVarWatchQueue *result;
424   result = (StgTVarWatchQueue *)allocate(cap, sizeofW(StgTVarWatchQueue));
425   SET_HDR (result, &stg_TVAR_WATCH_QUEUE_info, CCS_SYSTEM);
426   result -> closure = closure;
427   return result;
428 }
429
430 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
431   StgTRecChunk *result;
432   result = (StgTRecChunk *)allocate(cap, sizeofW(StgTRecChunk));
433   SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
434   result -> prev_chunk = END_STM_CHUNK_LIST;
435   result -> next_entry_idx = 0;
436   return result;
437 }
438
439 static StgTRecHeader *new_stg_trec_header(Capability *cap,
440                                           StgTRecHeader *enclosing_trec) {
441   StgTRecHeader *result;
442   result = (StgTRecHeader *) allocate(cap, sizeofW(StgTRecHeader));
443   SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
444
445   result -> enclosing_trec = enclosing_trec;
446   result -> current_chunk = new_stg_trec_chunk(cap);
447   result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
448
449   if (enclosing_trec == NO_TREC) {
450     result -> state = TREC_ACTIVE;
451   } else {
452     ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
453            enclosing_trec -> state == TREC_CONDEMNED);
454     result -> state = enclosing_trec -> state;
455   }
456
457   return result;  
458 }
459
460 /*......................................................................*/
461
462 // Allocation / deallocation functions that retain per-capability lists
463 // of closures that can be re-used
464
465 static StgInvariantCheckQueue *alloc_stg_invariant_check_queue(Capability *cap,
466                                                                StgAtomicInvariant *invariant) {
467   StgInvariantCheckQueue *result = NULL;
468   if (cap -> free_invariant_check_queues == END_INVARIANT_CHECK_QUEUE) {
469     result = new_stg_invariant_check_queue(cap, invariant);
470   } else {
471     result = cap -> free_invariant_check_queues;
472     result -> invariant = invariant;
473     result -> my_execution = NO_TREC;
474     cap -> free_invariant_check_queues = result -> next_queue_entry;
475   }
476   return result;
477 }
478
479 static StgTVarWatchQueue *alloc_stg_tvar_watch_queue(Capability *cap,
480                                                      StgClosure *closure) {
481   StgTVarWatchQueue *result = NULL;
482   if (cap -> free_tvar_watch_queues == END_STM_WATCH_QUEUE) {
483     result = new_stg_tvar_watch_queue(cap, closure);
484   } else {
485     result = cap -> free_tvar_watch_queues;
486     result -> closure = closure;
487     cap -> free_tvar_watch_queues = result -> next_queue_entry;
488   }
489   return result;
490 }
491
492 static void free_stg_tvar_watch_queue(Capability *cap,
493                                       StgTVarWatchQueue *wq) {
494 #if defined(REUSE_MEMORY)
495   wq -> next_queue_entry = cap -> free_tvar_watch_queues;
496   cap -> free_tvar_watch_queues = wq;
497 #endif
498 }
499
500 static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
501   StgTRecChunk *result = NULL;
502   if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
503     result = new_stg_trec_chunk(cap);
504   } else {
505     result = cap -> free_trec_chunks;
506     cap -> free_trec_chunks = result -> prev_chunk;
507     result -> prev_chunk = END_STM_CHUNK_LIST;
508     result -> next_entry_idx = 0;
509   }
510   return result;
511 }
512
513 static void free_stg_trec_chunk(Capability *cap, 
514                                 StgTRecChunk *c) {
515 #if defined(REUSE_MEMORY)
516   c -> prev_chunk = cap -> free_trec_chunks;
517   cap -> free_trec_chunks = c;
518 #endif
519 }
520
521 static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
522                                             StgTRecHeader *enclosing_trec) {
523   StgTRecHeader *result = NULL;
524   if (cap -> free_trec_headers == NO_TREC) {
525     result = new_stg_trec_header(cap, enclosing_trec);
526   } else {
527     result = cap -> free_trec_headers;
528     cap -> free_trec_headers = result -> enclosing_trec;
529     result -> enclosing_trec = enclosing_trec;
530     result -> current_chunk -> next_entry_idx = 0;
531     result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
532     if (enclosing_trec == NO_TREC) {
533       result -> state = TREC_ACTIVE;
534     } else {
535       ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
536              enclosing_trec -> state == TREC_CONDEMNED);
537       result -> state = enclosing_trec -> state;
538     }
539   }
540   return result;
541 }
542
543 static void free_stg_trec_header(Capability *cap,
544                                  StgTRecHeader *trec) {
545 #if defined(REUSE_MEMORY)
546   StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
547   while (chunk != END_STM_CHUNK_LIST) {
548     StgTRecChunk *prev_chunk = chunk -> prev_chunk;
549     free_stg_trec_chunk(cap, chunk);
550     chunk = prev_chunk;
551   } 
552   trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
553   trec -> enclosing_trec = cap -> free_trec_headers;
554   cap -> free_trec_headers = trec;
555 #endif
556 }
557
558 /*......................................................................*/
559
560 // Helper functions for managing waiting lists
561
562 static void build_watch_queue_entries_for_trec(Capability *cap,
563                                                StgTSO *tso, 
564                                                StgTRecHeader *trec) {
565   ASSERT(trec != NO_TREC);
566   ASSERT(trec -> enclosing_trec == NO_TREC);
567   ASSERT(trec -> state == TREC_ACTIVE);
568
569   TRACE("%p : build_watch_queue_entries_for_trec()", trec);
570
571   FOR_EACH_ENTRY(trec, e, {
572     StgTVar *s;
573     StgTVarWatchQueue *q;
574     StgTVarWatchQueue *fq;
575     s = e -> tvar;
576     TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
577     ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
578     NACQ_ASSERT(s -> current_value == e -> expected_value);
579     fq = s -> first_watch_queue_entry;
580     q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
581     q -> next_queue_entry = fq;
582     q -> prev_queue_entry = END_STM_WATCH_QUEUE;
583     if (fq != END_STM_WATCH_QUEUE) {
584       fq -> prev_queue_entry = q;
585     }
586     s -> first_watch_queue_entry = q;
587     e -> new_value = (StgClosure *) q;
588   });
589 }
590
591 static void remove_watch_queue_entries_for_trec(Capability *cap,
592                                                 StgTRecHeader *trec) {
593   ASSERT(trec != NO_TREC);
594   ASSERT(trec -> enclosing_trec == NO_TREC);
595   ASSERT(trec -> state == TREC_WAITING ||
596          trec -> state == TREC_CONDEMNED);
597
598   TRACE("%p : remove_watch_queue_entries_for_trec()", trec);
599
600   FOR_EACH_ENTRY(trec, e, {
601     StgTVar *s;
602     StgTVarWatchQueue *pq;
603     StgTVarWatchQueue *nq;
604     StgTVarWatchQueue *q;
605     StgClosure *saw;
606     s = e -> tvar;
607     saw = lock_tvar(trec, s);
608     q = (StgTVarWatchQueue *) (e -> new_value);
609     TRACE("%p : removing tso=%p from watch queue for tvar=%p", 
610           trec, 
611           q -> closure, 
612           s);
613     ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
614     nq = q -> next_queue_entry;
615     pq = q -> prev_queue_entry;
616     if (nq != END_STM_WATCH_QUEUE) {
617       nq -> prev_queue_entry = pq;
618     }
619     if (pq != END_STM_WATCH_QUEUE) {
620       pq -> next_queue_entry = nq;
621     } else {
622       ASSERT (s -> first_watch_queue_entry == q);
623       s -> first_watch_queue_entry = nq;
624     }
625     free_stg_tvar_watch_queue(cap, q);
626     unlock_tvar(trec, s, saw, FALSE);
627   });
628 }
629  
630 /*......................................................................*/
631  
632 static TRecEntry *get_new_entry(Capability *cap,
633                                 StgTRecHeader *t) {
634   TRecEntry *result;
635   StgTRecChunk *c;
636   int i;
637
638   c = t -> current_chunk;
639   i = c -> next_entry_idx;
640   ASSERT(c != END_STM_CHUNK_LIST);
641
642   if (i < TREC_CHUNK_NUM_ENTRIES) {
643     // Continue to use current chunk
644     result = &(c -> entries[i]);
645     c -> next_entry_idx ++;
646   } else {
647     // Current chunk is full: allocate a fresh one
648     StgTRecChunk *nc;
649     nc = alloc_stg_trec_chunk(cap);
650     nc -> prev_chunk = c;
651     nc -> next_entry_idx = 1;
652     t -> current_chunk = nc;
653     result = &(nc -> entries[0]);
654   }
655
656   return result;
657 }
658
659 /*......................................................................*/
660
661 static void merge_update_into(Capability *cap,
662                               StgTRecHeader *t,
663                               StgTVar *tvar,
664                               StgClosure *expected_value,
665                               StgClosure *new_value) {
666   int found;
667   
668   // Look for an entry in this trec
669   found = FALSE;
670   FOR_EACH_ENTRY(t, e, {
671     StgTVar *s;
672     s = e -> tvar;
673     if (s == tvar) {
674       found = TRUE;
675       if (e -> expected_value != expected_value) {
676         // Must abort if the two entries start from different values
677         TRACE("%p : update entries inconsistent at %p (%p vs %p)", 
678               t, tvar, e -> expected_value, expected_value);
679         t -> state = TREC_CONDEMNED;
680       } 
681       e -> new_value = new_value;
682       BREAK_FOR_EACH;
683     }
684   });
685
686   if (!found) {
687     // No entry so far in this trec
688     TRecEntry *ne;
689     ne = get_new_entry(cap, t);
690     ne -> tvar = tvar;
691     ne -> expected_value = expected_value;
692     ne -> new_value = new_value;
693   }
694 }
695
696 /*......................................................................*/
697
698 static void merge_read_into(Capability *cap,
699                             StgTRecHeader *t,
700                             StgTVar *tvar,
701                             StgClosure *expected_value) {
702   int found;
703   
704   // Look for an entry in this trec
705   found = FALSE;
706   FOR_EACH_ENTRY(t, e, {
707     StgTVar *s;
708     s = e -> tvar;
709     if (s == tvar) {
710       found = TRUE;
711       if (e -> expected_value != expected_value) {
712         // Must abort if the two entries start from different values
713         TRACE("%p : read entries inconsistent at %p (%p vs %p)", 
714               t, tvar, e -> expected_value, expected_value);
715         t -> state = TREC_CONDEMNED;
716       } 
717       BREAK_FOR_EACH;
718     }
719   });
720
721   if (!found) {
722     // No entry so far in this trec
723     TRecEntry *ne;
724     ne = get_new_entry(cap, t);
725     ne -> tvar = tvar;
726     ne -> expected_value = expected_value;
727     ne -> new_value = expected_value;
728   }
729 }
730
731 /*......................................................................*/
732
733 static StgBool entry_is_update(TRecEntry *e) {
734   StgBool result;
735   result = (e -> expected_value != e -> new_value);
736   return result;
737
738
739 #if defined(STM_FG_LOCKS)
740 static StgBool entry_is_read_only(TRecEntry *e) {
741   StgBool result;
742   result = (e -> expected_value == e -> new_value);
743   return result;
744
745
746 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
747   StgClosure *c;
748   StgBool result;
749   c = s -> current_value;
750   result = (c == (StgClosure *) h);
751   return result;  
752 }
753 #endif
754
755 // revert_ownership : release a lock on a TVar, storing back
756 // the value that it held when the lock was acquired.  "revert_all"
757 // is set in stmWait and stmReWait when we acquired locks on all of 
758 // the TVars involved.  "revert_all" is not set in commit operations
759 // where we don't lock TVars that have been read from but not updated.
760
761 static void revert_ownership(StgTRecHeader *trec STG_UNUSED,
762                              StgBool revert_all STG_UNUSED) {
763 #if defined(STM_FG_LOCKS) 
764   FOR_EACH_ENTRY(trec, e, {
765     if (revert_all || entry_is_update(e)) {
766       StgTVar *s;
767       s = e -> tvar;
768       if (tvar_is_locked(s, trec)) {
769         unlock_tvar(trec, s, e -> expected_value, TRUE);
770       }
771     }
772   });
773 #endif
774 }
775
776 /*......................................................................*/
777
778 // validate_and_acquire_ownership : this performs the twin functions
779 // of checking that the TVars referred to by entries in trec hold the
780 // expected values and:
781 // 
782 //   - locking the TVar (on updated TVars during commit, or all TVars
783 //     during wait)
784 //
785 //   - recording the identity of the TRec who wrote the value seen in the
786 //     TVar (on non-updated TVars during commit).  These values are 
787 //     stashed in the TRec entries and are then checked in check_read_only
788 //     to ensure that an atomic snapshot of all of these locations has been
789 //     seen.
790
791 static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, 
792                                                int acquire_all,
793                                                int retain_ownership) {
794   StgBool result;
795
796   if (shake()) {
797     TRACE("%p : shake, pretending trec is invalid when it may not be", trec);
798     return FALSE;
799   }
800
801   ASSERT ((trec -> state == TREC_ACTIVE) || 
802           (trec -> state == TREC_WAITING) ||
803           (trec -> state == TREC_CONDEMNED));
804   result = !((trec -> state) == TREC_CONDEMNED);
805   if (result) {
806     FOR_EACH_ENTRY(trec, e, {
807       StgTVar *s;
808       s = e -> tvar;
809       if (acquire_all || entry_is_update(e)) {
810         TRACE("%p : trying to acquire %p", trec, s);
811         if (!cond_lock_tvar(trec, s, e -> expected_value)) {
812           TRACE("%p : failed to acquire %p", trec, s);
813           result = FALSE;
814           BREAK_FOR_EACH;
815         }
816       } else {
817         ASSERT(config_use_read_phase);
818         IF_STM_FG_LOCKS({
819           TRACE("%p : will need to check %p", trec, s);
820           if (s -> current_value != e -> expected_value) {
821             TRACE("%p : doesn't match", trec);
822             result = FALSE;
823             BREAK_FOR_EACH;
824           }
825           e -> num_updates = s -> num_updates;
826           if (s -> current_value != e -> expected_value) {
827             TRACE("%p : doesn't match (race)", trec);
828             result = FALSE;
829             BREAK_FOR_EACH;
830           } else {
831             TRACE("%p : need to check version %ld", trec, e -> num_updates);
832           }
833         });
834       }
835     });
836   }
837
838   if ((!result) || (!retain_ownership)) {
839     revert_ownership(trec, acquire_all);
840   }
841   
842   return result;
843 }
844
845 // check_read_only : check that we've seen an atomic snapshot of the
846 // non-updated TVars accessed by a trec.  This checks that the last TRec to
847 // commit an update to the TVar is unchanged since the value was stashed in
848 // validate_and_acquire_ownership.  If no udpate is seen to any TVar than
849 // all of them contained their expected values at the start of the call to
850 // check_read_only.
851 //
852 // The paper "Concurrent programming without locks" (under submission), or
853 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
854 // this kind of algorithm.
855
856 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
857   StgBool result = TRUE;
858
859   ASSERT (config_use_read_phase);
860   IF_STM_FG_LOCKS({
861     FOR_EACH_ENTRY(trec, e, {
862       StgTVar *s;
863       s = e -> tvar;
864       if (entry_is_read_only(e)) {
865         TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
866         if (s -> num_updates != e -> num_updates) {
867           // ||s -> current_value != e -> expected_value) {
868           TRACE("%p : mismatch", trec);
869           result = FALSE;
870           BREAK_FOR_EACH;
871         }
872       }
873     });
874   });
875
876   return result;
877 }
878
879
880 /************************************************************************/
881
882 void stmPreGCHook (Capability *cap) {
883   lock_stm(NO_TREC);
884   TRACE("stmPreGCHook");
885   cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE;
886   cap->free_trec_chunks = END_STM_CHUNK_LIST;
887   cap->free_trec_headers = NO_TREC;
888   unlock_stm(NO_TREC);
889 }
890
891 /************************************************************************/
892
893 // check_read_only relies on version numbers held in TVars' "num_updates" 
894 // fields not wrapping around while a transaction is committed.  The version
895 // number is incremented each time an update is committed to the TVar
896 // This is unlikely to wrap around when 32-bit integers are used for the counts, 
897 // but to ensure correctness we maintain a shared count on the maximum
898 // number of commit operations that may occur and check that this has 
899 // not increased by more than 2^32 during a commit.
900
901 #define TOKEN_BATCH_SIZE 1024
902
903 static volatile StgInt64 max_commits = 0;
904
905 #if defined(THREADED_RTS)
906 static volatile StgBool token_locked = FALSE;
907
908 static void getTokenBatch(Capability *cap) {
909   while (cas((void *)&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
910   max_commits += TOKEN_BATCH_SIZE;
911   TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits);
912   cap -> transaction_tokens = TOKEN_BATCH_SIZE;
913   token_locked = FALSE;
914 }
915
916 static void getToken(Capability *cap) {
917   if (cap -> transaction_tokens == 0) {
918     getTokenBatch(cap);
919   }
920   cap -> transaction_tokens --;
921 }
922 #else
923 static void getToken(Capability *cap STG_UNUSED) {
924   // Nothing
925 }
926 #endif
927
928 /*......................................................................*/
929
930 StgTRecHeader *stmStartTransaction(Capability *cap,
931                                    StgTRecHeader *outer) {
932   StgTRecHeader *t;
933   TRACE("%p : stmStartTransaction with %d tokens", 
934         outer, 
935         cap -> transaction_tokens);
936
937   getToken(cap);
938
939   t = alloc_stg_trec_header(cap, outer);
940   TRACE("%p : stmStartTransaction()=%p", outer, t);
941   return t;
942 }
943
944 /*......................................................................*/
945
946 void stmAbortTransaction(Capability *cap,
947                          StgTRecHeader *trec) {
948   StgTRecHeader *et;
949   TRACE("%p : stmAbortTransaction", trec);
950   ASSERT (trec != NO_TREC);
951   ASSERT ((trec -> state == TREC_ACTIVE) || 
952           (trec -> state == TREC_WAITING) ||
953           (trec -> state == TREC_CONDEMNED));
954
955   lock_stm(trec);
956
957   et = trec -> enclosing_trec;
958   if (et == NO_TREC) {
959     // We're a top-level transaction: remove any watch queue entries that
960     // we may have.
961     TRACE("%p : aborting top-level transaction", trec);
962
963     if (trec -> state == TREC_WAITING) {
964       ASSERT (trec -> enclosing_trec == NO_TREC);
965       TRACE("%p : stmAbortTransaction aborting waiting transaction", trec);
966       remove_watch_queue_entries_for_trec(cap, trec);
967     } 
968
969   } else {
970     // We're a nested transaction: merge our read set into our parent's
971     TRACE("%p : retaining read-set into parent %p", trec, et);
972
973     FOR_EACH_ENTRY(trec, e, {
974       StgTVar *s = e -> tvar;
975       merge_read_into(cap, et, s, e -> expected_value);
976     });
977   } 
978
979   trec -> state = TREC_ABORTED;
980   unlock_stm(trec);
981
982   TRACE("%p : stmAbortTransaction done", trec);
983 }
984
985 /*......................................................................*/
986
987 void stmFreeAbortedTRec(Capability *cap,
988                         StgTRecHeader *trec) {
989   TRACE("%p : stmFreeAbortedTRec", trec);
990   ASSERT (trec != NO_TREC);
991   ASSERT ((trec -> state == TREC_CONDEMNED) ||
992           (trec -> state == TREC_ABORTED));
993
994   free_stg_trec_header(cap, trec);
995
996   TRACE("%p : stmFreeAbortedTRec done", trec);
997 }
998
999 /*......................................................................*/
1000
1001 void stmCondemnTransaction(Capability *cap,
1002                            StgTRecHeader *trec) {
1003   TRACE("%p : stmCondemnTransaction", trec);
1004   ASSERT (trec != NO_TREC);
1005   ASSERT ((trec -> state == TREC_ACTIVE) || 
1006           (trec -> state == TREC_WAITING) ||
1007           (trec -> state == TREC_CONDEMNED));
1008
1009   lock_stm(trec);
1010   if (trec -> state == TREC_WAITING) {
1011     ASSERT (trec -> enclosing_trec == NO_TREC);
1012     TRACE("%p : stmCondemnTransaction condemning waiting transaction", trec);
1013     remove_watch_queue_entries_for_trec(cap, trec);
1014   } 
1015   trec -> state = TREC_CONDEMNED;
1016   unlock_stm(trec);
1017
1018   TRACE("%p : stmCondemnTransaction done", trec);
1019 }
1020
1021 /*......................................................................*/
1022
1023 StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
1024   StgTRecHeader *t;
1025   StgBool result;
1026
1027   TRACE("%p : stmValidateNestOfTransactions", trec);
1028   ASSERT(trec != NO_TREC);
1029   ASSERT((trec -> state == TREC_ACTIVE) || 
1030          (trec -> state == TREC_WAITING) ||
1031          (trec -> state == TREC_CONDEMNED));
1032
1033   lock_stm(trec);
1034
1035   t = trec;
1036   result = TRUE;
1037   while (t != NO_TREC) {
1038     result &= validate_and_acquire_ownership(t, TRUE, FALSE);
1039     t = t -> enclosing_trec;
1040   }
1041
1042   if (!result && trec -> state != TREC_WAITING) {
1043     trec -> state = TREC_CONDEMNED; 
1044   }
1045
1046   unlock_stm(trec);
1047
1048   TRACE("%p : stmValidateNestOfTransactions()=%d", trec, result);
1049   return result;
1050 }
1051
1052 /*......................................................................*/
1053
1054 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
1055   TRecEntry *result = NULL;
1056
1057   TRACE("%p : get_entry_for TVar %p", trec, tvar);
1058   ASSERT(trec != NO_TREC);
1059
1060   do {
1061     FOR_EACH_ENTRY(trec, e, {
1062       if (e -> tvar == tvar) {
1063         result = e;
1064         if (in != NULL) {
1065           *in = trec;
1066         }
1067         BREAK_FOR_EACH;
1068       }
1069     });
1070     trec = trec -> enclosing_trec;
1071   } while (result == NULL && trec != NO_TREC);
1072
1073   return result;    
1074 }
1075
1076 /*......................................................................*/
1077
1078 /*
1079  * Add/remove links between an invariant TVars.  The caller must have 
1080  * locked the TVars involved and the invariant.
1081  */
1082
1083 static void disconnect_invariant(Capability *cap,
1084                                  StgAtomicInvariant *inv) {
1085   StgTRecHeader *last_execution = inv -> last_execution;
1086
1087   TRACE("unhooking last execution inv=%p trec=%p", inv, last_execution);
1088
1089   FOR_EACH_ENTRY(last_execution, e, {
1090     StgTVar *s = e -> tvar;
1091     StgTVarWatchQueue *q = s -> first_watch_queue_entry;
1092     StgBool found = FALSE;
1093     TRACE("  looking for trec on tvar=%p", s);
1094     for (q = s -> first_watch_queue_entry; 
1095          q != END_STM_WATCH_QUEUE; 
1096          q = q -> next_queue_entry) {
1097       if (q -> closure == (StgClosure*)inv) {
1098         StgTVarWatchQueue *pq;
1099         StgTVarWatchQueue *nq;
1100         nq = q -> next_queue_entry;
1101         pq = q -> prev_queue_entry;
1102         if (nq != END_STM_WATCH_QUEUE) {
1103           nq -> prev_queue_entry = pq;
1104         }
1105         if (pq != END_STM_WATCH_QUEUE) {
1106           pq -> next_queue_entry = nq;
1107         } else {
1108           ASSERT (s -> first_watch_queue_entry == q);
1109           s -> first_watch_queue_entry = nq;
1110         }
1111         TRACE("  found it in watch queue entry %p", q);
1112         free_stg_tvar_watch_queue(cap, q);
1113         found = TRUE;
1114         break;
1115       }
1116     }
1117     ASSERT(found);
1118   });
1119   inv -> last_execution = NO_TREC;
1120 }
1121
1122 static void connect_invariant_to_trec(Capability *cap,
1123                                       StgAtomicInvariant *inv, 
1124                                       StgTRecHeader *my_execution) {
1125   TRACE("connecting execution inv=%p trec=%p", inv, my_execution);
1126
1127   ASSERT(inv -> last_execution == NO_TREC);
1128
1129   FOR_EACH_ENTRY(my_execution, e, {
1130     StgTVar *s = e -> tvar;
1131     StgTVarWatchQueue *q = alloc_stg_tvar_watch_queue(cap, (StgClosure*)inv);
1132     StgTVarWatchQueue *fq = s -> first_watch_queue_entry;
1133
1134     // We leave "last_execution" holding the values that will be
1135     // in the heap after the transaction we're in the process
1136     // of committing has finished.
1137     TRecEntry *entry = get_entry_for(my_execution -> enclosing_trec, s, NULL);
1138     if (entry != NULL) {
1139       e -> expected_value = entry -> new_value;
1140       e -> new_value = entry -> new_value;
1141     }
1142
1143     TRACE("  linking trec on tvar=%p value=%p q=%p", s, e -> expected_value, q);
1144     q -> next_queue_entry = fq;
1145     q -> prev_queue_entry = END_STM_WATCH_QUEUE;
1146     if (fq != END_STM_WATCH_QUEUE) {
1147       fq -> prev_queue_entry = q;
1148     }
1149     s -> first_watch_queue_entry = q;
1150   });
1151
1152   inv -> last_execution = my_execution;
1153 }
1154
1155 /*
1156  * Add a new invariant to the trec's list of invariants to check on commit
1157  */
1158 void stmAddInvariantToCheck(Capability *cap, 
1159                             StgTRecHeader *trec,
1160                             StgClosure *code) {
1161   StgAtomicInvariant *invariant;
1162   StgInvariantCheckQueue *q;
1163   TRACE("%p : stmAddInvariantToCheck closure=%p", trec, code);
1164   ASSERT(trec != NO_TREC);
1165   ASSERT(trec -> state == TREC_ACTIVE ||
1166          trec -> state == TREC_CONDEMNED);
1167
1168
1169   // 1. Allocate an StgAtomicInvariant, set last_execution to NO_TREC
1170   //    to signal that this is a new invariant in the current atomic block
1171
1172   invariant = (StgAtomicInvariant *) allocate(cap, sizeofW(StgAtomicInvariant));
1173   TRACE("%p : stmAddInvariantToCheck allocated invariant=%p", trec, invariant);
1174   SET_HDR (invariant, &stg_ATOMIC_INVARIANT_info, CCS_SYSTEM);
1175   invariant -> code = code;
1176   invariant -> last_execution = NO_TREC;
1177   invariant -> lock = 0;
1178
1179   // 2. Allocate an StgInvariantCheckQueue entry, link it to the current trec
1180
1181   q = alloc_stg_invariant_check_queue(cap, invariant);
1182   TRACE("%p : stmAddInvariantToCheck allocated q=%p", trec, q);
1183   q -> invariant = invariant;
1184   q -> my_execution = NO_TREC;
1185   q -> next_queue_entry = trec -> invariants_to_check;
1186   trec -> invariants_to_check = q;
1187
1188   TRACE("%p : stmAddInvariantToCheck done", trec);
1189 }
1190
1191 /*
1192  * Fill in the trec's list of invariants that might be violated by the 
1193  * current transaction.  
1194  */
1195
1196 StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *trec) {
1197   StgTRecChunk *c;
1198   TRACE("%p : stmGetInvariantsToCheck, head was %p", 
1199         trec,
1200         trec -> invariants_to_check);
1201
1202   ASSERT(trec != NO_TREC);
1203   ASSERT ((trec -> state == TREC_ACTIVE) || 
1204           (trec -> state == TREC_WAITING) ||
1205           (trec -> state == TREC_CONDEMNED));
1206   ASSERT(trec -> enclosing_trec == NO_TREC);
1207
1208   lock_stm(trec);
1209   c = trec -> current_chunk;
1210   while (c != END_STM_CHUNK_LIST) {
1211     unsigned int i;
1212     for (i = 0; i < c -> next_entry_idx; i ++) {
1213       TRecEntry *e = &(c -> entries[i]);
1214       if (entry_is_update(e)) {
1215         StgTVar *s = e -> tvar;
1216         StgClosure *old = lock_tvar(trec, s);
1217                 
1218         // Pick up any invariants on the TVar being updated
1219         // by entry "e"
1220
1221         StgTVarWatchQueue *q;
1222         TRACE("%p : checking for invariants on %p", trec, s);
1223         for (q = s -> first_watch_queue_entry;
1224              q != END_STM_WATCH_QUEUE;
1225              q = q -> next_queue_entry) {
1226           if (watcher_is_invariant(q)) {
1227             StgBool found = FALSE;
1228             StgInvariantCheckQueue *q2;
1229             TRACE("%p : Touching invariant %p", trec, q -> closure);
1230             for (q2 = trec -> invariants_to_check;
1231                  q2 != END_INVARIANT_CHECK_QUEUE;
1232                  q2 = q2 -> next_queue_entry) {
1233               if (q2 -> invariant == (StgAtomicInvariant*)(q -> closure)) {
1234                 TRACE("%p : Already found %p", trec, q -> closure);
1235                 found = TRUE;
1236                 break;
1237               }
1238             }
1239             
1240             if (!found) {
1241               StgInvariantCheckQueue *q3;
1242               TRACE("%p : Not already found %p", trec, q -> closure);
1243               q3 = alloc_stg_invariant_check_queue(cap,
1244                                                    (StgAtomicInvariant*) q -> closure);
1245               q3 -> next_queue_entry = trec -> invariants_to_check;
1246               trec -> invariants_to_check = q3;
1247             }
1248           }
1249         }
1250
1251         unlock_tvar(trec, s, old, FALSE);
1252       }
1253     }
1254     c = c -> prev_chunk;
1255   }
1256
1257   unlock_stm(trec);
1258
1259   TRACE("%p : stmGetInvariantsToCheck, head now %p", 
1260         trec,
1261         trec -> invariants_to_check);
1262
1263   return (trec -> invariants_to_check);
1264 }
1265
1266 /*......................................................................*/
1267
1268 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
1269   int result;
1270   StgInt64 max_commits_at_start = max_commits;
1271   StgBool touched_invariants;
1272   StgBool use_read_phase;
1273
1274   TRACE("%p : stmCommitTransaction()", trec);
1275   ASSERT (trec != NO_TREC);
1276
1277   lock_stm(trec);
1278
1279   ASSERT (trec -> enclosing_trec == NO_TREC);
1280   ASSERT ((trec -> state == TREC_ACTIVE) || 
1281           (trec -> state == TREC_CONDEMNED));
1282
1283   // touched_invariants is true if we've written to a TVar with invariants 
1284   // attached to it, or if we're trying to add a new invariant to the system.
1285
1286   touched_invariants = (trec -> invariants_to_check != END_INVARIANT_CHECK_QUEUE);
1287
1288   // If we have touched invariants then (i) lock the invariant, and (ii) add
1289   // the invariant's read set to our own.  Step (i) is needed to serialize
1290   // concurrent transactions that attempt to make conflicting updates
1291   // to the invariant's trec (suppose it read from t1 and t2, and that one
1292   // concurrent transcation writes only to t1, and a second writes only to
1293   // t2).  Step (ii) is needed so that both transactions will lock t1 and t2
1294   // to gain access to their wait lists (and hence be able to unhook the
1295   // invariant from both tvars).
1296
1297   if (touched_invariants) {
1298     StgInvariantCheckQueue *q = trec -> invariants_to_check;
1299     TRACE("%p : locking invariants", trec);
1300     while (q != END_INVARIANT_CHECK_QUEUE) {
1301       StgTRecHeader *inv_old_trec;
1302       StgAtomicInvariant *inv;
1303       TRACE("%p : locking invariant %p", trec, q -> invariant);
1304       inv = q -> invariant;
1305       if (!lock_inv(inv)) {
1306         TRACE("%p : failed to lock %p", trec, inv);
1307         trec -> state = TREC_CONDEMNED;
1308         break;
1309       }
1310
1311       inv_old_trec = inv -> last_execution;
1312       if (inv_old_trec != NO_TREC) {
1313         StgTRecChunk *c = inv_old_trec -> current_chunk;
1314         while (c != END_STM_CHUNK_LIST) {
1315           unsigned int i;
1316           for (i = 0; i < c -> next_entry_idx; i ++) {
1317             TRecEntry *e = &(c -> entries[i]);
1318             TRACE("%p : ensuring we lock TVars for %p", trec, e -> tvar);
1319             merge_read_into (cap, trec, e -> tvar, e -> expected_value);
1320           }
1321           c = c -> prev_chunk;
1322         }
1323       }
1324       q = q -> next_queue_entry;
1325     }
1326     TRACE("%p : finished locking invariants", trec);
1327   }
1328
1329   // Use a read-phase (i.e. don't lock TVars we've read but not updated) if
1330   // (i) the configuration lets us use a read phase, and (ii) we've not
1331   // touched or introduced any invariants.  
1332   //
1333   // In principle we could extend the implementation to support a read-phase
1334   // and invariants, but it complicates the logic: the links between
1335   // invariants and TVars are managed by the TVar watch queues which are
1336   // protected by the TVar's locks.
1337
1338   use_read_phase = ((config_use_read_phase) && (!touched_invariants));
1339
1340   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
1341   if (result) {
1342     // We now know that all the updated locations hold their expected values.
1343     ASSERT (trec -> state == TREC_ACTIVE);
1344
1345     if (use_read_phase) {
1346       StgInt64 max_commits_at_end;
1347       StgInt64 max_concurrent_commits;
1348       TRACE("%p : doing read check", trec);
1349       result = check_read_only(trec);
1350       TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
1351
1352       max_commits_at_end = max_commits;
1353       max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
1354                                 (n_capabilities * TOKEN_BATCH_SIZE));
1355       if (((max_concurrent_commits >> 32) > 0) || shake()) {
1356         result = FALSE;
1357       }
1358     }
1359     
1360     if (result) {
1361       // We now know that all of the read-only locations held their exepcted values
1362       // at the end of the call to validate_and_acquire_ownership.  This forms the
1363       // linearization point of the commit.
1364
1365       // 1. If we have touched or introduced any invariants then unhook them
1366       //    from the TVars they depended on last time they were executed
1367       //    and hook them on the TVars that they now depend on.
1368       if (touched_invariants) {
1369         StgInvariantCheckQueue *q = trec -> invariants_to_check;
1370         while (q != END_INVARIANT_CHECK_QUEUE) {
1371           StgAtomicInvariant *inv = q -> invariant;
1372           if (inv -> last_execution != NO_TREC) {
1373             disconnect_invariant(cap, inv);
1374           }
1375
1376           TRACE("%p : hooking up new execution trec=%p", trec, q -> my_execution);
1377           connect_invariant_to_trec(cap, inv, q -> my_execution);
1378
1379           TRACE("%p : unlocking invariant %p", trec, inv);
1380           unlock_inv(inv);
1381
1382           q = q -> next_queue_entry;
1383         }
1384       }
1385
1386       // 2. Make the updates required by the transaction
1387       FOR_EACH_ENTRY(trec, e, {
1388         StgTVar *s;
1389         s = e -> tvar;
1390         if ((!use_read_phase) || (e -> new_value != e -> expected_value)) {
1391           // Either the entry is an update or we're not using a read phase:
1392           // write the value back to the TVar, unlocking it if necessary.
1393
1394           ACQ_ASSERT(tvar_is_locked(s, trec));
1395           TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
1396           unpark_waiters_on(cap,s);
1397           IF_STM_FG_LOCKS({
1398             s -> num_updates ++;
1399           });
1400           unlock_tvar(trec, s, e -> new_value, TRUE);
1401         } 
1402         ACQ_ASSERT(!tvar_is_locked(s, trec));
1403       });
1404     } else {
1405       revert_ownership(trec, FALSE);
1406     }
1407   } 
1408
1409   unlock_stm(trec);
1410
1411   free_stg_trec_header(cap, trec);
1412
1413   TRACE("%p : stmCommitTransaction()=%d", trec, result);
1414
1415   return result;
1416 }
1417
1418 /*......................................................................*/
1419
1420 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
1421   StgTRecHeader *et;
1422   int result;
1423   ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
1424   TRACE("%p : stmCommitNestedTransaction() into %p", trec, trec -> enclosing_trec);
1425   ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
1426
1427   lock_stm(trec);
1428
1429   et = trec -> enclosing_trec;
1430   result = validate_and_acquire_ownership(trec, (!config_use_read_phase), TRUE);
1431   if (result) {
1432     // We now know that all the updated locations hold their expected values.
1433
1434     if (config_use_read_phase) {
1435       TRACE("%p : doing read check", trec);
1436       result = check_read_only(trec);
1437     }
1438     if (result) {
1439       // We now know that all of the read-only locations held their exepcted values
1440       // at the end of the call to validate_and_acquire_ownership.  This forms the
1441       // linearization point of the commit.
1442
1443       TRACE("%p : read-check succeeded", trec);
1444       FOR_EACH_ENTRY(trec, e, {
1445         // Merge each entry into the enclosing transaction record, release all
1446         // locks.
1447         
1448         StgTVar *s;
1449         s = e -> tvar;
1450         if (entry_is_update(e)) {
1451           unlock_tvar(trec, s, e -> expected_value, FALSE);
1452         }
1453         merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
1454         ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
1455       });
1456     } else {
1457       revert_ownership(trec, FALSE);
1458     }
1459   } 
1460
1461   unlock_stm(trec);
1462
1463   free_stg_trec_header(cap, trec);
1464
1465   TRACE("%p : stmCommitNestedTransaction()=%d", trec, result);
1466
1467   return result;
1468 }
1469
1470 /*......................................................................*/
1471
1472 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
1473   int result;
1474   TRACE("%p : stmWait(%p)", trec, tso);
1475   ASSERT (trec != NO_TREC);
1476   ASSERT (trec -> enclosing_trec == NO_TREC);
1477   ASSERT ((trec -> state == TREC_ACTIVE) || 
1478           (trec -> state == TREC_CONDEMNED));
1479
1480   lock_stm(trec);
1481   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
1482   if (result) {
1483     // The transaction is valid so far so we can actually start waiting.
1484     // (Otherwise the transaction was not valid and the thread will have to
1485     // retry it).
1486
1487     // Put ourselves to sleep.  We retain locks on all the TVars involved
1488     // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
1489     // in the TSO, (c) TREC_WAITING in the Trec.  
1490     build_watch_queue_entries_for_trec(cap, tso, trec);
1491     park_tso(tso);
1492     trec -> state = TREC_WAITING;
1493
1494     // We haven't released ownership of the transaction yet.  The TSO
1495     // has been put on the wait queue for the TVars it is waiting for,
1496     // but we haven't yet tidied up the TSO's stack and made it safe
1497     // to wake up the TSO.  Therefore, we must wait until the TSO is
1498     // safe to wake up before we release ownership - when all is well,
1499     // the runtime will call stmWaitUnlock() below, with the same
1500     // TRec.
1501
1502   } else {
1503     unlock_stm(trec);
1504     free_stg_trec_header(cap, trec);
1505   }
1506
1507   TRACE("%p : stmWait(%p)=%d", trec, tso, result);
1508   return result;
1509 }
1510
1511
1512 void
1513 stmWaitUnlock(Capability *cap STG_UNUSED, StgTRecHeader *trec) {
1514     revert_ownership(trec, TRUE);
1515     unlock_stm(trec);
1516 }
1517
1518 /*......................................................................*/
1519
1520 StgBool stmReWait(Capability *cap, StgTSO *tso) {
1521   int result;
1522   StgTRecHeader *trec = tso->trec;
1523
1524   TRACE("%p : stmReWait", trec);
1525   ASSERT (trec != NO_TREC);
1526   ASSERT (trec -> enclosing_trec == NO_TREC);
1527   ASSERT ((trec -> state == TREC_WAITING) || 
1528           (trec -> state == TREC_CONDEMNED));
1529
1530   lock_stm(trec);
1531   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
1532   TRACE("%p : validation %s", trec, result ? "succeeded" : "failed");
1533   if (result) {
1534     // The transaction remains valid -- do nothing because it is already on
1535     // the wait queues
1536     ASSERT (trec -> state == TREC_WAITING);
1537     park_tso(tso);
1538     revert_ownership(trec, TRUE);
1539   } else {
1540     // The transcation has become invalid.  We can now remove it from the wait
1541     // queues.
1542     if (trec -> state != TREC_CONDEMNED) {
1543       remove_watch_queue_entries_for_trec (cap, trec);
1544     }
1545     free_stg_trec_header(cap, trec);
1546   }
1547   unlock_stm(trec);
1548
1549   TRACE("%p : stmReWait()=%d", trec, result);
1550   return result;
1551 }
1552
1553 /*......................................................................*/
1554
1555 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
1556   StgClosure *result;
1557   result = tvar -> current_value;
1558
1559 #if defined(STM_FG_LOCKS)
1560   while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) {
1561     TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
1562     result = tvar -> current_value;
1563   }
1564 #endif
1565
1566   TRACE("%p : read_current_value(%p)=%p", trec, tvar, result);
1567   return result;
1568 }
1569
1570 /*......................................................................*/
1571
1572 StgClosure *stmReadTVar(Capability *cap,
1573                         StgTRecHeader *trec, 
1574                         StgTVar *tvar) {
1575   StgTRecHeader *entry_in = NULL;
1576   StgClosure *result = NULL;
1577   TRecEntry *entry = NULL;
1578   TRACE("%p : stmReadTVar(%p)", trec, tvar);
1579   ASSERT (trec != NO_TREC);
1580   ASSERT (trec -> state == TREC_ACTIVE || 
1581           trec -> state == TREC_CONDEMNED);
1582
1583   entry = get_entry_for(trec, tvar, &entry_in);
1584
1585   if (entry != NULL) {
1586     if (entry_in == trec) {
1587       // Entry found in our trec
1588       result = entry -> new_value;
1589     } else {
1590       // Entry found in another trec
1591       TRecEntry *new_entry = get_new_entry(cap, trec);
1592       new_entry -> tvar = tvar;
1593       new_entry -> expected_value = entry -> expected_value;
1594       new_entry -> new_value = entry -> new_value;
1595       result = new_entry -> new_value;
1596     } 
1597   } else {
1598     // No entry found
1599     StgClosure *current_value = read_current_value(trec, tvar);
1600     TRecEntry *new_entry = get_new_entry(cap, trec);
1601     new_entry -> tvar = tvar;
1602     new_entry -> expected_value = current_value;
1603     new_entry -> new_value = current_value;
1604     result = current_value;
1605   }
1606
1607   TRACE("%p : stmReadTVar(%p)=%p", trec, tvar, result);
1608   return result;
1609 }
1610
1611 /*......................................................................*/
1612
1613 void stmWriteTVar(Capability *cap,
1614                   StgTRecHeader *trec,
1615                   StgTVar *tvar, 
1616                   StgClosure *new_value) {
1617
1618   StgTRecHeader *entry_in = NULL;
1619   TRecEntry *entry = NULL;
1620   TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
1621   ASSERT (trec != NO_TREC);
1622   ASSERT (trec -> state == TREC_ACTIVE || 
1623           trec -> state == TREC_CONDEMNED);
1624
1625   entry = get_entry_for(trec, tvar, &entry_in);
1626
1627   if (entry != NULL) {
1628     if (entry_in == trec) {
1629       // Entry found in our trec
1630       entry -> new_value = new_value;
1631     } else {
1632       // Entry found in another trec
1633       TRecEntry *new_entry = get_new_entry(cap, trec);
1634       new_entry -> tvar = tvar;
1635       new_entry -> expected_value = entry -> expected_value;
1636       new_entry -> new_value = new_value;
1637     } 
1638   } else {
1639     // No entry found
1640     StgClosure *current_value = read_current_value(trec, tvar);
1641     TRecEntry *new_entry = get_new_entry(cap, trec);
1642     new_entry -> tvar = tvar;
1643     new_entry -> expected_value = current_value;
1644     new_entry -> new_value = new_value;
1645   }
1646
1647   TRACE("%p : stmWriteTVar done", trec);
1648 }
1649
1650 /*......................................................................*/
1651
1652 StgTVar *stmNewTVar(Capability *cap,
1653                     StgClosure *new_value) {
1654   StgTVar *result;
1655   result = (StgTVar *)allocate(cap, sizeofW(StgTVar));
1656   SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
1657   result -> current_value = new_value;
1658   result -> first_watch_queue_entry = END_STM_WATCH_QUEUE;
1659 #if defined(THREADED_RTS)
1660   result -> num_updates = 0;
1661 #endif
1662   return result;
1663 }
1664
1665 /*......................................................................*/