New tracing interface
[ghc-hetmet.git] / rts / STM.c
1 /* -----------------------------------------------------------------------------
2  * (c) The GHC Team 1998-2005
3  * 
4  * STM implementation.
5  *
6  * Overview
7  * --------
8  *
9  * See the PPoPP 2005 paper "Composable memory transactions".  In summary, 
10  * each transcation has a TRec (transaction record) holding entries for each of the
11  * TVars (transactional variables) that it has accessed.  Each entry records
12  * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
13  * the transaction wants to write to the TVar, (d) during commit, the identity of
14  * the TRec that wrote the expected value.  
15  *
16  * Separate TRecs are used for each level in a nest of transactions.  This allows
17  * a nested transaction to be aborted without condemning its enclosing transactions.
18  * This is needed in the implementation of catchRetry.  Note that the "expected value"
19  * in a nested transaction's TRec is the value expected to be *held in memory* if
20  * the transaction commits -- not the "new value" stored in one of the enclosing
21  * transactions.  This means that validation can be done without searching through
22  * a nest of TRecs.
23  *
24  * Concurrency control
25  * -------------------
26  *
27  * Three different concurrency control schemes can be built according to the settings
28  * in STM.h:
29  * 
30  * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
31  * In the Haskell RTS this means it is suitable only for non-THREADED_RTS builds.
32  *
33  * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
34  * an invocation on the STM interface.  Note that this does not mean that 
35  * transactions are simply serialized -- the lock is only held *within* the 
36  * implementation of stmCommitTransaction, stmWait etc.
37  *
38  * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
39  * and, when committing a transaction, no locks are acquired for TVars that have
40  * been read but not updated.
41  *
42  * Concurrency control is implemented in the functions:
43  *
44  *    lock_stm
45  *    unlock_stm
46  *    lock_tvar / cond_lock_tvar
47  *    unlock_tvar
48  *
49  * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the 
50  * implementation of these functions.  
51  *
52  * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
53  * using STM_CG_LOCK, and otherwise they are no-ops.
54  *
55  * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they 
56  * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
57  * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
58  * builds).  This is because locking a TVar is implemented by writing the lock
59  * holder's TRec into the TVar's current_value field:
60  *
61  *   lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value 
62  *               it contained.
63  *
64  *   cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it 
65  *               contains a specified value.  Return TRUE if this succeeds,
66  *               FALSE otherwise.
67  *
68  *   unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
69  *               storing a specified value in place of the lock entry.
70  *
71  * Using these operations, the typcial pattern of a commit/validate/wait operation
72  * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that 
73  * the TVars that were only read from still contain their expected values, 
74  * (d) release the locks on the TVars, writing updates to them in the case of a 
75  * commit, (e) unlock the STM.
76  *
77  * Queues of waiting threads hang off the first_wait_queue_entry field of each
78  * TVar.  This may only be manipulated when holding that TVar's lock.  In
79  * particular, when a thread is putting itself to sleep, it mustn't release
80  * the TVar's lock until it has added itself to the wait queue and marked its
81  * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it.
82  *
83  * ---------------------------------------------------------------------------*/
84
85 #include "PosixSource.h"
86 #include "Rts.h"
87 #include "RtsFlags.h"
88 #include "RtsUtils.h"
89 #include "Schedule.h"
90 #include "SMP.h"
91 #include "STM.h"
92 #include "Storage.h"
93 #include "Trace.h"
94
95 #include <stdlib.h>
96 #include <stdio.h>
97
98 #define TRUE 1
99 #define FALSE 0
100
101 // ACQ_ASSERT is used for assertions which are only required for
102 // THREADED_RTS builds with fine-grained locking.
103
104 #if defined(STM_FG_LOCKS)
105 #define ACQ_ASSERT(_X) ASSERT(_X)
106 #define NACQ_ASSERT(_X) /*Nothing*/
107 #else
108 #define ACQ_ASSERT(_X) /*Nothing*/
109 #define NACQ_ASSERT(_X) ASSERT(_X)
110 #endif
111
112 /*......................................................................*/
113
114 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
115 // unusualy code paths if genuine contention is rare
116
117 #define TRACE(_x...) debugTrace(DEBUG_stm, "STM: " _x)
118
119 #ifdef SHAKE
120 static const int do_shake = TRUE;
121 #else
122 static const int do_shake = FALSE;
123 #endif
124 static int shake_ctr = 0;
125 static int shake_lim = 1;
126
127 static int shake(void) {
128   if (do_shake) {
129     if (((shake_ctr++) % shake_lim) == 0) {
130       shake_ctr = 1;
131       shake_lim ++;
132       return TRUE;
133     } 
134     return FALSE;
135   } else {
136     return FALSE;
137   }
138 }
139
140 /*......................................................................*/
141
142 // Helper macros for iterating over entries within a transaction
143 // record
144
145 #define FOR_EACH_ENTRY(_t,_x,CODE) do {                                         \
146   StgTRecHeader *__t = (_t);                                                    \
147   StgTRecChunk *__c = __t -> current_chunk;                                     \
148   StgWord __limit = __c -> next_entry_idx;                                      \
149   TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld\n", __t, __c, __limit); \
150   while (__c != END_STM_CHUNK_LIST) {                                           \
151     StgWord __i;                                                                \
152     for (__i = 0; __i < __limit; __i ++) {                                      \
153       TRecEntry *_x = &(__c -> entries[__i]);                                   \
154       do { CODE } while (0);                                                    \
155     }                                                                           \
156     __c = __c -> prev_chunk;                                                    \
157     __limit = TREC_CHUNK_NUM_ENTRIES;                                           \
158   }                                                                             \
159  exit_for_each:                                                                 \
160   if (FALSE) goto exit_for_each;                                                \
161 } while (0)
162
163 #define BREAK_FOR_EACH goto exit_for_each
164      
165 /*......................................................................*/
166
167 // if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
168 // and wait queue entries without GC
169
170 #define REUSE_MEMORY
171
172 /*......................................................................*/
173
174 #define IF_STM_UNIPROC(__X)  do { } while (0)
175 #define IF_STM_CG_LOCK(__X)  do { } while (0)
176 #define IF_STM_FG_LOCKS(__X) do { } while (0)
177
178 #if defined(STM_UNIPROC)
179 #undef IF_STM_UNIPROC
180 #define IF_STM_UNIPROC(__X)  do { __X } while (0)
181 static const StgBool use_read_phase = FALSE;
182
183 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
184   TRACE("%p : lock_stm()\n", trec);
185 }
186
187 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
188   TRACE("%p : unlock_stm()\n", trec);
189 }
190
191 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
192                              StgTVar *s STG_UNUSED) {
193   StgClosure *result;
194   TRACE("%p : lock_tvar(%p)\n", trec, s);
195   result = s -> current_value;
196   return result;
197 }
198
199 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
200                         StgTVar *s STG_UNUSED,
201                         StgClosure *c,
202                         StgBool force_update) {
203   TRACE("%p : unlock_tvar(%p)\n", trec, s);
204   if (force_update) {
205     s -> current_value = c;
206   }
207 }
208
209 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
210                               StgTVar *s STG_UNUSED,
211                               StgClosure *expected) {
212   StgClosure *result;
213   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
214   result = s -> current_value;
215   TRACE("%p : %s\n", trec, (result == expected) ? "success" : "failure");
216   return (result == expected);
217 }
218 #endif
219
220 #if defined(STM_CG_LOCK) /*........................................*/
221
222 #undef IF_STM_CG_LOCK
223 #define IF_STM_CG_LOCK(__X)  do { __X } while (0)
224 static const StgBool use_read_phase = FALSE;
225 static volatile StgTRecHeader *smp_locked = NULL;
226
227 static void lock_stm(StgTRecHeader *trec) {
228   while (cas(&smp_locked, NULL, trec) != NULL) { }
229   TRACE("%p : lock_stm()\n", trec);
230 }
231
232 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
233   TRACE("%p : unlock_stm()\n", trec);
234   ASSERT (smp_locked == trec);
235   smp_locked = 0;
236 }
237
238 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
239                              StgTVar *s STG_UNUSED) {
240   StgClosure *result;
241   TRACE("%p : lock_tvar(%p)\n", trec, s);
242   ASSERT (smp_locked == trec);
243   result = s -> current_value;
244   return result;
245 }
246
247 static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
248                          StgTVar *s STG_UNUSED,
249                          StgClosure *c,
250                          StgBool force_update) {
251   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
252   ASSERT (smp_locked == trec);
253   if (force_update) {
254     s -> current_value = c;
255   }
256 }
257
258 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
259                                StgTVar *s STG_UNUSED,
260                                StgClosure *expected) {
261   StgClosure *result;
262   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
263   ASSERT (smp_locked == trec);
264   result = s -> current_value;
265   TRACE("%p : %d\n", result ? "success" : "failure");
266   return (result == expected);
267 }
268 #endif
269
270 #if defined(STM_FG_LOCKS) /*...................................*/
271
272 #undef IF_STM_FG_LOCKS
273 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
274 static const StgBool use_read_phase = TRUE;
275
276 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
277   TRACE("%p : lock_stm()\n", trec);
278 }
279
280 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
281   TRACE("%p : unlock_stm()\n", trec);
282 }
283
284 static StgClosure *lock_tvar(StgTRecHeader *trec, 
285                              StgTVar *s STG_UNUSED) {
286   StgClosure *result;
287   TRACE("%p : lock_tvar(%p)\n", trec, s);
288   do {
289     do {
290       result = s -> current_value;
291     } while (GET_INFO(result) == &stg_TREC_HEADER_info);
292   } while (cas((void *)&(s -> current_value),
293                (StgWord)result, (StgWord)trec) != (StgWord)result);
294   return result;
295 }
296
297 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
298                         StgTVar *s,
299                         StgClosure *c,
300                         StgBool force_update STG_UNUSED) {
301   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
302   ASSERT(s -> current_value == (StgClosure *)trec);
303   s -> current_value = c;
304 }
305
306 static StgBool cond_lock_tvar(StgTRecHeader *trec, 
307                               StgTVar *s,
308                               StgClosure *expected) {
309   StgClosure *result;
310   StgWord w;
311   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
312   w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
313   result = (StgClosure *)w;
314   TRACE("%p : %s\n", trec, result ? "success" : "failure");
315   return (result == expected);
316 }
317 #endif
318
319 /*......................................................................*/
320
321 // Helper functions for thread blocking and unblocking
322
323 static void park_tso(StgTSO *tso) {
324   ASSERT(tso -> why_blocked == NotBlocked);
325   tso -> why_blocked = BlockedOnSTM;
326   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
327   TRACE("park_tso on tso=%p\n", tso);
328 }
329
330 static void unpark_tso(Capability *cap, StgTSO *tso) {
331   // We will continue unparking threads while they remain on one of the wait
332   // queues: it's up to the thread itself to remove it from the wait queues
333   // if it decides to do so when it is scheduled.
334   if (tso -> why_blocked == BlockedOnSTM) {
335     TRACE("unpark_tso on tso=%p\n", tso);
336     unblockOne(cap,tso);
337   } else {
338     TRACE("spurious unpark_tso on tso=%p\n", tso);
339   }
340 }
341
342 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
343   StgTVarWaitQueue *q;
344   TRACE("unpark_waiters_on tvar=%p\n", s);
345   for (q = s -> first_wait_queue_entry; 
346        q != END_STM_WAIT_QUEUE; 
347        q = q -> next_queue_entry) {
348     unpark_tso(cap, q -> waiting_tso);
349   }
350 }
351
352 /*......................................................................*/
353
354 // Helper functions for downstream allocation and initialization
355
356 static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap,
357                                                  StgTSO *waiting_tso) {
358   StgTVarWaitQueue *result;
359   result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue));
360   SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
361   result -> waiting_tso = waiting_tso;
362   return result;
363 }
364
365 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
366   StgTRecChunk *result;
367   result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk));
368   SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
369   result -> prev_chunk = END_STM_CHUNK_LIST;
370   result -> next_entry_idx = 0;
371   return result;
372 }
373
374 static StgTRecHeader *new_stg_trec_header(Capability *cap,
375                                           StgTRecHeader *enclosing_trec) {
376   StgTRecHeader *result;
377   result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader));
378   SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
379
380   result -> enclosing_trec = enclosing_trec;
381   result -> current_chunk = new_stg_trec_chunk(cap);
382
383   if (enclosing_trec == NO_TREC) {
384     result -> state = TREC_ACTIVE;
385   } else {
386     ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
387            enclosing_trec -> state == TREC_CONDEMNED);
388     result -> state = enclosing_trec -> state;
389   }
390
391   return result;  
392 }
393
394 /*......................................................................*/
395
396 // Allocation / deallocation functions that retain per-capability lists
397 // of closures that can be re-used
398
399 static StgTVarWaitQueue *alloc_stg_tvar_wait_queue(Capability *cap,
400                                                    StgTSO *waiting_tso) {
401   StgTVarWaitQueue *result = NULL;
402   if (cap -> free_tvar_wait_queues == END_STM_WAIT_QUEUE) {
403     result = new_stg_tvar_wait_queue(cap, waiting_tso);
404   } else {
405     result = cap -> free_tvar_wait_queues;
406     result -> waiting_tso = waiting_tso;
407     cap -> free_tvar_wait_queues = result -> next_queue_entry;
408   }
409   return result;
410 }
411
412 static void free_stg_tvar_wait_queue(Capability *cap,
413                                      StgTVarWaitQueue *wq) {
414 #if defined(REUSE_MEMORY)
415   wq -> next_queue_entry = cap -> free_tvar_wait_queues;
416   cap -> free_tvar_wait_queues = wq;
417 #endif
418 }
419
420 static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
421   StgTRecChunk *result = NULL;
422   if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
423     result = new_stg_trec_chunk(cap);
424   } else {
425     result = cap -> free_trec_chunks;
426     cap -> free_trec_chunks = result -> prev_chunk;
427     result -> prev_chunk = END_STM_CHUNK_LIST;
428     result -> next_entry_idx = 0;
429   }
430   return result;
431 }
432
433 static void free_stg_trec_chunk(Capability *cap, 
434                                 StgTRecChunk *c) {
435 #if defined(REUSE_MEMORY)
436   c -> prev_chunk = cap -> free_trec_chunks;
437   cap -> free_trec_chunks = c;
438 #endif
439 }
440
441 static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
442                                             StgTRecHeader *enclosing_trec) {
443   StgTRecHeader *result = NULL;
444   if (cap -> free_trec_headers == NO_TREC) {
445     result = new_stg_trec_header(cap, enclosing_trec);
446   } else {
447     result = cap -> free_trec_headers;
448     cap -> free_trec_headers = result -> enclosing_trec;
449     result -> enclosing_trec = enclosing_trec;
450     result -> current_chunk -> next_entry_idx = 0;
451     if (enclosing_trec == NO_TREC) {
452       result -> state = TREC_ACTIVE;
453     } else {
454       ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
455              enclosing_trec -> state == TREC_CONDEMNED);
456       result -> state = enclosing_trec -> state;
457     }
458   }
459   return result;
460 }
461
462 static void free_stg_trec_header(Capability *cap,
463                                  StgTRecHeader *trec) {
464 #if defined(REUSE_MEMORY)
465   StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
466   while (chunk != END_STM_CHUNK_LIST) {
467     StgTRecChunk *prev_chunk = chunk -> prev_chunk;
468     free_stg_trec_chunk(cap, chunk);
469     chunk = prev_chunk;
470   } 
471   trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
472   trec -> enclosing_trec = cap -> free_trec_headers;
473   cap -> free_trec_headers = trec;
474 #endif
475 }
476
477 /*......................................................................*/
478
479 // Helper functions for managing waiting lists
480
481 static void build_wait_queue_entries_for_trec(Capability *cap,
482                                       StgTSO *tso, 
483                                       StgTRecHeader *trec) {
484   ASSERT(trec != NO_TREC);
485   ASSERT(trec -> enclosing_trec == NO_TREC);
486   ASSERT(trec -> state == TREC_ACTIVE);
487
488   TRACE("%p : build_wait_queue_entries_for_trec()\n", trec);
489
490   FOR_EACH_ENTRY(trec, e, {
491     StgTVar *s;
492     StgTVarWaitQueue *q;
493     StgTVarWaitQueue *fq;
494     s = e -> tvar;
495     TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s);
496     ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
497     NACQ_ASSERT(s -> current_value == e -> expected_value);
498     fq = s -> first_wait_queue_entry;
499     q = alloc_stg_tvar_wait_queue(cap, tso);
500     q -> next_queue_entry = fq;
501     q -> prev_queue_entry = END_STM_WAIT_QUEUE;
502     if (fq != END_STM_WAIT_QUEUE) {
503       fq -> prev_queue_entry = q;
504     }
505     s -> first_wait_queue_entry = q;
506     e -> new_value = (StgClosure *) q;
507   });
508 }
509
510 static void remove_wait_queue_entries_for_trec(Capability *cap,
511                                                StgTRecHeader *trec) {
512   ASSERT(trec != NO_TREC);
513   ASSERT(trec -> enclosing_trec == NO_TREC);
514   ASSERT(trec -> state == TREC_WAITING ||
515          trec -> state == TREC_CONDEMNED);
516
517   TRACE("%p : remove_wait_queue_entries_for_trec()\n", trec);
518
519   FOR_EACH_ENTRY(trec, e, {
520     StgTVar *s;
521     StgTVarWaitQueue *pq;
522     StgTVarWaitQueue *nq;
523     StgTVarWaitQueue *q;
524     s = e -> tvar;
525     StgClosure *saw = lock_tvar(trec, s);
526     q = (StgTVarWaitQueue *) (e -> new_value);
527     TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s);
528     ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
529     nq = q -> next_queue_entry;
530     pq = q -> prev_queue_entry;
531     if (nq != END_STM_WAIT_QUEUE) {
532       nq -> prev_queue_entry = pq;
533     }
534     if (pq != END_STM_WAIT_QUEUE) {
535       pq -> next_queue_entry = nq;
536     } else {
537       ASSERT (s -> first_wait_queue_entry == q);
538       s -> first_wait_queue_entry = nq;
539     }
540     free_stg_tvar_wait_queue(cap, q);
541     unlock_tvar(trec, s, saw, FALSE);
542   });
543 }
544  
545 /*......................................................................*/
546  
547 static TRecEntry *get_new_entry(Capability *cap,
548                                 StgTRecHeader *t) {
549   TRecEntry *result;
550   StgTRecChunk *c;
551   int i;
552
553   c = t -> current_chunk;
554   i = c -> next_entry_idx;
555   ASSERT(c != END_STM_CHUNK_LIST);
556
557   if (i < TREC_CHUNK_NUM_ENTRIES) {
558     // Continue to use current chunk
559     result = &(c -> entries[i]);
560     c -> next_entry_idx ++;
561   } else {
562     // Current chunk is full: allocate a fresh one
563     StgTRecChunk *nc;
564     nc = alloc_stg_trec_chunk(cap);
565     nc -> prev_chunk = c;
566     nc -> next_entry_idx = 1;
567     t -> current_chunk = nc;
568     result = &(nc -> entries[0]);
569   }
570
571   return result;
572 }
573
574 /*......................................................................*/
575
576 static void merge_update_into(Capability *cap,
577                               StgTRecHeader *t,
578                               StgTVar *tvar,
579                               StgClosure *expected_value,
580                               StgClosure *new_value) {
581   int found;
582   
583   // Look for an entry in this trec
584   found = FALSE;
585   FOR_EACH_ENTRY(t, e, {
586     StgTVar *s;
587     s = e -> tvar;
588     if (s == tvar) {
589       found = TRUE;
590       if (e -> expected_value != expected_value) {
591         // Must abort if the two entries start from different values
592         TRACE("%p : entries inconsistent at %p (%p vs %p)\n", 
593               t, tvar, e -> expected_value, expected_value);
594         t -> state = TREC_CONDEMNED;
595       } 
596       e -> new_value = new_value;
597       BREAK_FOR_EACH;
598     }
599   });
600
601   if (!found) {
602     // No entry so far in this trec
603     TRecEntry *ne;
604     ne = get_new_entry(cap, t);
605     ne -> tvar = tvar;
606     ne -> expected_value = expected_value;
607     ne -> new_value = new_value;
608   }
609 }
610
611 /*......................................................................*/
612
613 static StgBool entry_is_update(TRecEntry *e) {
614   StgBool result;
615   result = (e -> expected_value != e -> new_value);
616   return result;
617
618
619 #if defined(STM_FG_LOCKS)
620 static StgBool entry_is_read_only(TRecEntry *e) {
621   StgBool result;
622   result = (e -> expected_value == e -> new_value);
623   return result;
624
625
626 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
627   StgClosure *c;
628   StgBool result;
629   c = s -> current_value;
630   result = (c == (StgClosure *) h);
631   return result;  
632 }
633 #endif
634
635 // revert_ownership : release a lock on a TVar, storing back
636 // the value that it held when the lock was acquired.  "revert_all"
637 // is set in stmWait and stmReWait when we acquired locks on all of 
638 // the TVars involved.  "revert_all" is not set in commit operations
639 // where we don't lock TVars that have been read from but not updated.
640
641 static void revert_ownership(StgTRecHeader *trec STG_UNUSED,
642                              StgBool revert_all STG_UNUSED) {
643 #if defined(STM_FG_LOCKS) 
644   FOR_EACH_ENTRY(trec, e, {
645     if (revert_all || entry_is_update(e)) {
646       StgTVar *s;
647       s = e -> tvar;
648       if (tvar_is_locked(s, trec)) {
649         unlock_tvar(trec, s, e -> expected_value, TRUE);
650       }
651     }
652   });
653 #endif
654 }
655
656 /*......................................................................*/
657
658 // validate_and_acquire_ownership : this performs the twin functions
659 // of checking that the TVars referred to by entries in trec hold the
660 // expected values and:
661 // 
662 //   - locking the TVar (on updated TVars during commit, or all TVars
663 //     during wait)
664 //
665 //   - recording the identity of the TRec who wrote the value seen in the
666 //     TVar (on non-updated TVars during commit).  These values are 
667 //     stashed in the TRec entries and are then checked in check_read_only
668 //     to ensure that an atomic snapshot of all of these locations has been
669 //     seen.
670
671 static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, 
672                                                int acquire_all,
673                                                int retain_ownership) {
674   StgBool result;
675
676   if (shake()) {
677     TRACE("%p : shake, pretending trec is invalid when it may not be\n", trec);
678     return FALSE;
679   }
680
681   ASSERT ((trec -> state == TREC_ACTIVE) || 
682           (trec -> state == TREC_WAITING) ||
683           (trec -> state == TREC_CONDEMNED));
684   result = !((trec -> state) == TREC_CONDEMNED);
685   if (result) {
686     FOR_EACH_ENTRY(trec, e, {
687       StgTVar *s;
688       s = e -> tvar;
689       if (acquire_all || entry_is_update(e)) {
690         TRACE("%p : trying to acquire %p\n", trec, s);
691         if (!cond_lock_tvar(trec, s, e -> expected_value)) {
692           TRACE("%p : failed to acquire %p\n", trec, s);
693           result = FALSE;
694           BREAK_FOR_EACH;
695         }
696       } else {
697         ASSERT(use_read_phase);
698         IF_STM_FG_LOCKS({
699           TRACE("%p : will need to check %p\n", trec, s);
700           if (s -> current_value != e -> expected_value) {
701             TRACE("%p : doesn't match\n", trec);
702             result = FALSE;
703             BREAK_FOR_EACH;
704           }
705           e -> num_updates = s -> num_updates;
706           if (s -> current_value != e -> expected_value) {
707             TRACE("%p : doesn't match (race)\n", trec);
708             result = FALSE;
709             BREAK_FOR_EACH;
710           } else {
711             TRACE("%p : need to check version %ld\n", trec, e -> num_updates);
712           }
713         });
714       }
715     });
716   }
717
718   if ((!result) || (!retain_ownership)) {
719     revert_ownership(trec, acquire_all);
720   }
721   
722   return result;
723 }
724
725 // check_read_only : check that we've seen an atomic snapshot of the
726 // non-updated TVars accessed by a trec.  This checks that the last TRec to
727 // commit an update to the TVar is unchanged since the value was stashed in
728 // validate_and_acquire_ownership.  If no udpate is seen to any TVar than
729 // all of them contained their expected values at the start of the call to
730 // check_read_only.
731 //
732 // The paper "Concurrent programming without locks" (under submission), or
733 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
734 // this kind of algorithm.
735
736 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
737   StgBool result = TRUE;
738
739   ASSERT (use_read_phase);
740   IF_STM_FG_LOCKS({
741     FOR_EACH_ENTRY(trec, e, {
742       StgTVar *s;
743       s = e -> tvar;
744       if (entry_is_read_only(e)) {
745         TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
746         if (s -> num_updates != e -> num_updates) {
747           // ||s -> current_value != e -> expected_value) {
748           TRACE("%p : mismatch\n", trec);
749           result = FALSE;
750           BREAK_FOR_EACH;
751         }
752       }
753     });
754   });
755
756   return result;
757 }
758
759
760 /************************************************************************/
761
762 void stmPreGCHook() {
763   nat i;
764
765   lock_stm(NO_TREC);
766   TRACE("stmPreGCHook\n");
767   for (i = 0; i < n_capabilities; i ++) {
768     Capability *cap = &capabilities[i];
769     cap -> free_tvar_wait_queues = END_STM_WAIT_QUEUE;
770     cap -> free_trec_chunks = END_STM_CHUNK_LIST;
771     cap -> free_trec_headers = NO_TREC;
772   }
773   unlock_stm(NO_TREC);
774 }
775
776 /************************************************************************/
777
778 // check_read_only relies on version numbers held in TVars' "num_updates" 
779 // fields not wrapping around while a transaction is committed.  The version
780 // number is incremented each time an update is committed to the TVar
781 // This is unlikely to wrap around when 32-bit integers are used for the counts, 
782 // but to ensure correctness we maintain a shared count on the maximum
783 // number of commit operations that may occur and check that this has 
784 // not increased by more than 2^32 during a commit.
785
786 #define TOKEN_BATCH_SIZE 1024
787
788 static volatile StgInt64 max_commits = 0;
789
790 #if defined(THREADED_RTS)
791 static volatile StgBool token_locked = FALSE;
792
793 static void getTokenBatch(Capability *cap) {
794   while (cas((void *)&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
795   max_commits += TOKEN_BATCH_SIZE;
796   cap -> transaction_tokens = TOKEN_BATCH_SIZE;
797   token_locked = FALSE;
798 }
799
800 static void getToken(Capability *cap) {
801   if (cap -> transaction_tokens == 0) {
802     getTokenBatch(cap);
803   }
804   cap -> transaction_tokens --;
805 }
806 #else
807 static void getToken(Capability *cap STG_UNUSED) {
808   // Nothing
809 }
810 #endif
811
812 /*......................................................................*/
813
814 StgTRecHeader *stmStartTransaction(Capability *cap,
815                                    StgTRecHeader *outer) {
816   StgTRecHeader *t;
817   TRACE("%p : stmStartTransaction with %d tokens\n", 
818         outer, 
819         cap -> transaction_tokens);
820
821   getToken(cap);
822
823   t = alloc_stg_trec_header(cap, outer);
824   TRACE("%p : stmStartTransaction()=%p\n", outer, t);
825   return t;
826 }
827
828 /*......................................................................*/
829
830 void stmAbortTransaction(Capability *cap,
831                          StgTRecHeader *trec) {
832   TRACE("%p : stmAbortTransaction\n", trec);
833   ASSERT (trec != NO_TREC);
834   ASSERT ((trec -> state == TREC_ACTIVE) || 
835           (trec -> state == TREC_WAITING) ||
836           (trec -> state == TREC_CONDEMNED));
837
838   lock_stm(trec);
839   if (trec -> state == TREC_WAITING) {
840     ASSERT (trec -> enclosing_trec == NO_TREC);
841     TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
842     remove_wait_queue_entries_for_trec(cap, trec);
843   } 
844   trec -> state = TREC_ABORTED;
845   unlock_stm(trec);
846
847   free_stg_trec_header(cap, trec);
848
849   TRACE("%p : stmAbortTransaction done\n", trec);
850 }
851
852 /*......................................................................*/
853
854 void stmCondemnTransaction(Capability *cap,
855                            StgTRecHeader *trec) {
856   TRACE("%p : stmCondemnTransaction\n", trec);
857   ASSERT (trec != NO_TREC);
858   ASSERT ((trec -> state == TREC_ACTIVE) || 
859           (trec -> state == TREC_WAITING) ||
860           (trec -> state == TREC_CONDEMNED));
861
862   lock_stm(trec);
863   if (trec -> state == TREC_WAITING) {
864     ASSERT (trec -> enclosing_trec == NO_TREC);
865     TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
866     remove_wait_queue_entries_for_trec(cap, trec);
867   } 
868   trec -> state = TREC_CONDEMNED;
869   unlock_stm(trec);
870
871   TRACE("%p : stmCondemnTransaction done\n", trec);
872 }
873
874 /*......................................................................*/
875
876 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
877   StgTRecHeader *outer;
878   TRACE("%p : stmGetEnclosingTRec\n", trec);
879   outer = trec -> enclosing_trec;
880   TRACE("%p : stmGetEnclosingTRec()=%p\n", trec, outer);
881   return outer;
882 }
883
884 /*......................................................................*/
885
886 StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
887   StgTRecHeader *t;
888   StgBool result;
889
890   TRACE("%p : stmValidateNestOfTransactions\n", trec);
891   ASSERT(trec != NO_TREC);
892   ASSERT((trec -> state == TREC_ACTIVE) || 
893          (trec -> state == TREC_WAITING) ||
894          (trec -> state == TREC_CONDEMNED));
895
896   lock_stm(trec);
897
898   t = trec;
899   result = TRUE;
900   while (t != NO_TREC) {
901     result &= validate_and_acquire_ownership(t, TRUE, FALSE);
902     t = t -> enclosing_trec;
903   }
904
905   if (!result && trec -> state != TREC_WAITING) {
906     trec -> state = TREC_CONDEMNED; 
907   }
908
909   unlock_stm(trec);
910
911   TRACE("%p : stmValidateNestOfTransactions()=%d\n", trec, result);
912   return result;
913 }
914
915 /*......................................................................*/
916
917 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
918   int result;
919   StgInt64 max_commits_at_start = max_commits;
920
921   TRACE("%p : stmCommitTransaction()\n", trec);
922   ASSERT (trec != NO_TREC);
923
924   lock_stm(trec);
925
926   ASSERT (trec -> enclosing_trec == NO_TREC);
927   ASSERT ((trec -> state == TREC_ACTIVE) || 
928           (trec -> state == TREC_CONDEMNED));
929
930   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
931   if (result) {
932     // We now know that all the updated locations hold their expected values.
933     ASSERT (trec -> state == TREC_ACTIVE);
934
935     if (use_read_phase) {
936       TRACE("%p : doing read check\n", trec);
937       result = check_read_only(trec);
938       TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed");
939
940       StgInt64 max_commits_at_end = max_commits;
941       StgInt64 max_concurrent_commits;
942       max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
943                                 (n_capabilities * TOKEN_BATCH_SIZE));
944       if (((max_concurrent_commits >> 32) > 0) || shake()) {
945         result = FALSE;
946       }
947     }
948     
949     if (result) {
950       // We now know that all of the read-only locations held their exepcted values
951       // at the end of the call to validate_and_acquire_ownership.  This forms the
952       // linearization point of the commit.
953       
954       FOR_EACH_ENTRY(trec, e, {
955         StgTVar *s;
956         s = e -> tvar;
957         if (e -> new_value != e -> expected_value) {
958           // Entry is an update: write the value back to the TVar, unlocking it if
959           // necessary.
960
961           ACQ_ASSERT(tvar_is_locked(s, trec));
962           TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
963           unpark_waiters_on(cap,s);
964           IF_STM_FG_LOCKS({
965             s -> num_updates ++;
966           });
967           unlock_tvar(trec, s, e -> new_value, TRUE);
968         } 
969         ACQ_ASSERT(!tvar_is_locked(s, trec));
970       });
971     } else {
972       revert_ownership(trec, FALSE);
973     }
974   } 
975
976   unlock_stm(trec);
977
978   free_stg_trec_header(cap, trec);
979
980   TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
981
982   return result;
983 }
984
985 /*......................................................................*/
986
987 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
988   StgTRecHeader *et;
989   int result;
990   ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
991   TRACE("%p : stmCommitNestedTransaction() into %p\n", trec, trec -> enclosing_trec);
992   ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
993
994   lock_stm(trec);
995
996   et = trec -> enclosing_trec;
997   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
998   if (result) {
999     // We now know that all the updated locations hold their expected values.
1000
1001     if (use_read_phase) {
1002       TRACE("%p : doing read check\n", trec);
1003       result = check_read_only(trec);
1004     }
1005     if (result) {
1006       // We now know that all of the read-only locations held their exepcted values
1007       // at the end of the call to validate_and_acquire_ownership.  This forms the
1008       // linearization point of the commit.
1009
1010       if (result) {
1011         TRACE("%p : read-check succeeded\n", trec);
1012         FOR_EACH_ENTRY(trec, e, {
1013           // Merge each entry into the enclosing transaction record, release all
1014           // locks.
1015
1016           StgTVar *s;
1017           s = e -> tvar;
1018           if (entry_is_update(e)) {
1019             unlock_tvar(trec, s, e -> expected_value, FALSE);
1020           }
1021           merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
1022           ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
1023         });
1024       } else {
1025         revert_ownership(trec, FALSE);
1026       }
1027     }
1028   } 
1029
1030   unlock_stm(trec);
1031
1032   free_stg_trec_header(cap, trec);
1033
1034   TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
1035
1036   return result;
1037 }
1038
1039 /*......................................................................*/
1040
1041 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
1042   int result;
1043   TRACE("%p : stmWait(%p)\n", trec, tso);
1044   ASSERT (trec != NO_TREC);
1045   ASSERT (trec -> enclosing_trec == NO_TREC);
1046   ASSERT ((trec -> state == TREC_ACTIVE) || 
1047           (trec -> state == TREC_CONDEMNED));
1048
1049   lock_stm(trec);
1050   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
1051   if (result) {
1052     // The transaction is valid so far so we can actually start waiting.
1053     // (Otherwise the transaction was not valid and the thread will have to
1054     // retry it).
1055
1056     // Put ourselves to sleep.  We retain locks on all the TVars involved
1057     // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
1058     // in the TSO, (c) TREC_WAITING in the Trec.  
1059     build_wait_queue_entries_for_trec(cap, tso, trec);
1060     park_tso(tso);
1061     trec -> state = TREC_WAITING;
1062
1063     // We haven't released ownership of the transaction yet.  The TSO
1064     // has been put on the wait queue for the TVars it is waiting for,
1065     // but we haven't yet tidied up the TSO's stack and made it safe
1066     // to wake up the TSO.  Therefore, we must wait until the TSO is
1067     // safe to wake up before we release ownership - when all is well,
1068     // the runtime will call stmWaitUnlock() below, with the same
1069     // TRec.
1070
1071   } else {
1072     unlock_stm(trec);
1073     free_stg_trec_header(cap, trec);
1074   }
1075
1076   TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
1077   return result;
1078 }
1079
1080
1081 void
1082 stmWaitUnlock(Capability *cap STG_UNUSED, StgTRecHeader *trec) {
1083     revert_ownership(trec, TRUE);
1084     unlock_stm(trec);
1085 }
1086
1087 /*......................................................................*/
1088
1089 StgBool stmReWait(Capability *cap, StgTSO *tso) {
1090   int result;
1091   StgTRecHeader *trec = tso->trec;
1092
1093   TRACE("%p : stmReWait\n", trec);
1094   ASSERT (trec != NO_TREC);
1095   ASSERT (trec -> enclosing_trec == NO_TREC);
1096   ASSERT ((trec -> state == TREC_WAITING) || 
1097           (trec -> state == TREC_CONDEMNED));
1098
1099   lock_stm(trec);
1100   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
1101   TRACE("%p : validation %s\n", trec, result ? "succeeded" : "failed");
1102   if (result) {
1103     // The transaction remains valid -- do nothing because it is already on
1104     // the wait queues
1105     ASSERT (trec -> state == TREC_WAITING);
1106     park_tso(tso);
1107     revert_ownership(trec, TRUE);
1108   } else {
1109     // The transcation has become invalid.  We can now remove it from the wait
1110     // queues.
1111     if (trec -> state != TREC_CONDEMNED) {
1112       remove_wait_queue_entries_for_trec (cap, trec);
1113     }
1114     free_stg_trec_header(cap, trec);
1115   }
1116   unlock_stm(trec);
1117
1118   TRACE("%p : stmReWait()=%d\n", trec, result);
1119   return result;
1120 }
1121
1122 /*......................................................................*/
1123
1124 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
1125   TRecEntry *result = NULL;
1126
1127   TRACE("%p : get_entry_for TVar %p\n", trec, tvar);
1128   ASSERT(trec != NO_TREC);
1129
1130   do {
1131     FOR_EACH_ENTRY(trec, e, {
1132       if (e -> tvar == tvar) {
1133         result = e;
1134         if (in != NULL) {
1135           *in = trec;
1136         }
1137         BREAK_FOR_EACH;
1138       }
1139     });
1140     trec = trec -> enclosing_trec;
1141   } while (result == NULL && trec != NO_TREC);
1142
1143   return result;    
1144 }
1145
1146 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
1147   StgClosure *result;
1148   result = tvar -> current_value;
1149
1150 #if defined(STM_FG_LOCKS)
1151   while (GET_INFO(result) == &stg_TREC_HEADER_info) {
1152     TRACE("%p : read_current_value(%p) saw %p\n", trec, tvar, result);
1153     result = tvar -> current_value;
1154   }
1155 #endif
1156
1157   TRACE("%p : read_current_value(%p)=%p\n", trec, tvar, result);
1158   return result;
1159 }
1160
1161 /*......................................................................*/
1162
1163 StgClosure *stmReadTVar(Capability *cap,
1164                         StgTRecHeader *trec, 
1165                         StgTVar *tvar) {
1166   StgTRecHeader *entry_in;
1167   StgClosure *result = NULL;
1168   TRecEntry *entry = NULL;
1169   TRACE("%p : stmReadTVar(%p)\n", trec, tvar);
1170   ASSERT (trec != NO_TREC);
1171   ASSERT (trec -> state == TREC_ACTIVE || 
1172           trec -> state == TREC_CONDEMNED);
1173
1174   entry = get_entry_for(trec, tvar, &entry_in);
1175
1176   if (entry != NULL) {
1177     if (entry_in == trec) {
1178       // Entry found in our trec
1179       result = entry -> new_value;
1180     } else {
1181       // Entry found in another trec
1182       TRecEntry *new_entry = get_new_entry(cap, trec);
1183       new_entry -> tvar = tvar;
1184       new_entry -> expected_value = entry -> expected_value;
1185       new_entry -> new_value = entry -> new_value;
1186       result = new_entry -> new_value;
1187     } 
1188   } else {
1189     // No entry found
1190     StgClosure *current_value = read_current_value(trec, tvar);
1191     TRecEntry *new_entry = get_new_entry(cap, trec);
1192     new_entry -> tvar = tvar;
1193     new_entry -> expected_value = current_value;
1194     new_entry -> new_value = current_value;
1195     result = current_value;
1196   }
1197
1198   TRACE("%p : stmReadTVar(%p)=%p\n", trec, tvar, result);
1199   return result;
1200 }
1201
1202 /*......................................................................*/
1203
1204 void stmWriteTVar(Capability *cap,
1205                   StgTRecHeader *trec,
1206                   StgTVar *tvar, 
1207                   StgClosure *new_value) {
1208
1209   StgTRecHeader *entry_in;
1210   TRecEntry *entry = NULL;
1211   TRACE("%p : stmWriteTVar(%p, %p)\n", trec, tvar, new_value);
1212   ASSERT (trec != NO_TREC);
1213   ASSERT (trec -> state == TREC_ACTIVE || 
1214           trec -> state == TREC_CONDEMNED);
1215
1216   entry = get_entry_for(trec, tvar, &entry_in);
1217
1218   if (entry != NULL) {
1219     if (entry_in == trec) {
1220       // Entry found in our trec
1221       entry -> new_value = new_value;
1222     } else {
1223       // Entry found in another trec
1224       TRecEntry *new_entry = get_new_entry(cap, trec);
1225       new_entry -> tvar = tvar;
1226       new_entry -> expected_value = entry -> expected_value;
1227       new_entry -> new_value = new_value;
1228     } 
1229   } else {
1230     // No entry found
1231     StgClosure *current_value = read_current_value(trec, tvar);
1232     TRecEntry *new_entry = get_new_entry(cap, trec);
1233     new_entry -> tvar = tvar;
1234     new_entry -> expected_value = current_value;
1235     new_entry -> new_value = new_value;
1236   }
1237
1238   TRACE("%p : stmWriteTVar done\n", trec);
1239 }
1240
1241 /*......................................................................*/
1242
1243 StgTVar *stmNewTVar(Capability *cap,
1244                     StgClosure *new_value) {
1245   StgTVar *result;
1246   result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar));
1247   SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
1248   result -> current_value = new_value;
1249   result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
1250 #if defined(THREADED_RTS)
1251   result -> num_updates = 0;
1252 #endif
1253   return result;
1254 }
1255
1256 /*......................................................................*/