[project @ 2005-03-02 11:06:58 by simonmar]
[ghc-hetmet.git] / ghc / rts / STM.c
1 /* -----------------------------------------------------------------------------
2  *
3  * (c) The GHC Team 1998-2004
4  * 
5  * STM implementation.
6  *
7  * This implementation is designed for a many-threads, few-CPUs case.  This leads
8  * to a number of design choices:
9  *
10  *  - We use a simple design which does not aim to be lock-free -- SMP builds use
11  *    a mutex to protect all the TVars and STM datastructures, non-SMP builds 
12  *    do not require any locking.  The goal is to make fast-path uncontended 
13  *    operations fast because, with few CPUs, contention betwen operations on the 
14  *    STM interface is expected rarely.
15  *
16  *  - Each thread is responsible for adding/removing itself to/from the queues
17  *    associated with tvars.  This reduces the work that is necessary when a
18  *    large number of threads are waiting on a single tvar and where the update
19  *    to that tvar is really only releasing a single thread.
20  *
21  * Ideas for future experimentation:
22  *
23  *  - Read/write operations here involve a linear search of the trec.  Consider
24  *    adding a cache to map tvars to existing entries in the trec.
25  *
26  *  - Consider whether to defer unparking more than one thread.  On a uniprocessor
27  *    the deferment could be made until a thread switch from the first thread
28  *    released in the hope that it restores the location to a value on which
29  *    other threads were waiting.  That would avoid a stampede on e.g. multiple
30  *    threads blocked reading from a single-cell shared buffer.
31  *
32  *  - Consider whether to provide a link from a StgTVarWaitQueue to the TRecEntry
33  *    associated with the waiter.  This would allow unpark_waiters_on to be
34  *    more selective and avoid unparking threads whose expected value for that 
35  *    tvar co-incides with the value now stored there.  Does this happen often?
36  *    
37  *
38  * ---------------------------------------------------------------------------*/
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "RtsFlags.h"
43 #include "RtsUtils.h"
44 #include "Schedule.h"
45 #include "STM.h"
46 #include "Storage.h"
47
48 #include <stdlib.h>
49 #include <stdio.h>
50
51 #define FALSE 0
52 #define TRUE  1
53
54 #if defined(DEBUG)
55 #define SHAKE
56 #define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
57 #else
58 #define TRACE(_x...) /*Nothing*/
59 #endif
60
61 // If SHAKE is defined then validation will sometime spuriously fail.  They helps test
62 // unusualy code paths if genuine contention is rare
63
64 #ifdef SHAKE
65 static const int do_shake = TRUE;
66 #else
67 static const int do_shake = FALSE;
68 #endif
69 static int shake_ctr = 0;
70
71 /*......................................................................*/
72
73 static int shake(void) {
74   if (do_shake) {
75     if (((shake_ctr++) % 47) == 0) {
76       return TRUE;
77     } 
78     return FALSE;
79   } else {
80     return FALSE;
81   }
82 }
83
84 /*......................................................................*/
85
86 // Helper macros for iterating over entries within a transaction
87 // record
88
89 #define FOR_EACH_ENTRY(_t,_x,CODE) do {         \
90   StgTRecHeader *__t = (_t);                    \
91   StgTRecChunk *__c = __t -> current_chunk;             \
92   StgWord __limit = __c -> next_entry_idx;              \
93   TRACE("trec=%p chunk=%p limit=%d\n", __t, __c, __limit); \
94   while (__c != END_STM_CHUNK_LIST) {           \
95     StgWord __i;                                  \
96     for (__i = 0; __i < __limit; __i ++) {              \
97       TRecEntry *_x = &(__c -> entries[__i]);   \
98       do { CODE } while (0);                    \
99     }                                           \
100     __c = __c -> prev_chunk;                    \
101     __limit = TREC_CHUNK_NUM_ENTRIES;           \
102   }                                             \
103  exit_for_each:                                 \
104   if (FALSE) goto exit_for_each;                \
105 } while (0)
106
107 #define BREAK_FOR_EACH goto exit_for_each
108      
109 /*......................................................................*/
110
111 // Private cache of must-be-unreachable trec headers and chunks
112
113 static StgTRecHeader *cached_trec_headers = NO_TREC;
114 static StgTRecChunk *cached_trec_chunks = END_STM_CHUNK_LIST;
115 static StgTVarWaitQueue *cached_tvar_wait_queues = END_STM_WAIT_QUEUE;
116
117 static void recycle_tvar_wait_queue(StgTVarWaitQueue *q STG_UNUSED) {
118 #if 0
119   if (shake()) {
120     TRACE("Shake: not re-using wait queue %p\n", q);
121     return;
122   }
123
124   q -> next_queue_entry = cached_tvar_wait_queues;
125   cached_tvar_wait_queues = q;
126 #endif
127 }
128
129 static void recycle_closures_from_trec (StgTRecHeader *t STG_UNUSED) {
130 #if 0
131   if (shake()) {
132     TRACE("Shake: not re-using closures from %p\n", t);
133     return;
134   }
135
136   t -> enclosing_trec = cached_trec_headers;
137   cached_trec_headers = t;
138   t -> enclosing_trec = NO_TREC;
139
140   while (t -> current_chunk != END_STM_CHUNK_LIST) {
141     StgTRecChunk *c = t -> current_chunk;
142     t -> current_chunk = c -> prev_chunk;
143     c -> prev_chunk = cached_trec_chunks;
144     cached_trec_chunks = c;
145   }
146 #endif
147 }
148
149 /*......................................................................*/
150
151 // Helper functions for managing internal STM state.  This lock is only held
152 // for a 'short' time, in the sense that it is never held when any of the 
153 // external functions returns.
154
155 static void lock_stm(void) {
156   // Nothing
157 }
158
159 static void unlock_stm(void) {
160   // Nothing
161 }
162
163 /*......................................................................*/
164
165 // Helper functions for thread blocking and unblocking
166
167 static void park_tso(StgTSO *tso) {
168   ASSERT(tso -> why_blocked == NotBlocked);
169   tso -> why_blocked = BlockedOnSTM;
170   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
171   TRACE("park_tso on tso=%p\n", tso);
172 }
173
174 static void unpark_tso(StgTSO *tso) {
175   // We will continue unparking threads while they remain on one of the wait
176   // queues: it's up to the thread itself to remove it from the wait queues
177   // if it decides to do so when it is scheduled.
178   if (tso -> why_blocked == BlockedOnSTM) {
179     TRACE("unpark_tso on tso=%p\n", tso);
180     tso -> why_blocked = NotBlocked;
181     PUSH_ON_RUN_QUEUE(tso);
182   } else {
183     TRACE("spurious unpark_tso on tso=%p\n", tso);
184   }
185 }
186
187 static void unpark_waiters_on(StgTVar *s) {
188   StgTVarWaitQueue *q;
189   TRACE("unpark_waiters_on tvar=%p\n", s);
190   for (q = s -> first_wait_queue_entry; 
191        q != END_STM_WAIT_QUEUE; 
192        q = q -> next_queue_entry) {
193     unpark_tso(q -> waiting_tso);
194   }
195 }
196
197 /*......................................................................*/
198
199 // Helper functions for allocation and initialization
200
201 static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgTSO *waiting_tso) {
202   StgTVarWaitQueue *result;
203   if (cached_tvar_wait_queues != END_STM_WAIT_QUEUE) {
204     result = cached_tvar_wait_queues;
205     cached_tvar_wait_queues = result -> next_queue_entry;
206   } else {
207     result = (StgTVarWaitQueue *)allocate(sizeofW(StgTVarWaitQueue));
208     SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
209   }
210   result -> waiting_tso = waiting_tso;
211   return result;
212 }
213
214 static StgTRecChunk *new_stg_trec_chunk(void) {
215   StgTRecChunk *result;
216   if (cached_trec_chunks != END_STM_CHUNK_LIST) {
217     result = cached_trec_chunks;
218     cached_trec_chunks = result -> prev_chunk;
219   } else {
220     result = (StgTRecChunk *)allocate(sizeofW(StgTRecChunk));
221     SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
222   }
223   result -> prev_chunk = END_STM_CHUNK_LIST;
224   result -> next_entry_idx = 0;
225   TRACE("prev from %p is %p\n", result, result -> prev_chunk);
226   return result;
227 }
228
229 static StgTRecHeader *new_stg_trec_header(StgTRecHeader *enclosing_trec) {
230   StgTRecHeader *result;
231   if (cached_trec_headers != NO_TREC) {
232     result = cached_trec_headers;
233     cached_trec_headers = result -> enclosing_trec;
234   } else {
235     result = (StgTRecHeader *) allocate(sizeofW(StgTRecHeader));
236     SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
237   }
238   result -> enclosing_trec = enclosing_trec;
239   result -> current_chunk = new_stg_trec_chunk();
240
241   if (enclosing_trec == NO_TREC) {
242     result -> state = TREC_ACTIVE;
243   } else {
244     ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
245            enclosing_trec -> state == TREC_MUST_ABORT ||
246            enclosing_trec -> state == TREC_CANNOT_COMMIT);
247     result -> state = enclosing_trec -> state;
248   }
249
250   TRACE("new_stg_trec_header creating %p nidx=%d chunk=%p enclosing_trec=%p state=%d\n",
251         result, result->current_chunk->next_entry_idx, result -> current_chunk, enclosing_trec, result->state);
252   return result;  
253 }
254
255 /*......................................................................*/
256
257 // Helper functions for managing waiting lists
258
259 static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) {
260   ASSERT(trec != NO_TREC);
261   ASSERT(trec -> enclosing_trec == NO_TREC);
262   ASSERT(trec -> state == TREC_ACTIVE || trec -> state == TREC_CANNOT_COMMIT);
263   FOR_EACH_ENTRY(trec, e, {
264     StgTVar *s;
265     StgTVarWaitQueue *q;
266     StgTVarWaitQueue *fq;
267     s = e -> tvar;
268     TRACE("Adding tso=%p to wait queue for tvar=%p\n", tso, s);
269     ASSERT(s -> current_value == e -> expected_value);
270     fq = s -> first_wait_queue_entry;
271     q = new_stg_tvar_wait_queue(tso);
272     q -> next_queue_entry = fq;
273     q -> prev_queue_entry = END_STM_WAIT_QUEUE;
274     if (fq != END_STM_WAIT_QUEUE) {
275       fq -> prev_queue_entry = q;
276     }
277     s -> first_wait_queue_entry = q;
278     e -> new_value = (StgClosure *) q;
279   });
280 }
281
282 static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) {
283   ASSERT(trec != NO_TREC);
284   ASSERT(trec -> enclosing_trec == NO_TREC);
285   ASSERT(trec -> state == TREC_WAITING ||
286          trec -> state == TREC_MUST_ABORT);
287   TRACE("stop_tsos_waiting in state=%d\n", trec -> state);
288   FOR_EACH_ENTRY(trec, e, {
289     StgTVar *s;
290     StgTVarWaitQueue *pq;
291     StgTVarWaitQueue *nq;
292     StgTVarWaitQueue *q;
293     s = e -> tvar;
294     q = (StgTVarWaitQueue *) (e -> new_value);
295     TRACE("Removing tso=%p from wait queue for tvar=%p\n", q -> waiting_tso, s);
296     nq = q -> next_queue_entry;
297     pq = q -> prev_queue_entry;
298     TRACE("pq=%p nq=%p q=%p\n", pq, nq, q);
299     if (nq != END_STM_WAIT_QUEUE) {
300       nq -> prev_queue_entry = pq;
301     }
302     if (pq != END_STM_WAIT_QUEUE) {
303       pq -> next_queue_entry = nq;
304     } else {
305       ASSERT (s -> first_wait_queue_entry == q);
306       s -> first_wait_queue_entry = nq;
307     }
308     recycle_tvar_wait_queue(q);
309   });
310 }
311  
312 /*......................................................................*/
313  
314 static TRecEntry *get_new_entry(StgTRecHeader *t) {
315   TRecEntry *result;
316   StgTRecChunk *c;
317   int i;
318
319   c = t -> current_chunk;
320   i = c -> next_entry_idx;
321   ASSERT(c != END_STM_CHUNK_LIST);
322
323   if (i < TREC_CHUNK_NUM_ENTRIES) {
324     // Continue to use current chunk
325     result = &(c -> entries[i]);
326     c -> next_entry_idx ++;
327   } else {
328     // Current chunk is full: allocate a fresh one
329     StgTRecChunk *nc;
330     nc = new_stg_trec_chunk();
331     nc -> prev_chunk = c;
332     nc -> next_entry_idx = 1;
333     t -> current_chunk = nc;
334     result = &(nc -> entries[0]);
335   }
336
337   return result;
338 }
339
340 /*......................................................................*/
341
342 static void merge_update_into(StgTRecHeader *t,
343                               StgTVar *tvar,
344                               StgClosure *expected_value,
345                               StgClosure *new_value,
346                               int merging_sibling) {
347   int found;
348   
349   // Look for an entry in this trec
350   found = FALSE;
351   FOR_EACH_ENTRY(t, e, {
352     StgTVar *s;
353     s = e -> tvar;
354     if (s == tvar) {
355       found = TRUE;
356       if (merging_sibling) {
357         if (e -> expected_value != expected_value) {
358           // Must abort if the two entries start from different values
359           TRACE("Siblings inconsistent at %p (%p vs %p)\n", 
360                 tvar, e -> expected_value, expected_value);
361           t -> state = TREC_MUST_ABORT;
362         } else if (e -> new_value != new_value) {
363           // Cannot commit if the two entries lead to different values (wait still OK)
364           TRACE("Siblings trying conflicting writes to %p (%p vs %p)\n", 
365                 tvar, e -> new_value, new_value);
366           t -> state = TREC_CANNOT_COMMIT;
367         }
368       } else {
369         // Otherwise merging child back into parent
370         ASSERT (e -> new_value == expected_value);
371       }
372       TRACE("        trec=%p exp=%p new=%p\n", t, e->expected_value, e->new_value);
373       e -> new_value = new_value;
374       BREAK_FOR_EACH;
375     }
376   });
377
378   if (!found) {
379     // No entry so far in this trec
380     TRecEntry *ne;
381     ne = get_new_entry(t);
382     ne -> tvar = tvar;
383     ne -> expected_value = expected_value;
384     ne -> new_value = new_value;
385   }
386 }
387
388 /*......................................................................*/
389
390 static StgClosure *read_current_value_seen_from(StgTRecHeader *t,
391                                                 StgTVar *tvar) {
392   int found;
393   StgClosure *result = NULL;
394
395   // Look for any relevent trec entries
396   found = FALSE;
397   while (t != NO_TREC) {
398     FOR_EACH_ENTRY(t, e, {
399       StgTVar *s;
400       s = e -> tvar;
401       if (s == tvar) {
402         found = TRUE;
403         result = e -> new_value;
404         BREAK_FOR_EACH;
405       }
406     });
407     if (found) break;
408     t = t -> enclosing_trec;
409   }
410
411   if (!found) {
412     // Value not yet held in a trec
413     result = tvar -> current_value;
414   }
415
416   return result;
417 }
418  
419 /*......................................................................*/
420
421 static int transaction_is_valid (StgTRecHeader *t) {
422   StgTRecHeader *et;
423   int result;
424
425   if (shake()) {
426     TRACE("Shake: pretending transaction trec=%p is invalid when it may not be\n", t);
427     return FALSE;
428   }
429
430   et = t -> enclosing_trec;
431   ASSERT ((t -> state == TREC_ACTIVE) || 
432           (t -> state == TREC_WAITING) ||
433           (t -> state == TREC_MUST_ABORT) ||
434           (t -> state == TREC_CANNOT_COMMIT));
435   result = !((t -> state) == TREC_MUST_ABORT);
436   if (result) {
437     FOR_EACH_ENTRY(t, e, {
438       StgTVar *s;
439       s = e -> tvar;
440       if (e -> expected_value != read_current_value_seen_from(et, s)) {
441         result = FALSE;
442         BREAK_FOR_EACH;
443       }
444     });
445   }
446   return result;
447 }
448
449 /************************************************************************/
450
451 /* 
452  * External functions below this point are repsonsible for:
453  *
454  * - acquiring/releasing the STM lock 
455  *
456  * - all updates to the trec status field
457  *  ASSERT(t != NO_TREC);
458
459  * By convention we increment entry_count when starting a new
460  * transaction and we decrement it at the point where we can discard
461  * the contents of the trec when exiting the outermost transaction.
462  * This means that stmWait and stmRewait decrement the count whenever
463  * they return FALSE (they do so exactly once for each transaction
464  * that doesn't remain blocked forever).
465  */
466
467 /************************************************************************/
468
469 void stmPreGCHook() {
470   TRACE("stmPreGCHook\n");
471   cached_trec_headers = NO_TREC;
472   cached_trec_chunks = END_STM_CHUNK_LIST;
473   cached_tvar_wait_queues = END_STM_WAIT_QUEUE;
474 }
475
476 /************************************************************************/
477
478 void initSTM() {
479   TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
480   /* Nothing */
481 }
482
483 /*......................................................................*/
484
485 StgTRecHeader *stmStartTransaction(StgTRecHeader *outer) {
486   StgTRecHeader *t;
487   TRACE("stmStartTransaction current-trec=%p\n", outer);
488   t = new_stg_trec_header(outer);
489   TRACE("stmStartTransaction new-trec=%p\n", t);
490   return t;
491 }
492
493 /*......................................................................*/
494
495 void stmAbortTransaction(StgTRecHeader *trec) {
496   TRACE("stmAbortTransaction trec=%p\n", trec);
497   ASSERT (trec != NO_TREC);
498   ASSERT ((trec -> state == TREC_ACTIVE) || 
499           (trec -> state == TREC_MUST_ABORT) ||
500           (trec -> state == TREC_WAITING) ||
501           (trec -> state == TREC_CANNOT_COMMIT));
502   if (trec -> state == TREC_WAITING) {
503     ASSERT (trec -> enclosing_trec == NO_TREC);
504     TRACE("stmAbortTransaction aborting waiting transaction\n");
505     stop_tsos_waiting_on_trec(trec);
506   } 
507   trec -> state = TREC_ABORTED;
508
509   // Outcome now reflected by status field; no need for log
510   recycle_closures_from_trec(trec);
511
512   TRACE("stmAbortTransaction trec=%p done\n", trec);
513 }
514
515 /*......................................................................*/
516
517 void stmCondemnTransaction(StgTRecHeader *trec) {
518   TRACE("stmCondemnTransaction trec=%p\n", trec);
519   ASSERT (trec != NO_TREC);
520   ASSERT ((trec -> state == TREC_ACTIVE) || 
521           (trec -> state == TREC_MUST_ABORT) ||
522           (trec -> state == TREC_WAITING) ||
523           (trec -> state == TREC_CANNOT_COMMIT));
524
525   if (trec -> state == TREC_WAITING) {
526     ASSERT (trec -> enclosing_trec == NO_TREC);
527     TRACE("stmCondemnTransaction condemning waiting transaction\n");
528     stop_tsos_waiting_on_trec(trec);
529   } 
530
531   trec -> state = TREC_MUST_ABORT;
532
533   TRACE("stmCondemnTransaction trec=%p done\n", trec);
534 }
535
536 /*......................................................................*/
537
538 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
539   StgTRecHeader *outer;
540   TRACE("stmGetEnclosingTRec trec=%p\n", trec);
541   outer = trec -> enclosing_trec;
542   TRACE("stmGetEnclosingTRec outer=%p\n", outer);
543   return outer;
544 }
545
546 /*......................................................................*/
547
548 StgBool stmValidateTransaction(StgTRecHeader *trec) {
549   int result;
550   TRACE("stmValidateTransaction trec=%p\n", trec);
551   ASSERT(trec != NO_TREC);
552   ASSERT((trec -> state == TREC_ACTIVE) || 
553          (trec -> state == TREC_MUST_ABORT) ||
554          (trec -> state == TREC_CANNOT_COMMIT) ||
555          (trec -> state == TREC_WAITING));
556
557   lock_stm();
558   result = transaction_is_valid(trec);
559
560   if (!result && trec -> state != TREC_WAITING) {
561     trec -> state = TREC_MUST_ABORT; 
562   }
563
564   unlock_stm();
565
566   TRACE("stmValidateTransaction trec=%p result=%d\n", trec, result);
567   return result;
568 }
569
570 /*......................................................................*/
571
572 StgBool stmCommitTransaction(StgTRecHeader *trec) {
573   StgTRecHeader *et;
574   int result;
575   TRACE("stmCommitTransaction trec=%p trec->enclosing_trec=%p\n", trec, trec->enclosing_trec);
576   ASSERT (trec != NO_TREC);
577   ASSERT ((trec -> state == TREC_ACTIVE) || 
578           (trec -> state == TREC_MUST_ABORT) ||
579           (trec -> state == TREC_CANNOT_COMMIT));
580
581   lock_stm();
582   result = transaction_is_valid(trec);
583   if (result) {
584     et = trec -> enclosing_trec;
585     if (trec -> state == TREC_CANNOT_COMMIT && et == NO_TREC) {
586       TRACE("Cannot commit trec=%p at top level\n", trec);
587       trec -> state = TREC_MUST_ABORT;
588       result = FALSE;
589     } else {
590       if (et == NO_TREC) {
591         TRACE("Non-nesting commit, NO_TREC=%p\n", NO_TREC);
592       } else {
593         TRACE("Nested commit into %p, NO_TREC=%p\n", et, NO_TREC);
594       }
595       
596       FOR_EACH_ENTRY(trec, e, {
597         StgTVar *s;
598         s = e -> tvar;
599         if (et == NO_TREC) {
600           s -> current_value = e -> new_value;
601           unpark_waiters_on(s);
602         } else {
603           merge_update_into(et, s, e -> expected_value, e -> new_value, FALSE);
604         }
605       });
606
607
608       if (trec->state == TREC_CANNOT_COMMIT && et -> state == TREC_ACTIVE) {
609         TRACE("Propagating TREC_CANNOT_COMMIT into %p\n", et);
610         et -> state = TREC_CANNOT_COMMIT;
611       }
612     }
613   } 
614
615   // Outcome now reflected by status field; no need for log
616   recycle_closures_from_trec(trec);
617   
618   unlock_stm();
619
620   TRACE("stmCommitTransaction trec=%p result=%d\n", trec, result);
621
622   return result;
623 }
624
625 /*......................................................................*/
626
627 StgBool stmMergeForWaiting(StgTRecHeader *trec, StgTRecHeader *other) {
628   int result;
629   TRACE("stmMergeForWaiting trec=%p (%d) other=%p (%d)\n", trec, trec -> state, other, other->state);
630   ASSERT(trec != NO_TREC);
631   ASSERT(other != NO_TREC);
632   ASSERT((trec -> state == TREC_ACTIVE) || 
633          (trec -> state == TREC_MUST_ABORT) ||
634          (trec -> state == TREC_CANNOT_COMMIT));
635   ASSERT((other -> state == TREC_ACTIVE) || 
636          (other -> state == TREC_MUST_ABORT) ||
637          (other -> state == TREC_CANNOT_COMMIT));
638
639   lock_stm();
640   result = (transaction_is_valid(trec));
641   TRACE("stmMergeForWaiting initial result=%d\n", result);
642   if (result) {
643     result = transaction_is_valid(other);
644     TRACE("stmMergeForWaiting after both result=%d\n", result);
645     if (result) {
646       // Individually the two transactions may be valid.  Now copy entries from
647       // "other" into "trec".  This may cause "trec" to become invalid if it
648       // contains an update that conflicts with one from "other"
649       FOR_EACH_ENTRY(other, e, {
650         StgTVar *s = e -> tvar;
651         TRACE("Merging trec=%p exp=%p new=%p\n", other, e->expected_value, e->new_value);
652         merge_update_into(trec, s, e-> expected_value, e -> new_value, TRUE);
653       });
654       result = (trec -> state != TREC_MUST_ABORT);
655     } 
656   }
657
658   if (!result) {
659     trec -> state = TREC_MUST_ABORT;
660   }
661
662   unlock_stm();
663
664   TRACE("stmMergeForWaiting result=%d\n", result);
665   return result;
666 }
667
668 /*......................................................................*/
669
670 StgBool stmWait(StgTSO *tso, StgTRecHeader *trec) {
671   int result;
672   TRACE("stmWait tso=%p trec=%p\n", tso, trec);
673   ASSERT (trec != NO_TREC);
674   ASSERT (trec -> enclosing_trec == NO_TREC);
675   ASSERT ((trec -> state == TREC_ACTIVE) || 
676           (trec -> state == TREC_MUST_ABORT) ||
677           (trec -> state == TREC_CANNOT_COMMIT));
678
679   lock_stm();
680   result = transaction_is_valid(trec);
681   if (result) {
682     // The transaction is valid so far so we can actually start waiting.
683     // (Otherwise the transaction was not valid and the thread will have to
684     // retry it).
685     start_tso_waiting_on_trec(tso, trec);
686     park_tso(tso);
687     trec -> state = TREC_WAITING;
688   }  else {
689     // Outcome now reflected by status field; no need for log
690     recycle_closures_from_trec(trec);
691   }
692   unlock_stm();
693
694   TRACE("stmWait trec=%p result=%d\n", trec, result);
695   return result;
696 }
697
698 /*......................................................................*/
699
700 StgBool stmReWait(StgTSO *tso) {
701   int result;
702   StgTRecHeader *trec = tso->trec;
703
704   TRACE("stmReWait trec=%p\n", trec);
705   ASSERT (trec != NO_TREC);
706   ASSERT (trec -> enclosing_trec == NO_TREC);
707   ASSERT ((trec -> state == TREC_WAITING) || 
708           (trec -> state == TREC_MUST_ABORT));
709
710   lock_stm();
711   result = transaction_is_valid(trec);
712   TRACE("stmReWait trec=%p result=%d\n", trec, result);
713   if (result) {
714     // The transaction remains valid -- do nothing because it is already on
715     // the wait queues
716     ASSERT (trec -> state == TREC_WAITING);
717     park_tso(tso);
718   } else {
719     // The transcation has become invalid.  We can now remove it from the wait
720     // queues.
721     if (trec -> state != TREC_MUST_ABORT) {
722           stop_tsos_waiting_on_trec (trec);
723
724           // Outcome now reflected by status field; no need for log
725           recycle_closures_from_trec(trec);
726     }
727
728   }
729   unlock_stm();
730
731   TRACE("stmReWait trec=%p result=%d\n", trec, result);
732   return result;
733 }
734
735 /*......................................................................*/
736
737 StgClosure *stmReadTVar(StgTRecHeader *trec, 
738                         StgTVar *tvar) {
739   StgTRecHeader *et;
740   StgClosure *result = NULL; // Suppress unassignment warning
741   int found = FALSE;
742   TRecEntry *ne = NULL;
743
744   TRACE("stmReadTVar trec=%p tvar=%p\n", trec, tvar);
745   ASSERT (trec != NO_TREC);
746   ASSERT (trec -> state == TREC_ACTIVE || 
747           trec -> state == TREC_MUST_ABORT ||
748           trec -> state == TREC_CANNOT_COMMIT);
749
750   lock_stm();
751   found = FALSE;
752
753   // Look for an existing entry in our trec or in an enclosing trec
754   et = trec;
755   while (et != NO_TREC) {
756     FOR_EACH_ENTRY(et, e, {
757       TRACE("testing e=%p\n", e);
758       if (e -> tvar == tvar) {
759         found = TRUE;
760         result = e -> new_value;
761         BREAK_FOR_EACH;
762       }
763     });
764     if (found) break;
765     et = et -> enclosing_trec;
766   }
767
768   if (found && et != trec) {
769     // Entry found in another trec
770     ASSERT (result != NULL);
771     TRACE("duplicating entry\n");
772     ne = get_new_entry(trec);
773     ne -> tvar = tvar;
774     ne -> expected_value = result;
775     ne -> new_value = result;
776   } else if (!found) {
777     // No entry found
778     ASSERT (result == NULL);
779     TRACE("need new entry\n");
780     ne = get_new_entry(trec);
781     TRACE("got ne=%p\n", ne);
782     result = tvar -> current_value;
783     ne -> tvar = tvar;
784     ne -> expected_value = result;
785     ne -> new_value = result;
786   }
787
788   unlock_stm();
789   ASSERT (result != NULL);
790   TRACE("stmReadTVar trec=%p result=%p\n", trec, result);
791
792   return result;
793 }
794
795 /*......................................................................*/
796
797 void stmWriteTVar(StgTRecHeader *trec,
798                   StgTVar *tvar, 
799                   StgClosure *new_value) {
800   StgTRecHeader *et;
801   TRecEntry *ne;
802   TRecEntry *entry = NULL;
803   int found;
804   TRACE("stmWriteTVar trec=%p tvar=%p new_value=%p\n", trec, tvar, new_value);
805   ASSERT (trec != NO_TREC);
806   ASSERT (trec -> state == TREC_ACTIVE || 
807           trec -> state == TREC_MUST_ABORT ||
808           trec -> state == TREC_CANNOT_COMMIT);
809
810   lock_stm();
811   found = FALSE;
812
813   // Look for an existing entry in our trec or in an enclosing trec
814   et = trec;
815   while (et != NO_TREC) {
816     FOR_EACH_ENTRY(et, e, {
817       if (e -> tvar == tvar) {
818         found = TRUE;
819         entry = e;
820         BREAK_FOR_EACH;
821       }
822     });
823     if (found) break;
824     et = et -> enclosing_trec;
825   }
826
827   if (found && et == trec) {
828     // Entry found in our trec
829     entry -> new_value = new_value;
830   } else if (found) {
831     // Entry found in another trec
832     ne = get_new_entry(trec);
833     ne -> tvar = tvar;
834     ne -> expected_value = entry -> new_value;
835     ne -> new_value = new_value;
836   } else {
837     // No entry found
838     ne = get_new_entry(trec);
839     ne -> tvar = tvar;
840     ne -> expected_value = tvar -> current_value;
841     ne -> new_value = new_value;
842   }
843
844   unlock_stm();
845   TRACE("stmWriteTVar trec=%p done\n", trec);
846 }
847
848
849 /*......................................................................*/
850