f98d201b07696a37d8e72e1ca27c206634db1804
[ghc-hetmet.git] / rts / STM.c
1 /* -----------------------------------------------------------------------------
2  * (c) The GHC Team 1998-2005
3  * 
4  * STM implementation.
5  *
6  * Overview
7  * --------
8  *
9  * See the PPoPP 2005 paper "Composable memory transactions".  In summary, 
10  * each transcation has a TRec (transaction record) holding entries for each of the
11  * TVars (transactional variables) that it has accessed.  Each entry records
12  * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
13  * the transaction wants to write to the TVar, (d) during commit, the identity of
14  * the TRec that wrote the expected value.  
15  *
16  * Separate TRecs are used for each level in a nest of transactions.  This allows
17  * a nested transaction to be aborted without condemning its enclosing transactions.
18  * This is needed in the implementation of catchRetry.  Note that the "expected value"
19  * in a nested transaction's TRec is the value expected to be *held in memory* if
20  * the transaction commits -- not the "new value" stored in one of the enclosing
21  * transactions.  This means that validation can be done without searching through
22  * a nest of TRecs.
23  *
24  * Concurrency control
25  * -------------------
26  *
27  * Three different concurrency control schemes can be built according to the settings
28  * in STM.h:
29  * 
30  * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
31  * In the Haskell RTS this means it is suitable only for non-THREADED_RTS builds.
32  *
33  * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
34  * an invocation on the STM interface.  Note that this does not mean that 
35  * transactions are simply serialized -- the lock is only held *within* the 
36  * implementation of stmCommitTransaction, stmWait etc.
37  *
38  * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
39  * and, when committing a transaction, no locks are acquired for TVars that have
40  * been read but not updated.
41  *
42  * Concurrency control is implemented in the functions:
43  *
44  *    lock_stm
45  *    unlock_stm
46  *    lock_tvar / cond_lock_tvar
47  *    unlock_tvar
48  *
49  * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the 
50  * implementation of these functions.  
51  *
52  * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
53  * using STM_CG_LOCK, and otherwise they are no-ops.
54  *
55  * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they 
56  * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
57  * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
58  * builds).  This is because locking a TVar is implemented by writing the lock
59  * holder's TRec into the TVar's current_value field:
60  *
61  *   lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value 
62  *               it contained.
63  *
64  *   cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it 
65  *               contains a specified value.  Return TRUE if this succeeds,
66  *               FALSE otherwise.
67  *
68  *   unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
69  *               storing a specified value in place of the lock entry.
70  *
71  * Using these operations, the typcial pattern of a commit/validate/wait operation
72  * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that 
73  * the TVars that were only read from still contain their expected values, 
74  * (d) release the locks on the TVars, writing updates to them in the case of a 
75  * commit, (e) unlock the STM.
76  *
77  * Queues of waiting threads hang off the first_watch_queue_entry
78  * field of each TVar.  This may only be manipulated when holding that
79  * TVar's lock.  In particular, when a thread is putting itself to
80  * sleep, it mustn't release the TVar's lock until it has added itself
81  * to the wait queue and marked its TSO as BlockedOnSTM -- this makes
82  * sure that other threads will know to wake it.
83  *
84  * ---------------------------------------------------------------------------*/
85
86 #include "PosixSource.h"
87 #include "Rts.h"
88
89 #include "RtsUtils.h"
90 #include "Schedule.h"
91 #include "STM.h"
92 #include "Trace.h"
93 #include "Threads.h"
94
95 #include <stdio.h>
96
97 #define TRUE 1
98 #define FALSE 0
99
100 // ACQ_ASSERT is used for assertions which are only required for
101 // THREADED_RTS builds with fine-grained locking.
102
103 #if defined(STM_FG_LOCKS)
104 #define ACQ_ASSERT(_X) ASSERT(_X)
105 #define NACQ_ASSERT(_X) /*Nothing*/
106 #else
107 #define ACQ_ASSERT(_X) /*Nothing*/
108 #define NACQ_ASSERT(_X) ASSERT(_X)
109 #endif
110
111 /*......................................................................*/
112
113 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
114 // unusualy code paths if genuine contention is rare
115
116 #define TRACE(_x...) debugTrace(DEBUG_stm, "STM: " _x)
117
118 #ifdef SHAKE
119 static const int do_shake = TRUE;
120 #else
121 static const int do_shake = FALSE;
122 #endif
123 static int shake_ctr = 0;
124 static int shake_lim = 1;
125
126 static int shake(void) {
127   if (do_shake) {
128     if (((shake_ctr++) % shake_lim) == 0) {
129       shake_ctr = 1;
130       shake_lim ++;
131       return TRUE;
132     } 
133     return FALSE;
134   } else {
135     return FALSE;
136   }
137 }
138
139 /*......................................................................*/
140
141 // Helper macros for iterating over entries within a transaction
142 // record
143
144 #define FOR_EACH_ENTRY(_t,_x,CODE) do {                                         \
145   StgTRecHeader *__t = (_t);                                                    \
146   StgTRecChunk *__c = __t -> current_chunk;                                     \
147   StgWord __limit = __c -> next_entry_idx;                                      \
148   TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld", __t, __c, __limit);  \
149   while (__c != END_STM_CHUNK_LIST) {                                           \
150     StgWord __i;                                                                \
151     for (__i = 0; __i < __limit; __i ++) {                                      \
152       TRecEntry *_x = &(__c -> entries[__i]);                                   \
153       do { CODE } while (0);                                                    \
154     }                                                                           \
155     __c = __c -> prev_chunk;                                                    \
156     __limit = TREC_CHUNK_NUM_ENTRIES;                                           \
157   }                                                                             \
158  exit_for_each:                                                                 \
159   if (FALSE) goto exit_for_each;                                                \
160 } while (0)
161
162 #define BREAK_FOR_EACH goto exit_for_each
163      
164 /*......................................................................*/
165
166 // if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
167 // and wait queue entries without GC
168
169 #define REUSE_MEMORY
170
171 /*......................................................................*/
172
173 #define IF_STM_UNIPROC(__X)  do { } while (0)
174 #define IF_STM_CG_LOCK(__X)  do { } while (0)
175 #define IF_STM_FG_LOCKS(__X) do { } while (0)
176
177 #if defined(STM_UNIPROC)
178 #undef IF_STM_UNIPROC
179 #define IF_STM_UNIPROC(__X)  do { __X } while (0)
180 static const StgBool config_use_read_phase = FALSE;
181
182 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
183   TRACE("%p : lock_stm()", trec);
184 }
185
186 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
187   TRACE("%p : unlock_stm()", trec);
188 }
189
190 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
191                              StgTVar *s STG_UNUSED) {
192   StgClosure *result;
193   TRACE("%p : lock_tvar(%p)", trec, s);
194   result = s -> current_value;
195   return result;
196 }
197
198 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
199                         StgTVar *s STG_UNUSED,
200                         StgClosure *c,
201                         StgBool force_update) {
202   TRACE("%p : unlock_tvar(%p)", trec, s);
203   if (force_update) {
204     s -> current_value = c;
205   }
206 }
207
208 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
209                               StgTVar *s STG_UNUSED,
210                               StgClosure *expected) {
211   StgClosure *result;
212   TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
213   result = s -> current_value;
214   TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
215   return (result == expected);
216 }
217
218 static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
219   // Nothing -- uniproc
220   return TRUE;
221 }
222
223 static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) { 
224   // Nothing -- uniproc
225 }
226 #endif
227
228 #if defined(STM_CG_LOCK) /*........................................*/
229
230 #undef IF_STM_CG_LOCK
231 #define IF_STM_CG_LOCK(__X)  do { __X } while (0)
232 static const StgBool config_use_read_phase = FALSE;
233 static volatile StgTRecHeader *smp_locked = NULL;
234
235 static void lock_stm(StgTRecHeader *trec) {
236   while (cas(&smp_locked, NULL, trec) != NULL) { }
237   TRACE("%p : lock_stm()", trec);
238 }
239
240 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
241   TRACE("%p : unlock_stm()", trec);
242   ASSERT (smp_locked == trec);
243   smp_locked = 0;
244 }
245
246 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
247                              StgTVar *s STG_UNUSED) {
248   StgClosure *result;
249   TRACE("%p : lock_tvar(%p)", trec, s);
250   ASSERT (smp_locked == trec);
251   result = s -> current_value;
252   return result;
253 }
254
255 static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
256                          StgTVar *s STG_UNUSED,
257                          StgClosure *c,
258                          StgBool force_update) {
259   TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
260   ASSERT (smp_locked == trec);
261   if (force_update) {
262     s -> current_value = c;
263   }
264 }
265
266 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
267                                StgTVar *s STG_UNUSED,
268                                StgClosure *expected) {
269   StgClosure *result;
270   TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
271   ASSERT (smp_locked == trec);
272   result = s -> current_value;
273   TRACE("%p : %d", result ? "success" : "failure");
274   return (result == expected);
275 }
276
277 static StgBool lock_inv(StgAtomicInvariant *inv STG_UNUSED) {
278   // Nothing -- protected by STM lock
279   return TRUE;
280 }
281
282 static void unlock_inv(StgAtomicInvariant *inv STG_UNUSED) { 
283   // Nothing -- protected by STM lock
284 }
285 #endif
286
287 #if defined(STM_FG_LOCKS) /*...................................*/
288
289 #undef IF_STM_FG_LOCKS
290 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
291 static const StgBool config_use_read_phase = TRUE;
292
293 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
294   TRACE("%p : lock_stm()", trec);
295 }
296
297 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
298   TRACE("%p : unlock_stm()", trec);
299 }
300
301 static StgClosure *lock_tvar(StgTRecHeader *trec, 
302                              StgTVar *s STG_UNUSED) {
303   StgClosure *result;
304   TRACE("%p : lock_tvar(%p)", trec, s);
305   do {
306     do {
307       result = s -> current_value;
308     } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
309   } while (cas((void *)&(s -> current_value),
310                (StgWord)result, (StgWord)trec) != (StgWord)result);
311   return result;
312 }
313
314 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
315                         StgTVar *s,
316                         StgClosure *c,
317                         StgBool force_update STG_UNUSED) {
318   TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
319   ASSERT(s -> current_value == (StgClosure *)trec);
320   s -> current_value = c;
321 }
322
323 static StgBool cond_lock_tvar(StgTRecHeader *trec, 
324                               StgTVar *s,
325                               StgClosure *expected) {
326   StgClosure *result;
327   StgWord w;
328   TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
329   w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
330   result = (StgClosure *)w;
331   TRACE("%p : %s", trec, result ? "success" : "failure");
332   return (result == expected);
333 }
334
335 static StgBool lock_inv(StgAtomicInvariant *inv) {
336   return (cas(&(inv -> lock), 0, 1) == 0);
337 }
338
339 static void unlock_inv(StgAtomicInvariant *inv) { 
340   ASSERT(inv -> lock == 1);
341   inv -> lock = 0;
342 }
343 #endif
344
345 /*......................................................................*/
346  
347 static StgBool watcher_is_tso(StgTVarWatchQueue *q) {
348   StgClosure *c = q -> closure;
349   StgInfoTable *info = get_itbl(c);
350   return (info -> type) == TSO;
351 }
352
353 static StgBool watcher_is_invariant(StgTVarWatchQueue *q) {
354   StgClosure *c = q -> closure;
355   return (c->header.info == &stg_ATOMIC_INVARIANT_info);
356 }
357
358 /*......................................................................*/
359  
360 // Helper functions for thread blocking and unblocking
361
362 static void park_tso(StgTSO *tso) {
363   ASSERT(tso -> why_blocked == NotBlocked);
364   tso -> why_blocked = BlockedOnSTM;
365   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
366   TRACE("park_tso on tso=%p", tso);
367 }
368
369 static void unpark_tso(Capability *cap, StgTSO *tso) {
370     // We will continue unparking threads while they remain on one of the wait
371     // queues: it's up to the thread itself to remove it from the wait queues
372     // if it decides to do so when it is scheduled.
373
374     // Unblocking a TSO from BlockedOnSTM is done under the TSO lock,
375     // to avoid multiple CPUs unblocking the same TSO, and also to
376     // synchronise with throwTo().
377     lockTSO(tso);
378     if (tso -> why_blocked == BlockedOnSTM) {
379         TRACE("unpark_tso on tso=%p", tso);
380         tryWakeupThread(cap,tso);
381     } else {
382         TRACE("spurious unpark_tso on tso=%p", tso);
383     }
384     unlockTSO(tso);
385 }
386
387 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
388   StgTVarWatchQueue *q;
389   StgTVarWatchQueue *trail;
390   TRACE("unpark_waiters_on tvar=%p", s);
391   // unblock TSOs in reverse order, to be a bit fairer (#2319)
392   for (q = s -> first_watch_queue_entry, trail = q;
393        q != END_STM_WATCH_QUEUE;
394        q = q -> next_queue_entry) {
395     trail = q;
396   }
397   q = trail;
398   for (;
399        q != END_STM_WATCH_QUEUE; 
400        q = q -> prev_queue_entry) {
401     if (watcher_is_tso(q)) {
402       unpark_tso(cap, (StgTSO *)(q -> closure));
403     }
404   }
405 }
406
407 /*......................................................................*/
408
409 // Helper functions for downstream allocation and initialization
410
411 static StgInvariantCheckQueue *new_stg_invariant_check_queue(Capability *cap,
412                                                              StgAtomicInvariant *invariant) {
413   StgInvariantCheckQueue *result;
414   result = (StgInvariantCheckQueue *)allocate(cap, sizeofW(StgInvariantCheckQueue));
415   SET_HDR (result, &stg_INVARIANT_CHECK_QUEUE_info, CCS_SYSTEM);
416   result -> invariant = invariant;
417   result -> my_execution = NO_TREC;
418   return result;
419 }
420
421 static StgTVarWatchQueue *new_stg_tvar_watch_queue(Capability *cap,
422                                                    StgClosure *closure) {
423   StgTVarWatchQueue *result;
424   result = (StgTVarWatchQueue *)allocate(cap, sizeofW(StgTVarWatchQueue));
425   SET_HDR (result, &stg_TVAR_WATCH_QUEUE_info, CCS_SYSTEM);
426   result -> closure = closure;
427   return result;
428 }
429
430 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
431   StgTRecChunk *result;
432   result = (StgTRecChunk *)allocate(cap, sizeofW(StgTRecChunk));
433   SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
434   result -> prev_chunk = END_STM_CHUNK_LIST;
435   result -> next_entry_idx = 0;
436   return result;
437 }
438
439 static StgTRecHeader *new_stg_trec_header(Capability *cap,
440                                           StgTRecHeader *enclosing_trec) {
441   StgTRecHeader *result;
442   result = (StgTRecHeader *) allocate(cap, sizeofW(StgTRecHeader));
443   SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
444
445   result -> enclosing_trec = enclosing_trec;
446   result -> current_chunk = new_stg_trec_chunk(cap);
447   result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
448
449   if (enclosing_trec == NO_TREC) {
450     result -> state = TREC_ACTIVE;
451   } else {
452     ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
453            enclosing_trec -> state == TREC_CONDEMNED);
454     result -> state = enclosing_trec -> state;
455   }
456
457   return result;  
458 }
459
460 /*......................................................................*/
461
462 // Allocation / deallocation functions that retain per-capability lists
463 // of closures that can be re-used
464
465 static StgInvariantCheckQueue *alloc_stg_invariant_check_queue(Capability *cap,
466                                                                StgAtomicInvariant *invariant) {
467   StgInvariantCheckQueue *result = NULL;
468   if (cap -> free_invariant_check_queues == END_INVARIANT_CHECK_QUEUE) {
469     result = new_stg_invariant_check_queue(cap, invariant);
470   } else {
471     result = cap -> free_invariant_check_queues;
472     result -> invariant = invariant;
473     result -> my_execution = NO_TREC;
474     cap -> free_invariant_check_queues = result -> next_queue_entry;
475   }
476   return result;
477 }
478
479 static StgTVarWatchQueue *alloc_stg_tvar_watch_queue(Capability *cap,
480                                                      StgClosure *closure) {
481   StgTVarWatchQueue *result = NULL;
482   if (cap -> free_tvar_watch_queues == END_STM_WATCH_QUEUE) {
483     result = new_stg_tvar_watch_queue(cap, closure);
484   } else {
485     result = cap -> free_tvar_watch_queues;
486     result -> closure = closure;
487     cap -> free_tvar_watch_queues = result -> next_queue_entry;
488   }
489   return result;
490 }
491
492 static void free_stg_tvar_watch_queue(Capability *cap,
493                                       StgTVarWatchQueue *wq) {
494 #if defined(REUSE_MEMORY)
495   wq -> next_queue_entry = cap -> free_tvar_watch_queues;
496   cap -> free_tvar_watch_queues = wq;
497 #endif
498 }
499
500 static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
501   StgTRecChunk *result = NULL;
502   if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
503     result = new_stg_trec_chunk(cap);
504   } else {
505     result = cap -> free_trec_chunks;
506     cap -> free_trec_chunks = result -> prev_chunk;
507     result -> prev_chunk = END_STM_CHUNK_LIST;
508     result -> next_entry_idx = 0;
509   }
510   return result;
511 }
512
513 static void free_stg_trec_chunk(Capability *cap, 
514                                 StgTRecChunk *c) {
515 #if defined(REUSE_MEMORY)
516   c -> prev_chunk = cap -> free_trec_chunks;
517   cap -> free_trec_chunks = c;
518 #endif
519 }
520
521 static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
522                                             StgTRecHeader *enclosing_trec) {
523   StgTRecHeader *result = NULL;
524   if (cap -> free_trec_headers == NO_TREC) {
525     result = new_stg_trec_header(cap, enclosing_trec);
526   } else {
527     result = cap -> free_trec_headers;
528     cap -> free_trec_headers = result -> enclosing_trec;
529     result -> enclosing_trec = enclosing_trec;
530     result -> current_chunk -> next_entry_idx = 0;
531     result -> invariants_to_check = END_INVARIANT_CHECK_QUEUE;
532     if (enclosing_trec == NO_TREC) {
533       result -> state = TREC_ACTIVE;
534     } else {
535       ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
536              enclosing_trec -> state == TREC_CONDEMNED);
537       result -> state = enclosing_trec -> state;
538     }
539   }
540   return result;
541 }
542
543 static void free_stg_trec_header(Capability *cap,
544                                  StgTRecHeader *trec) {
545 #if defined(REUSE_MEMORY)
546   StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
547   while (chunk != END_STM_CHUNK_LIST) {
548     StgTRecChunk *prev_chunk = chunk -> prev_chunk;
549     free_stg_trec_chunk(cap, chunk);
550     chunk = prev_chunk;
551   } 
552   trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
553   trec -> enclosing_trec = cap -> free_trec_headers;
554   cap -> free_trec_headers = trec;
555 #endif
556 }
557
558 /*......................................................................*/
559
560 // Helper functions for managing waiting lists
561
562 static void build_watch_queue_entries_for_trec(Capability *cap,
563                                                StgTSO *tso, 
564                                                StgTRecHeader *trec) {
565   ASSERT(trec != NO_TREC);
566   ASSERT(trec -> enclosing_trec == NO_TREC);
567   ASSERT(trec -> state == TREC_ACTIVE);
568
569   TRACE("%p : build_watch_queue_entries_for_trec()", trec);
570
571   FOR_EACH_ENTRY(trec, e, {
572     StgTVar *s;
573     StgTVarWatchQueue *q;
574     StgTVarWatchQueue *fq;
575     s = e -> tvar;
576     TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
577     ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
578     NACQ_ASSERT(s -> current_value == e -> expected_value);
579     fq = s -> first_watch_queue_entry;
580     q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
581     q -> next_queue_entry = fq;
582     q -> prev_queue_entry = END_STM_WATCH_QUEUE;
583     if (fq != END_STM_WATCH_QUEUE) {
584       fq -> prev_queue_entry = q;
585     }
586     s -> first_watch_queue_entry = q;
587     e -> new_value = (StgClosure *) q;
588   });
589 }
590
591 static void remove_watch_queue_entries_for_trec(Capability *cap,
592                                                 StgTRecHeader *trec) {
593   ASSERT(trec != NO_TREC);
594   ASSERT(trec -> enclosing_trec == NO_TREC);
595   ASSERT(trec -> state == TREC_WAITING ||
596          trec -> state == TREC_CONDEMNED);
597
598   TRACE("%p : remove_watch_queue_entries_for_trec()", trec);
599
600   FOR_EACH_ENTRY(trec, e, {
601     StgTVar *s;
602     StgTVarWatchQueue *pq;
603     StgTVarWatchQueue *nq;
604     StgTVarWatchQueue *q;
605     StgClosure *saw;
606     s = e -> tvar;
607     saw = lock_tvar(trec, s);
608     q = (StgTVarWatchQueue *) (e -> new_value);
609     TRACE("%p : removing tso=%p from watch queue for tvar=%p", 
610           trec, 
611           q -> closure, 
612           s);
613     ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
614     nq = q -> next_queue_entry;
615     pq = q -> prev_queue_entry;
616     if (nq != END_STM_WATCH_QUEUE) {
617       nq -> prev_queue_entry = pq;
618     }
619     if (pq != END_STM_WATCH_QUEUE) {
620       pq -> next_queue_entry = nq;
621     } else {
622       ASSERT (s -> first_watch_queue_entry == q);
623       s -> first_watch_queue_entry = nq;
624     }
625     free_stg_tvar_watch_queue(cap, q);
626     unlock_tvar(trec, s, saw, FALSE);
627   });
628 }
629  
630 /*......................................................................*/
631  
632 static TRecEntry *get_new_entry(Capability *cap,
633                                 StgTRecHeader *t) {
634   TRecEntry *result;
635   StgTRecChunk *c;
636   int i;
637
638   c = t -> current_chunk;
639   i = c -> next_entry_idx;
640   ASSERT(c != END_STM_CHUNK_LIST);
641
642   if (i < TREC_CHUNK_NUM_ENTRIES) {
643     // Continue to use current chunk
644     result = &(c -> entries[i]);
645     c -> next_entry_idx ++;
646   } else {
647     // Current chunk is full: allocate a fresh one
648     StgTRecChunk *nc;
649     nc = alloc_stg_trec_chunk(cap);
650     nc -> prev_chunk = c;
651     nc -> next_entry_idx = 1;
652     t -> current_chunk = nc;
653     result = &(nc -> entries[0]);
654   }
655
656   return result;
657 }
658
659 /*......................................................................*/
660
661 static void merge_update_into(Capability *cap,
662                               StgTRecHeader *t,
663                               StgTVar *tvar,
664                               StgClosure *expected_value,
665                               StgClosure *new_value) {
666   int found;
667   
668   // Look for an entry in this trec
669   found = FALSE;
670   FOR_EACH_ENTRY(t, e, {
671     StgTVar *s;
672     s = e -> tvar;
673     if (s == tvar) {
674       found = TRUE;
675       if (e -> expected_value != expected_value) {
676         // Must abort if the two entries start from different values
677         TRACE("%p : update entries inconsistent at %p (%p vs %p)", 
678               t, tvar, e -> expected_value, expected_value);
679         t -> state = TREC_CONDEMNED;
680       } 
681       e -> new_value = new_value;
682       BREAK_FOR_EACH;
683     }
684   });
685
686   if (!found) {
687     // No entry so far in this trec
688     TRecEntry *ne;
689     ne = get_new_entry(cap, t);
690     ne -> tvar = tvar;
691     ne -> expected_value = expected_value;
692     ne -> new_value = new_value;
693   }
694 }
695
696 /*......................................................................*/
697
698 static void merge_read_into(Capability *cap,
699                             StgTRecHeader *t,
700                             StgTVar *tvar,
701                             StgClosure *expected_value) {
702   int found;
703   
704   // Look for an entry in this trec
705   found = FALSE;
706   FOR_EACH_ENTRY(t, e, {
707     StgTVar *s;
708     s = e -> tvar;
709     if (s == tvar) {
710       found = TRUE;
711       if (e -> expected_value != expected_value) {
712         // Must abort if the two entries start from different values
713         TRACE("%p : read entries inconsistent at %p (%p vs %p)", 
714               t, tvar, e -> expected_value, expected_value);
715         t -> state = TREC_CONDEMNED;
716       } 
717       BREAK_FOR_EACH;
718     }
719   });
720
721   if (!found) {
722     // No entry so far in this trec
723     TRecEntry *ne;
724     ne = get_new_entry(cap, t);
725     ne -> tvar = tvar;
726     ne -> expected_value = expected_value;
727     ne -> new_value = expected_value;
728   }
729 }
730
731 /*......................................................................*/
732
733 static StgBool entry_is_update(TRecEntry *e) {
734   StgBool result;
735   result = (e -> expected_value != e -> new_value);
736   return result;
737
738
739 #if defined(STM_FG_LOCKS)
740 static StgBool entry_is_read_only(TRecEntry *e) {
741   StgBool result;
742   result = (e -> expected_value == e -> new_value);
743   return result;
744
745
746 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
747   StgClosure *c;
748   StgBool result;
749   c = s -> current_value;
750   result = (c == (StgClosure *) h);
751   return result;  
752 }
753 #endif
754
755 // revert_ownership : release a lock on a TVar, storing back
756 // the value that it held when the lock was acquired.  "revert_all"
757 // is set in stmWait and stmReWait when we acquired locks on all of 
758 // the TVars involved.  "revert_all" is not set in commit operations
759 // where we don't lock TVars that have been read from but not updated.
760
761 static void revert_ownership(StgTRecHeader *trec STG_UNUSED,
762                              StgBool revert_all STG_UNUSED) {
763 #if defined(STM_FG_LOCKS) 
764   FOR_EACH_ENTRY(trec, e, {
765     if (revert_all || entry_is_update(e)) {
766       StgTVar *s;
767       s = e -> tvar;
768       if (tvar_is_locked(s, trec)) {
769         unlock_tvar(trec, s, e -> expected_value, TRUE);
770       }
771     }
772   });
773 #endif
774 }
775
776 /*......................................................................*/
777
778 // validate_and_acquire_ownership : this performs the twin functions
779 // of checking that the TVars referred to by entries in trec hold the
780 // expected values and:
781 // 
782 //   - locking the TVar (on updated TVars during commit, or all TVars
783 //     during wait)
784 //
785 //   - recording the identity of the TRec who wrote the value seen in the
786 //     TVar (on non-updated TVars during commit).  These values are 
787 //     stashed in the TRec entries and are then checked in check_read_only
788 //     to ensure that an atomic snapshot of all of these locations has been
789 //     seen.
790
791 static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, 
792                                                int acquire_all,
793                                                int retain_ownership) {
794   StgBool result;
795
796   if (shake()) {
797     TRACE("%p : shake, pretending trec is invalid when it may not be", trec);
798     return FALSE;
799   }
800
801   ASSERT ((trec -> state == TREC_ACTIVE) || 
802           (trec -> state == TREC_WAITING) ||
803           (trec -> state == TREC_CONDEMNED));
804   result = !((trec -> state) == TREC_CONDEMNED);
805   if (result) {
806     FOR_EACH_ENTRY(trec, e, {
807       StgTVar *s;
808       s = e -> tvar;
809       if (acquire_all || entry_is_update(e)) {
810         TRACE("%p : trying to acquire %p", trec, s);
811         if (!cond_lock_tvar(trec, s, e -> expected_value)) {
812           TRACE("%p : failed to acquire %p", trec, s);
813           result = FALSE;
814           BREAK_FOR_EACH;
815         }
816       } else {
817         ASSERT(config_use_read_phase);
818         IF_STM_FG_LOCKS({
819           TRACE("%p : will need to check %p", trec, s);
820           if (s -> current_value != e -> expected_value) {
821             TRACE("%p : doesn't match", trec);
822             result = FALSE;
823             BREAK_FOR_EACH;
824           }
825           e -> num_updates = s -> num_updates;
826           if (s -> current_value != e -> expected_value) {
827             TRACE("%p : doesn't match (race)", trec);
828             result = FALSE;
829             BREAK_FOR_EACH;
830           } else {
831             TRACE("%p : need to check version %ld", trec, e -> num_updates);
832           }
833         });
834       }
835     });
836   }
837
838   if ((!result) || (!retain_ownership)) {
839     revert_ownership(trec, acquire_all);
840   }
841   
842   return result;
843 }
844
845 // check_read_only : check that we've seen an atomic snapshot of the
846 // non-updated TVars accessed by a trec.  This checks that the last TRec to
847 // commit an update to the TVar is unchanged since the value was stashed in
848 // validate_and_acquire_ownership.  If no udpate is seen to any TVar than
849 // all of them contained their expected values at the start of the call to
850 // check_read_only.
851 //
852 // The paper "Concurrent programming without locks" (under submission), or
853 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
854 // this kind of algorithm.
855
856 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
857   StgBool result = TRUE;
858
859   ASSERT (config_use_read_phase);
860   IF_STM_FG_LOCKS({
861     FOR_EACH_ENTRY(trec, e, {
862       StgTVar *s;
863       s = e -> tvar;
864       if (entry_is_read_only(e)) {
865         TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
866         if (s -> num_updates != e -> num_updates) {
867           // ||s -> current_value != e -> expected_value) {
868           TRACE("%p : mismatch", trec);
869           result = FALSE;
870           BREAK_FOR_EACH;
871         }
872       }
873     });
874   });
875
876   return result;
877 }
878
879
880 /************************************************************************/
881
882 void stmPreGCHook() {
883   nat i;
884
885   lock_stm(NO_TREC);
886   TRACE("stmPreGCHook");
887   for (i = 0; i < n_capabilities; i ++) {
888     Capability *cap = &capabilities[i];
889     cap -> free_tvar_watch_queues = END_STM_WATCH_QUEUE;
890     cap -> free_trec_chunks = END_STM_CHUNK_LIST;
891     cap -> free_trec_headers = NO_TREC;
892   }
893   unlock_stm(NO_TREC);
894 }
895
896 /************************************************************************/
897
898 // check_read_only relies on version numbers held in TVars' "num_updates" 
899 // fields not wrapping around while a transaction is committed.  The version
900 // number is incremented each time an update is committed to the TVar
901 // This is unlikely to wrap around when 32-bit integers are used for the counts, 
902 // but to ensure correctness we maintain a shared count on the maximum
903 // number of commit operations that may occur and check that this has 
904 // not increased by more than 2^32 during a commit.
905
906 #define TOKEN_BATCH_SIZE 1024
907
908 static volatile StgInt64 max_commits = 0;
909
910 #if defined(THREADED_RTS)
911 static volatile StgBool token_locked = FALSE;
912
913 static void getTokenBatch(Capability *cap) {
914   while (cas((void *)&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
915   max_commits += TOKEN_BATCH_SIZE;
916   TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits);
917   cap -> transaction_tokens = TOKEN_BATCH_SIZE;
918   token_locked = FALSE;
919 }
920
921 static void getToken(Capability *cap) {
922   if (cap -> transaction_tokens == 0) {
923     getTokenBatch(cap);
924   }
925   cap -> transaction_tokens --;
926 }
927 #else
928 static void getToken(Capability *cap STG_UNUSED) {
929   // Nothing
930 }
931 #endif
932
933 /*......................................................................*/
934
935 StgTRecHeader *stmStartTransaction(Capability *cap,
936                                    StgTRecHeader *outer) {
937   StgTRecHeader *t;
938   TRACE("%p : stmStartTransaction with %d tokens", 
939         outer, 
940         cap -> transaction_tokens);
941
942   getToken(cap);
943
944   t = alloc_stg_trec_header(cap, outer);
945   TRACE("%p : stmStartTransaction()=%p", outer, t);
946   return t;
947 }
948
949 /*......................................................................*/
950
951 void stmAbortTransaction(Capability *cap,
952                          StgTRecHeader *trec) {
953   StgTRecHeader *et;
954   TRACE("%p : stmAbortTransaction", trec);
955   ASSERT (trec != NO_TREC);
956   ASSERT ((trec -> state == TREC_ACTIVE) || 
957           (trec -> state == TREC_WAITING) ||
958           (trec -> state == TREC_CONDEMNED));
959
960   lock_stm(trec);
961
962   et = trec -> enclosing_trec;
963   if (et == NO_TREC) {
964     // We're a top-level transaction: remove any watch queue entries that
965     // we may have.
966     TRACE("%p : aborting top-level transaction", trec);
967
968     if (trec -> state == TREC_WAITING) {
969       ASSERT (trec -> enclosing_trec == NO_TREC);
970       TRACE("%p : stmAbortTransaction aborting waiting transaction", trec);
971       remove_watch_queue_entries_for_trec(cap, trec);
972     } 
973
974   } else {
975     // We're a nested transaction: merge our read set into our parent's
976     TRACE("%p : retaining read-set into parent %p", trec, et);
977
978     FOR_EACH_ENTRY(trec, e, {
979       StgTVar *s = e -> tvar;
980       merge_read_into(cap, et, s, e -> expected_value);
981     });
982   } 
983
984   trec -> state = TREC_ABORTED;
985   unlock_stm(trec);
986
987   TRACE("%p : stmAbortTransaction done", trec);
988 }
989
990 /*......................................................................*/
991
992 void stmFreeAbortedTRec(Capability *cap,
993                         StgTRecHeader *trec) {
994   TRACE("%p : stmFreeAbortedTRec", trec);
995   ASSERT (trec != NO_TREC);
996   ASSERT ((trec -> state == TREC_CONDEMNED) ||
997           (trec -> state == TREC_ABORTED));
998
999   free_stg_trec_header(cap, trec);
1000
1001   TRACE("%p : stmFreeAbortedTRec done", trec);
1002 }
1003
1004 /*......................................................................*/
1005
1006 void stmCondemnTransaction(Capability *cap,
1007                            StgTRecHeader *trec) {
1008   TRACE("%p : stmCondemnTransaction", trec);
1009   ASSERT (trec != NO_TREC);
1010   ASSERT ((trec -> state == TREC_ACTIVE) || 
1011           (trec -> state == TREC_WAITING) ||
1012           (trec -> state == TREC_CONDEMNED));
1013
1014   lock_stm(trec);
1015   if (trec -> state == TREC_WAITING) {
1016     ASSERT (trec -> enclosing_trec == NO_TREC);
1017     TRACE("%p : stmCondemnTransaction condemning waiting transaction", trec);
1018     remove_watch_queue_entries_for_trec(cap, trec);
1019   } 
1020   trec -> state = TREC_CONDEMNED;
1021   unlock_stm(trec);
1022
1023   TRACE("%p : stmCondemnTransaction done", trec);
1024 }
1025
1026 /*......................................................................*/
1027
1028 StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
1029   StgTRecHeader *t;
1030   StgBool result;
1031
1032   TRACE("%p : stmValidateNestOfTransactions", trec);
1033   ASSERT(trec != NO_TREC);
1034   ASSERT((trec -> state == TREC_ACTIVE) || 
1035          (trec -> state == TREC_WAITING) ||
1036          (trec -> state == TREC_CONDEMNED));
1037
1038   lock_stm(trec);
1039
1040   t = trec;
1041   result = TRUE;
1042   while (t != NO_TREC) {
1043     result &= validate_and_acquire_ownership(t, TRUE, FALSE);
1044     t = t -> enclosing_trec;
1045   }
1046
1047   if (!result && trec -> state != TREC_WAITING) {
1048     trec -> state = TREC_CONDEMNED; 
1049   }
1050
1051   unlock_stm(trec);
1052
1053   TRACE("%p : stmValidateNestOfTransactions()=%d", trec, result);
1054   return result;
1055 }
1056
1057 /*......................................................................*/
1058
1059 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
1060   TRecEntry *result = NULL;
1061
1062   TRACE("%p : get_entry_for TVar %p", trec, tvar);
1063   ASSERT(trec != NO_TREC);
1064
1065   do {
1066     FOR_EACH_ENTRY(trec, e, {
1067       if (e -> tvar == tvar) {
1068         result = e;
1069         if (in != NULL) {
1070           *in = trec;
1071         }
1072         BREAK_FOR_EACH;
1073       }
1074     });
1075     trec = trec -> enclosing_trec;
1076   } while (result == NULL && trec != NO_TREC);
1077
1078   return result;    
1079 }
1080
1081 /*......................................................................*/
1082
1083 /*
1084  * Add/remove links between an invariant TVars.  The caller must have 
1085  * locked the TVars involved and the invariant.
1086  */
1087
1088 static void disconnect_invariant(Capability *cap,
1089                                  StgAtomicInvariant *inv) {
1090   StgTRecHeader *last_execution = inv -> last_execution;
1091
1092   TRACE("unhooking last execution inv=%p trec=%p", inv, last_execution);
1093
1094   FOR_EACH_ENTRY(last_execution, e, {
1095     StgTVar *s = e -> tvar;
1096     StgTVarWatchQueue *q = s -> first_watch_queue_entry;
1097     StgBool found = FALSE;
1098     TRACE("  looking for trec on tvar=%p", s);
1099     for (q = s -> first_watch_queue_entry; 
1100          q != END_STM_WATCH_QUEUE; 
1101          q = q -> next_queue_entry) {
1102       if (q -> closure == (StgClosure*)inv) {
1103         StgTVarWatchQueue *pq;
1104         StgTVarWatchQueue *nq;
1105         nq = q -> next_queue_entry;
1106         pq = q -> prev_queue_entry;
1107         if (nq != END_STM_WATCH_QUEUE) {
1108           nq -> prev_queue_entry = pq;
1109         }
1110         if (pq != END_STM_WATCH_QUEUE) {
1111           pq -> next_queue_entry = nq;
1112         } else {
1113           ASSERT (s -> first_watch_queue_entry == q);
1114           s -> first_watch_queue_entry = nq;
1115         }
1116         TRACE("  found it in watch queue entry %p", q);
1117         free_stg_tvar_watch_queue(cap, q);
1118         found = TRUE;
1119         break;
1120       }
1121     }
1122     ASSERT(found);
1123   });
1124   inv -> last_execution = NO_TREC;
1125 }
1126
1127 static void connect_invariant_to_trec(Capability *cap,
1128                                       StgAtomicInvariant *inv, 
1129                                       StgTRecHeader *my_execution) {
1130   TRACE("connecting execution inv=%p trec=%p", inv, my_execution);
1131
1132   ASSERT(inv -> last_execution == NO_TREC);
1133
1134   FOR_EACH_ENTRY(my_execution, e, {
1135     StgTVar *s = e -> tvar;
1136     StgTVarWatchQueue *q = alloc_stg_tvar_watch_queue(cap, (StgClosure*)inv);
1137     StgTVarWatchQueue *fq = s -> first_watch_queue_entry;
1138
1139     // We leave "last_execution" holding the values that will be
1140     // in the heap after the transaction we're in the process
1141     // of committing has finished.
1142     TRecEntry *entry = get_entry_for(my_execution -> enclosing_trec, s, NULL);
1143     if (entry != NULL) {
1144       e -> expected_value = entry -> new_value;
1145       e -> new_value = entry -> new_value;
1146     }
1147
1148     TRACE("  linking trec on tvar=%p value=%p q=%p", s, e -> expected_value, q);
1149     q -> next_queue_entry = fq;
1150     q -> prev_queue_entry = END_STM_WATCH_QUEUE;
1151     if (fq != END_STM_WATCH_QUEUE) {
1152       fq -> prev_queue_entry = q;
1153     }
1154     s -> first_watch_queue_entry = q;
1155   });
1156
1157   inv -> last_execution = my_execution;
1158 }
1159
1160 /*
1161  * Add a new invariant to the trec's list of invariants to check on commit
1162  */
1163 void stmAddInvariantToCheck(Capability *cap, 
1164                             StgTRecHeader *trec,
1165                             StgClosure *code) {
1166   StgAtomicInvariant *invariant;
1167   StgInvariantCheckQueue *q;
1168   TRACE("%p : stmAddInvariantToCheck closure=%p", trec, code);
1169   ASSERT(trec != NO_TREC);
1170   ASSERT(trec -> state == TREC_ACTIVE ||
1171          trec -> state == TREC_CONDEMNED);
1172
1173
1174   // 1. Allocate an StgAtomicInvariant, set last_execution to NO_TREC
1175   //    to signal that this is a new invariant in the current atomic block
1176
1177   invariant = (StgAtomicInvariant *) allocate(cap, sizeofW(StgAtomicInvariant));
1178   TRACE("%p : stmAddInvariantToCheck allocated invariant=%p", trec, invariant);
1179   SET_HDR (invariant, &stg_ATOMIC_INVARIANT_info, CCS_SYSTEM);
1180   invariant -> code = code;
1181   invariant -> last_execution = NO_TREC;
1182
1183   // 2. Allocate an StgInvariantCheckQueue entry, link it to the current trec
1184
1185   q = alloc_stg_invariant_check_queue(cap, invariant);
1186   TRACE("%p : stmAddInvariantToCheck allocated q=%p", trec, q);
1187   q -> invariant = invariant;
1188   q -> my_execution = NO_TREC;
1189   q -> next_queue_entry = trec -> invariants_to_check;
1190   trec -> invariants_to_check = q;
1191
1192   TRACE("%p : stmAddInvariantToCheck done", trec);
1193 }
1194
1195 /*
1196  * Fill in the trec's list of invariants that might be violated by the 
1197  * current transaction.  
1198  */
1199
1200 StgInvariantCheckQueue *stmGetInvariantsToCheck(Capability *cap, StgTRecHeader *trec) {
1201   StgTRecChunk *c;
1202   TRACE("%p : stmGetInvariantsToCheck, head was %p", 
1203         trec,
1204         trec -> invariants_to_check);
1205
1206   ASSERT(trec != NO_TREC);
1207   ASSERT ((trec -> state == TREC_ACTIVE) || 
1208           (trec -> state == TREC_WAITING) ||
1209           (trec -> state == TREC_CONDEMNED));
1210   ASSERT(trec -> enclosing_trec == NO_TREC);
1211
1212   lock_stm(trec);
1213   c = trec -> current_chunk;
1214   while (c != END_STM_CHUNK_LIST) {
1215     unsigned int i;
1216     for (i = 0; i < c -> next_entry_idx; i ++) {
1217       TRecEntry *e = &(c -> entries[i]);
1218       if (entry_is_update(e)) {
1219         StgTVar *s = e -> tvar;
1220         StgClosure *old = lock_tvar(trec, s);
1221                 
1222         // Pick up any invariants on the TVar being updated
1223         // by entry "e"
1224
1225         StgTVarWatchQueue *q;
1226         TRACE("%p : checking for invariants on %p", trec, s);
1227         for (q = s -> first_watch_queue_entry;
1228              q != END_STM_WATCH_QUEUE;
1229              q = q -> next_queue_entry) {
1230           if (watcher_is_invariant(q)) {
1231             StgBool found = FALSE;
1232             StgInvariantCheckQueue *q2;
1233             TRACE("%p : Touching invariant %p", trec, q -> closure);
1234             for (q2 = trec -> invariants_to_check;
1235                  q2 != END_INVARIANT_CHECK_QUEUE;
1236                  q2 = q2 -> next_queue_entry) {
1237               if (q2 -> invariant == (StgAtomicInvariant*)(q -> closure)) {
1238                 TRACE("%p : Already found %p", trec, q -> closure);
1239                 found = TRUE;
1240                 break;
1241               }
1242             }
1243             
1244             if (!found) {
1245               StgInvariantCheckQueue *q3;
1246               TRACE("%p : Not already found %p", trec, q -> closure);
1247               q3 = alloc_stg_invariant_check_queue(cap,
1248                                                    (StgAtomicInvariant*) q -> closure);
1249               q3 -> next_queue_entry = trec -> invariants_to_check;
1250               trec -> invariants_to_check = q3;
1251             }
1252           }
1253         }
1254
1255         unlock_tvar(trec, s, old, FALSE);
1256       }
1257     }
1258     c = c -> prev_chunk;
1259   }
1260
1261   unlock_stm(trec);
1262
1263   TRACE("%p : stmGetInvariantsToCheck, head now %p", 
1264         trec,
1265         trec -> invariants_to_check);
1266
1267   return (trec -> invariants_to_check);
1268 }
1269
1270 /*......................................................................*/
1271
1272 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
1273   int result;
1274   StgInt64 max_commits_at_start = max_commits;
1275   StgBool touched_invariants;
1276   StgBool use_read_phase;
1277
1278   TRACE("%p : stmCommitTransaction()", trec);
1279   ASSERT (trec != NO_TREC);
1280
1281   lock_stm(trec);
1282
1283   ASSERT (trec -> enclosing_trec == NO_TREC);
1284   ASSERT ((trec -> state == TREC_ACTIVE) || 
1285           (trec -> state == TREC_CONDEMNED));
1286
1287   // touched_invariants is true if we've written to a TVar with invariants 
1288   // attached to it, or if we're trying to add a new invariant to the system.
1289
1290   touched_invariants = (trec -> invariants_to_check != END_INVARIANT_CHECK_QUEUE);
1291
1292   // If we have touched invariants then (i) lock the invariant, and (ii) add
1293   // the invariant's read set to our own.  Step (i) is needed to serialize
1294   // concurrent transactions that attempt to make conflicting updates
1295   // to the invariant's trec (suppose it read from t1 and t2, and that one
1296   // concurrent transcation writes only to t1, and a second writes only to
1297   // t2).  Step (ii) is needed so that both transactions will lock t1 and t2
1298   // to gain access to their wait lists (and hence be able to unhook the
1299   // invariant from both tvars).
1300
1301   if (touched_invariants) {
1302     StgInvariantCheckQueue *q = trec -> invariants_to_check;
1303     TRACE("%p : locking invariants", trec);
1304     while (q != END_INVARIANT_CHECK_QUEUE) {
1305       StgTRecHeader *inv_old_trec;
1306       StgAtomicInvariant *inv;
1307       TRACE("%p : locking invariant %p", trec, q -> invariant);
1308       inv = q -> invariant;
1309       if (!lock_inv(inv)) {
1310         TRACE("%p : failed to lock %p", trec, inv);
1311         trec -> state = TREC_CONDEMNED;
1312         break;
1313       }
1314
1315       inv_old_trec = inv -> last_execution;
1316       if (inv_old_trec != NO_TREC) {
1317         StgTRecChunk *c = inv_old_trec -> current_chunk;
1318         while (c != END_STM_CHUNK_LIST) {
1319           unsigned int i;
1320           for (i = 0; i < c -> next_entry_idx; i ++) {
1321             TRecEntry *e = &(c -> entries[i]);
1322             TRACE("%p : ensuring we lock TVars for %p", trec, e -> tvar);
1323             merge_read_into (cap, trec, e -> tvar, e -> expected_value);
1324           }
1325           c = c -> prev_chunk;
1326         }
1327       }
1328       q = q -> next_queue_entry;
1329     }
1330     TRACE("%p : finished locking invariants", trec);
1331   }
1332
1333   // Use a read-phase (i.e. don't lock TVars we've read but not updated) if
1334   // (i) the configuration lets us use a read phase, and (ii) we've not
1335   // touched or introduced any invariants.  
1336   //
1337   // In principle we could extend the implementation to support a read-phase
1338   // and invariants, but it complicates the logic: the links between
1339   // invariants and TVars are managed by the TVar watch queues which are
1340   // protected by the TVar's locks.
1341
1342   use_read_phase = ((config_use_read_phase) && (!touched_invariants));
1343
1344   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
1345   if (result) {
1346     // We now know that all the updated locations hold their expected values.
1347     ASSERT (trec -> state == TREC_ACTIVE);
1348
1349     if (use_read_phase) {
1350       StgInt64 max_commits_at_end;
1351       StgInt64 max_concurrent_commits;
1352       TRACE("%p : doing read check", trec);
1353       result = check_read_only(trec);
1354       TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
1355
1356       max_commits_at_end = max_commits;
1357       max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
1358                                 (n_capabilities * TOKEN_BATCH_SIZE));
1359       if (((max_concurrent_commits >> 32) > 0) || shake()) {
1360         result = FALSE;
1361       }
1362     }
1363     
1364     if (result) {
1365       // We now know that all of the read-only locations held their exepcted values
1366       // at the end of the call to validate_and_acquire_ownership.  This forms the
1367       // linearization point of the commit.
1368
1369       // 1. If we have touched or introduced any invariants then unhook them
1370       //    from the TVars they depended on last time they were executed
1371       //    and hook them on the TVars that they now depend on.
1372       if (touched_invariants) {
1373         StgInvariantCheckQueue *q = trec -> invariants_to_check;
1374         while (q != END_INVARIANT_CHECK_QUEUE) {
1375           StgAtomicInvariant *inv = q -> invariant;
1376           if (inv -> last_execution != NO_TREC) {
1377             disconnect_invariant(cap, inv);
1378           }
1379
1380           TRACE("%p : hooking up new execution trec=%p", trec, q -> my_execution);
1381           connect_invariant_to_trec(cap, inv, q -> my_execution);
1382
1383           TRACE("%p : unlocking invariant %p", trec, inv);
1384           unlock_inv(inv);
1385
1386           q = q -> next_queue_entry;
1387         }
1388       }
1389
1390       // 2. Make the updates required by the transaction
1391       FOR_EACH_ENTRY(trec, e, {
1392         StgTVar *s;
1393         s = e -> tvar;
1394         if ((!use_read_phase) || (e -> new_value != e -> expected_value)) {
1395           // Either the entry is an update or we're not using a read phase:
1396           // write the value back to the TVar, unlocking it if necessary.
1397
1398           ACQ_ASSERT(tvar_is_locked(s, trec));
1399           TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
1400           unpark_waiters_on(cap,s);
1401           IF_STM_FG_LOCKS({
1402             s -> num_updates ++;
1403           });
1404           unlock_tvar(trec, s, e -> new_value, TRUE);
1405         } 
1406         ACQ_ASSERT(!tvar_is_locked(s, trec));
1407       });
1408     } else {
1409       revert_ownership(trec, FALSE);
1410     }
1411   } 
1412
1413   unlock_stm(trec);
1414
1415   free_stg_trec_header(cap, trec);
1416
1417   TRACE("%p : stmCommitTransaction()=%d", trec, result);
1418
1419   return result;
1420 }
1421
1422 /*......................................................................*/
1423
1424 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
1425   StgTRecHeader *et;
1426   int result;
1427   ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
1428   TRACE("%p : stmCommitNestedTransaction() into %p", trec, trec -> enclosing_trec);
1429   ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
1430
1431   lock_stm(trec);
1432
1433   et = trec -> enclosing_trec;
1434   result = validate_and_acquire_ownership(trec, (!config_use_read_phase), TRUE);
1435   if (result) {
1436     // We now know that all the updated locations hold their expected values.
1437
1438     if (config_use_read_phase) {
1439       TRACE("%p : doing read check", trec);
1440       result = check_read_only(trec);
1441     }
1442     if (result) {
1443       // We now know that all of the read-only locations held their exepcted values
1444       // at the end of the call to validate_and_acquire_ownership.  This forms the
1445       // linearization point of the commit.
1446
1447       TRACE("%p : read-check succeeded", trec);
1448       FOR_EACH_ENTRY(trec, e, {
1449         // Merge each entry into the enclosing transaction record, release all
1450         // locks.
1451         
1452         StgTVar *s;
1453         s = e -> tvar;
1454         if (entry_is_update(e)) {
1455           unlock_tvar(trec, s, e -> expected_value, FALSE);
1456         }
1457         merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
1458         ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
1459       });
1460     } else {
1461       revert_ownership(trec, FALSE);
1462     }
1463   } 
1464
1465   unlock_stm(trec);
1466
1467   free_stg_trec_header(cap, trec);
1468
1469   TRACE("%p : stmCommitNestedTransaction()=%d", trec, result);
1470
1471   return result;
1472 }
1473
1474 /*......................................................................*/
1475
1476 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
1477   int result;
1478   TRACE("%p : stmWait(%p)", trec, tso);
1479   ASSERT (trec != NO_TREC);
1480   ASSERT (trec -> enclosing_trec == NO_TREC);
1481   ASSERT ((trec -> state == TREC_ACTIVE) || 
1482           (trec -> state == TREC_CONDEMNED));
1483
1484   lock_stm(trec);
1485   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
1486   if (result) {
1487     // The transaction is valid so far so we can actually start waiting.
1488     // (Otherwise the transaction was not valid and the thread will have to
1489     // retry it).
1490
1491     // Put ourselves to sleep.  We retain locks on all the TVars involved
1492     // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
1493     // in the TSO, (c) TREC_WAITING in the Trec.  
1494     build_watch_queue_entries_for_trec(cap, tso, trec);
1495     park_tso(tso);
1496     trec -> state = TREC_WAITING;
1497
1498     // We haven't released ownership of the transaction yet.  The TSO
1499     // has been put on the wait queue for the TVars it is waiting for,
1500     // but we haven't yet tidied up the TSO's stack and made it safe
1501     // to wake up the TSO.  Therefore, we must wait until the TSO is
1502     // safe to wake up before we release ownership - when all is well,
1503     // the runtime will call stmWaitUnlock() below, with the same
1504     // TRec.
1505
1506   } else {
1507     unlock_stm(trec);
1508     free_stg_trec_header(cap, trec);
1509   }
1510
1511   TRACE("%p : stmWait(%p)=%d", trec, tso, result);
1512   return result;
1513 }
1514
1515
1516 void
1517 stmWaitUnlock(Capability *cap STG_UNUSED, StgTRecHeader *trec) {
1518     revert_ownership(trec, TRUE);
1519     unlock_stm(trec);
1520 }
1521
1522 /*......................................................................*/
1523
1524 StgBool stmReWait(Capability *cap, StgTSO *tso) {
1525   int result;
1526   StgTRecHeader *trec = tso->trec;
1527
1528   TRACE("%p : stmReWait", trec);
1529   ASSERT (trec != NO_TREC);
1530   ASSERT (trec -> enclosing_trec == NO_TREC);
1531   ASSERT ((trec -> state == TREC_WAITING) || 
1532           (trec -> state == TREC_CONDEMNED));
1533
1534   lock_stm(trec);
1535   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
1536   TRACE("%p : validation %s", trec, result ? "succeeded" : "failed");
1537   if (result) {
1538     // The transaction remains valid -- do nothing because it is already on
1539     // the wait queues
1540     ASSERT (trec -> state == TREC_WAITING);
1541     park_tso(tso);
1542     revert_ownership(trec, TRUE);
1543   } else {
1544     // The transcation has become invalid.  We can now remove it from the wait
1545     // queues.
1546     if (trec -> state != TREC_CONDEMNED) {
1547       remove_watch_queue_entries_for_trec (cap, trec);
1548     }
1549     free_stg_trec_header(cap, trec);
1550   }
1551   unlock_stm(trec);
1552
1553   TRACE("%p : stmReWait()=%d", trec, result);
1554   return result;
1555 }
1556
1557 /*......................................................................*/
1558
1559 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
1560   StgClosure *result;
1561   result = tvar -> current_value;
1562
1563 #if defined(STM_FG_LOCKS)
1564   while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) {
1565     TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
1566     result = tvar -> current_value;
1567   }
1568 #endif
1569
1570   TRACE("%p : read_current_value(%p)=%p", trec, tvar, result);
1571   return result;
1572 }
1573
1574 /*......................................................................*/
1575
1576 StgClosure *stmReadTVar(Capability *cap,
1577                         StgTRecHeader *trec, 
1578                         StgTVar *tvar) {
1579   StgTRecHeader *entry_in = NULL;
1580   StgClosure *result = NULL;
1581   TRecEntry *entry = NULL;
1582   TRACE("%p : stmReadTVar(%p)", trec, tvar);
1583   ASSERT (trec != NO_TREC);
1584   ASSERT (trec -> state == TREC_ACTIVE || 
1585           trec -> state == TREC_CONDEMNED);
1586
1587   entry = get_entry_for(trec, tvar, &entry_in);
1588
1589   if (entry != NULL) {
1590     if (entry_in == trec) {
1591       // Entry found in our trec
1592       result = entry -> new_value;
1593     } else {
1594       // Entry found in another trec
1595       TRecEntry *new_entry = get_new_entry(cap, trec);
1596       new_entry -> tvar = tvar;
1597       new_entry -> expected_value = entry -> expected_value;
1598       new_entry -> new_value = entry -> new_value;
1599       result = new_entry -> new_value;
1600     } 
1601   } else {
1602     // No entry found
1603     StgClosure *current_value = read_current_value(trec, tvar);
1604     TRecEntry *new_entry = get_new_entry(cap, trec);
1605     new_entry -> tvar = tvar;
1606     new_entry -> expected_value = current_value;
1607     new_entry -> new_value = current_value;
1608     result = current_value;
1609   }
1610
1611   TRACE("%p : stmReadTVar(%p)=%p", trec, tvar, result);
1612   return result;
1613 }
1614
1615 /*......................................................................*/
1616
1617 void stmWriteTVar(Capability *cap,
1618                   StgTRecHeader *trec,
1619                   StgTVar *tvar, 
1620                   StgClosure *new_value) {
1621
1622   StgTRecHeader *entry_in = NULL;
1623   TRecEntry *entry = NULL;
1624   TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
1625   ASSERT (trec != NO_TREC);
1626   ASSERT (trec -> state == TREC_ACTIVE || 
1627           trec -> state == TREC_CONDEMNED);
1628
1629   entry = get_entry_for(trec, tvar, &entry_in);
1630
1631   if (entry != NULL) {
1632     if (entry_in == trec) {
1633       // Entry found in our trec
1634       entry -> new_value = new_value;
1635     } else {
1636       // Entry found in another trec
1637       TRecEntry *new_entry = get_new_entry(cap, trec);
1638       new_entry -> tvar = tvar;
1639       new_entry -> expected_value = entry -> expected_value;
1640       new_entry -> new_value = new_value;
1641     } 
1642   } else {
1643     // No entry found
1644     StgClosure *current_value = read_current_value(trec, tvar);
1645     TRecEntry *new_entry = get_new_entry(cap, trec);
1646     new_entry -> tvar = tvar;
1647     new_entry -> expected_value = current_value;
1648     new_entry -> new_value = new_value;
1649   }
1650
1651   TRACE("%p : stmWriteTVar done", trec);
1652 }
1653
1654 /*......................................................................*/
1655
1656 StgTVar *stmNewTVar(Capability *cap,
1657                     StgClosure *new_value) {
1658   StgTVar *result;
1659   result = (StgTVar *)allocate(cap, sizeofW(StgTVar));
1660   SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
1661   result -> current_value = new_value;
1662   result -> first_watch_queue_entry = END_STM_WATCH_QUEUE;
1663 #if defined(THREADED_RTS)
1664   result -> num_updates = 0;
1665 #endif
1666   return result;
1667 }
1668
1669 /*......................................................................*/