[project @ 2005-11-10 16:14:01 by simonmar]
[ghc-hetmet.git] / ghc / rts / STM.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 1998-2005
4  * 
5  * STM implementation.
6  *
7  * Overview
8  * --------
9  *
10  * See the PPoPP 2005 paper "Composable memory transactions".  In summary, 
11  * each transcation has a TRec (transaction record) holding entries for each of the
12  * TVars (transactional variables) that it has accessed.  Each entry records
13  * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
14  * the transaction wants to write to the TVar, (d) during commit, the identity of
15  * the TRec that wrote the expected value.  
16  *
17  * Separate TRecs are used for each level in a nest of transactions.  This allows
18  * a nested transaction to be aborted without condemning its enclosing transactions.
19  * This is needed in the implementation of catchRetry.  Note that the "expected value"
20  * in a nested transaction's TRec is the value expected to be *held in memory* if
21  * the transaction commits -- not the "new value" stored in one of the enclosing
22  * transactions.  This means that validation can be done without searching through
23  * a nest of TRecs.
24  *
25  * Concurrency control
26  * -------------------
27  *
28  * Three different concurrency control schemes can be built according to the settings
29  * in STM.h:
30  * 
31  * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
32  * In the Haskell RTS this means it is suitable only for non-SMP builds.
33  *
34  * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
35  * an invocation on the STM interface.  Note that this does not mean that 
36  * transactions are simply serialized -- the lock is only held *within* the 
37  * implementation of stmCommitTransaction, stmWait etc.
38  *
39  * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
40  * and, when committing a transaction, no locks are acquired for TVars that have
41  * been read but not updated.
42  *
43  * Concurrency control is implemented in the functions:
44  *
45  *    lock_stm
46  *    unlock_stm
47  *    lock_tvar / cond_lock_tvar
48  *    unlock_tvar
49  *
50  * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the 
51  * implementation of these functions.  
52  *
53  * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
54  * using STM_CG_LOCK, and otherwise they are no-ops.
55  *
56  * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they 
57  * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
58  * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
59  * builds).  This is because locking a TVar is implemented by writing the lock
60  * holder's TRec into the TVar's current_value field:
61  *
62  *   lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value 
63  *               it contained.
64  *
65  *   cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it 
66  *               contains a specified value.  Return TRUE if this succeeds,
67  *               FALSE otherwise.
68  *
69  *   unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
70  *               storing a specified value in place of the lock entry.
71  *
72  * Using these operations, the typcial pattern of a commit/validate/wait operation
73  * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that 
74  * the TVars that were only read from still contain their expected values, 
75  * (d) release the locks on the TVars, writing updates to them in the case of a 
76  * commit, (e) unlock the STM.
77  *
78  * Queues of waiting threads hang off the first_wait_queue_entry field of each
79  * TVar.  This may only be manipulated when holding that TVar's lock.  In
80  * particular, when a thread is putting itself to sleep, it mustn't release
81  * the TVar's lock until it has added itself to the wait queue and marked its
82  * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it.
83  *
84  * ---------------------------------------------------------------------------*/
85
86 #include "PosixSource.h"
87 #include "Rts.h"
88 #include "RtsFlags.h"
89 #include "RtsUtils.h"
90 #include "Schedule.h"
91 #include "SMP.h"
92 #include "STM.h"
93 #include "Storage.h"
94
95 #include <stdlib.h>
96 #include <stdio.h>
97
98 #define TRUE 1
99 #define FALSE 0
100
101 // ACQ_ASSERT is used for assertions which are only required for SMP builds with
102 // fine-grained locking. 
103
104 #if defined(STM_FG_LOCKS)
105 #define ACQ_ASSERT(_X) ASSERT(_X)
106 #define NACQ_ASSERT(_X) /*Nothing*/
107 #else
108 #define ACQ_ASSERT(_X) /*Nothing*/
109 #define NACQ_ASSERT(_X) ASSERT(_X)
110 #endif
111
112 /*......................................................................*/
113
114 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
115 // unusualy code paths if genuine contention is rare
116
117 #if defined(DEBUG)
118 #define SHAKE
119 #if defined(THREADED_RTS)
120 #define TRACE(_x...) IF_DEBUG(stm, debugBelch("STM  (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()); debugBelch ( _x ))
121 #else
122 #define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
123 #endif
124 #else
125 #define TRACE(_x...) /*Nothing*/
126 #endif
127
128 #ifdef SHAKE
129 static const int do_shake = TRUE;
130 #else
131 static const int do_shake = FALSE;
132 #endif
133 static int shake_ctr = 0;
134 static int shake_lim = 1;
135
136 static int shake(void) {
137   if (do_shake) {
138     if (((shake_ctr++) % shake_lim) == 0) {
139       shake_ctr = 1;
140       shake_lim ++;
141       return TRUE;
142     } 
143     return FALSE;
144   } else {
145     return FALSE;
146   }
147 }
148
149 /*......................................................................*/
150
151 // Helper macros for iterating over entries within a transaction
152 // record
153
154 #define FOR_EACH_ENTRY(_t,_x,CODE) do {                                         \
155   StgTRecHeader *__t = (_t);                                                    \
156   StgTRecChunk *__c = __t -> current_chunk;                                     \
157   StgWord __limit = __c -> next_entry_idx;                                      \
158   TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld\n", __t, __c, __limit); \
159   while (__c != END_STM_CHUNK_LIST) {                                           \
160     StgWord __i;                                                                \
161     for (__i = 0; __i < __limit; __i ++) {                                      \
162       TRecEntry *_x = &(__c -> entries[__i]);                                   \
163       do { CODE } while (0);                                                    \
164     }                                                                           \
165     __c = __c -> prev_chunk;                                                    \
166     __limit = TREC_CHUNK_NUM_ENTRIES;                                           \
167   }                                                                             \
168  exit_for_each:                                                                 \
169   if (FALSE) goto exit_for_each;                                                \
170 } while (0)
171
172 #define BREAK_FOR_EACH goto exit_for_each
173      
174 /*......................................................................*/
175
176 #define IF_STM_UNIPROC(__X)  do { } while (0)
177 #define IF_STM_CG_LOCK(__X)  do { } while (0)
178 #define IF_STM_FG_LOCKS(__X) do { } while (0)
179
180 #if defined(STM_UNIPROC)
181 #undef IF_STM_UNIPROC
182 #define IF_STM_UNIPROC(__X)  do { __X } while (0)
183 static const StgBool use_read_phase = FALSE;
184
185 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
186   TRACE("%p : lock_stm()\n", trec);
187 }
188
189 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
190   TRACE("%p : unlock_stm()\n", trec);
191 }
192
193 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
194                              StgTVar *s STG_UNUSED) {
195   StgClosure *result;
196   TRACE("%p : lock_tvar(%p)\n", trec, s);
197   result = s -> current_value;
198   return result;
199 }
200
201 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
202                         StgTVar *s STG_UNUSED,
203                         StgClosure *c,
204                         StgBool force_update) {
205   TRACE("%p : unlock_tvar(%p)\n", trec, s);
206   if (force_update) {
207     s -> current_value = c;
208   }
209 }
210
211 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
212                               StgTVar *s STG_UNUSED,
213                               StgClosure *expected) {
214   StgClosure *result;
215   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
216   result = s -> current_value;
217   TRACE("%p : %s\n", trec, (result == expected) ? "success" : "failure");
218   return (result == expected);
219 }
220 #endif
221
222 #if defined(STM_CG_LOCK) /*........................................*/
223
224 #undef IF_STM_CG_LOCK
225 #define IF_STM_CG_LOCK(__X)  do { __X } while (0)
226 static const StgBool use_read_phase = FALSE;
227 static volatile StgTRecHeader *smp_locked = NULL;
228
229 static void lock_stm(StgTRecHeader *trec) {
230   while (cas(&smp_locked, NULL, trec) != NULL) { }
231   TRACE("%p : lock_stm()\n", trec);
232 }
233
234 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
235   TRACE("%p : unlock_stm()\n", trec);
236   ASSERT (smp_locked == trec);
237   smp_locked = 0;
238 }
239
240 static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
241                              StgTVar *s STG_UNUSED) {
242   StgClosure *result;
243   TRACE("%p : lock_tvar(%p)\n", trec, s);
244   ASSERT (smp_locked == trec);
245   result = s -> current_value;
246   return result;
247 }
248
249 static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
250                          StgTVar *s STG_UNUSED,
251                          StgClosure *c,
252                          StgBool force_update) {
253   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
254   ASSERT (smp_locked == trec);
255   if (force_update) {
256     s -> current_value = c;
257   }
258 }
259
260 static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
261                                StgTVar *s STG_UNUSED,
262                                StgClosure *expected) {
263   StgClosure *result;
264   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
265   ASSERT (smp_locked == trec);
266   result = s -> current_value;
267   TRACE("%p : %d\n", result ? "success" : "failure");
268   return (result == expected);
269 }
270 #endif
271
272 #if defined(STM_FG_LOCKS) /*...................................*/
273
274 #undef IF_STM_FG_LOCKS
275 #define IF_STM_FG_LOCKS(__X) do { __X } while (0)
276 static const StgBool use_read_phase = TRUE;
277
278 static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
279   TRACE("%p : lock_stm()\n", trec);
280 }
281
282 static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
283   TRACE("%p : unlock_stm()\n", trec);
284 }
285
286 static StgClosure *lock_tvar(StgTRecHeader *trec, 
287                              StgTVar *s STG_UNUSED) {
288   StgClosure *result;
289   TRACE("%p : lock_tvar(%p)\n", trec, s);
290   do {
291     do {
292       result = s -> current_value;
293     } while (GET_INFO(result) == &stg_TREC_HEADER_info);
294   } while (cas(&(s -> current_value), result, trec) != result);
295   return result;
296 }
297
298 static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
299                         StgTVar *s,
300                         StgClosure *c,
301                         StgBool force_update STG_UNUSED) {
302   TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
303   ASSERT(s -> current_value == trec);
304   s -> current_value = c;
305 }
306
307 static StgBool cond_lock_tvar(StgTRecHeader *trec, 
308                               StgTVar *s,
309                               StgClosure *expected) {
310   StgClosure *result;
311   TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
312   result = cas(&(s -> current_value), expected, trec);
313   TRACE("%p : %s\n", trec, result ? "success" : "failure");
314   return (result == expected);
315 }
316 #endif
317
318 /*......................................................................*/
319
320 // Helper functions for thread blocking and unblocking
321
322 static void park_tso(StgTSO *tso) {
323   ASSERT(tso -> why_blocked == NotBlocked);
324   tso -> why_blocked = BlockedOnSTM;
325   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
326   TRACE("park_tso on tso=%p\n", tso);
327 }
328
329 static void unpark_tso(Capability *cap, StgTSO *tso) {
330   // We will continue unparking threads while they remain on one of the wait
331   // queues: it's up to the thread itself to remove it from the wait queues
332   // if it decides to do so when it is scheduled.
333   if (tso -> why_blocked == BlockedOnSTM) {
334     TRACE("unpark_tso on tso=%p\n", tso);
335     unblockOne(cap,tso);
336   } else {
337     TRACE("spurious unpark_tso on tso=%p\n", tso);
338   }
339 }
340
341 static void unpark_waiters_on(Capability *cap, StgTVar *s) {
342   StgTVarWaitQueue *q;
343   TRACE("unpark_waiters_on tvar=%p\n", s);
344   for (q = s -> first_wait_queue_entry; 
345        q != END_STM_WAIT_QUEUE; 
346        q = q -> next_queue_entry) {
347     unpark_tso(cap, q -> waiting_tso);
348   }
349 }
350
351 /*......................................................................*/
352
353 // Helper functions for allocation and initialization
354
355 static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap,
356                                                  StgTSO *waiting_tso) {
357   StgTVarWaitQueue *result;
358   result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue));
359   SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
360   result -> waiting_tso = waiting_tso;
361   return result;
362 }
363
364 static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
365   StgTRecChunk *result;
366   result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk));
367   SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
368   result -> prev_chunk = END_STM_CHUNK_LIST;
369   result -> next_entry_idx = 0;
370   return result;
371 }
372
373 static StgTRecHeader *new_stg_trec_header(Capability *cap,
374                                           StgTRecHeader *enclosing_trec) {
375   StgTRecHeader *result;
376   result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader));
377   SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
378
379   result -> enclosing_trec = enclosing_trec;
380   result -> current_chunk = new_stg_trec_chunk(cap);
381
382   if (enclosing_trec == NO_TREC) {
383     result -> state = TREC_ACTIVE;
384   } else {
385     ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
386            enclosing_trec -> state == TREC_CONDEMNED);
387     result -> state = enclosing_trec -> state;
388   }
389
390   return result;  
391 }
392
393 /*......................................................................*/
394
395 // Helper functions for managing waiting lists
396
397 static void build_wait_queue_entries_for_trec(Capability *cap,
398                                       StgTSO *tso, 
399                                       StgTRecHeader *trec) {
400   ASSERT(trec != NO_TREC);
401   ASSERT(trec -> enclosing_trec == NO_TREC);
402   ASSERT(trec -> state == TREC_ACTIVE);
403
404   TRACE("%p : build_wait_queue_entries_for_trec()\n", trec);
405
406   FOR_EACH_ENTRY(trec, e, {
407     StgTVar *s;
408     StgTVarWaitQueue *q;
409     StgTVarWaitQueue *fq;
410     s = e -> tvar;
411     TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s);
412     ACQ_ASSERT(s -> current_value == trec);
413     NACQ_ASSERT(s -> current_value == e -> expected_value);
414     fq = s -> first_wait_queue_entry;
415     q = new_stg_tvar_wait_queue(cap, tso);
416     q -> next_queue_entry = fq;
417     q -> prev_queue_entry = END_STM_WAIT_QUEUE;
418     if (fq != END_STM_WAIT_QUEUE) {
419       fq -> prev_queue_entry = q;
420     }
421     s -> first_wait_queue_entry = q;
422     e -> new_value = (StgClosure *) q;
423   });
424 }
425
426 static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
427   ASSERT(trec != NO_TREC);
428   ASSERT(trec -> enclosing_trec == NO_TREC);
429   ASSERT(trec -> state == TREC_WAITING ||
430          trec -> state == TREC_CONDEMNED);
431
432   TRACE("%p : remove_wait_queue_entries_for_trec()\n", trec);
433
434   FOR_EACH_ENTRY(trec, e, {
435     StgTVar *s;
436     StgTVarWaitQueue *pq;
437     StgTVarWaitQueue *nq;
438     StgTVarWaitQueue *q;
439     s = e -> tvar;
440     StgClosure *saw = lock_tvar(trec, s);
441     q = (StgTVarWaitQueue *) (e -> new_value);
442     TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s);
443     ACQ_ASSERT(s -> current_value == trec);
444     nq = q -> next_queue_entry;
445     pq = q -> prev_queue_entry;
446     if (nq != END_STM_WAIT_QUEUE) {
447       nq -> prev_queue_entry = pq;
448     }
449     if (pq != END_STM_WAIT_QUEUE) {
450       pq -> next_queue_entry = nq;
451     } else {
452       ASSERT (s -> first_wait_queue_entry == q);
453       s -> first_wait_queue_entry = nq;
454     }
455     unlock_tvar(trec, s, saw, FALSE);
456   });
457 }
458  
459 /*......................................................................*/
460  
461 static TRecEntry *get_new_entry(Capability *cap,
462                                 StgTRecHeader *t) {
463   TRecEntry *result;
464   StgTRecChunk *c;
465   int i;
466
467   c = t -> current_chunk;
468   i = c -> next_entry_idx;
469   ASSERT(c != END_STM_CHUNK_LIST);
470
471   if (i < TREC_CHUNK_NUM_ENTRIES) {
472     // Continue to use current chunk
473     result = &(c -> entries[i]);
474     c -> next_entry_idx ++;
475   } else {
476     // Current chunk is full: allocate a fresh one
477     StgTRecChunk *nc;
478     nc = new_stg_trec_chunk(cap);
479     nc -> prev_chunk = c;
480     nc -> next_entry_idx = 1;
481     t -> current_chunk = nc;
482     result = &(nc -> entries[0]);
483   }
484
485   return result;
486 }
487
488 /*......................................................................*/
489
490 static void merge_update_into(Capability *cap,
491                               StgTRecHeader *t,
492                               StgTVar *tvar,
493                               StgClosure *expected_value,
494                               StgClosure *new_value) {
495   int found;
496   
497   // Look for an entry in this trec
498   found = FALSE;
499   FOR_EACH_ENTRY(t, e, {
500     StgTVar *s;
501     s = e -> tvar;
502     if (s == tvar) {
503       found = TRUE;
504       if (e -> expected_value != expected_value) {
505         // Must abort if the two entries start from different values
506         TRACE("%p : entries inconsistent at %p (%p vs %p)\n", 
507               t, tvar, e -> expected_value, expected_value);
508         t -> state = TREC_CONDEMNED;
509       } 
510       e -> new_value = new_value;
511       BREAK_FOR_EACH;
512     }
513   });
514
515   if (!found) {
516     // No entry so far in this trec
517     TRecEntry *ne;
518     ne = get_new_entry(cap, t);
519     ne -> tvar = tvar;
520     ne -> expected_value = expected_value;
521     ne -> new_value = new_value;
522   }
523 }
524
525 /*......................................................................*/
526
527 static StgBool entry_is_update(TRecEntry *e) {
528   StgBool result;
529   result = (e -> expected_value != e -> new_value);
530   return result;
531
532
533 static StgBool entry_is_read_only(TRecEntry *e) {
534   StgBool result;
535   result = (e -> expected_value == e -> new_value);
536   return result;
537
538
539 static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
540   StgClosure *c;
541   StgBool result;
542   c = s -> current_value;
543   result = (c == (StgClosure *) h);
544   return result;  
545 }
546
547 // revert_ownership : release a lock on a TVar, storing back
548 // the value that it held when the lock was acquired.  "revert_all"
549 // is set in stmWait and stmReWait when we acquired locks on all of 
550 // the TVars involved.  "revert_all" is not set in commit operations
551 // where we don't lock TVars that have been read from but not updated.
552
553 static void revert_ownership(StgTRecHeader *trec STG_UNUSED,
554                              StgBool revert_all STG_UNUSED) {
555 #if defined(STM_FG_LOCKS) 
556   FOR_EACH_ENTRY(trec, e, {
557     if (revert_all || entry_is_update(e)) {
558       StgTVar *s;
559       s = e -> tvar;
560       if (tvar_is_locked(s, trec)) {
561         unlock_tvar(trec, s, e -> expected_value, TRUE);
562       }
563     }
564   });
565 #endif
566 }
567
568 /*......................................................................*/
569
570 // validate_and_acquire_ownership : this performs the twin functions
571 // of checking that the TVars referred to by entries in trec hold the
572 // expected values and:
573 // 
574 //   - locking the TVar (on updated TVars during commit, or all TVars
575 //     during wait)
576 //
577 //   - recording the identity of the TRec who wrote the value seen in the
578 //     TVar (on non-updated TVars during commit).  These values are 
579 //     stashed in the TRec entries and are then checked in check_read_only
580 //     to ensure that an atomic snapshot of all of these locations has been
581 //     seen.
582
583 static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, 
584                                                int acquire_all,
585                                                int retain_ownership) {
586   StgBool result;
587
588   if (shake()) {
589     TRACE("%p : shake, pretending trec is invalid when it may not be\n", trec);
590     return FALSE;
591   }
592
593   ASSERT ((trec -> state == TREC_ACTIVE) || 
594           (trec -> state == TREC_WAITING) ||
595           (trec -> state == TREC_CONDEMNED));
596   result = !((trec -> state) == TREC_CONDEMNED);
597   if (result) {
598     FOR_EACH_ENTRY(trec, e, {
599       StgTVar *s;
600       s = e -> tvar;
601       if (acquire_all || entry_is_update(e)) {
602         TRACE("%p : trying to acquire %p\n", trec, s);
603         if (!cond_lock_tvar(trec, s, e -> expected_value)) {
604           TRACE("%p : failed to acquire %p\n", trec, s);
605           result = FALSE;
606           BREAK_FOR_EACH;
607         }
608       } else {
609         ASSERT(use_read_phase);
610         IF_STM_FG_LOCKS({
611           TRACE("%p : will need to check %p\n", trec, s);
612           if (s -> current_value != e -> expected_value) {
613             TRACE("%p : doesn't match\n", trec);
614             result = FALSE;
615             BREAK_FOR_EACH;
616           }
617           e -> saw_update_by = s -> last_update_by;
618           if (s -> current_value != e -> expected_value) {
619             TRACE("%p : doesn't match (race)\n", trec);
620             result = FALSE;
621             BREAK_FOR_EACH;
622           } else {
623             TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by);
624           }
625         });
626       }
627     });
628   }
629
630   if ((!result) || (!retain_ownership)) {
631     revert_ownership(trec, acquire_all);
632   }
633   
634   return result;
635 }
636
637 // check_read_only : check that we've seen an atomic snapshot of the
638 // non-updated TVars accessed by a trec.  This checks that the last TRec to
639 // commit an update to the TVar is unchanged since the value was stashed in
640 // validate_and_acquire_ownership.  If no udpate is seen to any TVar than
641 // all of them contained their expected values at the start of the call to
642 // check_read_only.
643 //
644 // The paper "Concurrent programming without locks" (under submission), or
645 // Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
646 // this kind of algorithm.
647
648 static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
649   StgBool result = TRUE;
650
651   ASSERT (use_read_phase);
652   IF_STM_FG_LOCKS({
653     FOR_EACH_ENTRY(trec, e, {
654       StgTVar *s;
655       s = e -> tvar;
656       if (entry_is_read_only(e)) {
657         TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by);
658         if (s -> last_update_by != e -> saw_update_by) {
659           // ||s -> current_value != e -> expected_value) {
660           TRACE("%p : mismatch\n", trec);
661           result = FALSE;
662           BREAK_FOR_EACH;
663         }
664       }
665     });
666   });
667
668   return result;
669 }
670
671
672 /************************************************************************/
673
674 void stmPreGCHook() {
675   lock_stm(NO_TREC);
676   TRACE("stmPreGCHook\n");
677   unlock_stm(NO_TREC);
678 }
679
680 /************************************************************************/
681
682 void initSTM() {
683   TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
684 }
685
686 /*......................................................................*/
687
688 StgTRecHeader *stmStartTransaction(Capability *cap,
689                                    StgTRecHeader *outer) {
690   StgTRecHeader *t;
691   TRACE("%p : stmStartTransaction\n", outer);
692   t = new_stg_trec_header(cap, outer);
693   TRACE("%p : stmStartTransaction()=%p\n", outer, t);
694   return t;
695 }
696
697 /*......................................................................*/
698
699 void stmAbortTransaction(StgTRecHeader *trec) {
700   TRACE("%p : stmAbortTransaction\n", trec);
701   ASSERT (trec != NO_TREC);
702   ASSERT ((trec -> state == TREC_ACTIVE) || 
703           (trec -> state == TREC_WAITING) ||
704           (trec -> state == TREC_CONDEMNED));
705
706   lock_stm(trec);
707   if (trec -> state == TREC_WAITING) {
708     ASSERT (trec -> enclosing_trec == NO_TREC);
709     TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
710     remove_wait_queue_entries_for_trec(trec);
711   } 
712   trec -> state = TREC_ABORTED;
713   unlock_stm(trec);
714
715   TRACE("%p : stmAbortTransaction done\n", trec);
716 }
717
718 /*......................................................................*/
719
720 void stmCondemnTransaction(StgTRecHeader *trec) {
721   TRACE("%p : stmCondemnTransaction\n", trec);
722   ASSERT (trec != NO_TREC);
723   ASSERT ((trec -> state == TREC_ACTIVE) || 
724           (trec -> state == TREC_WAITING) ||
725           (trec -> state == TREC_CONDEMNED));
726
727   lock_stm(trec);
728   if (trec -> state == TREC_WAITING) {
729     ASSERT (trec -> enclosing_trec == NO_TREC);
730     TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
731     remove_wait_queue_entries_for_trec(trec);
732   } 
733   trec -> state = TREC_CONDEMNED;
734   unlock_stm(trec);
735
736   TRACE("%p : stmCondemnTransaction done\n", trec);
737 }
738
739 /*......................................................................*/
740
741 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
742   StgTRecHeader *outer;
743   TRACE("%p : stmGetEnclosingTRec\n", trec);
744   outer = trec -> enclosing_trec;
745   TRACE("%p : stmGetEnclosingTRec()=%p\n", trec, outer);
746   return outer;
747 }
748
749 /*......................................................................*/
750
751 StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
752   StgTRecHeader *t;
753   StgBool result;
754
755   TRACE("%p : stmValidateNestOfTransactions\n", trec);
756   ASSERT(trec != NO_TREC);
757   ASSERT((trec -> state == TREC_ACTIVE) || 
758          (trec -> state == TREC_WAITING) ||
759          (trec -> state == TREC_CONDEMNED));
760
761   lock_stm(trec);
762
763   t = trec;
764   result = TRUE;
765   while (t != NO_TREC) {
766     result &= validate_and_acquire_ownership(t, TRUE, FALSE);
767     t = t -> enclosing_trec;
768   }
769
770   if (!result && trec -> state != TREC_WAITING) {
771     trec -> state = TREC_CONDEMNED; 
772   }
773
774   unlock_stm(trec);
775
776   TRACE("%p : stmValidateNestOfTransactions()=%d\n", trec, result);
777   return result;
778 }
779
780 /*......................................................................*/
781
782 StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
783   int result;
784
785   TRACE("%p : stmCommitTransaction()\n", trec);
786   ASSERT (trec != NO_TREC);
787
788   lock_stm(trec);
789
790   ASSERT (trec -> enclosing_trec == NO_TREC);
791   ASSERT ((trec -> state == TREC_ACTIVE) || 
792           (trec -> state == TREC_CONDEMNED));
793
794   result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
795   if (result) {
796     // We now know that all the updated locations hold their expected values.
797     ASSERT (trec -> state == TREC_ACTIVE);
798
799     if (use_read_phase) {
800       TRACE("%p : doing read check\n", trec);
801       result = check_read_only(trec);
802       TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed");
803     }
804     
805     if (result) {
806       // We now know that all of the read-only locations held their exepcted values
807       // at the end of the call to validate_and_acquire_ownership.  This forms the
808       // linearization point of the commit.
809       
810       FOR_EACH_ENTRY(trec, e, {
811         StgTVar *s;
812         s = e -> tvar;
813         if (e -> new_value != e -> expected_value) {
814           // Entry is an update: write the value back to the TVar, unlocking it if
815           // necessary.
816
817           ACQ_ASSERT(tvar_is_locked(s, trec));
818           TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
819           unpark_waiters_on(cap,s);
820           IF_STM_FG_LOCKS({
821             s -> last_update_by = trec;
822           });
823           unlock_tvar(trec, s, e -> new_value, TRUE);
824         } 
825         ACQ_ASSERT(!tvar_is_locked(s, trec));
826       });
827     } else {
828       revert_ownership(trec, FALSE);
829     }
830   } 
831
832   unlock_stm(trec);
833
834   TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
835
836   return result;
837 }
838
839 /*......................................................................*/
840
841 StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
842   StgTRecHeader *et;
843   int result;
844   ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
845   TRACE("%p : stmCommitNestedTransaction() into %p\n", trec, trec -> enclosing_trec);
846   ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
847
848   lock_stm(trec);
849
850   et = trec -> enclosing_trec;
851   result = validate_and_acquire_ownership(trec, FALSE, TRUE);
852   if (result) {
853     // We now know that all the updated locations hold their expected values.
854
855     if (use_read_phase) {
856       TRACE("%p : doing read check\n", trec);
857       result = check_read_only(trec);
858     }
859     if (result) {
860       // We now know that all of the read-only locations held their exepcted values
861       // at the end of the call to validate_and_acquire_ownership.  This forms the
862       // linearization point of the commit.
863
864       if (result) {
865         TRACE("%p : read-check succeeded\n", trec);
866         FOR_EACH_ENTRY(trec, e, {
867           // Merge each entry into the enclosing transaction record, release all
868           // locks.
869
870           StgTVar *s;
871           s = e -> tvar;
872           if (entry_is_update(e)) {
873             unlock_tvar(trec, s, e -> expected_value, FALSE);
874           }
875           merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
876           ACQ_ASSERT(s -> current_value != trec);
877         });
878       } else {
879         revert_ownership(trec, FALSE);
880       }
881     }
882   } 
883
884   unlock_stm(trec);
885
886   TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
887
888   return result;
889 }
890
891 /*......................................................................*/
892
893 StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
894   int result;
895   TRACE("%p : stmWait(%p)\n", trec, tso);
896   ASSERT (trec != NO_TREC);
897   ASSERT (trec -> enclosing_trec == NO_TREC);
898   ASSERT ((trec -> state == TREC_ACTIVE) || 
899           (trec -> state == TREC_CONDEMNED));
900
901   lock_stm(trec);
902   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
903   if (result) {
904     // The transaction is valid so far so we can actually start waiting.
905     // (Otherwise the transaction was not valid and the thread will have to
906     // retry it).
907
908     // Put ourselves to sleep.  We retain locks on all the TVars involved
909     // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
910     // in the TSO, (c) TREC_WAITING in the Trec.  
911     build_wait_queue_entries_for_trec(cap, tso, trec);
912     park_tso(tso);
913     trec -> state = TREC_WAITING;
914
915     // We haven't released ownership of the transaction yet.  The TSO
916     // has been put on the wait queue for the TVars it is waiting for,
917     // but we haven't yet tidied up the TSO's stack and made it safe
918     // to wake up the TSO.  Therefore, we must wait until the TSO is
919     // safe to wake up before we release ownership - when all is well,
920     // the runtime will call stmWaitUnlock() below, with the same
921     // TRec.
922
923   } else {
924     unlock_stm(trec);
925   }
926
927   TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
928   return result;
929 }
930
931
932 void
933 stmWaitUnlock(Capability *cap, StgTRecHeader *trec) {
934     revert_ownership(trec, TRUE);
935     unlock_stm(trec);
936 }
937
938 /*......................................................................*/
939
940 StgBool stmReWait(StgTSO *tso) {
941   int result;
942   StgTRecHeader *trec = tso->trec;
943
944   TRACE("%p : stmReWait\n", trec);
945   ASSERT (trec != NO_TREC);
946   ASSERT (trec -> enclosing_trec == NO_TREC);
947   ASSERT ((trec -> state == TREC_WAITING) || 
948           (trec -> state == TREC_CONDEMNED));
949
950   lock_stm(trec);
951   result = validate_and_acquire_ownership(trec, TRUE, TRUE);
952   TRACE("%p : validation %s\n", trec, result ? "succeeded" : "failed");
953   if (result) {
954     // The transaction remains valid -- do nothing because it is already on
955     // the wait queues
956     ASSERT (trec -> state == TREC_WAITING);
957     park_tso(tso);
958     revert_ownership(trec, TRUE);
959   } else {
960     // The transcation has become invalid.  We can now remove it from the wait
961     // queues.
962     if (trec -> state != TREC_CONDEMNED) {
963       remove_wait_queue_entries_for_trec (trec);
964     }
965
966   }
967   unlock_stm(trec);
968
969   TRACE("%p : stmReWait()=%d\n", trec, result);
970   return result;
971 }
972
973 /*......................................................................*/
974
975 static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
976   TRecEntry *result = NULL;
977
978   TRACE("%p : get_entry_for TVar %p\n", trec, tvar);
979   ASSERT(trec != NO_TREC);
980
981   do {
982     FOR_EACH_ENTRY(trec, e, {
983       if (e -> tvar == tvar) {
984         result = e;
985         if (in != NULL) {
986           *in = trec;
987         }
988         BREAK_FOR_EACH;
989       }
990     });
991     trec = trec -> enclosing_trec;
992   } while (result == NULL && trec != NO_TREC);
993
994   return result;    
995 }
996
997 static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
998   StgClosure *result;
999   result = tvar -> current_value;
1000
1001 #if defined(STM_FG_LOCKS)
1002   while (GET_INFO(result) == &stg_TREC_HEADER_info) {
1003     TRACE("%p : read_current_value(%p) saw %p\n", trec, tvar, result);
1004     result = tvar -> current_value;
1005   }
1006 #endif
1007
1008   TRACE("%p : read_current_value(%p)=%p\n", trec, tvar, result);
1009   return result;
1010 }
1011
1012 /*......................................................................*/
1013
1014 StgClosure *stmReadTVar(Capability *cap,
1015                         StgTRecHeader *trec, 
1016                         StgTVar *tvar) {
1017   StgTRecHeader *entry_in;
1018   StgClosure *result = NULL;
1019   TRecEntry *entry = NULL;
1020   TRACE("%p : stmReadTVar(%p)\n", trec, tvar);
1021   ASSERT (trec != NO_TREC);
1022   ASSERT (trec -> state == TREC_ACTIVE || 
1023           trec -> state == TREC_CONDEMNED);
1024
1025   entry = get_entry_for(trec, tvar, &entry_in);
1026
1027   if (entry != NULL) {
1028     if (entry_in == trec) {
1029       // Entry found in our trec
1030       result = entry -> new_value;
1031     } else {
1032       // Entry found in another trec
1033       TRecEntry *new_entry = get_new_entry(cap, trec);
1034       new_entry -> tvar = tvar;
1035       new_entry -> expected_value = entry -> expected_value;
1036       new_entry -> new_value = entry -> new_value;
1037       result = new_entry -> new_value;
1038     } 
1039   } else {
1040     // No entry found
1041     StgClosure *current_value = read_current_value(trec, tvar);
1042     TRecEntry *new_entry = get_new_entry(cap, trec);
1043     new_entry -> tvar = tvar;
1044     new_entry -> expected_value = current_value;
1045     new_entry -> new_value = current_value;
1046     result = current_value;
1047   }
1048
1049   TRACE("%p : stmReadTVar(%p)=%p\n", trec, tvar, result);
1050   return result;
1051 }
1052
1053 /*......................................................................*/
1054
1055 void stmWriteTVar(Capability *cap,
1056                   StgTRecHeader *trec,
1057                   StgTVar *tvar, 
1058                   StgClosure *new_value) {
1059
1060   StgTRecHeader *entry_in;
1061   TRecEntry *entry = NULL;
1062   TRACE("%p : stmWriteTVar(%p, %p)\n", trec, tvar, new_value);
1063   ASSERT (trec != NO_TREC);
1064   ASSERT (trec -> state == TREC_ACTIVE || 
1065           trec -> state == TREC_CONDEMNED);
1066
1067   entry = get_entry_for(trec, tvar, &entry_in);
1068
1069   if (entry != NULL) {
1070     if (entry_in == trec) {
1071       // Entry found in our trec
1072       entry -> new_value = new_value;
1073     } else {
1074       // Entry found in another trec
1075       TRecEntry *new_entry = get_new_entry(cap, trec);
1076       new_entry -> tvar = tvar;
1077       new_entry -> expected_value = entry -> expected_value;
1078       new_entry -> new_value = new_value;
1079     } 
1080   } else {
1081     // No entry found
1082     StgClosure *current_value = read_current_value(trec, tvar);
1083     TRecEntry *new_entry = get_new_entry(cap, trec);
1084     new_entry -> tvar = tvar;
1085     new_entry -> expected_value = current_value;
1086     new_entry -> new_value = new_value;
1087   }
1088
1089   TRACE("%p : stmWriteTVar done\n", trec);
1090 }
1091
1092 /*......................................................................*/
1093
1094 StgTVar *stmNewTVar(Capability *cap,
1095                     StgClosure *new_value) {
1096   StgTVar *result;
1097   result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar));
1098   SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
1099   result -> current_value = new_value;
1100   result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
1101 #if defined(SMP)
1102   result -> last_update_by = NO_TREC;
1103 #endif
1104   return result;
1105 }
1106
1107 /*......................................................................*/