c ++ 11 очередь без блокировки с 2-мя потоками - PullRequest
0 голосов
/ 17 апреля 2020

Наряду с основным потоком у меня есть еще один поток, который получает данные для записи их в файл.

std::queue<std::vector<int>> dataQueue;
std::mutex mutex;

void setData(const std::vector<int>& data) {
    std::lock_guard<std::mutex> lock(mutex);
    dataQueue.push(data);
}

void write(const std::string& fileName) {
    std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));

    while (store) {
        std::lock_guard<std::mutex> lock(mutex);

        while (!dataQueue.empty()) {
            std::vector<int>& data= dataQueue.front();

            ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());

            dataQueue.pop();
            }
        }
    }
}

setData используется основным потоком, а write фактически является записывающим потоком. Я использую std::lock_quard, чтобы избежать конфликта памяти, но при блокировке записывающего потока он замедляет основной поток, так как должен ждать, пока очередь не будет разблокирована. Но я думаю, что могу избежать этого, поскольку потоки никогда не действуют на один и тот же элемент очереди в одно и то же время.

Так что я хотел бы сделать это без блокировки, но я не совсем понимаю, как мне это реализовать. Я имею в виду, как я могу сделать это, ничего не блокируя? более того, если записывающий поток работает быстрее, чем основной, большую часть времени очередь может быть пустой, поэтому он должен каким-то образом ждать новых данных, а не бесконечно циклически проверять наличие непустой очереди.

EDIT : я изменил простой std::lock_guard на std::cond_variable, чтобы он мог ждать, когда очередь пуста. Но основной поток все еще может быть заблокирован, так как, когда разрешен cvQeue.wait(.), он снова получает блокировку. более того, что, если основной поток выполняет cvQueue.notify_one(), но поток записи не ожидает?

std::queue<std::vector<int>> dataQueue;
std::mutex mutex;
std::condition_variable cvQueue;

void setData(const std::vector<int>& data) {
    std::unique_lock<std::mutex> lock(mutex);
    dataQueue.push(data);
    cvQueue.notify_one();
}

void write(const std::string& fileName) {
    std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));

    while (store) {
        std::lock_guard<std::mutex> lock(mutex);

        while (!dataQueue.empty()) {
            std::unique_lock<std::mutex> lock(mutex);
            cvQueue.wait(lock);

            ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());

            dataQueue.pop();
            }
        }
    }
}

Ответы [ 2 ]

2 голосов
/ 18 апреля 2020

Если у вас только два потока, вы можете использовать очередь без единого производителя (SPS C) без блокировки.
Ограниченную версию можно найти здесь: https://github.com/rigtor/SPSCQueue
Дмитрий Вьюков представил здесь неограниченную версию: http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue (Следует отметить, однако, что этот код должен быть адаптирован для использования атомики.)

Относительно блокирующего всплывающего окна Операция - это то, что структуры данных без блокировки не предоставляют, поскольку такая операция, очевидно, не без блокировки. Тем не менее, должно быть относительно просто адаптировать связанные реализации таким образом, чтобы операция pu sh уведомляла переменную условия, если очередь была пустой до операции pu sh.

.
0 голосов
/ 19 апреля 2020

Я думаю, у меня есть что-то, что соответствует моим потребностям. Я сделал LockFreeQueue, который использует std::atomic. Таким образом, я могу управлять состоянием головы / хвоста очереди атомарно.

template<typename T>
class LockFreeQueue {
public:
    void push(const T& newElement) {
        fifo.push(newElement);
        tail.fetch_add(1);
        cvQueue.notify_one();
    }

    void pop() {
        size_t oldTail = tail.load();
        size_t oldHead = head.load();

        if (oldTail == oldHead) {
            return;
        }

        fifo.pop();
        head.store(++oldHead);
    }

    bool isEmpty() {
        return head.load() == tail.load();
    }

    T& getFront() {
        return fifo.front();
    }

    void waitForNewElements() {
        if (tail.load() == head.load()) {
            std::mutex m;
            std::unique_lock<std::mutex> lock(m);
            cvQueue.wait_for(lock, std::chrono::milliseconds(TIMEOUT_VALUE));
        }
    }

private:
    std::queue<T> fifo;
    std::atomic<size_t> head = { 0 };
    std::atomic<size_t> tail = { 0 };
    std::condition_variable cvQueue;
};

LockFreeQueue<std::vector<int>> dataQueue;
std::atomic<bool> store(true);

void setData(const std::vector<int>& data) {
    dataQueue.push(data);
    // do other things
}

void write(const std::string& fileName) {
    std::unique_ptr<std::ostream> ofs = std::unique_ptr<std::ostream>(new zstr::ofstream(fileName));

    while (store.load()) {

        dataQueue.waitForNewElements();

        while (!dataQueue.isEmpty()) {
            std::vector<int>& data= dataQueue.getFront();

            ofs->write(reinterpret_cast<char*>(data.data()), sizeof(data[0])*data.size());

            dataQueue.pop();
            }
        }
    }
}

У меня все еще есть одна блокировка в waitForNewElements, но она не блокирует весь процесс, так как ждет, когда что-то будет сделано , Но большое улучшение заключается в том, что производитель может сделать sh, в то время как потребитель лопнет. Запрещено, когда LockFreQueue::tail и LockFreeQueue::head одинаковы. Это означает, что очередь пуста и она переходит в состояние ожидания.

То, чем я не очень доволен, это cvQueue.wait_for(lock, TIMEOUT_VALUE). Я хотел сделать простой cvQueue.wait(lock), но проблема в том, что когда дело доходит до конца потока, я делаю store.store(false) в основном потоке. Поэтому, если поток записи ожидает, он никогда не закончится без тайм-аута. Итак, я установил достаточно большой тайм-аут, чтобы большую часть времени condition_variable разрешался блокировкой, а когда поток завершался, он разрешался тайм-аутом.

Если вы чувствуете, что что-то должно быть не так или должны быть улучшены, не стесняйтесь комментировать.

...