Fix scoped type variables for expression type signatures
[ghc-hetmet.git] / rts / STM.c
1 /* -----------------------------------------------------------------------------
2  * (c) The GHC Team 1998-2005
3  * 
4  * STM implementation.
5  *
6  * Overview
7  * --------
8  *
9  * See the PPoPP 2005 paper "Composable memory transactions".  In summary, 
10  * each transcation has a TRec (transaction record) holding entries for each of the
11  * TVars (transactional variables) that it has accessed.  Each entry records
12  * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
13  * the transaction wants to write to the TVar, (d) during commit, the identity of
14  * the TRec that wrote the expected value.  
15  *
16  * Separate TRecs are used for each level in a nest of transactions.  This allows
17  * a nested transaction to be aborted without condemning its enclosing transactions.
18  * This is needed in the implementation of catchRetry.  Note that the "expected value"
19  * in a nested transaction's TRec is the value expected to be *held in memory* if
20  * the transaction commits -- not the "new value" stored in one of the enclosing
21  * transactions.  This means that validation can be done without searching through
22  * a nest of TRecs.
23  *
24  * Concurrency control
25  * -------------------
26  *
27  * Three different concurrency control schemes can be built according to the settings
28  * in STM.h:
29  * 
30  * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
31  * In the Haskell RTS this means it is suitable only for non-THREADED_RTS builds.
32  *
33  * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
34  * an invocation on the STM interface.  Note that this does not mean that 
35  * transactions are simply serialized -- the lock is only held *within* the 
36  * implementation of stmCommitTransaction, stmWait etc.
37  *
38  * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
39  * and, when committing a transaction, no locks are acquired for TVars that have
40  * been read but not updated.
41  *
42  * Concurrency control is implemented in the functions:
43  *
44  *    lock_stm
45  *    unlock_stm
46  *    lock_tvar / cond_lock_tvar
47  *    unlock_tvar
48  *
49  * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the 
50  * implementation of these functions.  
51  *
52  * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
53  * using STM_CG_LOCK, and otherwise they are no-ops.
54  *
55  * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they 
56  * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
57  * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
58  * builds).  This is because locking a TVar is implemented by writing the lock
59  * holder's TRec into the TVar's current_value field:
60  *
61  *   lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value 
62  *               it contained.
63  *
64  *   cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it 
65  *               contains a specified value.  Return TRUE if this succeeds,
66  *               FALSE otherwise.
67  *
68  *   unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
69  *               storing a specified value in place of the lock entry.
70  *
71  * Using these operations, the typcial pattern of a commit/validate/wait operation
72  * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that 
73  * the TVars that were only read from still contain their expected values, 
74  * (d) release the locks on the TVars, writing updates to them in the case of a 
75  * commit, (e) unlock the STM.
76  *
77  * Queues of waiting threads hang off the first_wait_queue_entry field of each
78  * TVar.  This may only be manipulated when holding that TVar's lock.  In
79  * particular, when a thread is putting itself to sleep, it mustn't release
80  * the TVar's lock until it has added itself to the wait queue and marked its
81  * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it.
82  *
83  * ---------------------------------------------------------------------------*/
84
85 #include "PosixSource.h"
86 #include "Rts.h"
87 #include "RtsFlags.h"
88 #include "RtsUtils.h"
89 #include "Schedule.h"
90 #include "SMP.h"
91 #include "STM.h"
92 #include "Storage.h"
93 #include "Trace.h"
94
95 #include <stdlib.h>
96 #include <stdio.h>
97
98 #define TRUE 1
99 #define FALSE 0
100
101 // ACQ_ASSERT is used for assertions which are only required for
102 // THREADED_RTS builds with fine-grained locking.
103
104 #if defined(STM_FG_LOCKS)
105 #define ACQ_ASSERT(_X) ASSERT(_X)
106 #define NACQ_ASSERT(_X) /*Nothing*/
107 #else
108 #define ACQ_ASSERT(_X) /*Nothing*/
109 #define NACQ_ASSERT(_X) ASSERT(_X)
110 #endif
111
112 /*......................................................................*/
113
114 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
115 // unusualy code paths if genuine contention is rare
116
117 #define TRACE(_x...) debugTrace(DEBUG_stm, "STM: " _x)
118
119 #ifdef SHAKE
120 static const int do_shake = TRUE;
121 #else
122 static const int do_shake = FALSE;
123 #endif
124 static int shake_ctr = 0;
125 static int shake_lim = 1;
126
127 static int shake(void) {
128   if (do_shake) {
129     if (((shake_ctr++) % shake_lim) == 0) {
130       shake_ctr = 1;
131       shake_lim ++;
132       return TRUE;
133     } 
134     return FALSE;
135   } else {
136     return FALSE;
137   }
138 }
139
140 /*......................................................................*/
141
142 // Helper macros for iterating over entries within a transaction
143 // record
144
145 #define FOR_EACH_ENTRY(_t,_x,CODE) do {                                         \
146   StgTRecHeader *__t = (_t);                                                    \
147   StgTRecChunk *__c = __t -> current_chunk;                                     \
148   StgWord __limit = __c -> next_entry_idx;                                      \
149   TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld\n", __t, __c, __limit); \
150   while (__c != END_STM_CHUNK_LIST) {                                           \
151     StgWord __i;                                                                \
152     for (__i = 0; __i < __limit; __i ++) {                                      \
153       TRecEntry *_x = &(__c -> entries[__i]);                                   \
154       do { CODE } while (0);                                                    \
155     }                                                                           \
156     __c = __c -> prev_chunk;                                                    \
157     __limit = TREC_CHUNK_NUM_ENTRIES;                                           \
158   }                                                                             \
159  exit_for_each:                                                                 \
160   if (FALSE) goto exit_for_each;                                                \
161 } while (0)
162
163 #define BREAK_FOR_EACH goto exit_for_each
164      
165 /*......................................................................*/
166
167 // if REUSE_MEMORY is defined then attempt to re-use descriptors, log chunks,
168 // and wait queue entries without GC
169
170 #define REUSE_MEMORY
171
172 /*......................................................................*/
173
174 #define IF_STM_UNIPROC(__X)  do { } while (0)
175 #define IF_STM_CG_LOCK(__X)  do { } while (0)
176 #define IF_STM_FG_LOCKS(__X) do { } while (0)
177
178 #if defined(STM_UNIPROC)
179 #undef IF_STM_UNIPROC
180 #define IF_STM_UNIPROC(__X)  do { __X } while (0)
181 static const StgBool use_read_phase = FALSE;
182
183 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
184   TRACE("%p : lock_stm()\n", trec);
185 }
186
187 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
188   TRACE("%p : unlock_stm()\n", trec);
189 }
190
191 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
192                              StgTVar *s STG_UNUSED) {
193   StgClosure *result;
194   TRACE("%p : lock_tvar(%p)\n", trec, s);
195   result = s -> current_value;
196   return result;
197 }
198
199 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
200                         StgTVar *s STG_UNUSED,
201                         StgClosure *c,
202                         StgBool force_update) {
203   TRACE("%p : unlock_tvar(%p)\n", trec, s);
204   if (force_update) {
205     s -> current_value = c;
206   }
207 }
208
209 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
210                               StgTVar *s STG_UNUSED,
211                               StgClosure *expected) {
212   StgClosure *result;
213   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
214   result = s -> current_value;
215   TRACE("%p : %s\n", trec, (result == expected) ? "success" : "failure");
216   return (result == expected);
217 }
218 #endif
219
220 #if defined(STM_CG_LOCK) /*........................................*/
221
222 #undef IF_STM_CG_LOCK
223 #define IF_STM_CG_LOCK(__X)  do { __X } while (0)
224 static const StgBool use_read_phase = FALSE;
225 static volatile StgTRecHeader *smp_locked = NULL;
226
227 static void lock_stm(StgTRecHeader *trec) {
228   while (cas(&smp_locked, NULL, trec) != NULL) { }
229   TRACE("%p : lock_stm()\n", trec);
230 }
231
232 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
233   TRACE("%p : unlock_stm()\n", trec);
234   ASSERT (smp_locked == trec);
235   smp_locked = 0;
236 }
237
238 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
239                              StgTVar *s STG_UNUSED) {
240   StgClosure *result;
241   TRACE("%p : lock_tvar(%p)\n", trec, s);
242   ASSERT (smp_locked == trec);
243   result = s -> current_value;
244   return result;
245 }
246
247 static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
248                          StgTVar *s STG_UNUSED,
249                          StgClosure *c,
250                          StgBool force_update) {
251   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
252   ASSERT (smp_locked == trec);
253   if (force_update) {
254     s -> current_value = c;
255   }
256 }
257
258 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
259                                StgTVar *s STG_UNUSED,
260                                StgClosure *expected) {
261   StgClosure *result;
262   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
263   ASSERT (smp_locked == trec);
264   result = s -> current_value;
265   TRACE("%p : %d\n", result ? "success" : "failure");
266   return (result == expected);
267 }
268 #endif
269
270 #if defined(STM_FG_LOCKS) /*...................................*/
271
272 #undef IF_STM_FG_LOCKS
273 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
274 static const StgBool use_read_phase = TRUE;
275
276 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
277   TRACE("%p : lock_stm()\n", trec);
278 }
279
280 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
281   TRACE("%p : unlock_stm()\n", trec);
282 }
283
284 static StgClosure *lock_tvar(StgTRecHeader *trec, 
285                              StgTVar *s STG_UNUSED) {
286   StgClosure *result;
287   TRACE("%p : lock_tvar(%p)\n", trec, s);
288   do {
289     do {
290       result = s -> current_value;
291     } while (GET_INFO(result) == &stg_TREC_HEADER_info);
292   } while (cas((void *)&(s -> current_value),
293                (StgWord)result, (StgWord)trec) != (StgWord)result);
294   return result;
295 }
296
297 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
298                         StgTVar *s,
299                         StgClosure *c,
300                         StgBool force_update STG_UNUSED) {
301   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
302   ASSERT(s -> current_value == (StgClosure *)trec);
303   s -> current_value = c;
304 }
305
306 static StgBool cond_lock_tvar(StgTRecHeader *trec, 
307                               StgTVar *s,
308                               StgClosure *expected) {
309   StgClosure *result;
310   StgWord w;
311   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
312   w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
313   result = (StgClosure *)w;
314   TRACE("%p : %s\n", trec, result ? "success" : "failure");
315   return (result == expected);
316 }
317 #endif
318
319 /*......................................................................*/
320
321 // Helper functions for thread blocking and unblocking
322
323 static void park_tso(StgTSO *tso) {
324   ASSERT(tso -> why_blocked == NotBlocked);
325   tso -> why_blocked = BlockedOnSTM;
326   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
327   TRACE("park_tso on tso=%p\n", tso);
328 }
329
330 static void unpark_tso(Capability *cap, StgTSO *tso) {
331     // We will continue unparking threads while they remain on one of the wait
332     // queues: it's up to the thread itself to remove it from the wait queues
333     // if it decides to do so when it is scheduled.
334
335     // Unblocking a TSO from BlockedOnSTM is done under the TSO lock,
336     // to avoid multiple CPUs unblocking the same TSO, and also to
337     // synchronise with throwTo().
338     lockTSO(tso);
339     if (tso -> why_blocked == BlockedOnSTM) {
340         TRACE("unpark_tso on tso=%p\n", tso);
341         unblockOne(cap,tso);
342     } else {
343         TRACE("spurious unpark_tso on tso=%p\n", tso);
344     }
345     unlockTSO(tso);
346 }
347
348 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
349   StgTVarWaitQueue *q;
350   TRACE("unpark_waiters_on tvar=%p\n", s);
351   for (q = s -> first_wait_queue_entry; 
352        q != END_STM_WAIT_QUEUE; 
353        q = q -> next_queue_entry) {
354     unpark_tso(cap, q -> waiting_tso);
355   }
356 }
357
358 /*......................................................................*/
359
360 // Helper functions for downstream allocation and initialization
361
362 static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap,
363                                                  StgTSO *waiting_tso) {
364   StgTVarWaitQueue *result;
365   result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue));
366   SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
367   result -> waiting_tso = waiting_tso;
368   return result;
369 }
370
371 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
372   StgTRecChunk *result;
373   result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk));
374   SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
375   result -> prev_chunk = END_STM_CHUNK_LIST;
376   result -> next_entry_idx = 0;
377   return result;
378 }
379
380 static StgTRecHeader *new_stg_trec_header(Capability *cap,
381                                           StgTRecHeader *enclosing_trec) {
382   StgTRecHeader *result;
383   result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader));
384   SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
385
386   result -> enclosing_trec = enclosing_trec;
387   result -> current_chunk = new_stg_trec_chunk(cap);
388
389   if (enclosing_trec == NO_TREC) {
390     result -> state = TREC_ACTIVE;
391   } else {
392     ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
393            enclosing_trec -> state == TREC_CONDEMNED);
394     result -> state = enclosing_trec -> state;
395   }
396
397   return result;  
398 }
399
400 /*......................................................................*/
401
402 // Allocation / deallocation functions that retain per-capability lists
403 // of closures that can be re-used
404
405 static StgTVarWaitQueue *alloc_stg_tvar_wait_queue(Capability *cap,
406                                                    StgTSO *waiting_tso) {
407   StgTVarWaitQueue *result = NULL;
408   if (cap -> free_tvar_wait_queues == END_STM_WAIT_QUEUE) {
409     result = new_stg_tvar_wait_queue(cap, waiting_tso);
410   } else {
411     result = cap -> free_tvar_wait_queues;
412     result -> waiting_tso = waiting_tso;
413     cap -> free_tvar_wait_queues = result -> next_queue_entry;
414   }
415   return result;
416 }
417
418 static void free_stg_tvar_wait_queue(Capability *cap,
419                                      StgTVarWaitQueue *wq) {
420 #if defined(REUSE_MEMORY)
421   wq -> next_queue_entry = cap -> free_tvar_wait_queues;
422   cap -> free_tvar_wait_queues = wq;
423 #endif
424 }
425
426 static StgTRecChunk *alloc_stg_trec_chunk(Capability *cap) {
427   StgTRecChunk *result = NULL;
428   if (cap -> free_trec_chunks == END_STM_CHUNK_LIST) {
429     result = new_stg_trec_chunk(cap);
430   } else {
431     result = cap -> free_trec_chunks;
432     cap -> free_trec_chunks = result -> prev_chunk;
433     result -> prev_chunk = END_STM_CHUNK_LIST;
434     result -> next_entry_idx = 0;
435   }
436   return result;
437 }
438
439 static void free_stg_trec_chunk(Capability *cap, 
440                                 StgTRecChunk *c) {
441 #if defined(REUSE_MEMORY)
442   c -> prev_chunk = cap -> free_trec_chunks;
443   cap -> free_trec_chunks = c;
444 #endif
445 }
446
447 static StgTRecHeader *alloc_stg_trec_header(Capability *cap,
448                                             StgTRecHeader *enclosing_trec) {
449   StgTRecHeader *result = NULL;
450   if (cap -> free_trec_headers == NO_TREC) {
451     result = new_stg_trec_header(cap, enclosing_trec);
452   } else {
453     result = cap -> free_trec_headers;
454     cap -> free_trec_headers = result -> enclosing_trec;
455     result -> enclosing_trec = enclosing_trec;
456     result -> current_chunk -> next_entry_idx = 0;
457     if (enclosing_trec == NO_TREC) {
458       result -> state = TREC_ACTIVE;
459     } else {
460       ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
461              enclosing_trec -> state == TREC_CONDEMNED);
462       result -> state = enclosing_trec -> state;
463     }
464   }
465   return result;
466 }
467
468 static void free_stg_trec_header(Capability *cap,
469                                  StgTRecHeader *trec) {
470 #if defined(REUSE_MEMORY)
471   StgTRecChunk *chunk = trec -> current_chunk -> prev_chunk;
472   while (chunk != END_STM_CHUNK_LIST) {
473     StgTRecChunk *prev_chunk = chunk -> prev_chunk;
474     free_stg_trec_chunk(cap, chunk);
475     chunk = prev_chunk;
476   } 
477   trec -> current_chunk -> prev_chunk = END_STM_CHUNK_LIST;
478   trec -> enclosing_trec = cap -> free_trec_headers;
479   cap -> free_trec_headers = trec;
480 #endif
481 }
482
483 /*......................................................................*/
484
485 // Helper functions for managing waiting lists
486
487 static void build_wait_queue_entries_for_trec(Capability *cap,
488                                       StgTSO *tso, 
489                                       StgTRecHeader *trec) {
490   ASSERT(trec != NO_TREC);
491   ASSERT(trec -> enclosing_trec == NO_TREC);
492   ASSERT(trec -> state == TREC_ACTIVE);
493
494   TRACE("%p : build_wait_queue_entries_for_trec()\n", trec);
495
496   FOR_EACH_ENTRY(trec, e, {
497     StgTVar *s;
498     StgTVarWaitQueue *q;
499     StgTVarWaitQueue *fq;
500     s = e -> tvar;
501     TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s);
502     ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
503     NACQ_ASSERT(s -> current_value == e -> expected_value);
504     fq = s -> first_wait_queue_entry;
505     q = alloc_stg_tvar_wait_queue(cap, tso);
506     q -> next_queue_entry = fq;
507     q -> prev_queue_entry = END_STM_WAIT_QUEUE;
508     if (fq != END_STM_WAIT_QUEUE) {
509       fq -> prev_queue_entry = q;
510     }
511     s -> first_wait_queue_entry = q;
512     e -> new_value = (StgClosure *) q;
513   });
514 }
515
516 static void remove_wait_queue_entries_for_trec(Capability *cap,
517                                                StgTRecHeader *trec) {
518   ASSERT(trec != NO_TREC);
519   ASSERT(trec -> enclosing_trec == NO_TREC);
520   ASSERT(trec -> state == TREC_WAITING ||
521          trec -> state == TREC_CONDEMNED);
522
523   TRACE("%p : remove_wait_queue_entries_for_trec()\n", trec);
524
525   FOR_EACH_ENTRY(trec, e, {
526     StgTVar *s;
527     StgTVarWaitQueue *pq;
528     StgTVarWaitQueue *nq;
529     StgTVarWaitQueue *q;
530     s = e -> tvar;
531     StgClosure *saw = lock_tvar(trec, s);
532     q = (StgTVarWaitQueue *) (e -> new_value);
533     TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s);
534     ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
535     nq = q -> next_queue_entry;
536     pq = q -> prev_queue_entry;
537     if (nq != END_STM_WAIT_QUEUE) {
538       nq -> prev_queue_entry = pq;
539     }
540     if (pq != END_STM_WAIT_QUEUE) {
541       pq -> next_queue_entry = nq;
542     } else {
543       ASSERT (s -> first_wait_queue_entry == q);
544       s -> first_wait_queue_entry = nq;
545     }
546     free_stg_tvar_wait_queue(cap, q);
547     unlock_tvar(trec, s, saw, FALSE);
548   });
549 }
550  
551 /*......................................................................*/
552  
553 static TRecEntry *get_new_entry(Capability *cap,
554                                 StgTRecHeader *t) {
555   TRecEntry *result;
556   StgTRecChunk *c;
557   int i;
558
559   c = t -> current_chunk;
560   i = c -> next_entry_idx;
561   ASSERT(c != END_STM_CHUNK_LIST);
562
563   if (i < TREC_CHUNK_NUM_ENTRIES) {
564     // Continue to use current chunk
565     result = &(c -> entries[i]);
566     c -> next_entry_idx ++;
567   } else {
568     // Current chunk is full: allocate a fresh one
569     StgTRecChunk *nc;
570     nc = alloc_stg_trec_chunk(cap);
571     nc -> prev_chunk = c;
572     nc -> next_entry_idx = 1;
573     t -> current_chunk = nc;
574     result = &(nc -> entries[0]);
575   }
576
577   return result;
578 }
579
580 /*......................................................................*/
581
582 static void merge_update_into(Capability *cap,
583                               StgTRecHeader *t,
584                               StgTVar *tvar,
585                               StgClosure *expected_value,
586                               StgClosure *new_value) {
587   int found;
588   
589   // Look for an entry in this trec
590   found = FALSE;
591   FOR_EACH_ENTRY(t, e, {
592     StgTVar *s;
593     s = e -> tvar;
594     if (s == tvar) {
595       found = TRUE;
596       if (e -> expected_value != expected_value) {
597         // Must abort if the two entries start from different values
598         TRACE("%p : entries inconsistent at %p (%p vs %p)\n", 
599               t, tvar, e -> expected_value, expected_value);
600         t -> state = TREC_CONDEMNED;
601       } 
602       e -> new_value = new_value;
603       BREAK_FOR_EACH;
604     }
605   });
606
607   if (!found) {
608     // No entry so far in this trec
609     TRecEntry *ne;
610     ne = get_new_entry(cap, t);
611     ne -> tvar = tvar;
612     ne -> expected_value = expected_value;
613     ne -> new_value = new_value;
614   }
615 }
616
617 /*......................................................................*/
618
619 static StgBool entry_is_update(TRecEntry *e) {
620   StgBool result;
621   result = (e -> expected_value != e -> new_value);
622   return result;
623
624
625 #if defined(STM_FG_LOCKS)
626 static StgBool entry_is_read_only(TRecEntry *e) {
627   StgBool result;
628   result = (e -> expected_value == e -> new_value);
629   return result;
630
631
632 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
633   StgClosure *c;
634   StgBool result;
635   c = s -> current_value;
636   result = (c == (StgClosure *) h);
637   return result;  
638 }
639 #endif
640
641 // revert_ownership : release a lock on a TVar, storing back
642 // the value that it held when the lock was acquired.  "revert_all"
643 // is set in stmWait and stmReWait when we acquired locks on all of 
644 // the TVars involved.  "revert_all" is not set in commit operations
645 // where we don't lock TVars that have been read from but not updated.
646
647 static void revert_ownership(StgTRecHeader *trec STG_UNUSED,
648                              StgBool revert_all STG_UNUSED) {
649 #if defined(STM_FG_LOCKS) 
650   FOR_EACH_ENTRY(trec, e, {
651     if (revert_all || entry_is_update(e)) {
652       StgTVar *s;
653       s = e -> tvar;
654       if (tvar_is_locked(s, trec)) {
655         unlock_tvar(trec, s, e -> expected_value, TRUE);
656       }
657     }
658   });
659 #endif
660 }
661
662 /*......................................................................*/
663
664 // validate_and_acquire_ownership : this performs the twin functions
665 // of checking that the TVars referred to by entries in trec hold the
666 // expected values and:
667 // 
668 //   - locking the TVar (on updated TVars during commit, or all TVars
669 //     during wait)
670 //
671 //   - recording the identity of the TRec who wrote the value seen in the
672 //     TVar (on non-updated TVars during commit).  These values are 
673 //     stashed in the TRec entries and are then checked in check_read_only
674 //     to ensure that an atomic snapshot of all of these locations has been
675 //     seen.
676
677 static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, 
678                                                int acquire_all,
679                                                int retain_ownership) {
680   StgBool result;
681
682   if (shake()) {
683     TRACE("%p : shake, pretending trec is invalid when it may not be\n", trec);
684     return FALSE;
685   }
686
687   ASSERT ((trec -> state == TREC_ACTIVE) || 
688           (trec -> state == TREC_WAITING) ||
689           (trec -> state == TREC_CONDEMNED));
690   result = !((trec -> state) == TREC_CONDEMNED);
691   if (result) {
692     FOR_EACH_ENTRY(trec, e, {
693       StgTVar *s;
694       s = e -> tvar;
695       if (acquire_all || entry_is_update(e)) {
696         TRACE("%p : trying to acquire %p\n", trec, s);
697         if (!cond_lock_tvar(trec, s, e -> expected_value)) {
698           TRACE("%p : failed to acquire %p\n", trec, s);
699           result = FALSE;
700           BREAK_FOR_EACH;
701         }
702       } else {
703         ASSERT(use_read_phase);
704         IF_STM_FG_LOCKS({
705           TRACE("%p : will need to check %p\n", trec, s);
706           if (s -> current_value != e -> expected_value) {
707             TRACE("%p : doesn't match\n", trec);
708             result = FALSE;
709             BREAK_FOR_EACH;
710           }
711           e -> num_updates = s -> num_updates;
712           if (s -> current_value != e -> expected_value) {
713             TRACE("%p : doesn't match (race)\n", trec);
714             result = FALSE;
715             BREAK_FOR_EACH;
716           } else {
717             TRACE("%p : need to check version %ld\n", trec, e -> num_updates);
718           }
719         });
720       }
721     });
722   }
723
724   if ((!result) || (!retain_ownership)) {
725     revert_ownership(trec, acquire_all);
726   }
727   
728   return result;
729 }
730
731 // check_read_only : check that we've seen an atomic snapshot of the
732 // non-updated TVars accessed by a trec.  This checks that the last TRec to
733 // commit an update to the TVar is unchanged since the value was stashed in
734 // validate_and_acquire_ownership.  If no udpate is seen to any TVar than
735 // all of them contained their expected values at the start of the call to
736 // check_read_only.
737 //
738 // The paper "Concurrent programming without locks" (under submission), or
739 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
740 // this kind of algorithm.
741
742 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
743   StgBool result = TRUE;
744
745   ASSERT (use_read_phase);
746   IF_STM_FG_LOCKS({
747     FOR_EACH_ENTRY(trec, e, {
748       StgTVar *s;
749       s = e -> tvar;
750       if (entry_is_read_only(e)) {
751         TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
752         if (s -> num_updates != e -> num_updates) {
753           // ||s -> current_value != e -> expected_value) {
754           TRACE("%p : mismatch\n", trec);
755           result = FALSE;
756           BREAK_FOR_EACH;
757         }
758       }
759     });
760   });
761
762   return result;
763 }
764
765
766 /************************************************************************/
767
768 void stmPreGCHook() {
769   nat i;
770
771   lock_stm(NO_TREC);
772   TRACE("stmPreGCHook\n");
773   for (i = 0; i < n_capabilities; i ++) {
774     Capability *cap = &capabilities[i];
775     cap -> free_tvar_wait_queues = END_STM_WAIT_QUEUE;
776     cap -> free_trec_chunks = END_STM_CHUNK_LIST;
777     cap -> free_trec_headers = NO_TREC;
778   }
779   unlock_stm(NO_TREC);
780 }
781
782 /************************************************************************/
783
784 // check_read_only relies on version numbers held in TVars' "num_updates" 
785 // fields not wrapping around while a transaction is committed.  The version
786 // number is incremented each time an update is committed to the TVar
787 // This is unlikely to wrap around when 32-bit integers are used for the counts, 
788 // but to ensure correctness we maintain a shared count on the maximum
789 // number of commit operations that may occur and check that this has 
790 // not increased by more than 2^32 during a commit.
791
792 #define TOKEN_BATCH_SIZE 1024
793
794 static volatile StgInt64 max_commits = 0;
795
796 #if defined(THREADED_RTS)
797 static volatile StgBool token_locked = FALSE;
798
799 static void getTokenBatch(Capability *cap) {
800   while (cas((void *)&token_locked, FALSE, TRUE) == TRUE) { /* nothing */ }
801   max_commits += TOKEN_BATCH_SIZE;
802   cap -> transaction_tokens = TOKEN_BATCH_SIZE;
803   token_locked = FALSE;
804 }
805
806 static void getToken(Capability *cap) {
807   if (cap -> transaction_tokens == 0) {
808     getTokenBatch(cap);
809   }
810   cap -> transaction_tokens --;
811 }
812 #else
813 static void getToken(Capability *cap STG_UNUSED) {
814   // Nothing
815 }
816 #endif
817
818 /*......................................................................*/
819
820 StgTRecHeader *stmStartTransaction(Capability *cap,
821                                    StgTRecHeader *outer) {
822   StgTRecHeader *t;
823   TRACE("%p : stmStartTransaction with %d tokens\n", 
824         outer, 
825         cap -> transaction_tokens);
826
827   getToken(cap);
828
829   t = alloc_stg_trec_header(cap, outer);
830   TRACE("%p : stmStartTransaction()=%p\n", outer, t);
831   return t;
832 }
833
834 /*......................................................................*/
835
836 void stmAbortTransaction(Capability *cap,
837                          StgTRecHeader *trec) {
838   TRACE("%p : stmAbortTransaction\n", trec);
839   ASSERT (trec != NO_TREC);
840   ASSERT ((trec -> state == TREC_ACTIVE) || 
841           (trec -> state == TREC_WAITING) ||
842           (trec -> state == TREC_CONDEMNED));
843
844   lock_stm(trec);
845   if (trec -> state == TREC_WAITING) {
846     ASSERT (trec -> enclosing_trec == NO_TREC);
847     TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
848     remove_wait_queue_entries_for_trec(cap, trec);
849   } 
850   trec -> state = TREC_ABORTED;
851   unlock_stm(trec);
852
853   free_stg_trec_header(cap, trec);
854
855   TRACE("%p : stmAbortTransaction done\n", trec);
856 }
857
858 /*......................................................................*/
859
860 void stmCondemnTransaction(Capability *cap,
861                            StgTRecHeader *trec) {
862   TRACE("%p : stmCondemnTransaction\n", trec);
863   ASSERT (trec != NO_TREC);
864   ASSERT ((trec -> state == TREC_ACTIVE) || 
865           (trec -> state == TREC_WAITING) ||
866           (trec -> state == TREC_CONDEMNED));
867
868   lock_stm(trec);
869   if (trec -> state == TREC_WAITING) {
870     ASSERT (trec -> enclosing_trec == NO_TREC);
871     TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
872     remove_wait_queue_entries_for_trec(cap, trec);
873   } 
874   trec -> state = TREC_CONDEMNED;
875   unlock_stm(trec);
876
877   TRACE("%p : stmCondemnTransaction done\n", trec);
878 }
879
880 /*......................................................................*/
881
882 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
883   StgTRecHeader *outer;
884   TRACE("%p : stmGetEnclosingTRec\n", trec);
885   outer = trec -> enclosing_trec;
886   TRACE("%p : stmGetEnclosingTRec()=%p\n", trec, outer);
887   return outer;
888 }
889
890 /*......................................................................*/
891
892 StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
893   StgTRecHeader *t;
894   StgBool result;
895
896   TRACE("%p : stmValidateNestOfTransactions\n", trec);
897   ASSERT(trec != NO_TREC);
898   ASSERT((trec -> state == TREC_ACTIVE) || 
899          (trec -> state == TREC_WAITING) ||
900          (trec -> state == TREC_CONDEMNED));
901
902   lock_stm(trec);
903
904   t = trec;
905   result = TRUE;
906   while (t != NO_TREC) {
907     result &= validate_and_acquire_ownership(t, TRUE, FALSE);
908     t = t -> enclosing_trec;
909   }
910
911   if (!result && trec -> state != TREC_WAITING) {
912     trec -> state = TREC_CONDEMNED; 
913   }
914
915   unlock_stm(trec);
916
917   TRACE("%p : stmValidateNestOfTransactions()=%d\n", trec, result);
918   return result;
919 }
920
921 /*......................................................................*/
922
923 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
924   int result;
925   StgInt64 max_commits_at_start = max_commits;
926
927   TRACE("%p : stmCommitTransaction()\n", trec);
928   ASSERT (trec != NO_TREC);
929
930   lock_stm(trec);
931
932   ASSERT (trec -> enclosing_trec == NO_TREC);
933   ASSERT ((trec -> state == TREC_ACTIVE) || 
934           (trec -> state == TREC_CONDEMNED));
935
936   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
937   if (result) {
938     // We now know that all the updated locations hold their expected values.
939     ASSERT (trec -> state == TREC_ACTIVE);
940
941     if (use_read_phase) {
942       TRACE("%p : doing read check\n", trec);
943       result = check_read_only(trec);
944       TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed");
945
946       StgInt64 max_commits_at_end = max_commits;
947       StgInt64 max_concurrent_commits;
948       max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
949                                 (n_capabilities * TOKEN_BATCH_SIZE));
950       if (((max_concurrent_commits >> 32) > 0) || shake()) {
951         result = FALSE;
952       }
953     }
954     
955     if (result) {
956       // We now know that all of the read-only locations held their exepcted values
957       // at the end of the call to validate_and_acquire_ownership.  This forms the
958       // linearization point of the commit.
959       
960       FOR_EACH_ENTRY(trec, e, {
961         StgTVar *s;
962         s = e -> tvar;
963         if (e -> new_value != e -> expected_value) {
964           // Entry is an update: write the value back to the TVar, unlocking it if
965           // necessary.
966
967           ACQ_ASSERT(tvar_is_locked(s, trec));
968           TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
969           unpark_waiters_on(cap,s);
970           IF_STM_FG_LOCKS({
971             s -> num_updates ++;
972           });
973           unlock_tvar(trec, s, e -> new_value, TRUE);
974         } 
975         ACQ_ASSERT(!tvar_is_locked(s, trec));
976       });
977     } else {
978       revert_ownership(trec, FALSE);
979     }
980   } 
981
982   unlock_stm(trec);
983
984   free_stg_trec_header(cap, trec);
985
986   TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
987
988   return result;
989 }
990
991 /*......................................................................*/
992
993 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
994   StgTRecHeader *et;
995   int result;
996   ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
997   TRACE("%p : stmCommitNestedTransaction() into %p\n", trec, trec -> enclosing_trec);
998   ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
999
1000   lock_stm(trec);
1001
1002   et = trec -> enclosing_trec;
1003   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
1004   if (result) {
1005     // We now know that all the updated locations hold their expected values.
1006
1007     if (use_read_phase) {
1008       TRACE("%p : doing read check\n", trec);
1009       result = check_read_only(trec);
1010     }
1011     if (result) {
1012       // We now know that all of the read-only locations held their exepcted values
1013       // at the end of the call to validate_and_acquire_ownership.  This forms the
1014       // linearization point of the commit.
1015
1016       if (result) {
1017         TRACE("%p : read-check succeeded\n", trec);
1018         FOR_EACH_ENTRY(trec, e, {
1019           // Merge each entry into the enclosing transaction record, release all
1020           // locks.
1021
1022           StgTVar *s;
1023           s = e -> tvar;
1024           if (entry_is_update(e)) {
1025             unlock_tvar(trec, s, e -> expected_value, FALSE);
1026           }
1027           merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
1028           ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
1029         });
1030       } else {
1031         revert_ownership(trec, FALSE);
1032       }
1033     }
1034   } 
1035
1036   unlock_stm(trec);
1037
1038   free_stg_trec_header(cap, trec);
1039
1040   TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
1041
1042   return result;
1043 }
1044
1045 /*......................................................................*/
1046
1047 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
1048   int result;
1049   TRACE("%p : stmWait(%p)\n", trec, tso);
1050   ASSERT (trec != NO_TREC);
1051   ASSERT (trec -> enclosing_trec == NO_TREC);
1052   ASSERT ((trec -> state == TREC_ACTIVE) || 
1053           (trec -> state == TREC_CONDEMNED));
1054
1055   lock_stm(trec);
1056   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
1057   if (result) {
1058     // The transaction is valid so far so we can actually start waiting.
1059     // (Otherwise the transaction was not valid and the thread will have to
1060     // retry it).
1061
1062     // Put ourselves to sleep.  We retain locks on all the TVars involved
1063     // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
1064     // in the TSO, (c) TREC_WAITING in the Trec.  
1065     build_wait_queue_entries_for_trec(cap, tso, trec);
1066     park_tso(tso);
1067     trec -> state = TREC_WAITING;
1068
1069     // We haven't released ownership of the transaction yet.  The TSO
1070     // has been put on the wait queue for the TVars it is waiting for,
1071     // but we haven't yet tidied up the TSO's stack and made it safe
1072     // to wake up the TSO.  Therefore, we must wait until the TSO is
1073     // safe to wake up before we release ownership - when all is well,
1074     // the runtime will call stmWaitUnlock() below, with the same
1075     // TRec.
1076
1077   } else {
1078     unlock_stm(trec);
1079     free_stg_trec_header(cap, trec);
1080   }
1081
1082   TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
1083   return result;
1084 }
1085
1086
1087 void
1088 stmWaitUnlock(Capability *cap STG_UNUSED, StgTRecHeader *trec) {
1089     revert_ownership(trec, TRUE);
1090     unlock_stm(trec);
1091 }
1092
1093 /*......................................................................*/
1094
1095 StgBool stmReWait(Capability *cap, StgTSO *tso) {
1096   int result;
1097   StgTRecHeader *trec = tso->trec;
1098
1099   TRACE("%p : stmReWait\n", trec);
1100   ASSERT (trec != NO_TREC);
1101   ASSERT (trec -> enclosing_trec == NO_TREC);
1102   ASSERT ((trec -> state == TREC_WAITING) || 
1103           (trec -> state == TREC_CONDEMNED));
1104
1105   lock_stm(trec);
1106   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
1107   TRACE("%p : validation %s\n", trec, result ? "succeeded" : "failed");
1108   if (result) {
1109     // The transaction remains valid -- do nothing because it is already on
1110     // the wait queues
1111     ASSERT (trec -> state == TREC_WAITING);
1112     park_tso(tso);
1113     revert_ownership(trec, TRUE);
1114   } else {
1115     // The transcation has become invalid.  We can now remove it from the wait
1116     // queues.
1117     if (trec -> state != TREC_CONDEMNED) {
1118       remove_wait_queue_entries_for_trec (cap, trec);
1119     }
1120     free_stg_trec_header(cap, trec);
1121   }
1122   unlock_stm(trec);
1123
1124   TRACE("%p : stmReWait()=%d\n", trec, result);
1125   return result;
1126 }
1127
1128 /*......................................................................*/
1129
1130 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
1131   TRecEntry *result = NULL;
1132
1133   TRACE("%p : get_entry_for TVar %p\n", trec, tvar);
1134   ASSERT(trec != NO_TREC);
1135
1136   do {
1137     FOR_EACH_ENTRY(trec, e, {
1138       if (e -> tvar == tvar) {
1139         result = e;
1140         if (in != NULL) {
1141           *in = trec;
1142         }
1143         BREAK_FOR_EACH;
1144       }
1145     });
1146     trec = trec -> enclosing_trec;
1147   } while (result == NULL && trec != NO_TREC);
1148
1149   return result;    
1150 }
1151
1152 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
1153   StgClosure *result;
1154   result = tvar -> current_value;
1155
1156 #if defined(STM_FG_LOCKS)
1157   while (GET_INFO(result) == &stg_TREC_HEADER_info) {
1158     TRACE("%p : read_current_value(%p) saw %p\n", trec, tvar, result);
1159     result = tvar -> current_value;
1160   }
1161 #endif
1162
1163   TRACE("%p : read_current_value(%p)=%p\n", trec, tvar, result);
1164   return result;
1165 }
1166
1167 /*......................................................................*/
1168
1169 StgClosure *stmReadTVar(Capability *cap,
1170                         StgTRecHeader *trec, 
1171                         StgTVar *tvar) {
1172   StgTRecHeader *entry_in;
1173   StgClosure *result = NULL;
1174   TRecEntry *entry = NULL;
1175   TRACE("%p : stmReadTVar(%p)\n", trec, tvar);
1176   ASSERT (trec != NO_TREC);
1177   ASSERT (trec -> state == TREC_ACTIVE || 
1178           trec -> state == TREC_CONDEMNED);
1179
1180   entry = get_entry_for(trec, tvar, &entry_in);
1181
1182   if (entry != NULL) {
1183     if (entry_in == trec) {
1184       // Entry found in our trec
1185       result = entry -> new_value;
1186     } else {
1187       // Entry found in another trec
1188       TRecEntry *new_entry = get_new_entry(cap, trec);
1189       new_entry -> tvar = tvar;
1190       new_entry -> expected_value = entry -> expected_value;
1191       new_entry -> new_value = entry -> new_value;
1192       result = new_entry -> new_value;
1193     } 
1194   } else {
1195     // No entry found
1196     StgClosure *current_value = read_current_value(trec, tvar);
1197     TRecEntry *new_entry = get_new_entry(cap, trec);
1198     new_entry -> tvar = tvar;
1199     new_entry -> expected_value = current_value;
1200     new_entry -> new_value = current_value;
1201     result = current_value;
1202   }
1203
1204   TRACE("%p : stmReadTVar(%p)=%p\n", trec, tvar, result);
1205   return result;
1206 }
1207
1208 /*......................................................................*/
1209
1210 void stmWriteTVar(Capability *cap,
1211                   StgTRecHeader *trec,
1212                   StgTVar *tvar, 
1213                   StgClosure *new_value) {
1214
1215   StgTRecHeader *entry_in;
1216   TRecEntry *entry = NULL;
1217   TRACE("%p : stmWriteTVar(%p, %p)\n", trec, tvar, new_value);
1218   ASSERT (trec != NO_TREC);
1219   ASSERT (trec -> state == TREC_ACTIVE || 
1220           trec -> state == TREC_CONDEMNED);
1221
1222   entry = get_entry_for(trec, tvar, &entry_in);
1223
1224   if (entry != NULL) {
1225     if (entry_in == trec) {
1226       // Entry found in our trec
1227       entry -> new_value = new_value;
1228     } else {
1229       // Entry found in another trec
1230       TRecEntry *new_entry = get_new_entry(cap, trec);
1231       new_entry -> tvar = tvar;
1232       new_entry -> expected_value = entry -> expected_value;
1233       new_entry -> new_value = new_value;
1234     } 
1235   } else {
1236     // No entry found
1237     StgClosure *current_value = read_current_value(trec, tvar);
1238     TRecEntry *new_entry = get_new_entry(cap, trec);
1239     new_entry -> tvar = tvar;
1240     new_entry -> expected_value = current_value;
1241     new_entry -> new_value = new_value;
1242   }
1243
1244   TRACE("%p : stmWriteTVar done\n", trec);
1245 }
1246
1247 /*......................................................................*/
1248
1249 StgTVar *stmNewTVar(Capability *cap,
1250                     StgClosure *new_value) {
1251   StgTVar *result;
1252   result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar));
1253   SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
1254   result -> current_value = new_value;
1255   result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
1256 #if defined(THREADED_RTS)
1257   result -> num_updates = 0;
1258 #endif
1259   return result;
1260 }
1261
1262 /*......................................................................*/