[project @ 2004-11-18 09:56:07 by tharris]
[ghc-hetmet.git] / ghc / rts / STM.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 1998-2004
4  * 
5  * STM implementation.
6  *
7  * This implementation is designed for a many-threads, few-CPUs case.  This leads
8  * to a number of design choices:
9  *
10  *  - We use a simple design which does not aim to be lock-free -- SMP builds use
11  *    a mutex to protect all the TVars and STM datastructures, non-SMP builds 
12  *    do not require any locking.  The goal is to make fast-path uncontended 
13  *    operations fast because, with few CPUs, contention betwen operations on the 
14  *    STM interface is expected rarely.
15  *
16  *  - Each thread is responsible for adding/removing itself to/from the queues
17  *    associated with tvars.  This reduces the work that is necessary when a
18  *    large number of threads are waiting on a single tvar and where the update
19  *    to that tvar is really only releasing a single thread.
20  *
21  * Ideas for future experimentation:
22  *
23  *  - Read/write operations here involve a linear search of the trec.  Consider
24  *    adding a cache to map tvars to existing entries in the trec.
25  *
26  *  - Consider whether to defer unparking more than one thread.  On a uniprocessor
27  *    the deferment could be made until a thread switch from the first thread
28  *    released in the hope that it restores the location to a value on which
29  *    other threads were waiting.  That would avoid a stampede on e.g. multiple
30  *    threads blocked reading from a single-cell shared buffer.
31  *
32  *  - Consider whether to provide a link from a StgTVarWaitQueue to the TRecEntry
33  *    associated with the waiter.  This would allow unpark_waiters_on to be
34  *    more selective and avoid unparking threads whose expected value for that 
35  *    tvar co-incides with the value now stored there.  Does this happen often?
36  *    
37  *
38  * ---------------------------------------------------------------------------*/
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "RtsFlags.h"
43 #include "RtsUtils.h"
44 #include "Schedule.h"
45 #include "STM.h"
46 #include "Storage.h"
47
48 #include <stdlib.h>
49 #include <stdio.h>
50
51 #define FALSE 0
52 #define TRUE  1
53
54 #if defined(DEBUG)
55 #define SHAKE
56 #define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
57 #else
58 #define TRACE(_x...) /*Nothing*/
59 #endif
60
61 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
62 // unusualy code paths if genuine contention is rare
63
64 #ifdef SHAKE
65 static const int do_shake = TRUE;
66 #else
67 static const int do_shake = FALSE;
68 #endif
69 static int shake_ctr = 0;
70
71 /*......................................................................*/
72
73 static int shake(void) {
74   if (do_shake) {
75     if (((shake_ctr++) % 47) == 0) {
76       return TRUE;
77     } 
78     return FALSE;
79   } else {
80     return FALSE;
81   }
82 }
83
84 /*......................................................................*/
85
86 // Helper macros for iterating over entries within a transaction
87 // record
88
89 #define FOR_EACH_ENTRY(_t,_x,CODE) do {         \
90   StgTRecHeader *__t = (_t);                    \
91   StgTRecChunk *__c = __t -> current_chunk;             \
92   StgWord __limit = __c -> next_entry_idx;              \
93   TRACE("trec=%p chunk=%p limit=%d\n", __t, __c, __limit); \
94   while (__c != END_STM_CHUNK_LIST) {           \
95     StgWord __i;                                  \
96     for (__i = 0; __i < __limit; __i ++) {              \
97       TRecEntry *_x = &(__c -> entries[__i]);   \
98       do { CODE } while (0);                    \
99     }                                           \
100     __c = __c -> prev_chunk;                    \
101     __limit = TREC_CHUNK_NUM_ENTRIES;           \
102   }                                             \
103  exit_for_each:                                 \
104   if (FALSE) goto exit_for_each;                \
105 } while (0)
106
107 #define BREAK_FOR_EACH goto exit_for_each
108      
109 /*......................................................................*/
110
111 // Private cache of must-be-unreachable trec headers and chunks
112
113 static StgTRecHeader *cached_trec_headers = NO_TREC;
114 static StgTRecChunk *cached_trec_chunks = END_STM_CHUNK_LIST;
115 static StgTVarWaitQueue *cached_tvar_wait_queues = END_STM_WAIT_QUEUE;
116
117 static void recycle_tvar_wait_queue(StgTVarWaitQueue *q) {
118   if (shake()) {
119     TRACE("Shake: not re-using wait queue %p\n", q);
120     return;
121   }
122
123   q -> next_queue_entry = cached_tvar_wait_queues;
124   cached_tvar_wait_queues = q;
125 }
126
127 static void recycle_closures_from_trec (StgTRecHeader *t) {
128   if (shake()) {
129     TRACE("Shake: not re-using closures from %p\n", t);
130     return;
131   }
132
133   t -> enclosing_trec = cached_trec_headers;
134   cached_trec_headers = t;
135   t -> enclosing_trec = NO_TREC;
136
137   while (t -> current_chunk != END_STM_CHUNK_LIST) {
138     StgTRecChunk *c = t -> current_chunk;
139     t -> current_chunk = c -> prev_chunk;
140     c -> prev_chunk = cached_trec_chunks;
141     cached_trec_chunks = c;
142   }
143 }
144
145 /*......................................................................*/
146
147 // Helper functions for managing internal STM state.  This lock is only held
148 // for a 'short' time, in the sense that it is never held when any of the 
149 // external functions returns.
150
151 static void lock_stm(void) {
152   // Nothing
153 }
154
155 static void unlock_stm(void) {
156   // Nothing
157 }
158
159 /*......................................................................*/
160
161 // Helper functions for thread blocking and unblocking
162
163 static void park_tso(StgTSO *tso) {
164   ASSERT(tso -> why_blocked == NotBlocked);
165   tso -> why_blocked = BlockedOnSTM;
166   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
167   TRACE("park_tso on tso=%p\n", tso);
168 }
169
170 static void unpark_tso(StgTSO *tso) {
171   // We will continue unparking threads while they remain on one of the wait
172   // queues: it's up to the thread itself to remove it from the wait queues
173   // if it decides to do so when it is scheduled.
174   if (tso -> why_blocked == BlockedOnSTM) {
175     TRACE("unpark_tso on tso=%p\n", tso);
176     tso -> why_blocked = NotBlocked;
177     PUSH_ON_RUN_QUEUE(tso);
178   } else {
179     TRACE("spurious unpark_tso on tso=%p\n", tso);
180   }
181 }
182
183 static void unpark_waiters_on(StgTVar *s) {
184   StgTVarWaitQueue *q;
185   TRACE("unpark_waiters_on tvar=%p\n", s);
186   for (q = s -> first_wait_queue_entry; 
187        q != END_STM_WAIT_QUEUE; 
188        q = q -> next_queue_entry) {
189     unpark_tso(q -> waiting_tso);
190   }
191 }
192
193 /*......................................................................*/
194
195 // Helper functions for allocation and initialization
196
197 static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgTSO *waiting_tso) {
198   StgTVarWaitQueue *result;
199   if (cached_tvar_wait_queues != END_STM_WAIT_QUEUE) {
200     result = cached_tvar_wait_queues;
201     cached_tvar_wait_queues = result -> next_queue_entry;
202   } else {
203     result = (StgTVarWaitQueue *)allocate(sizeofW(StgTVarWaitQueue));
204     SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
205   }
206   result -> waiting_tso = waiting_tso;
207   return result;
208 }
209
210 static StgTRecChunk *new_stg_trec_chunk(void) {
211   StgTRecChunk *result;
212   if (cached_trec_chunks != END_STM_CHUNK_LIST) {
213     result = cached_trec_chunks;
214     cached_trec_chunks = result -> prev_chunk;
215   } else {
216     result = (StgTRecChunk *)allocate(sizeofW(StgTRecChunk));
217     SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
218   }
219   result -> prev_chunk = END_STM_CHUNK_LIST;
220   result -> next_entry_idx = 0;
221   TRACE("prev from %p is %p\n", result, result -> prev_chunk);
222   return result;
223 }
224
225 static StgTRecHeader *new_stg_trec_header(StgTRecHeader *enclosing_trec) {
226   StgTRecHeader *result;
227   if (cached_trec_headers != NO_TREC) {
228     result = cached_trec_headers;
229     cached_trec_headers = result -> enclosing_trec;
230   } else {
231     result = (StgTRecHeader *) allocate(sizeofW(StgTRecHeader));
232     SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
233   }
234   result -> enclosing_trec = enclosing_trec;
235   result -> current_chunk = new_stg_trec_chunk();
236
237   if (enclosing_trec == NO_TREC) {
238     result -> state = TREC_ACTIVE;
239   } else {
240     ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
241            enclosing_trec -> state == TREC_MUST_ABORT ||
242            enclosing_trec -> state == TREC_CANNOT_COMMIT);
243     result -> state = enclosing_trec -> state;
244   }
245
246   TRACE("new_stg_trec_header creating %p nidx=%d chunk=%p enclosing_trec=%p state=%d\n",
247         result, result->current_chunk->next_entry_idx, result -> current_chunk, enclosing_trec, result->state);
248   return result;  
249 }
250
251 /*......................................................................*/
252
253 // Helper functions for managing waiting lists
254
255 static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) {
256   ASSERT(trec != NO_TREC);
257   ASSERT(trec -> enclosing_trec == NO_TREC);
258   ASSERT(trec -> state == TREC_ACTIVE || trec -> state == TREC_CANNOT_COMMIT);
259   FOR_EACH_ENTRY(trec, e, {
260     StgTVar *s;
261     StgTVarWaitQueue *q;
262     StgTVarWaitQueue *fq;
263     s = e -> tvar;
264     TRACE("Adding tso=%p to wait queue for tvar=%p\n", tso, s);
265     ASSERT(s -> current_value == e -> expected_value);
266     fq = s -> first_wait_queue_entry;
267     q = new_stg_tvar_wait_queue(tso);
268     q -> next_queue_entry = fq;
269     q -> prev_queue_entry = END_STM_WAIT_QUEUE;
270     if (fq != END_STM_WAIT_QUEUE) {
271       fq -> prev_queue_entry = q;
272     }
273     s -> first_wait_queue_entry = q;
274     e -> new_value = (StgClosure *) q;
275   });
276 }
277
278 static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) {
279   ASSERT(trec != NO_TREC);
280   ASSERT(trec -> enclosing_trec == NO_TREC);
281   ASSERT(trec -> state == TREC_WAITING);
282   TRACE("stop_tsos_waiting in state=%d\n", trec -> state);
283   FOR_EACH_ENTRY(trec, e, {
284     StgTVar *s;
285     StgTVarWaitQueue *pq;
286     StgTVarWaitQueue *nq;
287     StgTVarWaitQueue *q;
288     s = e -> tvar;
289     q = (StgTVarWaitQueue *) (e -> new_value);
290     TRACE("Removing tso=%p from wait queue for tvar=%p\n", q -> waiting_tso, s);
291     nq = q -> next_queue_entry;
292     pq = q -> prev_queue_entry;
293     TRACE("pq=%p nq=%p q=%p\n", pq, nq, q);
294     if (nq != END_STM_WAIT_QUEUE) {
295       nq -> prev_queue_entry = pq;
296     }
297     if (pq != END_STM_WAIT_QUEUE) {
298       pq -> next_queue_entry = nq;
299     } else {
300       ASSERT (s -> first_wait_queue_entry == q);
301       s -> first_wait_queue_entry = nq;
302     }
303     recycle_tvar_wait_queue(q);
304   });
305 }
306  
307 /*......................................................................*/
308  
309 static TRecEntry *get_new_entry(StgTRecHeader *t) {
310   TRecEntry *result;
311   StgTRecChunk *c;
312   int i;
313
314   c = t -> current_chunk;
315   i = c -> next_entry_idx;
316   ASSERT(c != END_STM_CHUNK_LIST);
317
318   if (i < TREC_CHUNK_NUM_ENTRIES) {
319     // Continue to use current chunk
320     result = &(c -> entries[i]);
321     c -> next_entry_idx ++;
322   } else {
323     // Current chunk is full: allocate a fresh one
324     StgTRecChunk *nc;
325     nc = new_stg_trec_chunk();
326     nc -> prev_chunk = c;
327     nc -> next_entry_idx = 1;
328     t -> current_chunk = nc;
329     result = &(nc -> entries[0]);
330   }
331
332   return result;
333 }
334
335 /*......................................................................*/
336
337 static void merge_update_into(StgTRecHeader *t,
338                               StgTVar *tvar,
339                               StgClosure *expected_value,
340                               StgClosure *new_value,
341                               int merging_sibling) {
342   int found;
343   
344   // Look for an entry in this trec
345   found = FALSE;
346   FOR_EACH_ENTRY(t, e, {
347     StgTVar *s;
348     s = e -> tvar;
349     if (s == tvar) {
350       found = TRUE;
351       if (merging_sibling) {
352         if (e -> expected_value != expected_value) {
353           // Must abort if the two entries start from different values
354           TRACE("Siblings inconsistent at %p (%p vs %p)\n", 
355                 tvar, e -> expected_value, expected_value);
356           t -> state = TREC_MUST_ABORT;
357         } else if (e -> new_value != new_value) {
358           // Cannot commit if the two entries lead to different values (wait still OK)
359           TRACE("Siblings trying conflicting writes to %p (%p vs %p)\n", 
360                 tvar, e -> new_value, new_value);
361           t -> state = TREC_CANNOT_COMMIT;
362         }
363       } else {
364         // Otherwise merging child back into parent
365         ASSERT (e -> new_value == expected_value);
366       }
367       TRACE("        trec=%p exp=%p new=%p\n", t, e->expected_value, e->new_value);
368       e -> new_value = new_value;
369       BREAK_FOR_EACH;
370     }
371   });
372
373   if (!found) {
374     // No entry so far in this trec
375     TRecEntry *ne;
376     ne = get_new_entry(t);
377     ne -> tvar = tvar;
378     ne -> expected_value = expected_value;
379     ne -> new_value = new_value;
380   }
381 }
382
383 /*......................................................................*/
384
385 static StgClosure *read_current_value_seen_from(StgTRecHeader *t,
386                                                 StgTVar *tvar) {
387   int found;
388   StgClosure *result = NULL;
389
390   // Look for any relevent trec entries
391   found = FALSE;
392   while (t != NO_TREC) {
393     FOR_EACH_ENTRY(t, e, {
394       StgTVar *s;
395       s = e -> tvar;
396       if (s == tvar) {
397         found = TRUE;
398         result = e -> new_value;
399         BREAK_FOR_EACH;
400       }
401     });
402     if (found) break;
403     t = t -> enclosing_trec;
404   }
405
406   if (!found) {
407     // Value not yet held in a trec
408     result = tvar -> current_value;
409   }
410
411   return result;
412 }
413  
414 /*......................................................................*/
415
416 static int transaction_is_valid (StgTRecHeader *t) {
417   StgTRecHeader *et;
418   int result;
419
420   if (shake()) {
421     TRACE("Shake: pretending transaction trec=%p is invalid when it may not be\n", t);
422     return FALSE;
423   }
424
425   et = t -> enclosing_trec;
426   ASSERT ((t -> state == TREC_ACTIVE) || 
427           (t -> state == TREC_WAITING) ||
428           (t -> state == TREC_MUST_ABORT) ||
429           (t -> state == TREC_CANNOT_COMMIT));
430   result = !((t -> state) == TREC_MUST_ABORT);
431   if (result) {
432     FOR_EACH_ENTRY(t, e, {
433       StgTVar *s;
434       s = e -> tvar;
435       if (e -> expected_value != read_current_value_seen_from(et, s)) {
436         result = FALSE;
437         BREAK_FOR_EACH;
438       }
439     });
440   }
441   return result;
442 }
443
444 /************************************************************************/
445
446 /* 
447  * External functions below this point are repsonsible for:
448  *
449  * - acquiring/releasing the STM lock 
450  *
451  * - all updates to the trec status field
452  *  ASSERT(t != NO_TREC);
453
454  * By convention we increment entry_count when starting a new
455  * transaction and we decrement it at the point where we can discard
456  * the contents of the trec when exiting the outermost transaction.
457  * This means that stmWait and stmRewait decrement the count whenever
458  * they return FALSE (they do so exactly once for each transaction
459  * that doesn't remain blocked forever).
460  */
461
462 /************************************************************************/
463
464 void stmPreGCHook() {
465   TRACE("stmPreGCHook\n");
466   cached_trec_headers = NO_TREC;
467   cached_trec_chunks = END_STM_CHUNK_LIST;
468   cached_tvar_wait_queues = END_STM_WAIT_QUEUE;
469 }
470
471 /************************************************************************/
472
473 void initSTM() {
474   TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
475   /* Nothing */
476 }
477
478 /*......................................................................*/
479
480 StgTRecHeader *stmStartTransaction(StgTRecHeader *outer) {
481   StgTRecHeader *t;
482   TRACE("stmStartTransaction current-trec=%p\n", outer);
483   t = new_stg_trec_header(outer);
484   TRACE("stmStartTransaction new-trec=%p\n", t);
485   return t;
486 }
487
488 /*......................................................................*/
489
490 void stmAbortTransaction(StgTRecHeader *trec) {
491   TRACE("stmAbortTransaction trec=%p\n", trec);
492   ASSERT (trec != NO_TREC);
493   ASSERT ((trec -> state == TREC_ACTIVE) || 
494           (trec -> state == TREC_MUST_ABORT) ||
495           (trec -> state == TREC_WAITING) ||
496           (trec -> state == TREC_CANNOT_COMMIT));
497   if (trec -> state == TREC_WAITING) {
498     ASSERT (trec -> enclosing_trec == NO_TREC);
499     TRACE("stmAbortTransaction aborting waiting transaction\n");
500     stop_tsos_waiting_on_trec(trec);
501   } 
502   trec -> state = TREC_ABORTED;
503
504   // Outcome now reflected by status field; no need for log
505   recycle_closures_from_trec(trec);
506
507   TRACE("stmAbortTransaction trec=%p done\n", trec);
508 }
509
510 /*......................................................................*/
511
512 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
513   StgTRecHeader *outer;
514   TRACE("stmGetEnclosingTRec trec=%p\n", trec);
515   outer = trec -> enclosing_trec;
516   TRACE("stmGetEnclosingTRec outer=%p\n", outer);
517   return outer;
518 }
519
520 /*......................................................................*/
521
522 StgBool stmValidateTransaction(StgTRecHeader *trec) {
523   int result;
524   TRACE("stmValidateTransaction trec=%p\n", trec);
525   ASSERT(trec != NO_TREC);
526   ASSERT((trec -> state == TREC_ACTIVE) || 
527          (trec -> state == TREC_MUST_ABORT) ||
528          (trec -> state == TREC_CANNOT_COMMIT) ||
529          (trec -> state == TREC_WAITING));
530
531   lock_stm();
532   result = transaction_is_valid(trec);
533
534   if (!result && trec -> state != TREC_WAITING) {
535     trec -> state = TREC_MUST_ABORT; 
536   }
537
538   unlock_stm();
539
540   TRACE("stmValidateTransaction trec=%p result=%d\n", trec, result);
541   return result;
542 }
543
544 /*......................................................................*/
545
546 StgBool stmCommitTransaction(StgTRecHeader *trec) {
547   StgTRecHeader *et;
548   int result;
549   TRACE("stmCommitTransaction trec=%p trec->enclosing_trec=%p\n", trec, trec->enclosing_trec);
550   ASSERT (trec != NO_TREC);
551   ASSERT ((trec -> state == TREC_ACTIVE) || 
552           (trec -> state == TREC_MUST_ABORT) ||
553           (trec -> state == TREC_CANNOT_COMMIT));
554
555   lock_stm();
556   result = transaction_is_valid(trec);
557   if (result) {
558     et = trec -> enclosing_trec;
559     if (trec -> state == TREC_CANNOT_COMMIT && et == NO_TREC) {
560       TRACE("Cannot commit trec=%p at top level\n", trec);
561       trec -> state = TREC_MUST_ABORT;
562       result = FALSE;
563     } else {
564       if (et == NO_TREC) {
565         TRACE("Non-nesting commit, NO_TREC=%p\n", NO_TREC);
566       } else {
567         TRACE("Nested commit into %p, NO_TREC=%p\n", et, NO_TREC);
568       }
569       
570       FOR_EACH_ENTRY(trec, e, {
571         StgTVar *s;
572         s = e -> tvar;
573         if (et == NO_TREC) {
574           s -> current_value = e -> new_value;
575           unpark_waiters_on(s);
576         } else {
577           merge_update_into(et, s, e -> expected_value, e -> new_value, FALSE);
578         }
579       });
580
581
582       if (trec->state == TREC_CANNOT_COMMIT && et -> state == TREC_ACTIVE) {
583         TRACE("Propagating TREC_CANNOT_COMMIT into %p\n", et);
584         et -> state = TREC_CANNOT_COMMIT;
585       }
586     }
587   } 
588
589   // Outcome now reflected by status field; no need for log
590   recycle_closures_from_trec(trec);
591   
592   unlock_stm();
593
594   TRACE("stmCommitTransaction trec=%p result=%d\n", trec, result);
595
596   return result;
597 }
598
599 /*......................................................................*/
600
601 StgBool stmMergeForWaiting(StgTRecHeader *trec, StgTRecHeader *other) {
602   int result;
603   TRACE("stmMergeForWaiting trec=%p (%d) other=%p (%d)\n", trec, trec -> state, other, other->state);
604   ASSERT(trec != NO_TREC);
605   ASSERT(other != NO_TREC);
606   ASSERT((trec -> state == TREC_ACTIVE) || 
607          (trec -> state == TREC_MUST_ABORT) ||
608          (trec -> state == TREC_CANNOT_COMMIT));
609   ASSERT((other -> state == TREC_ACTIVE) || 
610          (other -> state == TREC_MUST_ABORT) ||
611          (other -> state == TREC_CANNOT_COMMIT));
612
613   lock_stm();
614   result = (transaction_is_valid(trec));
615   TRACE("stmMergeForWaiting initial result=%d\n", result);
616   if (result) {
617     result = transaction_is_valid(other);
618     TRACE("stmMergeForWaiting after both result=%d\n", result);
619     if (result) {
620       // Individually the two transactions may be valid.  Now copy entries from
621       // "other" into "trec".  This may cause "trec" to become invalid if it
622       // contains an update that conflicts with one from "other"
623       FOR_EACH_ENTRY(other, e, {
624         StgTVar *s = e -> tvar;
625         TRACE("Merging trec=%p exp=%p new=%p\n", other, e->expected_value, e->new_value);
626         merge_update_into(trec, s, e-> expected_value, e -> new_value, TRUE);
627       });
628       result = (trec -> state != TREC_MUST_ABORT);
629     } 
630   }
631
632   if (!result) {
633     trec -> state = TREC_MUST_ABORT;
634   }
635
636   unlock_stm();
637
638   TRACE("stmMergeForWaiting result=%d\n", result);
639   return result;
640 }
641
642 /*......................................................................*/
643
644 StgBool stmWait(StgTSO *tso, StgTRecHeader *trec) {
645   int result;
646   TRACE("stmWait tso=%p trec=%p\n", tso, trec);
647   ASSERT (trec != NO_TREC);
648   ASSERT (trec -> enclosing_trec == NO_TREC);
649   ASSERT ((trec -> state == TREC_ACTIVE) || 
650           (trec -> state == TREC_MUST_ABORT) ||
651           (trec -> state == TREC_CANNOT_COMMIT));
652
653   lock_stm();
654   result = transaction_is_valid(trec);
655   if (result) {
656     // The transaction is valid so far so we can actually start waiting.
657     // (Otherwise the transaction was not valid and the thread will have to
658     // retry it).
659     start_tso_waiting_on_trec(tso, trec);
660     park_tso(tso);
661     trec -> state = TREC_WAITING;
662   }  else {
663     // Outcome now reflected by status field; no need for log
664     recycle_closures_from_trec(trec);
665   }
666   unlock_stm();
667
668   TRACE("stmWait trec=%p result=%d\n", trec, result);
669   return result;
670 }
671
672 /*......................................................................*/
673
674 StgBool stmReWait(StgTRecHeader *trec) {
675   int result;
676   TRACE("stmReWait trec=%p\n", trec);
677   ASSERT (trec != NO_TREC);
678   ASSERT (trec -> enclosing_trec == NO_TREC);
679   ASSERT (trec -> state == TREC_WAITING);
680
681   lock_stm();
682   result = transaction_is_valid(trec);
683   TRACE("stmReWait trec=%p result=%d\n", trec, result);
684   if (result) {
685     // The transaction remains valid -- do nothing because it is already on
686     // the wait queues
687     ASSERT (trec -> state == TREC_WAITING);
688   } else {
689     // The transcation has become invalid.  We can now remove it from the wait
690     // queues.
691     stop_tsos_waiting_on_trec (trec);
692
693     // Outcome now reflected by status field; no need for log
694     recycle_closures_from_trec(trec);
695   }
696   unlock_stm();
697
698   TRACE("stmReWait trec=%p result=%d\n", trec, result);
699   return result;
700 }
701
702 /*......................................................................*/
703
704 StgClosure *stmReadTVar(StgTRecHeader *trec, 
705                         StgTVar *tvar) {
706   StgTRecHeader *et;
707   StgClosure *result = NULL; // Suppress unassignment warning
708   int found = FALSE;
709   TRecEntry *ne = NULL;
710
711   TRACE("stmReadTVar trec=%p tvar=%p\n", trec, tvar);
712   ASSERT (trec != NO_TREC);
713   ASSERT (trec -> state == TREC_ACTIVE || 
714           trec -> state == TREC_MUST_ABORT ||
715           trec -> state == TREC_CANNOT_COMMIT);
716
717   lock_stm();
718   found = FALSE;
719
720   // Look for an existing entry in our trec or in an enclosing trec
721   et = trec;
722   while (et != NO_TREC) {
723     FOR_EACH_ENTRY(et, e, {
724       TRACE("testing e=%p\n", e);
725       if (e -> tvar == tvar) {
726         found = TRUE;
727         result = e -> new_value;
728         BREAK_FOR_EACH;
729       }
730     });
731     if (found) break;
732     et = et -> enclosing_trec;
733   }
734
735   if (found && et != trec) {
736     // Entry found in another trec
737     ASSERT (result != NULL);
738     TRACE("duplicating entry\n");
739     ne = get_new_entry(trec);
740     ne -> tvar = tvar;
741     ne -> expected_value = result;
742     ne -> new_value = result;
743   } else if (!found) {
744     // No entry found
745     ASSERT (result == NULL);
746     TRACE("need new entry\n");
747     ne = get_new_entry(trec);
748     TRACE("got ne=%p\n", ne);
749     result = tvar -> current_value;
750     ne -> tvar = tvar;
751     ne -> expected_value = result;
752     ne -> new_value = result;
753   }
754
755   unlock_stm();
756   ASSERT (result != NULL);
757   TRACE("stmReadTVar trec=%p result=%p\n", trec, result);
758
759   return result;
760 }
761
762 /*......................................................................*/
763
764 void stmWriteTVar(StgTRecHeader *trec,
765                   StgTVar *tvar, 
766                   StgClosure *new_value) {
767   StgTRecHeader *et;
768   TRecEntry *ne;
769   TRecEntry *entry = NULL;
770   int found;
771   TRACE("stmWriteTVar trec=%p tvar=%p new_value=%p\n", trec, tvar, new_value);
772   ASSERT (trec != NO_TREC);
773   ASSERT (trec -> state == TREC_ACTIVE || 
774           trec -> state == TREC_MUST_ABORT ||
775           trec -> state == TREC_CANNOT_COMMIT);
776
777   lock_stm();
778   found = FALSE;
779
780   // Look for an existing entry in our trec or in an enclosing trec
781   et = trec;
782   while (et != NO_TREC) {
783     FOR_EACH_ENTRY(et, e, {
784       if (e -> tvar == tvar) {
785         found = TRUE;
786         entry = e;
787         BREAK_FOR_EACH;
788       }
789     });
790     if (found) break;
791     et = et -> enclosing_trec;
792   }
793
794   if (found && et == trec) {
795     // Entry found in our trec
796     entry -> new_value = new_value;
797   } else if (found) {
798     // Entry found in another trec
799     ne = get_new_entry(trec);
800     ne -> tvar = tvar;
801     ne -> expected_value = entry -> new_value;
802     ne -> new_value = new_value;
803   } else {
804     // No entry found
805     ne = get_new_entry(trec);
806     ne -> tvar = tvar;
807     ne -> expected_value = tvar -> current_value;
808     ne -> new_value = new_value;
809   }
810
811   unlock_stm();
812   TRACE("stmWriteTVar trec=%p done\n", trec);
813 }
814
815
816 /*......................................................................*/
817