1 /* -----------------------------------------------------------------------------
3 * (c) The GHC Team 1998-2004
7 * This implementation is designed for a many-threads, few-CPUs case. This leads
8 * to a number of design choices:
10 * - We use a simple design which does not aim to be lock-free -- SMP builds use
11 * a mutex to protect all the TVars and STM datastructures, non-SMP builds
12 * do not require any locking. The goal is to make fast-path uncontended
13 * operations fast because, with few CPUs, contention betwen operations on the
14 * STM interface is expected rarely.
16 * - Each thread is responsible for adding/removing itself to/from the queues
17 * associated with tvars. This reduces the work that is necessary when a
18 * large number of threads are waiting on a single tvar and where the update
19 * to that tvar is really only releasing a single thread.
21 * Ideas for future experimentation:
23 * - Read/write operations here involve a linear search of the trec. Consider
24 * adding a cache to map tvars to existing entries in the trec.
26 * - Consider whether to defer unparking more than one thread. On a uniprocessor
27 * the deferment could be made until a thread switch from the first thread
28 * released in the hope that it restores the location to a value on which
29 * other threads were waiting. That would avoid a stampede on e.g. multiple
30 * threads blocked reading from a single-cell shared buffer.
32 * - Consider whether to provide a link from a StgTVarWaitQueue to the TRecEntry
33 * associated with the waiter. This would allow unpark_waiters_on to be
34 * more selective and avoid unparking threads whose expected value for that
35 * tvar co-incides with the value now stored there. Does this happen often?
38 * ---------------------------------------------------------------------------*/
40 #include "PosixSource.h"
56 #define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
58 #define TRACE(_x...) /*Nothing*/
61 // If SHAKE is defined then validation will sometime spuriously fail. They helps test
62 // unusualy code paths if genuine contention is rare
65 static const int do_shake = TRUE;
67 static const int do_shake = FALSE;
69 static int shake_ctr = 0;
71 /*......................................................................*/
73 static int shake(void) {
75 if (((shake_ctr++) % 47) == 0) {
84 /*......................................................................*/
86 // Helper macros for iterating over entries within a transaction
89 #define FOR_EACH_ENTRY(_t,_x,CODE) do { \
90 StgTRecHeader *__t = (_t); \
91 StgTRecChunk *__c = __t -> current_chunk; \
92 StgWord __limit = __c -> next_entry_idx; \
93 TRACE("trec=%p chunk=%p limit=%d\n", __t, __c, __limit); \
94 while (__c != END_STM_CHUNK_LIST) { \
96 for (__i = 0; __i < __limit; __i ++) { \
97 TRecEntry *_x = &(__c -> entries[__i]); \
98 do { CODE } while (0); \
100 __c = __c -> prev_chunk; \
101 __limit = TREC_CHUNK_NUM_ENTRIES; \
104 if (FALSE) goto exit_for_each; \
107 #define BREAK_FOR_EACH goto exit_for_each
109 /*......................................................................*/
111 // Private cache of must-be-unreachable trec headers and chunks
113 static StgTRecHeader *cached_trec_headers = NO_TREC;
114 static StgTRecChunk *cached_trec_chunks = END_STM_CHUNK_LIST;
115 static StgTVarWaitQueue *cached_tvar_wait_queues = END_STM_WAIT_QUEUE;
117 static void recycle_tvar_wait_queue(StgTVarWaitQueue *q) {
119 TRACE("Shake: not re-using wait queue %p\n", q);
123 q -> next_queue_entry = cached_tvar_wait_queues;
124 cached_tvar_wait_queues = q;
127 static void recycle_closures_from_trec (StgTRecHeader *t) {
129 TRACE("Shake: not re-using closures from %p\n", t);
133 t -> enclosing_trec = cached_trec_headers;
134 cached_trec_headers = t;
135 t -> enclosing_trec = NO_TREC;
137 while (t -> current_chunk != END_STM_CHUNK_LIST) {
138 StgTRecChunk *c = t -> current_chunk;
139 t -> current_chunk = c -> prev_chunk;
140 c -> prev_chunk = cached_trec_chunks;
141 cached_trec_chunks = c;
145 /*......................................................................*/
147 // Helper functions for managing internal STM state. This lock is only held
148 // for a 'short' time, in the sense that it is never held when any of the
149 // external functions returns.
151 static void lock_stm(void) {
155 static void unlock_stm(void) {
159 /*......................................................................*/
161 // Helper functions for thread blocking and unblocking
163 static void park_tso(StgTSO *tso) {
164 ASSERT(tso -> why_blocked == NotBlocked);
165 tso -> why_blocked = BlockedOnSTM;
166 tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
167 TRACE("park_tso on tso=%p\n", tso);
170 static void unpark_tso(StgTSO *tso) {
171 // We will continue unparking threads while they remain on one of the wait
172 // queues: it's up to the thread itself to remove it from the wait queues
173 // if it decides to do so when it is scheduled.
174 if (tso -> why_blocked == BlockedOnSTM) {
175 TRACE("unpark_tso on tso=%p\n", tso);
176 tso -> why_blocked = NotBlocked;
177 PUSH_ON_RUN_QUEUE(tso);
179 TRACE("spurious unpark_tso on tso=%p\n", tso);
183 static void unpark_waiters_on(StgTVar *s) {
185 TRACE("unpark_waiters_on tvar=%p\n", s);
186 for (q = s -> first_wait_queue_entry;
187 q != END_STM_WAIT_QUEUE;
188 q = q -> next_queue_entry) {
189 unpark_tso(q -> waiting_tso);
193 /*......................................................................*/
195 // Helper functions for allocation and initialization
197 static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgTSO *waiting_tso) {
198 StgTVarWaitQueue *result;
199 if (cached_tvar_wait_queues != END_STM_WAIT_QUEUE) {
200 result = cached_tvar_wait_queues;
201 cached_tvar_wait_queues = result -> next_queue_entry;
203 result = (StgTVarWaitQueue *)allocate(sizeofW(StgTVarWaitQueue));
204 SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
206 result -> waiting_tso = waiting_tso;
210 static StgTRecChunk *new_stg_trec_chunk(void) {
211 StgTRecChunk *result;
212 if (cached_trec_chunks != END_STM_CHUNK_LIST) {
213 result = cached_trec_chunks;
214 cached_trec_chunks = result -> prev_chunk;
216 result = (StgTRecChunk *)allocate(sizeofW(StgTRecChunk));
217 SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
219 result -> prev_chunk = END_STM_CHUNK_LIST;
220 result -> next_entry_idx = 0;
221 TRACE("prev from %p is %p\n", result, result -> prev_chunk);
225 static StgTRecHeader *new_stg_trec_header(StgTRecHeader *enclosing_trec) {
226 StgTRecHeader *result;
227 if (cached_trec_headers != NO_TREC) {
228 result = cached_trec_headers;
229 cached_trec_headers = result -> enclosing_trec;
231 result = (StgTRecHeader *) allocate(sizeofW(StgTRecHeader));
232 SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);
234 result -> enclosing_trec = enclosing_trec;
235 result -> current_chunk = new_stg_trec_chunk();
237 if (enclosing_trec == NO_TREC) {
238 result -> state = TREC_ACTIVE;
240 ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
241 enclosing_trec -> state == TREC_MUST_ABORT ||
242 enclosing_trec -> state == TREC_CANNOT_COMMIT);
243 result -> state = enclosing_trec -> state;
246 TRACE("new_stg_trec_header creating %p nidx=%d chunk=%p enclosing_trec=%p state=%d\n",
247 result, result->current_chunk->next_entry_idx, result -> current_chunk, enclosing_trec, result->state);
251 /*......................................................................*/
253 // Helper functions for managing waiting lists
255 static void start_tso_waiting_on_trec(StgTSO *tso, StgTRecHeader *trec) {
256 ASSERT(trec != NO_TREC);
257 ASSERT(trec -> enclosing_trec == NO_TREC);
258 ASSERT(trec -> state == TREC_ACTIVE || trec -> state == TREC_CANNOT_COMMIT);
259 FOR_EACH_ENTRY(trec, e, {
262 StgTVarWaitQueue *fq;
264 TRACE("Adding tso=%p to wait queue for tvar=%p\n", tso, s);
265 ASSERT(s -> current_value == e -> expected_value);
266 fq = s -> first_wait_queue_entry;
267 q = new_stg_tvar_wait_queue(tso);
268 q -> next_queue_entry = fq;
269 q -> prev_queue_entry = END_STM_WAIT_QUEUE;
270 if (fq != END_STM_WAIT_QUEUE) {
271 fq -> prev_queue_entry = q;
273 s -> first_wait_queue_entry = q;
274 e -> new_value = (StgClosure *) q;
278 static void stop_tsos_waiting_on_trec(StgTRecHeader *trec) {
279 ASSERT(trec != NO_TREC);
280 ASSERT(trec -> enclosing_trec == NO_TREC);
281 ASSERT(trec -> state == TREC_WAITING);
282 TRACE("stop_tsos_waiting in state=%d\n", trec -> state);
283 FOR_EACH_ENTRY(trec, e, {
285 StgTVarWaitQueue *pq;
286 StgTVarWaitQueue *nq;
289 q = (StgTVarWaitQueue *) (e -> new_value);
290 TRACE("Removing tso=%p from wait queue for tvar=%p\n", q -> waiting_tso, s);
291 nq = q -> next_queue_entry;
292 pq = q -> prev_queue_entry;
293 TRACE("pq=%p nq=%p q=%p\n", pq, nq, q);
294 if (nq != END_STM_WAIT_QUEUE) {
295 nq -> prev_queue_entry = pq;
297 if (pq != END_STM_WAIT_QUEUE) {
298 pq -> next_queue_entry = nq;
300 ASSERT (s -> first_wait_queue_entry == q);
301 s -> first_wait_queue_entry = nq;
303 recycle_tvar_wait_queue(q);
307 /*......................................................................*/
309 static TRecEntry *get_new_entry(StgTRecHeader *t) {
314 c = t -> current_chunk;
315 i = c -> next_entry_idx;
316 ASSERT(c != END_STM_CHUNK_LIST);
318 if (i < TREC_CHUNK_NUM_ENTRIES) {
319 // Continue to use current chunk
320 result = &(c -> entries[i]);
321 c -> next_entry_idx ++;
323 // Current chunk is full: allocate a fresh one
325 nc = new_stg_trec_chunk();
326 nc -> prev_chunk = c;
327 nc -> next_entry_idx = 1;
328 t -> current_chunk = nc;
329 result = &(nc -> entries[0]);
335 /*......................................................................*/
337 static void merge_update_into(StgTRecHeader *t,
339 StgClosure *expected_value,
340 StgClosure *new_value,
341 int merging_sibling) {
344 // Look for an entry in this trec
346 FOR_EACH_ENTRY(t, e, {
351 if (merging_sibling) {
352 if (e -> expected_value != expected_value) {
353 // Must abort if the two entries start from different values
354 TRACE("Siblings inconsistent at %p (%p vs %p)\n",
355 tvar, e -> expected_value, expected_value);
356 t -> state = TREC_MUST_ABORT;
357 } else if (e -> new_value != new_value) {
358 // Cannot commit if the two entries lead to different values (wait still OK)
359 TRACE("Siblings trying conflicting writes to %p (%p vs %p)\n",
360 tvar, e -> new_value, new_value);
361 t -> state = TREC_CANNOT_COMMIT;
364 // Otherwise merging child back into parent
365 ASSERT (e -> new_value == expected_value);
367 TRACE(" trec=%p exp=%p new=%p\n", t, e->expected_value, e->new_value);
368 e -> new_value = new_value;
374 // No entry so far in this trec
376 ne = get_new_entry(t);
378 ne -> expected_value = expected_value;
379 ne -> new_value = new_value;
383 /*......................................................................*/
385 static StgClosure *read_current_value_seen_from(StgTRecHeader *t,
388 StgClosure *result = NULL;
390 // Look for any relevent trec entries
392 while (t != NO_TREC) {
393 FOR_EACH_ENTRY(t, e, {
398 result = e -> new_value;
403 t = t -> enclosing_trec;
407 // Value not yet held in a trec
408 result = tvar -> current_value;
414 /*......................................................................*/
416 static int transaction_is_valid (StgTRecHeader *t) {
421 TRACE("Shake: pretending transaction trec=%p is invalid when it may not be\n", t);
425 et = t -> enclosing_trec;
426 ASSERT ((t -> state == TREC_ACTIVE) ||
427 (t -> state == TREC_WAITING) ||
428 (t -> state == TREC_MUST_ABORT) ||
429 (t -> state == TREC_CANNOT_COMMIT));
430 result = !((t -> state) == TREC_MUST_ABORT);
432 FOR_EACH_ENTRY(t, e, {
435 if (e -> expected_value != read_current_value_seen_from(et, s)) {
444 /************************************************************************/
447 * External functions below this point are repsonsible for:
449 * - acquiring/releasing the STM lock
451 * - all updates to the trec status field
452 * ASSERT(t != NO_TREC);
454 * By convention we increment entry_count when starting a new
455 * transaction and we decrement it at the point where we can discard
456 * the contents of the trec when exiting the outermost transaction.
457 * This means that stmWait and stmRewait decrement the count whenever
458 * they return FALSE (they do so exactly once for each transaction
459 * that doesn't remain blocked forever).
462 /************************************************************************/
464 void stmPreGCHook() {
465 TRACE("stmPreGCHook\n");
466 cached_trec_headers = NO_TREC;
467 cached_trec_chunks = END_STM_CHUNK_LIST;
468 cached_tvar_wait_queues = END_STM_WAIT_QUEUE;
471 /************************************************************************/
474 TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
478 /*......................................................................*/
480 StgTRecHeader *stmStartTransaction(StgTRecHeader *outer) {
482 TRACE("stmStartTransaction current-trec=%p\n", outer);
483 t = new_stg_trec_header(outer);
484 TRACE("stmStartTransaction new-trec=%p\n", t);
488 /*......................................................................*/
490 void stmAbortTransaction(StgTRecHeader *trec) {
491 TRACE("stmAbortTransaction trec=%p\n", trec);
492 ASSERT (trec != NO_TREC);
493 ASSERT ((trec -> state == TREC_ACTIVE) ||
494 (trec -> state == TREC_MUST_ABORT) ||
495 (trec -> state == TREC_WAITING) ||
496 (trec -> state == TREC_CANNOT_COMMIT));
497 if (trec -> state == TREC_WAITING) {
498 ASSERT (trec -> enclosing_trec == NO_TREC);
499 TRACE("stmAbortTransaction aborting waiting transaction\n");
500 stop_tsos_waiting_on_trec(trec);
502 trec -> state = TREC_ABORTED;
504 // Outcome now reflected by status field; no need for log
505 recycle_closures_from_trec(trec);
507 TRACE("stmAbortTransaction trec=%p done\n", trec);
510 /*......................................................................*/
512 StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
513 StgTRecHeader *outer;
514 TRACE("stmGetEnclosingTRec trec=%p\n", trec);
515 outer = trec -> enclosing_trec;
516 TRACE("stmGetEnclosingTRec outer=%p\n", outer);
520 /*......................................................................*/
522 StgBool stmValidateTransaction(StgTRecHeader *trec) {
524 TRACE("stmValidateTransaction trec=%p\n", trec);
525 ASSERT(trec != NO_TREC);
526 ASSERT((trec -> state == TREC_ACTIVE) ||
527 (trec -> state == TREC_MUST_ABORT) ||
528 (trec -> state == TREC_CANNOT_COMMIT) ||
529 (trec -> state == TREC_WAITING));
532 result = transaction_is_valid(trec);
534 if (!result && trec -> state != TREC_WAITING) {
535 trec -> state = TREC_MUST_ABORT;
540 TRACE("stmValidateTransaction trec=%p result=%d\n", trec, result);
544 /*......................................................................*/
546 StgBool stmCommitTransaction(StgTRecHeader *trec) {
549 TRACE("stmCommitTransaction trec=%p trec->enclosing_trec=%p\n", trec, trec->enclosing_trec);
550 ASSERT (trec != NO_TREC);
551 ASSERT ((trec -> state == TREC_ACTIVE) ||
552 (trec -> state == TREC_MUST_ABORT) ||
553 (trec -> state == TREC_CANNOT_COMMIT));
556 result = transaction_is_valid(trec);
558 et = trec -> enclosing_trec;
559 if (trec -> state == TREC_CANNOT_COMMIT && et == NO_TREC) {
560 TRACE("Cannot commit trec=%p at top level\n", trec);
561 trec -> state = TREC_MUST_ABORT;
565 TRACE("Non-nesting commit, NO_TREC=%p\n", NO_TREC);
567 TRACE("Nested commit into %p, NO_TREC=%p\n", et, NO_TREC);
570 FOR_EACH_ENTRY(trec, e, {
574 s -> current_value = e -> new_value;
575 unpark_waiters_on(s);
577 merge_update_into(et, s, e -> expected_value, e -> new_value, FALSE);
582 if (trec->state == TREC_CANNOT_COMMIT && et -> state == TREC_ACTIVE) {
583 TRACE("Propagating TREC_CANNOT_COMMIT into %p\n", et);
584 et -> state = TREC_CANNOT_COMMIT;
589 // Outcome now reflected by status field; no need for log
590 recycle_closures_from_trec(trec);
594 TRACE("stmCommitTransaction trec=%p result=%d\n", trec, result);
599 /*......................................................................*/
601 StgBool stmMergeForWaiting(StgTRecHeader *trec, StgTRecHeader *other) {
603 TRACE("stmMergeForWaiting trec=%p (%d) other=%p (%d)\n", trec, trec -> state, other, other->state);
604 ASSERT(trec != NO_TREC);
605 ASSERT(other != NO_TREC);
606 ASSERT((trec -> state == TREC_ACTIVE) ||
607 (trec -> state == TREC_MUST_ABORT) ||
608 (trec -> state == TREC_CANNOT_COMMIT));
609 ASSERT((other -> state == TREC_ACTIVE) ||
610 (other -> state == TREC_MUST_ABORT) ||
611 (other -> state == TREC_CANNOT_COMMIT));
614 result = (transaction_is_valid(trec));
615 TRACE("stmMergeForWaiting initial result=%d\n", result);
617 result = transaction_is_valid(other);
618 TRACE("stmMergeForWaiting after both result=%d\n", result);
620 // Individually the two transactions may be valid. Now copy entries from
621 // "other" into "trec". This may cause "trec" to become invalid if it
622 // contains an update that conflicts with one from "other"
623 FOR_EACH_ENTRY(other, e, {
624 StgTVar *s = e -> tvar;
625 TRACE("Merging trec=%p exp=%p new=%p\n", other, e->expected_value, e->new_value);
626 merge_update_into(trec, s, e-> expected_value, e -> new_value, TRUE);
628 result = (trec -> state != TREC_MUST_ABORT);
633 trec -> state = TREC_MUST_ABORT;
638 TRACE("stmMergeForWaiting result=%d\n", result);
642 /*......................................................................*/
644 StgBool stmWait(StgTSO *tso, StgTRecHeader *trec) {
646 TRACE("stmWait tso=%p trec=%p\n", tso, trec);
647 ASSERT (trec != NO_TREC);
648 ASSERT (trec -> enclosing_trec == NO_TREC);
649 ASSERT ((trec -> state == TREC_ACTIVE) ||
650 (trec -> state == TREC_MUST_ABORT) ||
651 (trec -> state == TREC_CANNOT_COMMIT));
654 result = transaction_is_valid(trec);
656 // The transaction is valid so far so we can actually start waiting.
657 // (Otherwise the transaction was not valid and the thread will have to
659 start_tso_waiting_on_trec(tso, trec);
661 trec -> state = TREC_WAITING;
663 // Outcome now reflected by status field; no need for log
664 recycle_closures_from_trec(trec);
668 TRACE("stmWait trec=%p result=%d\n", trec, result);
672 /*......................................................................*/
674 StgBool stmReWait(StgTRecHeader *trec) {
676 TRACE("stmReWait trec=%p\n", trec);
677 ASSERT (trec != NO_TREC);
678 ASSERT (trec -> enclosing_trec == NO_TREC);
679 ASSERT (trec -> state == TREC_WAITING);
682 result = transaction_is_valid(trec);
683 TRACE("stmReWait trec=%p result=%d\n", trec, result);
685 // The transaction remains valid -- do nothing because it is already on
687 ASSERT (trec -> state == TREC_WAITING);
689 // The transcation has become invalid. We can now remove it from the wait
691 stop_tsos_waiting_on_trec (trec);
693 // Outcome now reflected by status field; no need for log
694 recycle_closures_from_trec(trec);
698 TRACE("stmReWait trec=%p result=%d\n", trec, result);
702 /*......................................................................*/
704 StgClosure *stmReadTVar(StgTRecHeader *trec,
707 StgClosure *result = NULL; // Suppress unassignment warning
709 TRecEntry *ne = NULL;
711 TRACE("stmReadTVar trec=%p tvar=%p\n", trec, tvar);
712 ASSERT (trec != NO_TREC);
713 ASSERT (trec -> state == TREC_ACTIVE ||
714 trec -> state == TREC_MUST_ABORT ||
715 trec -> state == TREC_CANNOT_COMMIT);
720 // Look for an existing entry in our trec or in an enclosing trec
722 while (et != NO_TREC) {
723 FOR_EACH_ENTRY(et, e, {
724 TRACE("testing e=%p\n", e);
725 if (e -> tvar == tvar) {
727 result = e -> new_value;
732 et = et -> enclosing_trec;
735 if (found && et != trec) {
736 // Entry found in another trec
737 ASSERT (result != NULL);
738 TRACE("duplicating entry\n");
739 ne = get_new_entry(trec);
741 ne -> expected_value = result;
742 ne -> new_value = result;
745 ASSERT (result == NULL);
746 TRACE("need new entry\n");
747 ne = get_new_entry(trec);
748 TRACE("got ne=%p\n", ne);
749 result = tvar -> current_value;
751 ne -> expected_value = result;
752 ne -> new_value = result;
756 ASSERT (result != NULL);
757 TRACE("stmReadTVar trec=%p result=%p\n", trec, result);
762 /*......................................................................*/
764 void stmWriteTVar(StgTRecHeader *trec,
766 StgClosure *new_value) {
769 TRecEntry *entry = NULL;
771 TRACE("stmWriteTVar trec=%p tvar=%p new_value=%p\n", trec, tvar, new_value);
772 ASSERT (trec != NO_TREC);
773 ASSERT (trec -> state == TREC_ACTIVE ||
774 trec -> state == TREC_MUST_ABORT ||
775 trec -> state == TREC_CANNOT_COMMIT);
780 // Look for an existing entry in our trec or in an enclosing trec
782 while (et != NO_TREC) {
783 FOR_EACH_ENTRY(et, e, {
784 if (e -> tvar == tvar) {
791 et = et -> enclosing_trec;
794 if (found && et == trec) {
795 // Entry found in our trec
796 entry -> new_value = new_value;
798 // Entry found in another trec
799 ne = get_new_entry(trec);
801 ne -> expected_value = entry -> new_value;
802 ne -> new_value = new_value;
805 ne = get_new_entry(trec);
807 ne -> expected_value = tvar -> current_value;
808 ne -> new_value = new_value;
812 TRACE("stmWriteTVar trec=%p done\n", trec);
816 /*......................................................................*/