Поддельные потери в реализации очереди без блокировки C ++ - PullRequest
0 голосов
/ 30 апреля 2018

Я пытаюсь реализовать очередь без блокировки, которая использует линейный круговой буфер для хранения данных. В отличие от очереди без блокировки общего назначения, у меня есть следующие расслабляющие условия:

  • Я знаю наихудшее количество элементов, которые когда-либо будут храниться в очереди. Очередь является частью системы, которая работает с фиксированным набором элементов. Код никогда не будет пытаться сохранить больше элементов в очереди, так как в этом фиксированном наборе есть элементы.
  • Нет мультипроизводителя / мультипотребителя. Очередь будет использоваться либо в мультипроизводителе / ​​одном потребителе , либо в настройке одного производителя / мультипотребителя.

Концептуально очередь реализована следующим образом

Моя реализация очереди выглядит следующим образом. Обратите внимание на код отладки, который останавливает выполнение всякий раз, когда pop() пытается прочитать память, ранее записанную push(). Этого никогда не должно произойти, поскольку - по крайней мере, концептуально - pop() может продолжаться, только если в очереди есть элементы (не должно быть переполнений).

#include <atomic>
#include <cstdint>
#include <csignal> // XXX for debugging

template <typename T>
class Queue {
private:
    uint32_t m_data_size;   // Number of elements allocated
    std::atomic<T> *m_data; // Queue data, size is power of two
    uint32_t m_mask;        // Bitwise AND mask for m_rd_ptr and m_wr_ptr
    std::atomic<uint32_t> m_rd_ptr; // Circular buffer read pointer
    std::atomic<uint32_t> m_wr_ptr; // Circular buffer write pointer
    std::atomic<uint32_t> m_size;   // Number of elements in the queue

    static uint32_t upper_power_of_two(uint32_t v) {
        v--; // https://graphics.stanford.edu/~seander/bithacks.html
        v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
        v++;
        return v;
    }

public:
    struct Optional { // Minimal replacement for std::optional
        bool good;
        T value;
        Optional() : good(false) {}
        Optional(T value) : good(true), value(std::move(value)) {}
        explicit operator bool() const { return good; }
    };

    Queue(uint32_t max_size)
        : // XXX Allocate 1 MiB of additional memory for debugging purposes
          m_data_size(upper_power_of_two(1024 * 1024 + max_size)),
          m_data(new std::atomic<T>[m_data_size]),
          m_mask(m_data_size - 1),
          m_rd_ptr(0),
          m_wr_ptr(0),
          m_size(0) {
        // XXX Debug code begin
        // Fill the memory with a marker so we can detect invalid reads
        for (uint32_t i = 0; i < m_data_size; i++) {
            m_data[i] = 0xDEADBEAF;
        }
        // XXX Debug code end
    }

    ~Queue() { delete[] m_data; }

    Optional pop() {
        // Atomically decrement the size variable
        uint32_t size = m_size.load();
        while (size != 0 && !m_size.compare_exchange_weak(size, size - 1)) {
        }

        // The queue is empty, abort
        if (size <= 0) {
            return Optional();
        }

        // Read the actual element, atomically increase the read pointer
        T res = m_data[(m_rd_ptr++) & m_mask].load();

        // XXX Debug code begin
        if (res == T(0xDEADBEAF)) {
            std::raise(SIGTRAP);
        }
        // XXX Debug code end
        return res;
    }

    void push(T t) {
        m_data[(m_wr_ptr++) & m_mask].store(t);
        m_size++;
    }

    bool empty() const { return m_size == 0; }
};

Тем не менее, переполнение происходит и может быть легко инициировано в многопоточном стресс-тесте. В этом конкретном тесте я поддерживаю две очереди q1 и q2. В основном потоке я передаю фиксированное количество элементов в q1. Два рабочих потока читают из q1 и нажимают на q2 в тесном цикле. Основной поток читает данные из q2 и возвращает их обратно в q1.

Это прекрасно работает, если существует только один рабочий поток (один производитель / один потребитель) или если все рабочие потоки находятся на одном процессоре с основным потоком. Однако он завершается ошибкой, как только два рабочих потока явно планируются на другой процессор, отличный от основного потока.

Следующий код реализует этот тест

#include <pthread.h>
#include <thread>
#include <vector>

static void queue_stress_test_main(std::atomic<uint32_t> &done_count,
                                   Queue<int> &queue_rd, Queue<int> &queue_wr) {
    for (size_t i = 0; i < (1UL << 24); i++) {
        auto res = queue_rd.pop();
        if (res) {
            queue_wr.push(res.value);
        }
    }
    done_count++;
}

static void set_thread_affinity(pthread_t thread, int cpu) {
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(cpu, &cpuset);
    if (pthread_setaffinity_np(thread, sizeof(cpu_set_t),
                               &cpuset) != 0) {
        throw "Error while calling pthread_setaffinity_np";
    }
}

int main() {
    static constexpr uint32_t n_threads{2U}; // Number of worker threads
    //static constexpr uint32_t n_threads{1U}; // < Works fine
    static constexpr uint32_t max_size{16U}; // Elements in the queue
    std::atomic<uint32_t> done_count{0};     // Number of finished threads
    Queue<int> queue1(max_size), queue2(max_size);

    // Launch n_threads threads, make sure the main thread and the two worker
    // threads are on different CPUs.
    std::vector<std::thread> threads;
    for (uint32_t i = 0; i < n_threads; i++) {
        threads.emplace_back(queue_stress_test_main, std::ref(done_count),
                             std::ref(queue1), std::ref(queue2));
        set_thread_affinity(threads.back().native_handle(), 0);
    }
    set_thread_affinity(pthread_self(), 1);
    //set_thread_affinity(pthread_self(), 0); // < Works fine

    // Pump data from queue2 into queue1
    uint32_t elems_written = 0;
    while (done_count < n_threads || !queue2.empty()) {
        // Initially fill queue1 with all values from 0..max_size-1
        if (elems_written < max_size) {
            queue1.push(elems_written++);
        }

        // Read elements from queue2 and put them into queue1
        auto res = queue2.pop();
        if (res) {
            queue1.push(res.value);
        }
    }

    // Wait for all threads to finish
    for (uint32_t i = 0; i < n_threads; i++) {
        threads[i].join();
    }
}

Большую часть времени эта программа вызывает ловушку в коде очереди, что означает, что pop() пытается прочитать память, которая никогда не была затронута push() - хотя pop() должен только успешно если push() вызывается хотя бы так же часто, как pop().

Вы можете скомпилировать и запустить вышеупомянутую программу с помощью GCC / clang в Linux, используя

c++ -std=c++11 queue.cpp -o queue -lpthread && ./queue

Либо просто объедините два вышеупомянутых блока кода, либо загрузите полную программу здесь .

Обратите внимание, что я новичок, когда дело доходит до структур данных без блокировки. Я прекрасно понимаю, что для C ++ существует множество проверенных реализаций очередей без блокировок. Однако я просто не могу понять, почему приведенный выше код не работает должным образом.

1 Ответ

0 голосов
/ 30 апреля 2018

У вас есть две ошибки, одна из которых может вызвать сбой, который вы наблюдаете.

Давайте посмотрим на ваш push-код, за исключением того, что мы допустим только одну операцию на оператор:

void push(T t)
{
    auto const claimed_index = m_wr_ptr++;               /* 1 */
    auto const claimed_offset = claimed_index & m_mask; /* 2 */
    auto& claimed_data = m_data[claimed_offset];         /* 3 */
    claimed_data.store(t);                               /* 4 */
    m_size++;                                            /* 5 */
}

Теперь для очереди с двумя производителями существует окно уязвимости к состоянию гонки между операциями 1 и 4:

До:

m_rd_ptr == 1
m_wr_ptr == 1
m_size == 0

Производитель A:

/* 1 */ claimed_index = 1; m_wr_ptr = 2;
/* 2 */ claimed_offset = 1;
  • Планировщик переводит сюда производителя А

Производитель B:

/* 1 */ claimed_index = 2; m_wr_ptr = 3;
/* 2 */ claimed_offset = 2;
/* 3 */ claimed_data = m_data[2];
/* 4 */ claimed_data.store(t);
/* 5 */ m_size = 1;

После того, как: * * тысяча двадцать-одна

m_size == 1
m_rd_ptr == 1
m_wr_ptr == 3
m_data[1] == 0xDEADBEAF
m_data[2] == value_produced_by_B

Потребитель теперь работает, видит m_size > 0 и читает с m_data[1], увеличивая m_rd_ptr с 1 до 2. Но m_data[1] еще не был записан производителем A, а производитель B написал m_data[2].

Вторая ошибка является дополнительным случаем в pop(), когда поток пользователя прерывается между действием m_rd_ptr++ и вызовом .load(). Это может привести к тому, что значения будут прочитаны не по порядку, возможно, настолько не по порядку, что очередь полностью обведена и перезапишет исходное значение.

Тот факт, что две операции в одном операторе источника являются атомарными, не делает весь оператор атомарным.

...