rename StgSync to SpinLock
[ghc-hetmet.git] / rts / STM.c
1 /* -----------------------------------------------------------------------------
2  * (c) The GHC Team 1998-2005
3  * 
4  * STM implementation.
5  *
6  * Overview
7  * --------
8  *
9  * See the PPoPP 2005 paper "Composable memory transactions".  In summary, 
10  * each transcation has a TRec (transaction record) holding entries for each of the
11  * TVars (transactional variables) that it has accessed.  Each entry records
12  * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
13  * the transaction wants to write to the TVar, (d) during commit, the identity of
14  * the TRec that wrote the expected value.  
15  *
16  * Separate TRecs are used for each level in a nest of transactions.  This allows
17  * a nested transaction to be aborted without condemning its enclosing transactions.
18  * This is needed in the implementation of catchRetry.  Note that the "expected value"
19  * in a nested transaction's TRec is the value expected to be *held in memory* if
20  * the transaction commits -- not the "new value" stored in one of the enclosing
21  * transactions.  This means that validation can be done without searching through
22  * a nest of TRecs.
23  *
24  * Concurrency control
25  * -------------------
26  *
27  * Three different concurrency control schemes can be built according to the settings
28  * in STM.h:
29  * 
30  * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
31  * In the Haskell RTS this means it is suitable only for non-THREADED_RTS builds.
32  *
33  * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
34  * an invocation on the STM interface.  Note that this does not mean that 
35  * transactions are simply serialized -- the lock is only held *within* the 
36  * implementation of stmCommitTransaction, stmWait etc.
37  *
38  * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
39  * and, when committing a transaction, no locks are acquired for TVars that have
40  * been read but not updated.
41  *
42  * Concurrency control is implemented in the functions:
43  *
44  *    lock_stm
45  *    unlock_stm
46  *    lock_tvar / cond_lock_tvar
47  *    unlock_tvar
48  *
49  * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the 
50  * implementation of these functions.  
51  *
52  * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
53  * using STM_CG_LOCK, and otherwise they are no-ops.
54  *
55  * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they 
56  * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
57  * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
58  * builds).  This is because locking a TVar is implemented by writing the lock
59  * holder's TRec into the TVar's current_value field:
60  *
61  *   lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value 
62  *               it contained.
63  *
64  *   cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it 
65  *               contains a specified value.  Return TRUE if this succeeds,
66  *               FALSE otherwise.
67  *
68  *   unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
69  *               storing a specified value in place of the lock entry.
70  *
71  * Using these operations, the typcial pattern of a commit/validate/wait operation
72  * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that 
73  * the TVars that were only read from still contain their expected values, 
74  * (d) release the locks on the TVars, writing updates to them in the case of a 
75  * commit, (e) unlock the STM.
76  *
77  * Queues of waiting threads hang off the first_watch_queue_entry field of each
78  * TVar.  This may only be manipulated when holding that TVar's lock.  In
79  * particular, when a thread is putting itself to sleep, it mustn't release
80  * the TVar's lock until it has added itself to the wait queue and marked its
81  * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it.
82  *
83  * ---------------------------------------------------------------------------*/
84
85 #include "PosixSource.h"
86 #include "Rts.h"
87 #include "RtsFlags.h"
88 #include "RtsUtils.h"
89 #include "Storage.h"
90 #include "Schedule.h"
91 #include "SMP.h"
92 #include "STM.h"
93 #include "Trace.h"
94
95 #include <stdlib.h>
96 #include <stdio.h>
97
98 #define TRUE 1
99 #define FALSE 0
100
101 // ACQ_ASSERT is used for assertions which are only required for
102 // THREADED_RTS builds with fine-grained locking.
103
104 #if defined(STM_FG_LOCKS)
105 #define ACQ_ASSERT(_X) ASSERT(_X)
106 #define NACQ_ASSERT(_X) /*Nothing*/
107 #else
108 #define ACQ_ASSERT(_X) /*Nothing*/
109 #define NACQ_ASSERT(_X) ASSERT(_X)
110 #endif
111
112 /*......................................................................*/
113
114 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
115 // unusualy code paths if genuine contention is rare
116
117 #define TRACE(_x...) debugTrace(DEBUG_stm, "STM: " _x)
118
119 #ifdef SHAKE
120 static const int do_shake = TRUE;
121 #else
122 static const int do_shake = FALSE;
123 #endif
124 static int shake_ctr = 0;
125 static int shake_lim = 1;
126
127 static int shake(void) {
128   if (do_shake) {
129     if (((shake_ctr++) % shake_lim) == 0) {
130       shake_ctr = 1;
131       shake_lim ++;
132       return TRUE;
133     } 
134     return FALSE;
135   } else {
136     return FALSE;
137   }
138 }
139
140 /*......................................................................*/
141
142 // Helper macros for iterating over entries within a transaction
143 // record
144
145 #define FOR_EACH_ENTRY(_t,_x,CODE) do {                                         \
146   StgTRecHeader *__t = (_t);                                                    \
147   StgTRecChunk *__c = __t -> current_chunk;                                     \
148   StgWord __limit = __c -> next_entry_idx;                                      \
149   TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld", __t, __c, __limit);  \
150   while (__c != END_STM_CHUNK_LIST) {                                           \
151     StgWord __i;                                                                \
152     for (__i = 0; __i < __limit; __i ++) {                                      \
153       TRecEntry *_x = &(__c -> entries[__i]);                                   \
154       do { CODE } while (0);                                                    \
155     }                                                                           \
156     __c = __c -> prev_chunk;                                                    \
157     __limit = TREC_CHUNK_NUM_ENTRIES;                                           \
158   }                                                                             \
159  exit_for_each:                                                                 \
160   if (FALSE) goto exit_for_each;                                                \
161 } while (0)
162
163 #define BREAK_FOR_EACH goto exit_for_each
164      
165 /*......................................................................*/
166
167 // if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
168 // and wait queue entries without GC
169
170 #define REUSE_MEMORY
171
172 /*......................................................................*/
173
174 #define IF_STM_UNIPROC(__X)  do { } while (0)
175 #define IF_STM_CG_LOCK(__X)  do { } while (0)
176 #define IF_STM_FG_LOCKS(__X) do { } while (0)
177
178 #if defined(STM_UNIPROC)
179 #undef IF_STM_UNIPROC
180 #define IF_STM_UNIPROC(__X)  do { __X } while (0)
181 static const StgBool config_use_read_phase = FALSE;
182
183 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
184   TRACE("%p : lock_stm()", trec);
185 }
186
187 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
188   TRACE("%p : unlock_stm()", trec);
189 }
190
191 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
192                              StgTVar *s STG_UNUSED) {
193   StgClosure *result;
194   TRACE("%p : lock_tvar(%p)", trec, s);
195   result = s -> current_value;
196   return result;
197 }
198
199 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
200                         StgTVar *s STG_UNUSED,
201                         StgClosure *c,
202                         StgBool force_update) {
203   TRACE("%p : unlock_tvar(%p)", trec, s);
204   if (force_update) {
205     s -> current_value = c;
206   }
207 }
208
209 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
210                               StgTVar *s STG_UNUSED,
211                               StgClosure *expected) {
212   StgClosure *result;
213   TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
214   result = s -> current_value;
215   TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
216   return (result == expected);
217 }
218
219 static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
220   // Nothing -- uniproc
221   return TRUE;
222 }
223
224 static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) { 
225   // Nothing -- uniproc
226 }
227 #endif
228
229 #if defined(STM_CG_LOCK) /*........................................*/
230
231 #undef IF_STM_CG_LOCK
232 #define IF_STM_CG_LOCK(__X)  do { __X } while (0)
233 static const StgBool config_use_read_phase = FALSE;
234 static volatile StgTRecHeader *smp_locked = NULL;
235
236 static void lock_stm(StgTRecHeader *trec) {
237   while (cas(&smp_locked, NULL, trec) != NULL) { }
238   TRACE("%p : lock_stm()", trec);
239 }
240
241 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
242   TRACE("%p : unlock_stm()", trec);
243   ASSERT (smp_locked == trec);
244   smp_locked = 0;
245 }
246
247 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
248                              StgTVar *s STG_UNUSED) {
249   StgClosure *result;
250   TRACE("%p : lock_tvar(%p)", trec, s);
251   ASSERT (smp_locked == trec);
252   result = s -> current_value;
253   return result;
254 }
255
256 static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
257                          StgTVar *s STG_UNUSED,
258                          StgClosure *c,
259                          StgBool force_update) {
260   TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
261   ASSERT (smp_locked == trec);
262   if (force_update) {
263     s -> current_value = c;
264   }
265 }
266
267 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
268                                StgTVar *s STG_UNUSED,
269                                StgClosure *expected) {
270   StgClosure *result;
271   TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
272   ASSERT (smp_locked == trec);
273   result = s -> current_value;
274   TRACE("%p : %d", result ? "success" : "failure");
275   return (result == expected);
276 }
277
278 static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
279   // Nothing -- protected by STM lock
280   return TRUE;
281 }
282
283 static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) { 
284   // Nothing -- protected by STM lock
285 }
286 #endif
287
288 #if defined(STM_FG_LOCKS) /*...................................*/
289
290 #undef IF_STM_FG_LOCKS
291 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
292 static const StgBool config_use_read_phase = TRUE;
293
294 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
295   TRACE("%p : lock_stm()", trec);
296 }
297
298 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
299   TRACE("%p : unlock_stm()", trec);
300 }
301
302 static StgClosure *lock_tvar(StgTRecHeader *trec, 
303                              StgTVar *s STG_UNUSED) {
304   StgClosure *result;
305   TRACE("%p : lock_tvar(%p)", trec, s);
306   do {
307     do {
308       result = s -> current_value;
309     } while (GET_INFO(result) == &stg_TREC_HEADER_info);
310   } while (cas((void *)&(s -> current_value),
311                (StgWord)result, (StgWord)trec) != (StgWord)result);
312   return result;
313 }
314
315 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
316                         StgTVar *s,
317                         StgClosure *c,
318                         StgBool force_update STG_UNUSED) {
319   TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
320   ASSERT(s -> current_value == (StgClosure *)trec);
321   s -> current_value = c;
322 }
323
324 static StgBool cond_lock_tvar(StgTRecHeader *trec, 
325                               StgTVar *s,
326                               StgClosure *expected) {
327   StgClosure *result;
328   StgWord w;
329   TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
330   w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
331   result = (StgClosure *)w;
332   TRACE("%p : %s", trec, result ? "success" : "failure");
333   return (result == expected);
334 }
335
336 static StgBool lock_inv(StgAtomicInvariant *inv) {
337   return (cas(&(inv -> lock), 0, 1) == 0);
338 }
339
340 static void unlock_inv(StgAtomicInvariant *inv) { 
341   ASSERT(inv -> lock == 1);
342   inv -> lock = 0;
343 }
344 #endif
345
346 /*......................................................................*/
347  
348 static StgBool watcher_is_tso(StgTVarWatchQueue *q) {
349   StgClosure *c = q -> closure;
350   StgInfoTable *info = get_itbl(c);
351   return (info -> type) == TSO;
352 }
353
354 static StgBool watcher_is_invariant(StgTVarWatchQueue *q) {
355   StgClosure *c = q -> closure;
356   StgInfoTable *info = get_itbl(c);
357   return (info -> type) == ATOMIC_INVARIANT;
358 }
359
360 /*......................................................................*/
361  
362 // Helper functions for thread blocking and unblocking
363
364 static void park_tso(StgTSO *tso) {
365   ASSERT(tso -> why_blocked == NotBlocked);
366   tso -> why_blocked = BlockedOnSTM;
367   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
368   TRACE("park_tso on tso=%p", tso);
369 }
370
371 static void unpark_tso(Capability *cap, StgTSO *tso) {
372     // We will continue unparking threads while they remain on one of the wait
373     // queues: it's up to the thread itself to remove it from the wait queues
374     // if it decides to do so when it is scheduled.
375
376     // Unblocking a TSO from BlockedOnSTM is done under the TSO lock,
377     // to avoid multiple CPUs unblocking the same TSO, and also to
378     // synchronise with throwTo().
379     lockTSO(tso);
380     if (tso -> why_blocked == BlockedOnSTM) {
381         TRACE("unpark_tso on tso=%p", tso);
382         unblockOne(cap,tso);
383     } else {
384         TRACE("spurious unpark_tso on tso=%p", tso);
385     }
386     unlockTSO(tso);
387 }
388
389 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
390   StgTVarWatchQueue *q;
391   TRACE("unpark_waiters_on tvar=%p", s);
392   for (q = s -> first_watch_queue_entry; 
393        q != END_STM_WATCH_QUEUE; 
394        q = q -> next_queue_entry) {
395     if (watcher_is_tso(q)) {
396       unpark_tso(cap, (StgTSO *)(q -> closure));
397     }
398   }
399 }
400
401 /*......................................................................*/
402
403 // Helper functions for downstream allocation and initialization
404
405 static StgInvariantCheckQueue *new_stg_invariant_check_queue(Capability *cap,
406                                                              StgAtomicInvariant *invariant) {
407   StgInvariantCheckQueue *result;
408   result = (StgInvariantCheckQueue *)allocateLocal(cap, sizeofW(StgInvariantCheckQueue));
409   SET_HDR (result, &stg_INVARIANT_CHECK_QUEUE_info, CCS_SYSTEM);
410   result -> invariant = invariant;
411   result -> my_execution = NO_TREC;
412   return result;
413 }
414
415 static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap,
416                                                    StgClosure *closure) {
417   StgTVarWatchQueue *result;
418   result = (StgTVarWatchQueue *)allocateLocal(cap, sizeofW(StgTVarWatchQueue));
419   SET_HDR (result, &stg_TVAR_WATCH_QUEUE_info, CCS_SYSTEM);
420   result -> closure = closure;
421   return result;
422 }
423
424 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
425   StgTRecChunk *result;
426   result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk));
427   SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
428   result -> prev_chunk = END_STM_CHUNK_LIST;
429   result -> next_entry_idx = 0;
430   return result;
431 }
432
433 static StgTRecHeader *new_stg_trec_header(Capability *cap,
434                                           StgTRecHeader *enclosing_trec) {
435   StgTRecHeader *result;
436   result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader));
437   SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
438
439   result -> enclosing_trec = enclosing_trec;
440   result -> current_chunk = new_stg_trec_chunk(cap);
441   result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
442
443   if (enclosing_trec == NO_TREC) {
444     result -> state = TREC_ACTIVE;
445   } else {
446     ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
447            enclosing_trec -> state == TREC_CONDEMNED);
448     result -> state = enclosing_trec -> state;
449   }
450
451   return result;  
452 }
453
454 /*......................................................................*/
455
456 // Allocation / deallocation functions that retain per-capability lists
457 // of closures that can be re-used
458
459 static StgInvariantCheckQueue *alloc_stg_invariant_check_queue(Capability *cap,
460                                                                StgAtomicInvariant *invariant) {
461   StgInvariantCheckQueue *result = NULL;
462   if (cap -> free_invariant_check_queues == END_INVARIANT_CHECK_QUEUE) {
463     result = new_stg_invariant_check_queue(cap, invariant);
464   } else {
465     result = cap -> free_invariant_check_queues;
466     result -> invariant = invariant;
467     result -> my_execution = NO_TREC;
468     cap -> free_invariant_check_queues = result -> next_queue_entry;
469   }
470   return result;
471 }
472
473 static StgTVarWatchQueue *alloc_stg_tvar_watch_queue(Capability *cap,
474                                                      StgClosure *closure) {
475   StgTVarWatchQueue *result = NULL;
476   if (cap -> free_tvar_watch_queues == END_STM_WATCH_QUEUE) {
477     result = new_stg_tvar_watch_queue(cap, closure);
478   } else {
479     result = cap -> free_tvar_watch_queues;
480     result -> closure = closure;
481     cap -> free_tvar_watch_queues = result -> next_queue_entry;
482   }
483   return result;
484 }
485
486 static void free_stg_tvar_watch_queue(Capability *cap,
487                                       StgTVarWatchQueue *wq) {
488 #if defined(REUSE_MEMORY)
489   wq -> next_queue_entry = cap -> free_tvar_watch_queues;
490   cap -> free_tvar_watch_queues = wq;
491 #endif
492 }
493
494 static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
495   StgTRecChunk *result = NULL;
496   if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
497     result = new_stg_trec_chunk(cap);
498   } else {
499     result = cap -> free_trec_chunks;
500     cap -> free_trec_chunks = result -> prev_chunk;
501     result -> prev_chunk = END_STM_CHUNK_LIST;
502     result -> next_entry_idx = 0;
503   }
504   return result;
505 }
506
507 static void free_stg_trec_chunk(Capability *cap, 
508                                 StgTRecChunk *c) {
509 #if defined(REUSE_MEMORY)
510   c -> prev_chunk = cap -> free_trec_chunks;
511   cap -> free_trec_chunks = c;
512 #endif
513 }
514
515 static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
516                                             StgTRecHeader *enclosing_trec) {
517   StgTRecHeader *result = NULL;
518   if (cap -> free_trec_headers == NO_TREC) {
519     result = new_stg_trec_header(cap, enclosing_trec);
520   } else {
521     result = cap -> free_trec_headers;
522     cap -> free_trec_headers = result -> enclosing_trec;
523     result -> enclosing_trec = enclosing_trec;
524     result -> current_chunk -> next_entry_idx = 0;
525     result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
526     if (enclosing_trec == NO_TREC) {
527       result -> state = TREC_ACTIVE;
528     } else {
529       ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
530              enclosing_trec -> state == TREC_CONDEMNED);
531       result -> state = enclosing_trec -> state;
532     }
533   }
534   return result;
535 }
536
537 static void free_stg_trec_header(Capability *cap,
538                                  StgTRecHeader *trec) {
539 #if defined(REUSE_MEMORY)
540   StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
541   while (chunk != END_STM_CHUNK_LIST) {
542     StgTRecChunk *prev_chunk = chunk -> prev_chunk;
543     free_stg_trec_chunk(cap, chunk);
544     chunk = prev_chunk;
545   } 
546   trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
547   trec -> enclosing_trec = cap -> free_trec_headers;
548   cap -> free_trec_headers = trec;
549 #endif
550 }
551
552 /*......................................................................*/
553
554 // Helper functions for managing waiting lists
555
556 static void build_watch_queue_entries_for_trec(Capability *cap,
557                                                StgTSO *tso, 
558                                                StgTRecHeader *trec) {
559   ASSERT(trec != NO_TREC);
560   ASSERT(trec -> enclosing_trec == NO_TREC);
561   ASSERT(trec -> state == TREC_ACTIVE);
562
563   TRACE("%p : build_watch_queue_entries_for_trec()", trec);
564
565   FOR_EACH_ENTRY(trec, e, {
566     StgTVar *s;
567     StgTVarWatchQueue *q;
568     StgTVarWatchQueue *fq;
569     s = e -> tvar;
570     TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
571     ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
572     NACQ_ASSERT(s -> current_value == e -> expected_value);
573     fq = s -> first_watch_queue_entry;
574     q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
575     q -> next_queue_entry = fq;
576     q -> prev_queue_entry = END_STM_WATCH_QUEUE;
577     if (fq != END_STM_WATCH_QUEUE) {
578       fq -> prev_queue_entry = q;
579     }
580     s -> first_watch_queue_entry = q;
581     e -> new_value = (StgClosure *) q;
582   });
583 }
584
585 static void remove_watch_queue_entries_for_trec(Capability *cap,
586                                                 StgTRecHeader *trec) {
587   ASSERT(trec != NO_TREC);
588   ASSERT(trec -> enclosing_trec == NO_TREC);
589   ASSERT(trec -> state == TREC_WAITING ||
590          trec -> state == TREC_CONDEMNED);
591
592   TRACE("%p : remove_watch_queue_entries_for_trec()", trec);
593
594   FOR_EACH_ENTRY(trec, e, {
595     StgTVar *s;
596     StgTVarWatchQueue *pq;
597     StgTVarWatchQueue *nq;
598     StgTVarWatchQueue *q;
599     StgClosure *saw;
600     s = e -> tvar;
601     saw = lock_tvar(trec, s);
602     q = (StgTVarWatchQueue *) (e -> new_value);
603     TRACE("%p : removing tso=%p from watch queue for tvar=%p", 
604           trec, 
605           q -> closure, 
606           s);
607     ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
608     nq = q -> next_queue_entry;
609     pq = q -> prev_queue_entry;
610     if (nq != END_STM_WATCH_QUEUE) {
611       nq -> prev_queue_entry = pq;
612     }
613     if (pq != END_STM_WATCH_QUEUE) {
614       pq -> next_queue_entry = nq;
615     } else {
616       ASSERT (s -> first_watch_queue_entry == q);
617       s -> first_watch_queue_entry = nq;
618     }
619     free_stg_tvar_watch_queue(cap, q);
620     unlock_tvar(trec, s, saw, FALSE);
621   });
622 }
623  
624 /*......................................................................*/
625  
626 static TRecEntry *get_new_entry(Capability *cap,
627                                 StgTRecHeader *t) {
628   TRecEntry *result;
629   StgTRecChunk *c;
630   int i;
631
632   c = t -> current_chunk;
633   i = c -> next_entry_idx;
634   ASSERT(c != END_STM_CHUNK_LIST);
635
636   if (i < TREC_CHUNK_NUM_ENTRIES) {
637     // Continue to use current chunk
638     result = &(c -> entries[i]);
639     c -> next_entry_idx ++;
640   } else {
641     // Current chunk is full: allocate a fresh one
642     StgTRecChunk *nc;
643     nc = alloc_stg_trec_chunk(cap);
644     nc -> prev_chunk = c;
645     nc -> next_entry_idx = 1;
646     t -> current_chunk = nc;
647     result = &(nc -> entries[0]);
648   }
649
650   return result;
651 }
652
653 /*......................................................................*/
654
655 static void merge_update_into(Capability *cap,
656                               StgTRecHeader *t,
657                               StgTVar *tvar,
658                               StgClosure *expected_value,
659                               StgClosure *new_value) {
660   int found;
661   
662   // Look for an entry in this trec
663   found = FALSE;
664   FOR_EACH_ENTRY(t, e, {
665     StgTVar *s;
666     s = e -> tvar;
667     if (s == tvar) {
668       found = TRUE;
669       if (e -> expected_value != expected_value) {
670         // Must abort if the two entries start from different values
671         TRACE("%p : update entries inconsistent at %p (%p vs %p)", 
672               t, tvar, e -> expected_value, expected_value);
673         t -> state = TREC_CONDEMNED;
674       } 
675       e -> new_value = new_value;
676       BREAK_FOR_EACH;
677     }
678   });
679
680   if (!found) {
681     // No entry so far in this trec
682     TRecEntry *ne;
683     ne = get_new_entry(cap, t);
684     ne -> tvar = tvar;
685     ne -> expected_value = expected_value;
686     ne -> new_value = new_value;
687   }
688 }
689
690 /*......................................................................*/
691
692 static void merge_read_into(Capability *cap,
693                             StgTRecHeader *t,
694                             StgTVar *tvar,
695                             StgClosure *expected_value) {
696   int found;
697   
698   // Look for an entry in this trec
699   found = FALSE;
700   FOR_EACH_ENTRY(t, e, {
701     StgTVar *s;
702     s = e -> tvar;
703     if (s == tvar) {
704       found = TRUE;
705       if (e -> expected_value != expected_value) {
706         // Must abort if the two entries start from different values
707         TRACE("%p : read entries inconsistent at %p (%p vs %p)", 
708               t, tvar, e -> expected_value, expected_value);
709         t -> state = TREC_CONDEMNED;
710       } 
711       BREAK_FOR_EACH;
712     }
713   });
714
715   if (!found) {
716     // No entry so far in this trec
717     TRecEntry *ne;
718     ne = get_new_entry(cap, t);
719     ne -> tvar = tvar;
720     ne -> expected_value = expected_value;
721     ne -> new_value = expected_value;
722   }
723 }
724
725 /*......................................................................*/
726
727 static StgBool entry_is_update(TRecEntry *e) {
728   StgBool result;
729   result = (e -> expected_value != e -> new_value);
730   return result;
731
732
733 #if defined(STM_FG_LOCKS)
734 static StgBool entry_is_read_only(TRecEntry *e) {
735   StgBool result;
736   result = (e -> expected_value == e -> new_value);
737   return result;
738
739
740 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
741   StgClosure *c;
742   StgBool result;
743   c = s -> current_value;
744   result = (c == (StgClosure *) h);
745   return result;  
746 }
747 #endif
748
749 // revert_ownership : release a lock on a TVar, storing back
750 // the value that it held when the lock was acquired.  "revert_all"
751 // is set in stmWait and stmReWait when we acquired locks on all of 
752 // the TVars involved.  "revert_all" is not set in commit operations
753 // where we don't lock TVars that have been read from but not updated.
754
755 static void revert_ownership(StgTRecHeader *trec STG_UNUSED,
756                              StgBool revert_all STG_UNUSED) {
757 #if defined(STM_FG_LOCKS) 
758   FOR_EACH_ENTRY(trec, e, {
759     if (revert_all || entry_is_update(e)) {
760       StgTVar *s;
761       s = e -> tvar;
762       if (tvar_is_locked(s, trec)) {
763         unlock_tvar(trec, s, e -> expected_value, TRUE);
764       }
765     }
766   });
767 #endif
768 }
769
770 /*......................................................................*/
771
772 // validate_and_acquire_ownership : this performs the twin functions
773 // of checking that the TVars referred to by entries in trec hold the
774 // expected values and:
775 // 
776 //   - locking the TVar (on updated TVars during commit, or all TVars
777 //     during wait)
778 //
779 //   - recording the identity of the TRec who wrote the value seen in the
780 //     TVar (on non-updated TVars during commit).  These values are 
781 //     stashed in the TRec entries and are then checked in check_read_only
782 //     to ensure that an atomic snapshot of all of these locations has been
783 //     seen.
784
785 static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, 
786                                                int acquire_all,
787                                                int retain_ownership) {
788   StgBool result;
789
790   if (shake()) {
791     TRACE("%p : shake, pretending trec is invalid when it may not be", trec);
792     return FALSE;
793   }
794
795   ASSERT ((trec -> state == TREC_ACTIVE) || 
796           (trec -> state == TREC_WAITING) ||
797           (trec -> state == TREC_CONDEMNED));
798   result = !((trec -> state) == TREC_CONDEMNED);
799   if (result) {
800     FOR_EACH_ENTRY(trec, e, {
801       StgTVar *s;
802       s = e -> tvar;
803       if (acquire_all || entry_is_update(e)) {
804         TRACE("%p : trying to acquire %p", trec, s);
805         if (!cond_lock_tvar(trec, s, e -> expected_value)) {
806           TRACE("%p : failed to acquire %p", trec, s);
807           result = FALSE;
808           BREAK_FOR_EACH;
809         }
810       } else {
811         ASSERT(config_use_read_phase);
812         IF_STM_FG_LOCKS({
813           TRACE("%p : will need to check %p", trec, s);
814           if (s -> current_value != e -> expected_value) {
815             TRACE("%p : doesn't match", trec);
816             result = FALSE;
817             BREAK_FOR_EACH;
818           }
819           e -> num_updates = s -> num_updates;
820           if (s -> current_value != e -> expected_value) {
821             TRACE("%p : doesn't match (race)", trec);
822             result = FALSE;
823             BREAK_FOR_EACH;
824           } else {
825             TRACE("%p : need to check version %ld", trec, e -> num_updates);
826           }
827         });
828       }
829     });
830   }
831
832   if ((!result) || (!retain_ownership)) {
833     revert_ownership(trec, acquire_all);
834   }
835   
836   return result;
837 }
838
839 // check_read_only : check that we've seen an atomic snapshot of the
840 // non-updated TVars accessed by a trec.  This checks that the last TRec to
841 // commit an update to the TVar is unchanged since the value was stashed in
842 // validate_and_acquire_ownership.  If no udpate is seen to any TVar than
843 // all of them contained their expected values at the start of the call to
844 // check_read_only.
845 //
846 // The paper "Concurrent programming without locks" (under submission), or
847 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
848 // this kind of algorithm.
849
850 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
851   StgBool result = TRUE;
852
853   ASSERT (config_use_read_phase);
854   IF_STM_FG_LOCKS({
855     FOR_EACH_ENTRY(trec, e, {
856       StgTVar *s;
857       s = e -> tvar;
858       if (entry_is_read_only(e)) {
859         TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
860         if (s -> num_updates != e -> num_updates) {
861           // ||s -> current_value != e -> expected_value) {
862           TRACE("%p : mismatch", trec);
863           result = FALSE;
864           BREAK_FOR_EACH;
865         }
866       }
867     });
868   });
869
870   return result;
871 }
872
873
874 /************************************************************************/
875
876 void stmPreGCHook() {
877   nat i;
878
879   lock_stm(NO_TREC);
880   TRACE("stmPreGCHook");
881   for (i = 0; i < n_capabilities; i ++) {
882     Capability *cap = &capabilities[i];
883     cap -> free_tvar_watch_queues = END_STM_WATCH_QUEUE;
884     cap -> free_trec_chunks = END_STM_CHUNK_LIST;
885     cap -> free_trec_headers = NO_TREC;
886   }
887   unlock_stm(NO_TREC);
888 }
889
890 /************************************************************************/
891
892 // check_read_only relies on version numbers held in TVars' "num_updates" 
893 // fields not wrapping around while a transaction is committed.  The version
894 // number is incremented each time an update is committed to the TVar
895 // This is unlikely to wrap around when 32-bit integers are used for the counts, 
896 // but to ensure correctness we maintain a shared count on the maximum
897 // number of commit operations that may occur and check that this has 
898 // not increased by more than 2^32 during a commit.
899
900 #define TOKEN_BATCH_SIZE 1024
901
902 static volatile StgInt64 max_commits = 0;
903
904 #if defined(THREADED_RTS)
905 static volatile StgBool token_locked = FALSE;
906
907 static void getTokenBatch(Capability *cap) {
908   while (cas((void *)&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
909   max_commits += TOKEN_BATCH_SIZE;
910   TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits);
911   cap -> transaction_tokens = TOKEN_BATCH_SIZE;
912   token_locked = FALSE;
913 }
914
915 static void getToken(Capability *cap) {
916   if (cap -> transaction_tokens == 0) {
917     getTokenBatch(cap);
918   }
919   cap -> transaction_tokens --;
920 }
921 #else
922 static void getToken(Capability *cap STG_UNUSED) {
923   // Nothing
924 }
925 #endif
926
927 /*......................................................................*/
928
929 StgTRecHeader *stmStartTransaction(Capability *cap,
930                                    StgTRecHeader *outer) {
931   StgTRecHeader *t;
932   TRACE("%p : stmStartTransaction with %d tokens", 
933         outer, 
934         cap -> transaction_tokens);
935
936   getToken(cap);
937
938   t = alloc_stg_trec_header(cap, outer);
939   TRACE("%p : stmStartTransaction()=%p", outer, t);
940   return t;
941 }
942
943 /*......................................................................*/
944
945 void stmAbortTransaction(Capability *cap,
946                          StgTRecHeader *trec) {
947   StgTRecHeader *et;
948   TRACE("%p : stmAbortTransaction", trec);
949   ASSERT (trec != NO_TREC);
950   ASSERT ((trec -> state == TREC_ACTIVE) || 
951           (trec -> state == TREC_WAITING) ||
952           (trec -> state == TREC_CONDEMNED));
953
954   lock_stm(trec);
955
956   et = trec -> enclosing_trec;
957   if (et == NO_TREC) {
958     // We're a top-level transaction: remove any watch queue entries that
959     // we may have.
960     TRACE("%p : aborting top-level transaction", trec);
961
962     if (trec -> state == TREC_WAITING) {
963       ASSERT (trec -> enclosing_trec == NO_TREC);
964       TRACE("%p : stmAbortTransaction aborting waiting transaction", trec);
965       remove_watch_queue_entries_for_trec(cap, trec);
966     } 
967
968   } else {
969     // We're a nested transaction: merge our read set into our parent's
970     TRACE("%p : retaining read-set into parent %p", trec, et);
971
972     FOR_EACH_ENTRY(trec, e, {
973       StgTVar *s = e -> tvar;
974       merge_read_into(cap, et, s, e -> expected_value);
975     });
976   } 
977
978   trec -> state = TREC_ABORTED;
979   unlock_stm(trec);
980
981   TRACE("%p : stmAbortTransaction done", trec);
982 }
983
984 /*......................................................................*/
985
986 void stmFreeAbortedTRec(Capability *cap,
987                         StgTRecHeader *trec) {
988   TRACE("%p : stmFreeAbortedTRec", trec);
989   ASSERT (trec != NO_TREC);
990   ASSERT ((trec -> state == TREC_CONDEMNED) ||
991           (trec -> state == TREC_ABORTED));
992
993   free_stg_trec_header(cap, trec);
994
995   TRACE("%p : stmFreeAbortedTRec done", trec);
996 }
997
998 /*......................................................................*/
999
1000 void stmCondemnTransaction(Capability *cap,
1001                            StgTRecHeader *trec) {
1002   TRACE("%p : stmCondemnTransaction", trec);
1003   ASSERT (trec != NO_TREC);
1004   ASSERT ((trec -> state == TREC_ACTIVE) || 
1005           (trec -> state == TREC_WAITING) ||
1006           (trec -> state == TREC_CONDEMNED));
1007
1008   lock_stm(trec);
1009   if (trec -> state == TREC_WAITING) {
1010     ASSERT (trec -> enclosing_trec == NO_TREC);
1011     TRACE("%p : stmCondemnTransaction condemning waiting transaction", trec);
1012     remove_watch_queue_entries_for_trec(cap, trec);
1013   } 
1014   trec -> state = TREC_CONDEMNED;
1015   unlock_stm(trec);
1016
1017   TRACE("%p : stmCondemnTransaction done", trec);
1018 }
1019
1020 /*......................................................................*/
1021
1022 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
1023   StgTRecHeader *outer;
1024   TRACE("%p : stmGetEnclosingTRec", trec);
1025   outer = trec -> enclosing_trec;
1026   TRACE("%p : stmGetEnclosingTRec()=%p", trec, outer);
1027   return outer;
1028 }
1029
1030 /*......................................................................*/
1031
1032 StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
1033   StgTRecHeader *t;
1034   StgBool result;
1035
1036   TRACE("%p : stmValidateNestOfTransactions", trec);
1037   ASSERT(trec != NO_TREC);
1038   ASSERT((trec -> state == TREC_ACTIVE) || 
1039          (trec -> state == TREC_WAITING) ||
1040          (trec -> state == TREC_CONDEMNED));
1041
1042   lock_stm(trec);
1043
1044   t = trec;
1045   result = TRUE;
1046   while (t != NO_TREC) {
1047     result &= validate_and_acquire_ownership(t, TRUE, FALSE);
1048     t = t -> enclosing_trec;
1049   }
1050
1051   if (!result && trec -> state != TREC_WAITING) {
1052     trec -> state = TREC_CONDEMNED; 
1053   }
1054
1055   unlock_stm(trec);
1056
1057   TRACE("%p : stmValidateNestOfTransactions()=%d", trec, result);
1058   return result;
1059 }
1060
1061 /*......................................................................*/
1062
1063 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
1064   TRecEntry *result = NULL;
1065
1066   TRACE("%p : get_entry_for TVar %p", trec, tvar);
1067   ASSERT(trec != NO_TREC);
1068
1069   do {
1070     FOR_EACH_ENTRY(trec, e, {
1071       if (e -> tvar == tvar) {
1072         result = e;
1073         if (in != NULL) {
1074           *in = trec;
1075         }
1076         BREAK_FOR_EACH;
1077       }
1078     });
1079     trec = trec -> enclosing_trec;
1080   } while (result == NULL && trec != NO_TREC);
1081
1082   return result;    
1083 }
1084
1085 /*......................................................................*/
1086
1087 /*
1088  * Add/remove links between an invariant TVars.  The caller must have 
1089  * locked the TVars involved and the invariant.
1090  */
1091
1092 static void disconnect_invariant(Capability *cap,
1093                                  StgAtomicInvariant *inv) {
1094   StgTRecHeader *last_execution = inv -> last_execution;
1095
1096   TRACE("unhooking last execution inv=%p trec=%p", inv, last_execution);
1097
1098   FOR_EACH_ENTRY(last_execution, e, {
1099     StgTVar *s = e -> tvar;
1100     StgTVarWatchQueue *q = s -> first_watch_queue_entry;
1101     StgBool found = FALSE;
1102     TRACE("  looking for trec on tvar=%p", s);
1103     for (q = s -> first_watch_queue_entry; 
1104          q != END_STM_WATCH_QUEUE; 
1105          q = q -> next_queue_entry) {
1106       if (q -> closure == (StgClosure*)inv) {
1107         StgTVarWatchQueue *pq;
1108         StgTVarWatchQueue *nq;
1109         nq = q -> next_queue_entry;
1110         pq = q -> prev_queue_entry;
1111         if (nq != END_STM_WATCH_QUEUE) {
1112           nq -> prev_queue_entry = pq;
1113         }
1114         if (pq != END_STM_WATCH_QUEUE) {
1115           pq -> next_queue_entry = nq;
1116         } else {
1117           ASSERT (s -> first_watch_queue_entry == q);
1118           s -> first_watch_queue_entry = nq;
1119         }
1120         TRACE("  found it in watch queue entry %p", q);
1121         free_stg_tvar_watch_queue(cap, q);
1122         found = TRUE;
1123         break;
1124       }
1125     }
1126     ASSERT(found);
1127   });
1128   inv -> last_execution = NO_TREC;
1129 }
1130
1131 static void connect_invariant_to_trec(Capability *cap,
1132                                       StgAtomicInvariant *inv, 
1133                                       StgTRecHeader *my_execution) {
1134   TRACE("connecting execution inv=%p trec=%p", inv, my_execution);
1135
1136   ASSERT(inv -> last_execution == NO_TREC);
1137
1138   FOR_EACH_ENTRY(my_execution, e, {
1139     StgTVar *s = e -> tvar;
1140     StgTVarWatchQueue *q = alloc_stg_tvar_watch_queue(cap, (StgClosure*)inv);
1141     StgTVarWatchQueue *fq = s -> first_watch_queue_entry;
1142
1143     // We leave "last_execution" holding the values that will be
1144     // in the heap after the transaction we're in the process
1145     // of committing has finished.
1146     TRecEntry *entry = get_entry_for(my_execution -> enclosing_trec, s, NULL);
1147     if (entry != NULL) {
1148       e -> expected_value = entry -> new_value;
1149       e -> new_value = entry -> new_value;
1150     }
1151
1152     TRACE("  linking trec on tvar=%p value=%p q=%p", s, e -> expected_value, q);
1153     q -> next_queue_entry = fq;
1154     q -> prev_queue_entry = END_STM_WATCH_QUEUE;
1155     if (fq != END_STM_WATCH_QUEUE) {
1156       fq -> prev_queue_entry = q;
1157     }
1158     s -> first_watch_queue_entry = q;
1159   });
1160
1161   inv -> last_execution = my_execution;
1162 }
1163
1164 /*
1165  * Add a new invariant to the trec's list of invariants to check on commit
1166  */
1167 void stmAddInvariantToCheck(Capability *cap, 
1168                             StgTRecHeader *trec,
1169                             StgClosure *code) {
1170   StgAtomicInvariant *invariant;
1171   StgInvariantCheckQueue *q;
1172   TRACE("%p : stmAddInvariantToCheck closure=%p", trec, code);
1173   ASSERT(trec != NO_TREC);
1174   ASSERT(trec -> state == TREC_ACTIVE ||
1175          trec -> state == TREC_CONDEMNED);
1176
1177
1178   // 1. Allocate an StgAtomicInvariant, set last_execution to NO_TREC
1179   //    to signal that this is a new invariant in the current atomic block
1180
1181   invariant = (StgAtomicInvariant *) allocateLocal(cap, sizeofW(StgAtomicInvariant));
1182   TRACE("%p : stmAddInvariantToCheck allocated invariant=%p", trec, invariant);
1183   SET_HDR (invariant, &stg_ATOMIC_INVARIANT_info, CCS_SYSTEM);
1184   invariant -> code = code;
1185   invariant -> last_execution = NO_TREC;
1186
1187   // 2. Allocate an StgInvariantCheckQueue entry, link it to the current trec
1188
1189   q = alloc_stg_invariant_check_queue(cap, invariant);
1190   TRACE("%p : stmAddInvariantToCheck allocated q=%p", trec, q);
1191   q -> invariant = invariant;
1192   q -> my_execution = NO_TREC;
1193   q -> next_queue_entry = trec -> invariants_to_check;
1194   trec -> invariants_to_check = q;
1195
1196   TRACE("%p : stmAddInvariantToCheck done", trec);
1197 }
1198
1199 /*
1200  * Fill in the trec's list of invariants that might be violated by the 
1201  * current transaction.  
1202  */
1203
1204 StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *trec) {
1205   StgTRecChunk *c;
1206   TRACE("%p : stmGetInvariantsToCheck, head was %p", 
1207         trec,
1208         trec -> invariants_to_check);
1209
1210   ASSERT(trec != NO_TREC);
1211   ASSERT ((trec -> state == TREC_ACTIVE) || 
1212           (trec -> state == TREC_WAITING) ||
1213           (trec -> state == TREC_CONDEMNED));
1214   ASSERT(trec -> enclosing_trec == NO_TREC);
1215
1216   lock_stm(trec);
1217   c = trec -> current_chunk;
1218   while (c != END_STM_CHUNK_LIST) {
1219     unsigned int i;
1220     for (i = 0; i < c -> next_entry_idx; i ++) {
1221       TRecEntry *e = &(c -> entries[i]);
1222       if (entry_is_update(e)) {
1223         StgTVar *s = e -> tvar;
1224         StgClosure *old = lock_tvar(trec, s);
1225                 
1226         // Pick up any invariants on the TVar being updated
1227         // by entry "e"
1228
1229         StgTVarWatchQueue *q;
1230         TRACE("%p : checking for invariants on %p", trec, s);
1231         for (q = s -> first_watch_queue_entry;
1232              q != END_STM_WATCH_QUEUE;
1233              q = q -> next_queue_entry) {
1234           if (watcher_is_invariant(q)) {
1235             StgBool found = FALSE;
1236             StgInvariantCheckQueue *q2;
1237             TRACE("%p : Touching invariant %p", trec, q -> closure);
1238             for (q2 = trec -> invariants_to_check;
1239                  q2 != END_INVARIANT_CHECK_QUEUE;
1240                  q2 = q2 -> next_queue_entry) {
1241               if (q2 -> invariant == (StgAtomicInvariant*)(q -> closure)) {
1242                 TRACE("%p : Already found %p", trec, q -> closure);
1243                 found = TRUE;
1244                 break;
1245               }
1246             }
1247             
1248             if (!found) {
1249               StgInvariantCheckQueue *q3;
1250               TRACE("%p : Not already found %p", trec, q -> closure);
1251               q3 = alloc_stg_invariant_check_queue(cap,
1252                                                    (StgAtomicInvariant*) q -> closure);
1253               q3 -> next_queue_entry = trec -> invariants_to_check;
1254               trec -> invariants_to_check = q3;
1255             }
1256           }
1257         }
1258
1259         unlock_tvar(trec, s, old, FALSE);
1260       }
1261     }
1262     c = c -> prev_chunk;
1263   }
1264
1265   unlock_stm(trec);
1266
1267   TRACE("%p : stmGetInvariantsToCheck, head now %p", 
1268         trec,
1269         trec -> invariants_to_check);
1270
1271   return (trec -> invariants_to_check);
1272 }
1273
1274 /*......................................................................*/
1275
1276 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
1277   int result;
1278   StgInt64 max_commits_at_start = max_commits;
1279   StgBool touched_invariants;
1280   StgBool use_read_phase;
1281
1282   TRACE("%p : stmCommitTransaction()", trec);
1283   ASSERT (trec != NO_TREC);
1284
1285   lock_stm(trec);
1286
1287   ASSERT (trec -> enclosing_trec == NO_TREC);
1288   ASSERT ((trec -> state == TREC_ACTIVE) || 
1289           (trec -> state == TREC_CONDEMNED));
1290
1291   // touched_invariants is true if we've written to a TVar with invariants 
1292   // attached to it, or if we're trying to add a new invariant to the system.
1293
1294   touched_invariants = (trec -> invariants_to_check != END_INVARIANT_CHECK_QUEUE);
1295
1296   // If we have touched invariants then (i) lock the invariant, and (ii) add
1297   // the invariant's read set to our own.  Step (i) is needed to serialize
1298   // concurrent transactions that attempt to make conflicting updates
1299   // to the invariant's trec (suppose it read from t1 and t2, and that one
1300   // concurrent transcation writes only to t1, and a second writes only to
1301   // t2).  Step (ii) is needed so that both transactions will lock t1 and t2
1302   // to gain access to their wait lists (and hence be able to unhook the
1303   // invariant from both tvars).
1304
1305   if (touched_invariants) {
1306     StgInvariantCheckQueue *q = trec -> invariants_to_check;
1307     TRACE("%p : locking invariants", trec);
1308     while (q != END_INVARIANT_CHECK_QUEUE) {
1309       StgTRecHeader *inv_old_trec;
1310       StgAtomicInvariant *inv;
1311       TRACE("%p : locking invariant %p", trec, q -> invariant);
1312       inv = q -> invariant;
1313       if (!lock_inv(inv)) {
1314         TRACE("%p : failed to lock %p", trec, inv);
1315         trec -> state = TREC_CONDEMNED;
1316         break;
1317       }
1318
1319       inv_old_trec = inv -> last_execution;
1320       if (inv_old_trec != NO_TREC) {
1321         StgTRecChunk *c = inv_old_trec -> current_chunk;
1322         while (c != END_STM_CHUNK_LIST) {
1323           unsigned int i;
1324           for (i = 0; i < c -> next_entry_idx; i ++) {
1325             TRecEntry *e = &(c -> entries[i]);
1326             TRACE("%p : ensuring we lock TVars for %p", trec, e -> tvar);
1327             merge_read_into (cap, trec, e -> tvar, e -> expected_value);
1328           }
1329           c = c -> prev_chunk;
1330         }
1331       }
1332       q = q -> next_queue_entry;
1333     }
1334     TRACE("%p : finished locking invariants", trec);
1335   }
1336
1337   // Use a read-phase (i.e. don't lock TVars we've read but not updated) if
1338   // (i) the configuration lets us use a read phase, and (ii) we've not
1339   // touched or introduced any invariants.  
1340   //
1341   // In principle we could extend the implementation to support a read-phase
1342   // and invariants, but it complicates the logic: the links between
1343   // invariants and TVars are managed by the TVar watch queues which are
1344   // protected by the TVar's locks.
1345
1346   use_read_phase = ((config_use_read_phase) && (!touched_invariants));
1347
1348   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
1349   if (result) {
1350     // We now know that all the updated locations hold their expected values.
1351     ASSERT (trec -> state == TREC_ACTIVE);
1352
1353     if (use_read_phase) {
1354       StgInt64 max_commits_at_end;
1355       StgInt64 max_concurrent_commits;
1356       TRACE("%p : doing read check", trec);
1357       result = check_read_only(trec);
1358       TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
1359
1360       max_commits_at_end = max_commits;
1361       max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
1362                                 (n_capabilities * TOKEN_BATCH_SIZE));
1363       if (((max_concurrent_commits >> 32) > 0) || shake()) {
1364         result = FALSE;
1365       }
1366     }
1367     
1368     if (result) {
1369       // We now know that all of the read-only locations held their exepcted values
1370       // at the end of the call to validate_and_acquire_ownership.  This forms the
1371       // linearization point of the commit.
1372
1373       // 1. If we have touched or introduced any invariants then unhook them
1374       //    from the TVars they depended on last time they were executed
1375       //    and hook them on the TVars that they now depend on.
1376       if (touched_invariants) {
1377         StgInvariantCheckQueue *q = trec -> invariants_to_check;
1378         while (q != END_INVARIANT_CHECK_QUEUE) {
1379           StgAtomicInvariant *inv = q -> invariant;
1380           if (inv -> last_execution != NO_TREC) {
1381             disconnect_invariant(cap, inv);
1382           }
1383
1384           TRACE("%p : hooking up new execution trec=%p", trec, q -> my_execution);
1385           connect_invariant_to_trec(cap, inv, q -> my_execution);
1386
1387           TRACE("%p : unlocking invariant %p", trec, inv);
1388           unlock_inv(inv);
1389
1390           q = q -> next_queue_entry;
1391         }
1392       }
1393
1394       // 2. Make the updates required by the transaction
1395       FOR_EACH_ENTRY(trec, e, {
1396         StgTVar *s;
1397         s = e -> tvar;
1398         if ((!use_read_phase) || (e -> new_value != e -> expected_value)) {
1399           // Either the entry is an update or we're not using a read phase:
1400           // write the value back to the TVar, unlocking it if necessary.
1401
1402           ACQ_ASSERT(tvar_is_locked(s, trec));
1403           TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
1404           unpark_waiters_on(cap,s);
1405           IF_STM_FG_LOCKS({
1406             s -> num_updates ++;
1407           });
1408           unlock_tvar(trec, s, e -> new_value, TRUE);
1409         } 
1410         ACQ_ASSERT(!tvar_is_locked(s, trec));
1411       });
1412     } else {
1413       revert_ownership(trec, FALSE);
1414     }
1415   } 
1416
1417   unlock_stm(trec);
1418
1419   free_stg_trec_header(cap, trec);
1420
1421   TRACE("%p : stmCommitTransaction()=%d", trec, result);
1422
1423   return result;
1424 }
1425
1426 /*......................................................................*/
1427
1428 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
1429   StgTRecHeader *et;
1430   int result;
1431   ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
1432   TRACE("%p : stmCommitNestedTransaction() into %p", trec, trec -> enclosing_trec);
1433   ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
1434
1435   lock_stm(trec);
1436
1437   et = trec -> enclosing_trec;
1438   result = validate_and_acquire_ownership(trec, (!config_use_read_phase), TRUE);
1439   if (result) {
1440     // We now know that all the updated locations hold their expected values.
1441
1442     if (config_use_read_phase) {
1443       TRACE("%p : doing read check", trec);
1444       result = check_read_only(trec);
1445     }
1446     if (result) {
1447       // We now know that all of the read-only locations held their exepcted values
1448       // at the end of the call to validate_and_acquire_ownership.  This forms the
1449       // linearization point of the commit.
1450
1451       TRACE("%p : read-check succeeded", trec);
1452       FOR_EACH_ENTRY(trec, e, {
1453         // Merge each entry into the enclosing transaction record, release all
1454         // locks.
1455         
1456         StgTVar *s;
1457         s = e -> tvar;
1458         if (entry_is_update(e)) {
1459           unlock_tvar(trec, s, e -> expected_value, FALSE);
1460         }
1461         merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
1462         ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
1463       });
1464     } else {
1465       revert_ownership(trec, FALSE);
1466     }
1467   } 
1468
1469   unlock_stm(trec);
1470
1471   free_stg_trec_header(cap, trec);
1472
1473   TRACE("%p : stmCommitNestedTransaction()=%d", trec, result);
1474
1475   return result;
1476 }
1477
1478 /*......................................................................*/
1479
1480 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
1481   int result;
1482   TRACE("%p : stmWait(%p)", trec, tso);
1483   ASSERT (trec != NO_TREC);
1484   ASSERT (trec -> enclosing_trec == NO_TREC);
1485   ASSERT ((trec -> state == TREC_ACTIVE) || 
1486           (trec -> state == TREC_CONDEMNED));
1487
1488   lock_stm(trec);
1489   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
1490   if (result) {
1491     // The transaction is valid so far so we can actually start waiting.
1492     // (Otherwise the transaction was not valid and the thread will have to
1493     // retry it).
1494
1495     // Put ourselves to sleep.  We retain locks on all the TVars involved
1496     // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
1497     // in the TSO, (c) TREC_WAITING in the Trec.  
1498     build_watch_queue_entries_for_trec(cap, tso, trec);
1499     park_tso(tso);
1500     trec -> state = TREC_WAITING;
1501
1502     // We haven't released ownership of the transaction yet.  The TSO
1503     // has been put on the wait queue for the TVars it is waiting for,
1504     // but we haven't yet tidied up the TSO's stack and made it safe
1505     // to wake up the TSO.  Therefore, we must wait until the TSO is
1506     // safe to wake up before we release ownership - when all is well,
1507     // the runtime will call stmWaitUnlock() below, with the same
1508     // TRec.
1509
1510   } else {
1511     unlock_stm(trec);
1512     free_stg_trec_header(cap, trec);
1513   }
1514
1515   TRACE("%p : stmWait(%p)=%d", trec, tso, result);
1516   return result;
1517 }
1518
1519
1520 void
1521 stmWaitUnlock(Capability *cap STG_UNUSED, StgTRecHeader *trec) {
1522     revert_ownership(trec, TRUE);
1523     unlock_stm(trec);
1524 }
1525
1526 /*......................................................................*/
1527
1528 StgBool stmReWait(Capability *cap, StgTSO *tso) {
1529   int result;
1530   StgTRecHeader *trec = tso->trec;
1531
1532   TRACE("%p : stmReWait", trec);
1533   ASSERT (trec != NO_TREC);
1534   ASSERT (trec -> enclosing_trec == NO_TREC);
1535   ASSERT ((trec -> state == TREC_WAITING) || 
1536           (trec -> state == TREC_CONDEMNED));
1537
1538   lock_stm(trec);
1539   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
1540   TRACE("%p : validation %s", trec, result ? "succeeded" : "failed");
1541   if (result) {
1542     // The transaction remains valid -- do nothing because it is already on
1543     // the wait queues
1544     ASSERT (trec -> state == TREC_WAITING);
1545     park_tso(tso);
1546     revert_ownership(trec, TRUE);
1547   } else {
1548     // The transcation has become invalid.  We can now remove it from the wait
1549     // queues.
1550     if (trec -> state != TREC_CONDEMNED) {
1551       remove_watch_queue_entries_for_trec (cap, trec);
1552     }
1553     free_stg_trec_header(cap, trec);
1554   }
1555   unlock_stm(trec);
1556
1557   TRACE("%p : stmReWait()=%d", trec, result);
1558   return result;
1559 }
1560
1561 /*......................................................................*/
1562
1563 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
1564   StgClosure *result;
1565   result = tvar -> current_value;
1566
1567 #if defined(STM_FG_LOCKS)
1568   while (GET_INFO(result) == &stg_TREC_HEADER_info) {
1569     TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
1570     result = tvar -> current_value;
1571   }
1572 #endif
1573
1574   TRACE("%p : read_current_value(%p)=%p", trec, tvar, result);
1575   return result;
1576 }
1577
1578 /*......................................................................*/
1579
1580 StgClosure *stmReadTVar(Capability *cap,
1581                         StgTRecHeader *trec, 
1582                         StgTVar *tvar) {
1583   StgTRecHeader *entry_in;
1584   StgClosure *result = NULL;
1585   TRecEntry *entry = NULL;
1586   TRACE("%p : stmReadTVar(%p)", trec, tvar);
1587   ASSERT (trec != NO_TREC);
1588   ASSERT (trec -> state == TREC_ACTIVE || 
1589           trec -> state == TREC_CONDEMNED);
1590
1591   entry = get_entry_for(trec, tvar, &entry_in);
1592
1593   if (entry != NULL) {
1594     if (entry_in == trec) {
1595       // Entry found in our trec
1596       result = entry -> new_value;
1597     } else {
1598       // Entry found in another trec
1599       TRecEntry *new_entry = get_new_entry(cap, trec);
1600       new_entry -> tvar = tvar;
1601       new_entry -> expected_value = entry -> expected_value;
1602       new_entry -> new_value = entry -> new_value;
1603       result = new_entry -> new_value;
1604     } 
1605   } else {
1606     // No entry found
1607     StgClosure *current_value = read_current_value(trec, tvar);
1608     TRecEntry *new_entry = get_new_entry(cap, trec);
1609     new_entry -> tvar = tvar;
1610     new_entry -> expected_value = current_value;
1611     new_entry -> new_value = current_value;
1612     result = current_value;
1613   }
1614
1615   TRACE("%p : stmReadTVar(%p)=%p", trec, tvar, result);
1616   return result;
1617 }
1618
1619 /*......................................................................*/
1620
1621 void stmWriteTVar(Capability *cap,
1622                   StgTRecHeader *trec,
1623                   StgTVar *tvar, 
1624                   StgClosure *new_value) {
1625
1626   StgTRecHeader *entry_in;
1627   TRecEntry *entry = NULL;
1628   TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
1629   ASSERT (trec != NO_TREC);
1630   ASSERT (trec -> state == TREC_ACTIVE || 
1631           trec -> state == TREC_CONDEMNED);
1632
1633   entry = get_entry_for(trec, tvar, &entry_in);
1634
1635   if (entry != NULL) {
1636     if (entry_in == trec) {
1637       // Entry found in our trec
1638       entry -> new_value = new_value;
1639     } else {
1640       // Entry found in another trec
1641       TRecEntry *new_entry = get_new_entry(cap, trec);
1642       new_entry -> tvar = tvar;
1643       new_entry -> expected_value = entry -> expected_value;
1644       new_entry -> new_value = new_value;
1645     } 
1646   } else {
1647     // No entry found
1648     StgClosure *current_value = read_current_value(trec, tvar);
1649     TRecEntry *new_entry = get_new_entry(cap, trec);
1650     new_entry -> tvar = tvar;
1651     new_entry -> expected_value = current_value;
1652     new_entry -> new_value = new_value;
1653   }
1654
1655   TRACE("%p : stmWriteTVar done", trec);
1656 }
1657
1658 /*......................................................................*/
1659
1660 StgTVar *stmNewTVar(Capability *cap,
1661                     StgClosure *new_value) {
1662   StgTVar *result;
1663   result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar));
1664   SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
1665   result -> current_value = new_value;
1666   result -> first_watch_queue_entry = END_STM_WATCH_QUEUE;
1667 #if defined(THREADED_RTS)
1668   result -> num_updates = 0;
1669 #endif
1670   return result;
1671 }
1672
1673 /*......................................................................*/