Потоки не просыпаются при вызове notify_all () - PullRequest
0 голосов
/ 20 мая 2019

Я пытаюсь написать простую потокобезопасную очередь блокировки для своего приложения.По какой-то причине в моем демонстрационном приложении не все потоки просыпаются, когда выполняется вызов close().Мой потребительский поток просто закрывается, но два других потока производителя никогда не получают сигнал.Возможно, состояние гонки?

Это мое демонстрационное приложение:

#include <iostream>
#include <string>
#include <sstream>
#include <mutex>
#include "blocking_queue.h"

rl::blocking_queue<std::string> queue(3);
std::string random_string(size_t);

int main()
{

    std::thread consumer([]() {
        std::string message;

        while (queue.pop(message)) {
            std::cout << "--> async_write(...) called with:" << message << std::endl;

            std::thread popper([]() {
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                std::cout << "--> async_write(...) completed, queue size is now " << queue.size() << std::endl;
                queue.unblock();
            });

            popper.detach();

            if (!queue.block()) {
                break;
            }

        }

        std::cout << "consumer thread closed" << std::endl;
    });

    std::thread producer([]() {
        while (!queue.closed()) {
            std::string id = random_string(6);
            std::this_thread::sleep_for(std::chrono::milliseconds(50));

            std::cout << id << ": Pushing JSON" << std::endl;
            queue.push("a message");
            std::cout << id << ": Pushed JSON" << std::endl;

        }
        std::cout << "producer closed" << std::endl;
        });

    std::thread producer2([]() {
        while (!queue.closed()) {
            std::string id = random_string(6);
            std::this_thread::sleep_for(std::chrono::milliseconds(50));

            std::cout << id << ": Pushing heartbeat" << std::endl;
            queue.push("a heartbeat");
            std::cout << id << ": Pushed heartbeat" << std::endl;
        }

        std::cout << "producer2 closed" << std::endl;
        });

    std::thread cancel([]() {
            std::this_thread::sleep_for(std::chrono::milliseconds(10000));
            std::cout << "%%%% CLOSING QUEUE %%%%" << std::endl;
            queue.close();
            std::cout << "%%%% QUEUE CLOSED %%%%" << std::endl;
        });

    cancel.join();

    std::this_thread::sleep_for(std::chrono::milliseconds(10000));

    std::cout << "Hello World!\n"; 

    return EXIT_SUCCESS;
}

std::string random_string(size_t length)
{
    auto randchar = []() -> char
    {
        const char charset[] =
            "0123456789"
            "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
        const size_t max_index = (sizeof(charset) - 1);
        return charset[rand() % max_index];
    };
    std::string str(length, 0);
    std::generate_n(str.begin(), length, randchar);
    return str;
}

Это моя очередь блокировки:

#include <queue>
#include <mutex>
#include <condition_variable>
#include <assert.h>

// Based on https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
namespace rl {
    template<typename T>
    class blocking_queue {
    private:
        std::queue<T> queue;
        const size_t queue_limit;
        bool is_closed = false;
        mutable std::mutex queue_mutex;
        std::condition_variable new_item_or_closed_event;
        std::condition_variable item_removed_event;
        std::condition_variable queue_blocked_event;

#ifndef NDEBUG
        size_t pushes_in_progress = 0;
#endif

    public:
        blocking_queue(size_t size_limit = 0) : queue_limit(size_limit) {}

        void push(const T& data) {
            std::unique_lock<std::mutex> lock(queue_mutex);
#ifndef NDEBUG
            ++pushes_in_progress;
#endif
            if (queue_limit > 0) {
                while (queue.size() >= queue_limit) {
                    item_removed_event.wait(lock);
                }
            }
            assert(!is_closed);
            queue.push(data);
#ifndef NDEBUG
            --pushes_in_progress;
#endif
            lock.unlock();

            new_item_or_closed_event.notify_one();
        }

        bool try_push(const T& data) {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (queue_limit > 0) {
                if (queue.size() >= queue_limit) {
                    return false;
                }
            }
            assert(!is_closed);
            queue.push(data);
            lock.unlock();

            new_item_or_closed_event.notify_one();
            return true;
        }

        void close() {
            std::unique_lock<std::mutex> lock(queue_mutex);
            assert(!is_closed);
#ifndef NDEBUG
            assert(pushes_in_progress == 0);
#endif
            is_closed = true;
            lock.unlock();

            //item_removed_event.notify_all();
            //queue_blocked_event.notify_all();
            new_item_or_closed_event.notify_all();
        }

        void open() {
            is_closed = false;
        }

        bool block() {
            std::unique_lock<std::mutex> lock(queue_mutex);

            queue_blocked_event.wait(lock);

            return !is_closed;
        }

        void unblock() {
            queue_blocked_event.notify_one();
        }

        bool pop(T & popped_value) {
            std::unique_lock<std::mutex> lock(queue_mutex);
            while (queue.empty()) {
                if (is_closed) {
                    return false;
                }
                new_item_or_closed_event.wait(lock);
            }

            popped_value = queue.front();
            queue.pop();
            item_removed_event.notify_one();
            return true;
        }

        bool try_pop(T & popped_value) {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (queue.empty()) {
                return false;
            }

            popped_value = queue.front();
            queue.pop();
            item_removed_event.notify_one();
            return true;
        }

        bool empty() const {
            std::unique_lock<std::mutex> lock(queue_mutex);
            return queue.empty();
        }

        bool closed() const {
            std::unique_lock<std::mutex> lock(queue_mutex);
            return is_closed;
        }

        size_t limit() const {
            return queue_limit;
        }

        size_t size() const {
            std::unique_lock<std::mutex> lock(queue_mutex);
            return queue.size();
        }

    };
}
...