1 /* -----------------------------------------------------------------------------
3 * (c) The GHC Team 1998-2004
7 * This implementation is designed for a many-threads, few-CPUs case. This leads
8 * to a number of design choices:
10 * - We use a simple design which does not aim to be lock-free -- SMP builds use
11 * a mutex to protect all the TVars and STM datastructures, non-SMP builds
12 * do not require any locking. The goal is to make fast-path uncontended
13 * operations fast because, with few CPUs, contention betwen operations on the
14 * STM interface is expected rarely.
16 * - Each thread is responsible for adding/removing itself to/from the queues
17 * associated with tvars. This reduces the work that is necessary when a
18 * large number of threads are waiting on a single tvar and where the update
19 * to that tvar is really only releasing a single thread.
21 * Ideas for future experimentation:
23 * - Read/write operations here involve a linear search of the trec. Consider
24 * adding a cache to map tvars to existing entries in the trec.
26 * - Consider whether to defer unparking more than one thread. On a uniprocessor
27 * the deferment could be made until a thread switch from the first thread
28 * released in the hope that it restores the location to a value on which
29 * other threads were waiting. That would avoid a stampede on e.g. multiple
30 * threads blocked reading from a single-cell shared buffer.
32 * - Consider whether to provide a link from a StgTVarWaitQueue to the TRecEntry
33 * associated with the waiter. This would allow unpark_waiters_on to be
34 * more selective and avoid unparking threads whose expected value for that
35 * tvar co-incides with the value now stored there. Does this happen often?
38 * ---------------------------------------------------------------------------*/
40 #include "PosixSource.h"
56 #define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
58 #define TRACE(_x...) /*Nothing*/
61 // If SHAKE is defined then validation will sometime spuriously fail. They helps test
62 // unusualy code paths if genuine contention is rare
65 static const int do_shake = TRUE;
67 static const int do_shake = FALSE;
69 static int shake_ctr = 0;
71 /*......................................................................*/
73 static int shake(void) {
75 if (((shake_ctr++) % 47) == 0) {
84 /*......................................................................*/
86 // Helper macros for iterating over entries within a transaction
89 #define FOR_EACH_ENTRY(_t,_x,CODE) do { \
90 StgTRecHeader *__t = (_t); \
91 StgTRecChunk *__c = __t -> current_chunk; \
92 StgWord __limit = __c -> next_entry_idx; \
93 TRACE("trec=%p chunk=%p limit=%ld\n", __t, __c, __limit); \
94 while (__c != END_STM_CHUNK_LIST) { \
96 for (__i = 0; __i < __limit; __i ++) { \
97 TRecEntry *_x = &(__c -> entries[__i]); \
98 do { CODE } while (0); \
100 __c = __c -> prev_chunk; \
101 __limit = TREC_CHUNK_NUM_ENTRIES; \
104 if (FALSE) goto exit_for_each; \
107 #define BREAK_FOR_EACH goto exit_for_each
109 /*......................................................................*/
111 // Private cache of must-be-unreachable trec headers and chunks
113 static StgTRecHeader *cached_trec_headers = NO_TREC;
114 static StgTRecChunk *cached_trec_chunks = END_STM_CHUNK_LIST;
115 static StgTVarWaitQueue *cached_tvar_wait_queues = END_STM_WAIT_QUEUE;
117 static void recycle_tvar_wait_queue(StgTVarWaitQueue *q STG_UNUSED) {
120 TRACE("Shake: not re-using wait queue %p\n", q);
124 q -> next_queue_entry = cached_tvar_wait_queues;
125 cached_tvar_wait_queues = q;
129 static void recycle_closures_from_trec (StgTRecHeader *t STG_UNUSED) {
132 TRACE("Shake: not re-using closures from %p\n", t);
136 t -> enclosing_trec = cached_trec_headers;
137 cached_trec_headers = t;
138 t -> enclosing_trec = NO_TREC;
140 while (t -> current_chunk != END_STM_CHUNK_LIST) {
141 StgTRecChunk *c = t -> current_chunk;
142 t -> current_chunk = c -> prev_chunk;
143 c -> prev_chunk = cached_trec_chunks;
144 cached_trec_chunks = c;
149 /*......................................................................*/
151 // Helper functions for managing internal STM state. This lock is only held
152 // for a 'short' time, in the sense that it is never held when any of the
153 // external functions returns.
155 static void lock_stm(void) {
159 static void unlock_stm(void) {
163 /*......................................................................*/
165 // Helper functions for thread blocking and unblocking
167 static void park_tso(StgTSO *tso) {
168 ASSERT(tso -> why_blocked == NotBlocked);
169 tso -> why_blocked = BlockedOnSTM;
170 tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
171 TRACE("park_tso on tso=%p\n", tso);
174 static void unpark_tso(StgTSO *tso) {
175 // We will continue unparking threads while they remain on one of the wait
176 // queues: it's up to the thread itself to remove it from the wait queues
177 // if it decides to do so when it is scheduled.
178 if (tso -> why_blocked == BlockedOnSTM) {
179 TRACE("unpark_tso on tso=%p\n", tso);
180 tso -> why_blocked = NotBlocked;
181 PUSH_ON_RUN_QUEUE(tso);
183 TRACE("spurious unpark_tso on tso=%p\n", tso);
187 static void unpark_waiters_on(StgTVar *s) {
189 TRACE("unpark_waiters_on tvar=%p\n", s);
190 for (q = s -> first_wait_queue_entry;
191 q != END_STM_WAIT_QUEUE;
192 q = q -> next_queue_entry) {
193 unpark_tso(q -> waiting_tso);
197 /*......................................................................*/
199 // Helper functions for allocation and initialization
201 static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgTSO *waiting_tso) {
202 StgTVarWaitQueue *result;
203 if (cached_tvar_wait_queues != END_STM_WAIT_QUEUE) {
204 result = cached_tvar_wait_queues;
205 cached_tvar_wait_queues = result -> next_queue_entry;
207 result = (StgTVarWaitQueue *)allocate(sizeofW(StgTVarWaitQueue));
208 SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
210 result -> waiting_tso = waiting_tso;
214 static StgTRecChunk *new_stg_trec_chunk(void) {
215 StgTRecChunk *result;
216 if (cached_trec_chunks != END_STM_CHUNK_LIST) {
217 result = cached_trec_chunks;
218 cached_trec_chunks = result -> prev_chunk;
220 result = (StgTRecChunk *)allocate(sizeofW(StgTRecChunk));
221 SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
223 result -> prev_chunk = END_STM_CHUNK_LIST;
224 result -> next_entry_idx = 0;
225 TRACE("prev from %p is %p\n", result, result -> prev_chunk);
229 static StgTRecHeader *new_stg_trec_header(StgTRecHeader *enclosing_trec) {
230 StgTRecHeader *result;
231 if (cached_trec_headers != NO_TREC) {
232 result = cached_trec_headers;
233 cached_trec_headers = result -> enclosing_trec;
235 result = (StgTRecHeader *) allocate(sizeofW(StgTRecHeader));
236 SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
238 result -> enclosing_trec = enclosing_trec;
239 result -> current_chunk = new_stg_trec_chunk();
241 if (enclosing_trec == NO_TREC) {
242 result -> state = TREC_ACTIVE;
244 ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
245 enclosing_trec -> state == TREC_MUST_ABORT ||
246 enclosing_trec -> state == TREC_CANNOT_COMMIT);
247 result -> state = enclosing_trec -> state;
250 TRACE("new_stg_trec_header creating %p nidx=%ld chunk=%p enclosing_trec=%p state=%d\n",
251 result, result->current_chunk->next_entry_idx, result -> current_chunk, enclosing_trec, result->state);
255 /*......................................................................*/
257 // Helper functions for managing waiting lists
259 static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) {
260 ASSERT(trec != NO_TREC);
261 ASSERT(trec -> enclosing_trec == NO_TREC);
262 ASSERT(trec -> state == TREC_ACTIVE || trec -> state == TREC_CANNOT_COMMIT);
263 FOR_EACH_ENTRY(trec, e, {
266 StgTVarWaitQueue *fq;
268 TRACE("Adding tso=%p to wait queue for tvar=%p\n", tso, s);
269 ASSERT(s -> current_value == e -> expected_value);
270 fq = s -> first_wait_queue_entry;
271 q = new_stg_tvar_wait_queue(tso);
272 q -> next_queue_entry = fq;
273 q -> prev_queue_entry = END_STM_WAIT_QUEUE;
274 if (fq != END_STM_WAIT_QUEUE) {
275 fq -> prev_queue_entry = q;
277 s -> first_wait_queue_entry = q;
278 e -> new_value = (StgClosure *) q;
282 static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) {
283 ASSERT(trec != NO_TREC);
284 ASSERT(trec -> enclosing_trec == NO_TREC);
285 ASSERT(trec -> state == TREC_WAITING ||
286 trec -> state == TREC_MUST_ABORT);
287 TRACE("stop_tsos_waiting in state=%d\n", trec -> state);
288 FOR_EACH_ENTRY(trec, e, {
290 StgTVarWaitQueue *pq;
291 StgTVarWaitQueue *nq;
294 q = (StgTVarWaitQueue *) (e -> new_value);
295 TRACE("Removing tso=%p from wait queue for tvar=%p\n", q -> waiting_tso, s);
296 nq = q -> next_queue_entry;
297 pq = q -> prev_queue_entry;
298 TRACE("pq=%p nq=%p q=%p\n", pq, nq, q);
299 if (nq != END_STM_WAIT_QUEUE) {
300 nq -> prev_queue_entry = pq;
302 if (pq != END_STM_WAIT_QUEUE) {
303 pq -> next_queue_entry = nq;
305 ASSERT (s -> first_wait_queue_entry == q);
306 s -> first_wait_queue_entry = nq;
308 recycle_tvar_wait_queue(q);
312 /*......................................................................*/
314 static TRecEntry *get_new_entry(StgTRecHeader *t) {
319 c = t -> current_chunk;
320 i = c -> next_entry_idx;
321 ASSERT(c != END_STM_CHUNK_LIST);
323 if (i < TREC_CHUNK_NUM_ENTRIES) {
324 // Continue to use current chunk
325 result = &(c -> entries[i]);
326 c -> next_entry_idx ++;
328 // Current chunk is full: allocate a fresh one
330 nc = new_stg_trec_chunk();
331 nc -> prev_chunk = c;
332 nc -> next_entry_idx = 1;
333 t -> current_chunk = nc;
334 result = &(nc -> entries[0]);
340 /*......................................................................*/
342 static void merge_update_into(StgTRecHeader *t,
344 StgClosure *expected_value,
345 StgClosure *new_value,
346 int merging_sibling) {
349 // Look for an entry in this trec
351 FOR_EACH_ENTRY(t, e, {
356 if (merging_sibling) {
357 if (e -> expected_value != expected_value) {
358 // Must abort if the two entries start from different values
359 TRACE("Siblings inconsistent at %p (%p vs %p)\n",
360 tvar, e -> expected_value, expected_value);
361 t -> state = TREC_MUST_ABORT;
362 } else if (e -> new_value != new_value) {
363 // Cannot commit if the two entries lead to different values (wait still OK)
364 TRACE("Siblings trying conflicting writes to %p (%p vs %p)\n",
365 tvar, e -> new_value, new_value);
366 t -> state = TREC_CANNOT_COMMIT;
369 // Otherwise merging child back into parent
370 ASSERT (e -> new_value == expected_value);
372 TRACE(" trec=%p exp=%p new=%p\n", t, e->expected_value, e->new_value);
373 e -> new_value = new_value;
379 // No entry so far in this trec
381 ne = get_new_entry(t);
383 ne -> expected_value = expected_value;
384 ne -> new_value = new_value;
388 /*......................................................................*/
390 static StgClosure *read_current_value_seen_from(StgTRecHeader *t,
393 StgClosure *result = NULL;
395 // Look for any relevent trec entries
397 while (t != NO_TREC) {
398 FOR_EACH_ENTRY(t, e, {
403 result = e -> new_value;
408 t = t -> enclosing_trec;
412 // Value not yet held in a trec
413 result = tvar -> current_value;
419 /*......................................................................*/
421 static int transaction_is_valid (StgTRecHeader *t) {
426 TRACE("Shake: pretending transaction trec=%p is invalid when it may not be\n", t);
430 et = t -> enclosing_trec;
431 ASSERT ((t -> state == TREC_ACTIVE) ||
432 (t -> state == TREC_WAITING) ||
433 (t -> state == TREC_MUST_ABORT) ||
434 (t -> state == TREC_CANNOT_COMMIT));
435 result = !((t -> state) == TREC_MUST_ABORT);
437 FOR_EACH_ENTRY(t, e, {
440 if (e -> expected_value != read_current_value_seen_from(et, s)) {
449 /************************************************************************/
452 * External functions below this point are repsonsible for:
454 * - acquiring/releasing the STM lock
456 * - all updates to the trec status field
457 * ASSERT(t != NO_TREC);
459 * By convention we increment entry_count when starting a new
460 * transaction and we decrement it at the point where we can discard
461 * the contents of the trec when exiting the outermost transaction.
462 * This means that stmWait and stmRewait decrement the count whenever
463 * they return FALSE (they do so exactly once for each transaction
464 * that doesn't remain blocked forever).
467 /************************************************************************/
469 void stmPreGCHook() {
470 TRACE("stmPreGCHook\n");
471 cached_trec_headers = NO_TREC;
472 cached_trec_chunks = END_STM_CHUNK_LIST;
473 cached_tvar_wait_queues = END_STM_WAIT_QUEUE;
476 /************************************************************************/
479 TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
483 /*......................................................................*/
485 StgTRecHeader *stmStartTransaction(StgTRecHeader *outer) {
487 TRACE("stmStartTransaction current-trec=%p\n", outer);
488 t = new_stg_trec_header(outer);
489 TRACE("stmStartTransaction new-trec=%p\n", t);
493 /*......................................................................*/
495 void stmAbortTransaction(StgTRecHeader *trec) {
496 TRACE("stmAbortTransaction trec=%p\n", trec);
497 ASSERT (trec != NO_TREC);
498 ASSERT ((trec -> state == TREC_ACTIVE) ||
499 (trec -> state == TREC_MUST_ABORT) ||
500 (trec -> state == TREC_WAITING) ||
501 (trec -> state == TREC_CANNOT_COMMIT));
502 if (trec -> state == TREC_WAITING) {
503 ASSERT (trec -> enclosing_trec == NO_TREC);
504 TRACE("stmAbortTransaction aborting waiting transaction\n");
505 stop_tsos_waiting_on_trec(trec);
507 trec -> state = TREC_ABORTED;
509 // Outcome now reflected by status field; no need for log
510 recycle_closures_from_trec(trec);
512 TRACE("stmAbortTransaction trec=%p done\n", trec);
515 /*......................................................................*/
517 void stmCondemnTransaction(StgTRecHeader *trec) {
518 TRACE("stmCondemnTransaction trec=%p\n", trec);
519 ASSERT (trec != NO_TREC);
520 ASSERT ((trec -> state == TREC_ACTIVE) ||
521 (trec -> state == TREC_MUST_ABORT) ||
522 (trec -> state == TREC_WAITING) ||
523 (trec -> state == TREC_CANNOT_COMMIT));
525 if (trec -> state == TREC_WAITING) {
526 ASSERT (trec -> enclosing_trec == NO_TREC);
527 TRACE("stmCondemnTransaction condemning waiting transaction\n");
528 stop_tsos_waiting_on_trec(trec);
531 trec -> state = TREC_MUST_ABORT;
533 TRACE("stmCondemnTransaction trec=%p done\n", trec);
536 /*......................................................................*/
538 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
539 StgTRecHeader *outer;
540 TRACE("stmGetEnclosingTRec trec=%p\n", trec);
541 outer = trec -> enclosing_trec;
542 TRACE("stmGetEnclosingTRec outer=%p\n", outer);
546 /*......................................................................*/
548 StgBool stmValidateTransaction(StgTRecHeader *trec) {
550 TRACE("stmValidateTransaction trec=%p\n", trec);
551 ASSERT(trec != NO_TREC);
552 ASSERT((trec -> state == TREC_ACTIVE) ||
553 (trec -> state == TREC_MUST_ABORT) ||
554 (trec -> state == TREC_CANNOT_COMMIT) ||
555 (trec -> state == TREC_WAITING));
558 result = transaction_is_valid(trec);
560 if (!result && trec -> state != TREC_WAITING) {
561 trec -> state = TREC_MUST_ABORT;
566 TRACE("stmValidateTransaction trec=%p result=%d\n", trec, result);
570 /*......................................................................*/
572 StgBool stmCommitTransaction(StgTRecHeader *trec) {
575 TRACE("stmCommitTransaction trec=%p trec->enclosing_trec=%p\n", trec, trec->enclosing_trec);
576 ASSERT (trec != NO_TREC);
577 ASSERT ((trec -> state == TREC_ACTIVE) ||
578 (trec -> state == TREC_MUST_ABORT) ||
579 (trec -> state == TREC_CANNOT_COMMIT));
582 result = transaction_is_valid(trec);
584 et = trec -> enclosing_trec;
585 if (trec -> state == TREC_CANNOT_COMMIT && et == NO_TREC) {
586 TRACE("Cannot commit trec=%p at top level\n", trec);
587 trec -> state = TREC_MUST_ABORT;
591 TRACE("Non-nesting commit, NO_TREC=%p\n", NO_TREC);
593 TRACE("Nested commit into %p, NO_TREC=%p\n", et, NO_TREC);
596 FOR_EACH_ENTRY(trec, e, {
600 s -> current_value = e -> new_value;
601 unpark_waiters_on(s);
603 merge_update_into(et, s, e -> expected_value, e -> new_value, FALSE);
608 if (trec->state == TREC_CANNOT_COMMIT && et -> state == TREC_ACTIVE) {
609 TRACE("Propagating TREC_CANNOT_COMMIT into %p\n", et);
610 et -> state = TREC_CANNOT_COMMIT;
615 // Outcome now reflected by status field; no need for log
616 recycle_closures_from_trec(trec);
620 TRACE("stmCommitTransaction trec=%p result=%d\n", trec, result);
625 /*......................................................................*/
627 StgBool stmMergeForWaiting(StgTRecHeader *trec, StgTRecHeader *other) {
629 TRACE("stmMergeForWaiting trec=%p (%d) other=%p (%d)\n", trec, trec -> state, other, other->state);
630 ASSERT(trec != NO_TREC);
631 ASSERT(other != NO_TREC);
632 ASSERT((trec -> state == TREC_ACTIVE) ||
633 (trec -> state == TREC_MUST_ABORT) ||
634 (trec -> state == TREC_CANNOT_COMMIT));
635 ASSERT((other -> state == TREC_ACTIVE) ||
636 (other -> state == TREC_MUST_ABORT) ||
637 (other -> state == TREC_CANNOT_COMMIT));
640 result = (transaction_is_valid(trec));
641 TRACE("stmMergeForWaiting initial result=%d\n", result);
643 result = transaction_is_valid(other);
644 TRACE("stmMergeForWaiting after both result=%d\n", result);
646 // Individually the two transactions may be valid. Now copy entries from
647 // "other" into "trec". This may cause "trec" to become invalid if it
648 // contains an update that conflicts with one from "other"
649 FOR_EACH_ENTRY(other, e, {
650 StgTVar *s = e -> tvar;
651 TRACE("Merging trec=%p exp=%p new=%p\n", other, e->expected_value, e->new_value);
652 merge_update_into(trec, s, e-> expected_value, e -> new_value, TRUE);
654 result = (trec -> state != TREC_MUST_ABORT);
659 trec -> state = TREC_MUST_ABORT;
664 TRACE("stmMergeForWaiting result=%d\n", result);
668 /*......................................................................*/
670 StgBool stmWait(StgTSO *tso, StgTRecHeader *trec) {
672 TRACE("stmWait tso=%p trec=%p\n", tso, trec);
673 ASSERT (trec != NO_TREC);
674 ASSERT (trec -> enclosing_trec == NO_TREC);
675 ASSERT ((trec -> state == TREC_ACTIVE) ||
676 (trec -> state == TREC_MUST_ABORT) ||
677 (trec -> state == TREC_CANNOT_COMMIT));
680 result = transaction_is_valid(trec);
682 // The transaction is valid so far so we can actually start waiting.
683 // (Otherwise the transaction was not valid and the thread will have to
685 start_tso_waiting_on_trec(tso, trec);
687 trec -> state = TREC_WAITING;
689 // Outcome now reflected by status field; no need for log
690 recycle_closures_from_trec(trec);
694 TRACE("stmWait trec=%p result=%d\n", trec, result);
698 /*......................................................................*/
700 StgBool stmReWait(StgTSO *tso) {
702 StgTRecHeader *trec = tso->trec;
704 TRACE("stmReWait trec=%p\n", trec);
705 ASSERT (trec != NO_TREC);
706 ASSERT (trec -> enclosing_trec == NO_TREC);
707 ASSERT ((trec -> state == TREC_WAITING) ||
708 (trec -> state == TREC_MUST_ABORT));
711 result = transaction_is_valid(trec);
712 TRACE("stmReWait trec=%p result=%d\n", trec, result);
714 // The transaction remains valid -- do nothing because it is already on
716 ASSERT (trec -> state == TREC_WAITING);
719 // The transcation has become invalid. We can now remove it from the wait
721 if (trec -> state != TREC_MUST_ABORT) {
722 stop_tsos_waiting_on_trec (trec);
724 // Outcome now reflected by status field; no need for log
725 recycle_closures_from_trec(trec);
731 TRACE("stmReWait trec=%p result=%d\n", trec, result);
735 /*......................................................................*/
737 StgClosure *stmReadTVar(StgTRecHeader *trec,
740 StgClosure *result = NULL; // Suppress unassignment warning
742 TRecEntry *ne = NULL;
744 TRACE("stmReadTVar trec=%p tvar=%p\n", trec, tvar);
745 ASSERT (trec != NO_TREC);
746 ASSERT (trec -> state == TREC_ACTIVE ||
747 trec -> state == TREC_MUST_ABORT ||
748 trec -> state == TREC_CANNOT_COMMIT);
753 // Look for an existing entry in our trec or in an enclosing trec
755 while (et != NO_TREC) {
756 FOR_EACH_ENTRY(et, e, {
757 TRACE("testing e=%p\n", e);
758 if (e -> tvar == tvar) {
760 result = e -> new_value;
765 et = et -> enclosing_trec;
768 if (found && et != trec) {
769 // Entry found in another trec
770 ASSERT (result != NULL);
771 TRACE("duplicating entry\n");
772 ne = get_new_entry(trec);
774 ne -> expected_value = result;
775 ne -> new_value = result;
778 ASSERT (result == NULL);
779 TRACE("need new entry\n");
780 ne = get_new_entry(trec);
781 TRACE("got ne=%p\n", ne);
782 result = tvar -> current_value;
784 ne -> expected_value = result;
785 ne -> new_value = result;
789 ASSERT (result != NULL);
790 TRACE("stmReadTVar trec=%p result=%p\n", trec, result);
795 /*......................................................................*/
797 void stmWriteTVar(StgTRecHeader *trec,
799 StgClosure *new_value) {
802 TRecEntry *entry = NULL;
804 TRACE("stmWriteTVar trec=%p tvar=%p new_value=%p\n", trec, tvar, new_value);
805 ASSERT (trec != NO_TREC);
806 ASSERT (trec -> state == TREC_ACTIVE ||
807 trec -> state == TREC_MUST_ABORT ||
808 trec -> state == TREC_CANNOT_COMMIT);
813 // Look for an existing entry in our trec or in an enclosing trec
815 while (et != NO_TREC) {
816 FOR_EACH_ENTRY(et, e, {
817 if (e -> tvar == tvar) {
824 et = et -> enclosing_trec;
827 if (found && et == trec) {
828 // Entry found in our trec
829 entry -> new_value = new_value;
831 // Entry found in another trec
832 ne = get_new_entry(trec);
834 ne -> expected_value = entry -> new_value;
835 ne -> new_value = new_value;
838 ne = get_new_entry(trec);
840 ne -> expected_value = tvar -> current_value;
841 ne -> new_value = new_value;
845 TRACE("stmWriteTVar trec=%p done\n", trec);
849 /*......................................................................*/