В приведенном ниже классе описан стек без блокировки uint32_t
последовательных значений (полный код здесь ). Например, LockFreeIndexStack stack(5);
объявляет стек, содержащий числа {0, 1, 2, 3, 4}
. Этот класс имеет бассейн семантического. Емкость стека фиксирована. Только значения, изначально введенные в стек, могут быть извлечены и повторно вставлены. Таким образом, в любой конкретный момент времени любое из этих значений может быть либо внутри стека, либо снаружи, но не одновременно. Поток может выдвинуть только индекс, который он ранее получил через всплывающее окно. Таким образом, правильное использование для потока:
auto index = stack.pop(); // get an index from the stack, if available
if(index.isValid()) {
// do something with 'index'
stack.push(index); // return index to the stack
}
Оба метода push
и pop
реализованы с атомарным циклом load
и CAS
.
Какую правильную семантику порядка памяти я должен использовать в атомарных операциях в pop
и push
(я написал свое закомментированное предположение)?
struct LockFreeIndexStack
{
typedef uint64_t bundle_t;
typedef uint32_t index_t;
private:
static const index_t s_null = ~index_t(0);
typedef std::atomic<bundle_t> atomic_bundle_t;
union Bundle {
Bundle(index_t index, index_t count)
{
m_value.m_index = index;
m_value.m_count = count;
}
Bundle(bundle_t bundle)
{
m_bundle = bundle;
}
struct {
index_t m_index;
index_t m_count;
} m_value;
bundle_t m_bundle;
};
public:
LockFreeIndexStack(index_t n)
: m_top(Bundle(0, 0).m_bundle)
, m_next(n, s_null)
{
for (index_t i = 1; i < n; ++i)
m_next[i - 1] = i;
}
index_t pop()
{
Bundle curtop(m_top.load()); // memory_order_acquire?
while(true) {
index_t candidate = curtop.m_value.m_index;
if (candidate != s_null) { // stack is not empty?
index_t next = m_next[candidate];
Bundle newtop(next, curtop.m_value.m_count);
// In the very remote eventuality that, between reading 'm_top' and
// the CAS operation other threads cause all the below circumstances occur simultaneously:
// - other threads execute exactly a multiple of 2^32 pop or push operations,
// so that 'm_count' assumes again the original value;
// - the value read as 'candidate' 2^32 transactions ago is again top of the stack;
// - the value 'm_next[candidate]' is no longer what it was 2^32 transactions ago
// then the stack will get corrupted
if (m_top.compare_exchange_weak(curtop.m_bundle, newtop.m_bundle)) {
return candidate;
}
}
else {
// stack was empty, no point in spinning
return s_null;
}
}
}
void push(index_t index)
{
Bundle curtop(m_top.load()); // memory_order_relaxed?
while (true) {
index_t current = curtop.m_value.m_index;
m_next[index] = current;
Bundle newtop = Bundle(index, curtop.m_value.m_count + 1);
if (m_top.compare_exchange_weak(curtop.m_bundle, newtop.m_bundle)) {
return;
}
};
}
private:
atomic_bundle_t m_top;
std::vector<index_t> m_next;
};