[project @ 2005-10-27 15:26:06 by simonmar]
[ghc-hetmet.git] / ghc / rts / STM.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 1998-2005
4  * 
5  * STM implementation.
6  *
7  * Overview
8  * --------
9  *
10  * See the PPoPP 2005 paper "Composable memory transactions".  In summary, 
11  * each transcation has a TRec (transaction record) holding entries for each of the
12  * TVars (transactional variables) that it has accessed.  Each entry records
13  * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
14  * the transaction wants to write to the TVar, (d) during commit, the identity of
15  * the TRec that wrote the expected value.  
16  *
17  * Separate TRecs are used for each level in a nest of transactions.  This allows
18  * a nested transaction to be aborted without condemning its enclosing transactions.
19  * This is needed in the implementation of catchRetry.  Note that the "expected value"
20  * in a nested transaction's TRec is the value expected to be *held in memory* if
21  * the transaction commits -- not the "new value" stored in one of the enclosing
22  * transactions.  This means that validation can be done without searching through
23  * a nest of TRecs.
24  *
25  * Concurrency control
26  * -------------------
27  *
28  * Three different concurrency control schemes can be built according to the settings
29  * in STM.h:
30  * 
31  * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
32  * In the Haskell RTS this means it is suitable only for non-SMP builds.
33  *
34  * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
35  * an invocation on the STM interface.  Note that this does not mean that 
36  * transactions are simply serialized -- the lock is only held *within* the 
37  * implementation of stmCommitTransaction, stmWait etc.
38  *
39  * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
40  * and, when committing a transaction, no locks are acquired for TVars that have
41  * been read but not updated.
42  *
43  * Concurrency control is implemented in the functions:
44  *
45  *    lock_stm
46  *    unlock_stm
47  *    lock_tvar / cond_lock_tvar
48  *    unlock_tvar
49  *
50  * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the 
51  * implementation of these functions.  
52  *
53  * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
54  * using STM_CG_LOCK, and otherwise they are no-ops.
55  *
56  * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they 
57  * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
58  * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
59  * builds).  This is because locking a TVar is implemented by writing the lock
60  * holder's TRec into the TVar's current_value field:
61  *
62  *   lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value 
63  *               it contained.
64  *
65  *   cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it 
66  *               contains a specified value.  Return TRUE if this succeeds,
67  *               FALSE otherwise.
68  *
69  *   unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
70  *               storing a specified value in place of the lock entry.
71  *
72  * Using these operations, the typcial pattern of a commit/validate/wait operation
73  * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that 
74  * the TVars that were only read from still contain their expected values, 
75  * (d) release the locks on the TVars, writing updates to them in the case of a 
76  * commit, (e) unlock the STM.
77  *
78  * Queues of waiting threads hang off the first_wait_queue_entry field of each
79  * TVar.  This may only be manipulated when holding that TVar's lock.  In
80  * particular, when a thread is putting itself to sleep, it mustn't release
81  * the TVar's lock until it has added itself to the wait queue and marked its
82  * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it.
83  *
84  * ---------------------------------------------------------------------------*/
85
86 #include "PosixSource.h"
87 #include "Rts.h"
88 #include "RtsFlags.h"
89 #include "RtsUtils.h"
90 #include "Schedule.h"
91 #include "SMP.h"
92 #include "STM.h"
93 #include "Storage.h"
94
95 #include <stdlib.h>
96 #include <stdio.h>
97
98 #define TRUE 1
99 #define FALSE 0
100
101 // ACQ_ASSERT is used for assertions which are only required for SMP builds with
102 // fine-grained locking. 
103
104 #if defined(STM_FG_LOCKS)
105 #define ACQ_ASSERT(_X) ASSERT(_X)
106 #define NACQ_ASSERT(_X) /*Nothing*/
107 #else
108 #define ACQ_ASSERT(_X) /*Nothing*/
109 #define NACQ_ASSERT(_X) ASSERT(_X)
110 #endif
111
112 /*......................................................................*/
113
114 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
115 // unusualy code paths if genuine contention is rare
116
117 #if defined(DEBUG)
118 #define SHAKE
119 #define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
120 #else
121 #define TRACE(_x...) /*Nothing*/
122 #endif
123
124 #ifdef SHAKE
125 static const int do_shake = TRUE;
126 #else
127 static const int do_shake = FALSE;
128 #endif
129 static int shake_ctr = 0;
130 static int shake_lim = 1;
131
132 static int shake(void) {
133   if (do_shake) {
134     if (((shake_ctr++) % shake_lim) == 0) {
135       shake_ctr = 1;
136       shake_lim ++;
137       return TRUE;
138     } 
139     return FALSE;
140   } else {
141     return FALSE;
142   }
143 }
144
145 /*......................................................................*/
146
147 // Helper macros for iterating over entries within a transaction
148 // record
149
150 #define FOR_EACH_ENTRY(_t,_x,CODE) do {                                         \
151   StgTRecHeader *__t = (_t);                                                    \
152   StgTRecChunk *__c = __t -> current_chunk;                                     \
153   StgWord __limit = __c -> next_entry_idx;                                      \
154   TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld\n", __t, __c, __limit); \
155   while (__c != END_STM_CHUNK_LIST) {                                           \
156     StgWord __i;                                                                \
157     for (__i = 0; __i < __limit; __i ++) {                                      \
158       TRecEntry *_x = &(__c -> entries[__i]);                                   \
159       do { CODE } while (0);                                                    \
160     }                                                                           \
161     __c = __c -> prev_chunk;                                                    \
162     __limit = TREC_CHUNK_NUM_ENTRIES;                                           \
163   }                                                                             \
164  exit_for_each:                                                                 \
165   if (FALSE) goto exit_for_each;                                                \
166 } while (0)
167
168 #define BREAK_FOR_EACH goto exit_for_each
169      
170 /*......................................................................*/
171
172 #define IF_STM_UNIPROC(__X)  do { } while (0)
173 #define IF_STM_CG_LOCK(__X)  do { } while (0)
174 #define IF_STM_FG_LOCKS(__X) do { } while (0)
175
176 #if defined(STM_UNIPROC)
177 #undef IF_STM_UNIPROC
178 #define IF_STM_UNIPROC(__X)  do { __X } while (0)
179 static const StgBool use_read_phase = FALSE;
180
181 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
182   TRACE("%p : lock_stm()\n", trec);
183 }
184
185 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
186   TRACE("%p : unlock_stm()\n", trec);
187 }
188
189 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
190                              StgTVar *s STG_UNUSED) {
191   StgClosure *result;
192   TRACE("%p : lock_tvar(%p)\n", trec, s);
193   result = s -> current_value;
194   return result;
195 }
196
197 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
198                         StgTVar *s STG_UNUSED,
199                         StgClosure *c,
200                         StgBool force_update) {
201   TRACE("%p : unlock_tvar(%p)\n", trec, s);
202   if (force_update) {
203     s -> current_value = c;
204   }
205 }
206
207 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
208                               StgTVar *s STG_UNUSED,
209                               StgClosure *expected) {
210   StgClosure *result;
211   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
212   result = s -> current_value;
213   TRACE("%p : %s\n", trec, (result == expected) ? "success" : "failure");
214   return (result == expected);
215 }
216 #endif
217
218 #if defined(STM_CG_LOCK) /*........................................*/
219
220 #undef IF_STM_CG_LOCK
221 #define IF_STM_CG_LOCK(__X)  do { __X } while (0)
222 static const StgBool use_read_phase = FALSE;
223 static volatile StgTRecHeader *smp_locked = NULL;
224
225 static void lock_stm(StgTRecHeader *trec) {
226   while (cas(&smp_locked, NULL, trec) != NULL) { }
227   TRACE("%p : lock_stm()\n", trec);
228 }
229
230 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
231   TRACE("%p : unlock_stm()\n", trec);
232   ASSERT (smp_locked == trec);
233   smp_locked = 0;
234 }
235
236 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
237                              StgTVar *s STG_UNUSED) {
238   StgClosure *result;
239   TRACE("%p : lock_tvar(%p)\n", trec, s);
240   ASSERT (smp_locked == trec);
241   result = s -> current_value;
242   return result;
243 }
244
245 static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
246                          StgTVar *s STG_UNUSED,
247                          StgClosure *c,
248                          StgBool force_update) {
249   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
250   ASSERT (smp_locked == trec);
251   if (force_update) {
252     s -> current_value = c;
253   }
254 }
255
256 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
257                                StgTVar *s STG_UNUSED,
258                                StgClosure *expected) {
259   StgClosure *result;
260   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
261   ASSERT (smp_locked == trec);
262   result = s -> current_value;
263   TRACE("%p : %d\n", result ? "success" : "failure");
264   return (result == expected);
265 }
266 #endif
267
268 #if defined(STM_FG_LOCKS) /*...................................*/
269
270 #undef IF_STM_FG_LOCKS
271 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
272 static const StgBool use_read_phase = TRUE;
273
274 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
275   TRACE("%p : lock_stm()\n", trec);
276 }
277
278 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
279   TRACE("%p : unlock_stm()\n", trec);
280 }
281
282 static StgClosure *lock_tvar(StgTRecHeader *trec, 
283                              StgTVar *s STG_UNUSED) {
284   StgClosure *result;
285   TRACE("%p : lock_tvar(%p)\n", trec, s);
286   do {
287     do {
288       result = s -> current_value;
289     } while (GET_INFO(result) == &stg_TREC_HEADER_info);
290   } while (cas(&(s -> current_value), result, trec) != result);
291   return result;
292 }
293
294 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
295                         StgTVar *s,
296                         StgClosure *c,
297                         StgBool force_update STG_UNUSED) {
298   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
299   ASSERT(s -> current_value == trec);
300   s -> current_value = c;
301 }
302
303 static StgBool cond_lock_tvar(StgTRecHeader *trec, 
304                               StgTVar *s,
305                               StgClosure *expected) {
306   StgClosure *result;
307   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
308   result = cas(&(s -> current_value), expected, trec);
309   TRACE("%p : %s\n", trec, result ? "success" : "failure");
310   return (result == expected);
311 }
312 #endif
313
314 /*......................................................................*/
315
316 // Helper functions for thread blocking and unblocking
317
318 static void park_tso(StgTSO *tso) {
319   ASSERT(tso -> why_blocked == NotBlocked);
320   tso -> why_blocked = BlockedOnSTM;
321   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
322   TRACE("park_tso on tso=%p\n", tso);
323 }
324
325 static void unpark_tso(Capability *cap, StgTSO *tso) {
326   // We will continue unparking threads while they remain on one of the wait
327   // queues: it's up to the thread itself to remove it from the wait queues
328   // if it decides to do so when it is scheduled.
329   if (tso -> why_blocked == BlockedOnSTM) {
330     TRACE("unpark_tso on tso=%p\n", tso);
331     unblockOne(cap,tso);
332   } else {
333     TRACE("spurious unpark_tso on tso=%p\n", tso);
334   }
335 }
336
337 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
338   StgTVarWaitQueue *q;
339   TRACE("unpark_waiters_on tvar=%p\n", s);
340   for (q = s -> first_wait_queue_entry; 
341        q != END_STM_WAIT_QUEUE; 
342        q = q -> next_queue_entry) {
343     unpark_tso(cap, q -> waiting_tso);
344   }
345 }
346
347 /*......................................................................*/
348
349 // Helper functions for allocation and initialization
350
351 static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap,
352                                                  StgTSO *waiting_tso) {
353   StgTVarWaitQueue *result;
354   result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue));
355   SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
356   result -> waiting_tso = waiting_tso;
357   return result;
358 }
359
360 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
361   StgTRecChunk *result;
362   result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk));
363   SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
364   result -> prev_chunk = END_STM_CHUNK_LIST;
365   result -> next_entry_idx = 0;
366   return result;
367 }
368
369 static StgTRecHeader *new_stg_trec_header(Capability *cap,
370                                           StgTRecHeader *enclosing_trec) {
371   StgTRecHeader *result;
372   result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader));
373   SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
374
375   result -> enclosing_trec = enclosing_trec;
376   result -> current_chunk = new_stg_trec_chunk(cap);
377
378   if (enclosing_trec == NO_TREC) {
379     result -> state = TREC_ACTIVE;
380   } else {
381     ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
382            enclosing_trec -> state == TREC_CONDEMNED);
383     result -> state = enclosing_trec -> state;
384   }
385
386   return result;  
387 }
388
389 static StgTVar *new_tvar(Capability *cap,
390                          StgClosure *new_value) {
391   StgTVar *result;
392   result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar));
393   SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
394   result -> current_value = new_value;
395   result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
396 #if defined(SMP)
397   result -> last_update_by = NO_TREC;
398 #endif
399   return result;
400 }
401
402 /*......................................................................*/
403
404 // Helper functions for managing waiting lists
405
406 static void build_wait_queue_entries_for_trec(Capability *cap,
407                                       StgTSO *tso, 
408                                       StgTRecHeader *trec) {
409   ASSERT(trec != NO_TREC);
410   ASSERT(trec -> enclosing_trec == NO_TREC);
411   ASSERT(trec -> state == TREC_ACTIVE);
412
413   TRACE("%p : build_wait_queue_entries_for_trec()\n", trec);
414
415   FOR_EACH_ENTRY(trec, e, {
416     StgTVar *s;
417     StgTVarWaitQueue *q;
418     StgTVarWaitQueue *fq;
419     s = e -> tvar;
420     TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s);
421     ACQ_ASSERT(s -> current_value == trec);
422     NACQ_ASSERT(s -> current_value == e -> expected_value);
423     fq = s -> first_wait_queue_entry;
424     q = new_stg_tvar_wait_queue(cap, tso);
425     q -> next_queue_entry = fq;
426     q -> prev_queue_entry = END_STM_WAIT_QUEUE;
427     if (fq != END_STM_WAIT_QUEUE) {
428       fq -> prev_queue_entry = q;
429     }
430     s -> first_wait_queue_entry = q;
431     e -> new_value = (StgClosure *) q;
432   });
433 }
434
435 static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
436   ASSERT(trec != NO_TREC);
437   ASSERT(trec -> enclosing_trec == NO_TREC);
438   ASSERT(trec -> state == TREC_WAITING ||
439          trec -> state == TREC_CONDEMNED);
440
441   TRACE("%p : remove_wait_queue_entries_for_trec()\n", trec);
442
443   FOR_EACH_ENTRY(trec, e, {
444     StgTVar *s;
445     StgTVarWaitQueue *pq;
446     StgTVarWaitQueue *nq;
447     StgTVarWaitQueue *q;
448     s = e -> tvar;
449     StgClosure *saw = lock_tvar(trec, s);
450     q = (StgTVarWaitQueue *) (e -> new_value);
451     TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s);
452     ACQ_ASSERT(s -> current_value == trec);
453     nq = q -> next_queue_entry;
454     pq = q -> prev_queue_entry;
455     if (nq != END_STM_WAIT_QUEUE) {
456       nq -> prev_queue_entry = pq;
457     }
458     if (pq != END_STM_WAIT_QUEUE) {
459       pq -> next_queue_entry = nq;
460     } else {
461       ASSERT (s -> first_wait_queue_entry == q);
462       s -> first_wait_queue_entry = nq;
463     }
464     unlock_tvar(trec, s, saw, FALSE);
465   });
466 }
467  
468 /*......................................................................*/
469  
470 static TRecEntry *get_new_entry(Capability *cap,
471                                 StgTRecHeader *t) {
472   TRecEntry *result;
473   StgTRecChunk *c;
474   int i;
475
476   c = t -> current_chunk;
477   i = c -> next_entry_idx;
478   ASSERT(c != END_STM_CHUNK_LIST);
479
480   if (i < TREC_CHUNK_NUM_ENTRIES) {
481     // Continue to use current chunk
482     result = &(c -> entries[i]);
483     c -> next_entry_idx ++;
484   } else {
485     // Current chunk is full: allocate a fresh one
486     StgTRecChunk *nc;
487     nc = new_stg_trec_chunk(cap);
488     nc -> prev_chunk = c;
489     nc -> next_entry_idx = 1;
490     t -> current_chunk = nc;
491     result = &(nc -> entries[0]);
492   }
493
494   return result;
495 }
496
497 /*......................................................................*/
498
499 static void merge_update_into(Capability *cap,
500                               StgTRecHeader *t,
501                               StgTVar *tvar,
502                               StgClosure *expected_value,
503                               StgClosure *new_value) {
504   int found;
505   
506   // Look for an entry in this trec
507   found = FALSE;
508   FOR_EACH_ENTRY(t, e, {
509     StgTVar *s;
510     s = e -> tvar;
511     if (s == tvar) {
512       found = TRUE;
513       if (e -> expected_value != expected_value) {
514         // Must abort if the two entries start from different values
515         TRACE("%p : entries inconsistent at %p (%p vs %p)\n", 
516               t, tvar, e -> expected_value, expected_value);
517         t -> state = TREC_CONDEMNED;
518       } 
519       e -> new_value = new_value;
520       BREAK_FOR_EACH;
521     }
522   });
523
524   if (!found) {
525     // No entry so far in this trec
526     TRecEntry *ne;
527     ne = get_new_entry(cap, t);
528     ne -> tvar = tvar;
529     ne -> expected_value = expected_value;
530     ne -> new_value = new_value;
531   }
532 }
533
534 /*......................................................................*/
535
536 static StgBool entry_is_update(TRecEntry *e) {
537   StgBool result;
538   result = (e -> expected_value != e -> new_value);
539   return result;
540
541
542 static StgBool entry_is_read_only(TRecEntry *e) {
543   StgBool result;
544   result = (e -> expected_value == e -> new_value);
545   return result;
546
547
548 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
549   StgClosure *c;
550   StgBool result;
551   c = s -> current_value;
552   result = (c == (StgClosure *) h);
553   return result;  
554 }
555
556 // revert_ownership : release a lock on a TVar, storing back
557 // the value that it held when the lock was acquired.  "revert_all"
558 // is set in stmWait and stmReWait when we acquired locks on all of 
559 // the TVars involved.  "revert_all" is not set in commit operations
560 // where we don't lock TVars that have been read from but not updated.
561
562 static void revert_ownership(StgTRecHeader *trec STG_UNUSED,
563                              StgBool revert_all STG_UNUSED) {
564 #if defined(STM_FG_LOCKS) 
565   FOR_EACH_ENTRY(trec, e, {
566     if (revert_all || entry_is_update(e)) {
567       StgTVar *s;
568       s = e -> tvar;
569       if (tvar_is_locked(s, trec)) {
570         unlock_tvar(trec, s, e -> expected_value, TRUE);
571       }
572     }
573   });
574 #endif
575 }
576
577 /*......................................................................*/
578
579 // validate_and_acquire_ownership : this performs the twin functions
580 // of checking that the TVars referred to by entries in trec hold the
581 // expected values and:
582 // 
583 //   - locking the TVar (on updated TVars during commit, or all TVars
584 //     during wait)
585 //
586 //   - recording the identity of the TRec who wrote the value seen in the
587 //     TVar (on non-updated TVars during commit).  These values are 
588 //     stashed in the TRec entries and are then checked in check_read_only
589 //     to ensure that an atomic snapshot of all of these locations has been
590 //     seen.
591
592 static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, 
593                                                int acquire_all,
594                                                int retain_ownership) {
595   StgBool result;
596
597   if (shake()) {
598     TRACE("%p : shake, pretending trec is invalid when it may not be\n", trec);
599     return FALSE;
600   }
601
602   ASSERT ((trec -> state == TREC_ACTIVE) || 
603           (trec -> state == TREC_WAITING) ||
604           (trec -> state == TREC_CONDEMNED));
605   result = !((trec -> state) == TREC_CONDEMNED);
606   if (result) {
607     FOR_EACH_ENTRY(trec, e, {
608       StgTVar *s;
609       s = e -> tvar;
610       if (acquire_all || entry_is_update(e)) {
611         TRACE("%p : trying to acquire %p\n", trec, s);
612         if (!cond_lock_tvar(trec, s, e -> expected_value)) {
613           TRACE("%p : failed to acquire %p\n", trec, s);
614           result = FALSE;
615           BREAK_FOR_EACH;
616         }
617       } else {
618         ASSERT(use_read_phase);
619         IF_STM_FG_LOCKS({
620           TRACE("%p : will need to check %p\n", trec, s);
621           if (s -> current_value != e -> expected_value) {
622             TRACE("%p : doesn't match\n", trec);
623             result = FALSE;
624             BREAK_FOR_EACH;
625           }
626           e -> saw_update_by = s -> last_update_by;
627           if (s -> current_value != e -> expected_value) {
628             TRACE("%p : doesn't match (race)\n", trec);
629             result = FALSE;
630             BREAK_FOR_EACH;
631           } else {
632             TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by);
633           }
634         });
635       }
636     });
637   }
638
639   if ((!result) || (!retain_ownership)) {
640     revert_ownership(trec, acquire_all);
641   }
642   
643   return result;
644 }
645
646 // check_read_only : check that we've seen an atomic snapshot of the
647 // non-updated TVars accessed by a trec.  This checks that the last TRec to
648 // commit an update to the TVar is unchanged since the value was stashed in
649 // validate_and_acquire_ownership.  If no udpate is seen to any TVar than
650 // all of them contained their expected values at the start of the call to
651 // check_read_only.
652 //
653 // The paper "Concurrent programming without locks" (under submission), or
654 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
655 // this kind of algorithm.
656
657 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
658   StgBool result = TRUE;
659
660   ASSERT (use_read_phase);
661   IF_STM_FG_LOCKS({
662     FOR_EACH_ENTRY(trec, e, {
663       StgTVar *s;
664       s = e -> tvar;
665       if (entry_is_read_only(e)) {
666         TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by);
667         if (s -> last_update_by != e -> saw_update_by) {
668           // ||s -> current_value != e -> expected_value) {
669           TRACE("%p : mismatch\n", trec);
670           result = FALSE;
671           BREAK_FOR_EACH;
672         }
673       }
674     });
675   });
676
677   return result;
678 }
679
680
681 /************************************************************************/
682
683 void stmPreGCHook() {
684   lock_stm(NO_TREC);
685   TRACE("stmPreGCHook\n");
686   unlock_stm(NO_TREC);
687 }
688
689 /************************************************************************/
690
691 void initSTM() {
692   TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
693 }
694
695 /*......................................................................*/
696
697 StgTRecHeader *stmStartTransaction(Capability *cap,
698                                    StgTRecHeader *outer) {
699   StgTRecHeader *t;
700   TRACE("%p : stmStartTransaction\n", outer);
701   t = new_stg_trec_header(cap, outer);
702   TRACE("%p : stmStartTransaction()=%p\n", outer, t);
703   return t;
704 }
705
706 /*......................................................................*/
707
708 void stmAbortTransaction(StgTRecHeader *trec) {
709   TRACE("%p : stmAbortTransaction\n", trec);
710   ASSERT (trec != NO_TREC);
711   ASSERT ((trec -> state == TREC_ACTIVE) || 
712           (trec -> state == TREC_WAITING) ||
713           (trec -> state == TREC_CONDEMNED));
714
715   lock_stm(trec);
716   if (trec -> state == TREC_WAITING) {
717     ASSERT (trec -> enclosing_trec == NO_TREC);
718     TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
719     remove_wait_queue_entries_for_trec(trec);
720   } 
721   trec -> state = TREC_ABORTED;
722   unlock_stm(trec);
723
724   TRACE("%p : stmAbortTransaction done\n", trec);
725 }
726
727 /*......................................................................*/
728
729 void stmCondemnTransaction(StgTRecHeader *trec) {
730   TRACE("%p : stmCondemnTransaction\n", trec);
731   ASSERT (trec != NO_TREC);
732   ASSERT ((trec -> state == TREC_ACTIVE) || 
733           (trec -> state == TREC_WAITING) ||
734           (trec -> state == TREC_CONDEMNED));
735
736   lock_stm(trec);
737   if (trec -> state == TREC_WAITING) {
738     ASSERT (trec -> enclosing_trec == NO_TREC);
739     TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
740     remove_wait_queue_entries_for_trec(trec);
741   } 
742   trec -> state = TREC_CONDEMNED;
743   unlock_stm(trec);
744
745   TRACE("%p : stmCondemnTransaction done\n", trec);
746 }
747
748 /*......................................................................*/
749
750 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
751   StgTRecHeader *outer;
752   TRACE("%p : stmGetEnclosingTRec\n", trec);
753   outer = trec -> enclosing_trec;
754   TRACE("%p : stmGetEnclosingTRec()=%p\n", trec, outer);
755   return outer;
756 }
757
758 /*......................................................................*/
759
760 StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
761   StgTRecHeader *t;
762   StgBool result;
763
764   TRACE("%p : stmValidateNestOfTransactions\n", trec);
765   ASSERT(trec != NO_TREC);
766   ASSERT((trec -> state == TREC_ACTIVE) || 
767          (trec -> state == TREC_WAITING) ||
768          (trec -> state == TREC_CONDEMNED));
769
770   lock_stm(trec);
771
772   t = trec;
773   result = TRUE;
774   while (t != NO_TREC) {
775     result &= validate_and_acquire_ownership(t, TRUE, FALSE);
776     t = t -> enclosing_trec;
777   }
778
779   if (!result && trec -> state != TREC_WAITING) {
780     trec -> state = TREC_CONDEMNED; 
781   }
782
783   unlock_stm(trec);
784
785   TRACE("%p : stmValidateNestOfTransactions()=%d\n", trec, result);
786   return result;
787 }
788
789 /*......................................................................*/
790
791 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
792   int result;
793
794   TRACE("%p : stmCommitTransaction()\n", trec);
795   ASSERT (trec != NO_TREC);
796   ASSERT (trec -> enclosing_trec == NO_TREC);
797   ASSERT ((trec -> state == TREC_ACTIVE) || 
798           (trec -> state == TREC_CONDEMNED));
799
800   lock_stm(trec);
801   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
802   if (result) {
803     // We now know that all the updated locations hold their expected values.
804     ASSERT (trec -> state == TREC_ACTIVE);
805
806     if (use_read_phase) {
807       TRACE("%p : doing read check\n", trec);
808       result = check_read_only(trec);
809       TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed");
810     }
811     
812     if (result) {
813       // We now know that all of the read-only locations held their exepcted values
814       // at the end of the call to validate_and_acquire_ownership.  This forms the
815       // linearization point of the commit.
816       
817       FOR_EACH_ENTRY(trec, e, {
818         StgTVar *s;
819         s = e -> tvar;
820         if (e -> new_value != e -> expected_value) {
821           // Entry is an update: write the value back to the TVar, unlocking it if
822           // necessary.
823
824           ACQ_ASSERT(tvar_is_locked(s, trec));
825           TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
826           unpark_waiters_on(cap,s);
827           IF_STM_FG_LOCKS({
828             s -> last_update_by = trec;
829           });
830           unlock_tvar(trec, s, e -> new_value, TRUE);
831         } 
832         ACQ_ASSERT(!tvar_is_locked(s, trec));
833       });
834     } else {
835       revert_ownership(trec, FALSE);
836     }
837   } 
838
839   unlock_stm(trec);
840
841   TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
842
843   return result;
844 }
845
846 /*......................................................................*/
847
848 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
849   StgTRecHeader *et;
850   int result;
851   ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
852   TRACE("%p : stmCommitNestedTransaction() into %p\n", trec, trec -> enclosing_trec);
853   ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
854
855   lock_stm(trec);
856
857   et = trec -> enclosing_trec;
858   result = validate_and_acquire_ownership(trec, FALSE, TRUE);
859   if (result) {
860     // We now know that all the updated locations hold their expected values.
861
862     if (use_read_phase) {
863       TRACE("%p : doing read check\n", trec);
864       result = check_read_only(trec);
865     }
866     if (result) {
867       // We now know that all of the read-only locations held their exepcted values
868       // at the end of the call to validate_and_acquire_ownership.  This forms the
869       // linearization point of the commit.
870
871       if (result) {
872         TRACE("%p : read-check succeeded\n", trec);
873         FOR_EACH_ENTRY(trec, e, {
874           // Merge each entry into the enclosing transaction record, release all
875           // locks.
876
877           StgTVar *s;
878           s = e -> tvar;
879           if (entry_is_update(e)) {
880             unlock_tvar(trec, s, e -> expected_value, FALSE);
881           }
882           merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
883           ACQ_ASSERT(s -> current_value != trec);
884         });
885       } else {
886         revert_ownership(trec, FALSE);
887       }
888     }
889   } 
890
891   unlock_stm(trec);
892
893   TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
894
895   return result;
896 }
897
898 /*......................................................................*/
899
900 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
901   int result;
902   TRACE("%p : stmWait(%p)\n", trec, tso);
903   ASSERT (trec != NO_TREC);
904   ASSERT (trec -> enclosing_trec == NO_TREC);
905   ASSERT ((trec -> state == TREC_ACTIVE) || 
906           (trec -> state == TREC_CONDEMNED));
907
908   lock_stm(trec);
909   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
910   if (result) {
911     // The transaction is valid so far so we can actually start waiting.
912     // (Otherwise the transaction was not valid and the thread will have to
913     // retry it).
914
915     // Put ourselves to sleep.  We retain locks on all the TVars involved
916     // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
917     // in the TSO, (c) TREC_WAITING in the Trec.  
918     build_wait_queue_entries_for_trec(cap, tso, trec);
919     park_tso(tso);
920     trec -> state = TREC_WAITING;
921
922     // As soon as we start releasing ownership, another thread may find us 
923     // and wake us up.  This may happen even before we have finished 
924     // releasing ownership.
925     revert_ownership(trec, TRUE);
926   }  
927
928   unlock_stm(trec);
929
930   TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
931   return result;
932 }
933
934 /*......................................................................*/
935
936 StgBool stmReWait(StgTSO *tso) {
937   int result;
938   StgTRecHeader *trec = tso->trec;
939
940   TRACE("%p : stmReWait\n", trec);
941   ASSERT (trec != NO_TREC);
942   ASSERT (trec -> enclosing_trec == NO_TREC);
943   ASSERT ((trec -> state == TREC_WAITING) || 
944           (trec -> state == TREC_CONDEMNED));
945
946   lock_stm(trec);
947   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
948   TRACE("%p : validation %s\n", trec, result ? "succeeded" : "failed");
949   if (result) {
950     // The transaction remains valid -- do nothing because it is already on
951     // the wait queues
952     ASSERT (trec -> state == TREC_WAITING);
953     park_tso(tso);
954     revert_ownership(trec, TRUE);
955   } else {
956     // The transcation has become invalid.  We can now remove it from the wait
957     // queues.
958     if (trec -> state != TREC_CONDEMNED) {
959       remove_wait_queue_entries_for_trec (trec);
960     }
961
962   }
963   unlock_stm(trec);
964
965   TRACE("%p : stmReWait()=%d\n", trec, result);
966   return result;
967 }
968
969 /*......................................................................*/
970
971 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
972   TRecEntry *result = NULL;
973
974   TRACE("%p : get_entry_for TVar %p\n", trec, tvar);
975   ASSERT(trec != NO_TREC);
976
977   do {
978     FOR_EACH_ENTRY(trec, e, {
979       if (e -> tvar == tvar) {
980         result = e;
981         if (in != NULL) {
982           *in = trec;
983         }
984         BREAK_FOR_EACH;
985       }
986     });
987     trec = trec -> enclosing_trec;
988   } while (result == NULL && trec != NO_TREC);
989
990   return result;    
991 }
992
993 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
994   StgClosure *result;
995   result = tvar -> current_value;
996
997 #if defined(STM_FG_LOCKS)
998   while (GET_INFO(result) == &stg_TREC_HEADER_info) {
999     TRACE("%p : read_current_value(%p) saw %p\n", trec, tvar, result);
1000     result = tvar -> current_value;
1001   }
1002 #endif
1003
1004   TRACE("%p : read_current_value(%p)=%p\n", trec, tvar, result);
1005   return result;
1006 }
1007
1008 /*......................................................................*/
1009
1010 StgClosure *stmReadTVar(Capability *cap,
1011                         StgTRecHeader *trec, 
1012                         StgTVar *tvar) {
1013   StgTRecHeader *entry_in;
1014   StgClosure *result = NULL;
1015   TRecEntry *entry = NULL;
1016   TRACE("%p : stmReadTVar(%p)\n", trec, tvar);
1017   ASSERT (trec != NO_TREC);
1018   ASSERT (trec -> state == TREC_ACTIVE || 
1019           trec -> state == TREC_CONDEMNED);
1020
1021   entry = get_entry_for(trec, tvar, &entry_in);
1022
1023   if (entry != NULL) {
1024     if (entry_in == trec) {
1025       // Entry found in our trec
1026       result = entry -> new_value;
1027     } else {
1028       // Entry found in another trec
1029       TRecEntry *new_entry = get_new_entry(cap, trec);
1030       new_entry -> tvar = tvar;
1031       new_entry -> expected_value = entry -> expected_value;
1032       new_entry -> new_value = entry -> new_value;
1033       result = new_entry -> new_value;
1034     } 
1035   } else {
1036     // No entry found
1037     StgClosure *current_value = read_current_value(trec, tvar);
1038     TRecEntry *new_entry = get_new_entry(cap, trec);
1039     new_entry -> tvar = tvar;
1040     new_entry -> expected_value = current_value;
1041     new_entry -> new_value = current_value;
1042     result = current_value;
1043   }
1044
1045   TRACE("%p : stmReadTVar(%p)=%p\n", trec, tvar, result);
1046   return result;
1047 }
1048
1049 /*......................................................................*/
1050
1051 void stmWriteTVar(Capability *cap,
1052                   StgTRecHeader *trec,
1053                   StgTVar *tvar, 
1054                   StgClosure *new_value) {
1055
1056   StgTRecHeader *entry_in;
1057   TRecEntry *entry = NULL;
1058   TRACE("%p : stmWriteTVar(%p, %p)\n", trec, tvar, new_value);
1059   ASSERT (trec != NO_TREC);
1060   ASSERT (trec -> state == TREC_ACTIVE || 
1061           trec -> state == TREC_CONDEMNED);
1062
1063   entry = get_entry_for(trec, tvar, &entry_in);
1064
1065   if (entry != NULL) {
1066     if (entry_in == trec) {
1067       // Entry found in our trec
1068       entry -> new_value = new_value;
1069     } else {
1070       // Entry found in another trec
1071       TRecEntry *new_entry = get_new_entry(cap, trec);
1072       new_entry -> tvar = tvar;
1073       new_entry -> expected_value = entry -> expected_value;
1074       new_entry -> new_value = new_value;
1075     } 
1076   } else {
1077     // No entry found
1078     StgClosure *current_value = read_current_value(trec, tvar);
1079     TRecEntry *new_entry = get_new_entry(cap, trec);
1080     new_entry -> tvar = tvar;
1081     new_entry -> expected_value = current_value;
1082     new_entry -> new_value = new_value;
1083   }
1084
1085   TRACE("%p : stmWriteTVar done\n", trec);
1086 }
1087
1088 /*......................................................................*/
1089
1090 StgTVar *stmNewTVar(Capability *cap,
1091                     StgClosure *new_value) {
1092   StgTVar *result;
1093   result = new_tvar(cap, new_value);
1094   return result;
1095 }
1096
1097 /*......................................................................*/
1098