Я пытаюсь реализовать очередь без блокировки, которая использует линейный круговой буфер для хранения данных. В отличие от очереди без блокировки общего назначения, у меня есть следующие расслабляющие условия:
- Я знаю наихудшее количество элементов, которые когда-либо будут храниться в очереди. Очередь является частью системы, которая работает с фиксированным набором элементов. Код никогда не будет пытаться сохранить больше элементов в очереди, так как в этом фиксированном наборе есть элементы.
- Нет мультипроизводителя / мультипотребителя. Очередь будет использоваться либо в мультипроизводителе / одном потребителе , либо в настройке одного производителя / мультипотребителя.
Концептуально очередь реализована следующим образом
Моя реализация очереди выглядит следующим образом. Обратите внимание на код отладки, который останавливает выполнение всякий раз, когда pop()
пытается прочитать память, ранее записанную push()
. Этого никогда не должно произойти, поскольку - по крайней мере, концептуально - pop()
может продолжаться, только если в очереди есть элементы (не должно быть переполнений).
#include <atomic>
#include <cstdint>
#include <csignal> // XXX for debugging
template <typename T>
class Queue {
private:
uint32_t m_data_size; // Number of elements allocated
std::atomic<T> *m_data; // Queue data, size is power of two
uint32_t m_mask; // Bitwise AND mask for m_rd_ptr and m_wr_ptr
std::atomic<uint32_t> m_rd_ptr; // Circular buffer read pointer
std::atomic<uint32_t> m_wr_ptr; // Circular buffer write pointer
std::atomic<uint32_t> m_size; // Number of elements in the queue
static uint32_t upper_power_of_two(uint32_t v) {
v--; // https://graphics.stanford.edu/~seander/bithacks.html
v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
v++;
return v;
}
public:
struct Optional { // Minimal replacement for std::optional
bool good;
T value;
Optional() : good(false) {}
Optional(T value) : good(true), value(std::move(value)) {}
explicit operator bool() const { return good; }
};
Queue(uint32_t max_size)
: // XXX Allocate 1 MiB of additional memory for debugging purposes
m_data_size(upper_power_of_two(1024 * 1024 + max_size)),
m_data(new std::atomic<T>[m_data_size]),
m_mask(m_data_size - 1),
m_rd_ptr(0),
m_wr_ptr(0),
m_size(0) {
// XXX Debug code begin
// Fill the memory with a marker so we can detect invalid reads
for (uint32_t i = 0; i < m_data_size; i++) {
m_data[i] = 0xDEADBEAF;
}
// XXX Debug code end
}
~Queue() { delete[] m_data; }
Optional pop() {
// Atomically decrement the size variable
uint32_t size = m_size.load();
while (size != 0 && !m_size.compare_exchange_weak(size, size - 1)) {
}
// The queue is empty, abort
if (size <= 0) {
return Optional();
}
// Read the actual element, atomically increase the read pointer
T res = m_data[(m_rd_ptr++) & m_mask].load();
// XXX Debug code begin
if (res == T(0xDEADBEAF)) {
std::raise(SIGTRAP);
}
// XXX Debug code end
return res;
}
void push(T t) {
m_data[(m_wr_ptr++) & m_mask].store(t);
m_size++;
}
bool empty() const { return m_size == 0; }
};
Тем не менее, переполнение происходит и может быть легко инициировано в многопоточном стресс-тесте. В этом конкретном тесте я поддерживаю две очереди q1
и q2
. В основном потоке я передаю фиксированное количество элементов в q1
. Два рабочих потока читают из q1
и нажимают на q2
в тесном цикле. Основной поток читает данные из q2
и возвращает их обратно в q1
.
Это прекрасно работает, если существует только один рабочий поток (один производитель / один потребитель) или если все рабочие потоки находятся на одном процессоре с основным потоком. Однако он завершается ошибкой, как только два рабочих потока явно планируются на другой процессор, отличный от основного потока.
Следующий код реализует этот тест
#include <pthread.h>
#include <thread>
#include <vector>
static void queue_stress_test_main(std::atomic<uint32_t> &done_count,
Queue<int> &queue_rd, Queue<int> &queue_wr) {
for (size_t i = 0; i < (1UL << 24); i++) {
auto res = queue_rd.pop();
if (res) {
queue_wr.push(res.value);
}
}
done_count++;
}
static void set_thread_affinity(pthread_t thread, int cpu) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
if (pthread_setaffinity_np(thread, sizeof(cpu_set_t),
&cpuset) != 0) {
throw "Error while calling pthread_setaffinity_np";
}
}
int main() {
static constexpr uint32_t n_threads{2U}; // Number of worker threads
//static constexpr uint32_t n_threads{1U}; // < Works fine
static constexpr uint32_t max_size{16U}; // Elements in the queue
std::atomic<uint32_t> done_count{0}; // Number of finished threads
Queue<int> queue1(max_size), queue2(max_size);
// Launch n_threads threads, make sure the main thread and the two worker
// threads are on different CPUs.
std::vector<std::thread> threads;
for (uint32_t i = 0; i < n_threads; i++) {
threads.emplace_back(queue_stress_test_main, std::ref(done_count),
std::ref(queue1), std::ref(queue2));
set_thread_affinity(threads.back().native_handle(), 0);
}
set_thread_affinity(pthread_self(), 1);
//set_thread_affinity(pthread_self(), 0); // < Works fine
// Pump data from queue2 into queue1
uint32_t elems_written = 0;
while (done_count < n_threads || !queue2.empty()) {
// Initially fill queue1 with all values from 0..max_size-1
if (elems_written < max_size) {
queue1.push(elems_written++);
}
// Read elements from queue2 and put them into queue1
auto res = queue2.pop();
if (res) {
queue1.push(res.value);
}
}
// Wait for all threads to finish
for (uint32_t i = 0; i < n_threads; i++) {
threads[i].join();
}
}
Большую часть времени эта программа вызывает ловушку в коде очереди, что означает, что pop()
пытается прочитать память, которая никогда не была затронута push()
- хотя pop()
должен только успешно если push()
вызывается хотя бы так же часто, как pop()
.
Вы можете скомпилировать и запустить вышеупомянутую программу с помощью GCC / clang в Linux, используя
c++ -std=c++11 queue.cpp -o queue -lpthread && ./queue
Либо просто объедините два вышеупомянутых блока кода, либо загрузите полную программу здесь .
Обратите внимание, что я новичок, когда дело доходит до структур данных без блокировки. Я прекрасно понимаю, что для C ++ существует множество проверенных реализаций очередей без блокировок. Однако я просто не могу понять, почему приведенный выше код не работает должным образом.