* are synchronised without a lock, based on a cas of the top
* position. One reader wins, the others return NULL for a failure.
*
- * Both popBottom and steal also return NULL when the queue is empty.
+ * Both popWSDeque and stealWSDeque also return NULL when the queue is empty.
+ *
+ * Testing: see testsuite/tests/ghc-regress/rts/testwsdeque.c. If
+ * there's anything wrong with the deque implementation, this test
+ * will probably catch it.
*
* ---------------------------------------------------------------------------*/
#include "WSDeque.h"
#include "SMP.h" // for cas
-#if defined(THREADED_RTS)
-
#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
/* -----------------------------------------------------------------------------
/* also a bit tricky, has to avoid concurrent steal() calls by
accessing top with cas, when there is only one element left */
StgWord t, b;
- void ** pos;
long currSize;
void * removed;
ASSERT_WSDEQUE_INVARIANTS(q);
b = q->bottom;
- /* "decrement b as a test, see what happens" */
- q->bottom = --b;
- pos = (q->elements) + (b & (q->moduloSize));
+
+ // "decrement b as a test, see what happens"
+ b--;
+ q->bottom = b;
+
+ // very important that the following read of q->top does not occur
+ // before the earlier write to q->bottom.
+ store_load_barrier();
+
t = q->top; /* using topBound would give an *upper* bound, we
need a lower bound. We use the real top here, but
can update the topBound value */
q->topBound = t;
- currSize = b - t;
+ currSize = (long)b - (long)t;
if (currSize < 0) { /* was empty before decrementing b, set b
consistently and abort */
q->bottom = t;
return NULL;
}
- removed = *pos;
+
+ // read the element at b
+ removed = q->elements[b & q->moduloSize];
+
if (currSize > 0) { /* no danger, still elements in buffer after b-- */
+ // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed);
return removed;
}
/* otherwise, has someone meanwhile stolen the same (last) element?
q->topBound = t+1; /* ...and cached top value as well */
ASSERT_WSDEQUE_INVARIANTS(q);
+ ASSERT(q->bottom >= q->top);
+ // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed);
+
return removed;
}
void *
stealWSDeque_ (WSDeque *q)
{
- void ** pos;
- void ** arraybase;
- StgWord sz;
void * stolen;
StgWord b,t;
// Can't do this on someone else's spark pool:
// ASSERT_WSDEQUE_INVARIANTS(q);
- b = q->bottom;
+ // NB. these loads must be ordered, otherwise there is a race
+ // between steal and pop.
t = q->top;
+ load_load_barrier();
+ b = q->bottom;
// NB. b and t are unsigned; we need a signed value for the test
- // below.
+ // below, because it is possible that t > b during a
+ // concurrent popWSQueue() operation.
if ((long)b - (long)t <= 0 ) {
return NULL; /* already looks empty, abort */
}
/* now access array, see pushBottom() */
- arraybase = q->elements;
- sz = q->moduloSize;
- pos = arraybase + (t & sz);
- stolen = *pos;
+ stolen = q->elements[t & q->moduloSize];
/* now decide whether we have won */
if ( !(CASTOP(&(q->top),t,t+1)) ) {
return NULL;
} /* else: OK, top has been incremented by the cas call */
+ // debugBelch("stealWSDeque_: t=%d b=%d\n", t, b);
+
// Can't do this on someone else's spark pool:
// ASSERT_WSDEQUE_INVARIANTS(q);
return stolen;
}
+/* -----------------------------------------------------------------------------
+ * pushWSQueue
+ * -------------------------------------------------------------------------- */
#define DISCARD_NEW
pushWSDeque (WSDeque* q, void * elem)
{
StgWord t;
- void ** pos;
+ StgWord b;
StgWord sz = q->moduloSize;
- StgWord b = q->bottom;
ASSERT_WSDEQUE_INVARIANTS(q);
q->topBound (accessed only by writer) instead.
This is why we do not just call empty(q) here.
*/
+ b = q->bottom;
t = q->topBound;
if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) {
/* NB. 1. sz == q->size - 1, thus ">="
#endif
}
}
- pos = (q->elements) + (b & sz);
- *pos = elem;
- (q->bottom)++;
+
+ q->elements[b & sz] = elem;
+ q->bottom = b + 1;
ASSERT_WSDEQUE_INVARIANTS(q);
return rtsTrue;
}
-
-#endif