Попадание в тупик при доступе к одной и той же очереди из 2 разных потоков. - PullRequest
0 голосов
/ 14 мая 2019

Я использую следующую реализацию очереди блокировки, но я захожу в тупик при попытке доступа к push() из двух разных потоков.Я имею опыт работы с Java, поэтому мой опыт работы с потоками C ++ ограничен.

У меня нет буста, и я бы хотел использовать STL, поэтому я изменил ссылки на буст с помощью их аналога STL.

template<typename D>
class blocking_queue {
private:
    std::queue<D> queue;
    mutable std::mutex queue_mutex;
    const size_t queue_limit;
    bool is_closed = false;
    std::condition_variable new_item_or_closed_event;
    std::condition_variable item_removed_event;

#ifndef NDEBUG
    size_t pushes_in_progress = 0;
#endif

public:
    blocking_queue(size_t size_limit = 0) : queue_limit(size_limit) {}

    void push(const D& data) {
        std::lock_guard lock(queue_mutex);
#ifndef NDEBUG
        ++pushes_in_progress;
#endif
        if (queue_limit > 0) {
            while (queue.size() >= queue_limit) {
                item_removed_event.wait(lock);
            }
        }
        assert(!is_closed);
        queue.push(data);
#ifndef NDEBUG
        --pushes_in_progress;
#endif
        lock.unlock();

        new_item_or_closed_event.notify_one();
    }

    bool try_push(const D& data) {
        std::unique_lock lock(queue_mutex);
        if (queue_limit > 0) {
            if (queue.size() >= queue_limit) {
                return false;
            }
        }
        assert(!is_closed);
        queue.push(data);
        lock.unlock();

        new_item_or_closed_event.notify_one();
        return true;
    }

    void close() {
        std::unique_lock lock(queue_mutex);
        assert(!is_closed);
#ifndef NDEBUG
        assert(pushes_in_progress == 0);
#endif
        is_closed = true;
        lock.unlock();

        new_item_or_closed_event.notify_all();
    }

    D front() {
        if (queue.empty()) {
            return {};
        }

        return queue.front();
    }

    bool pop(D & popped_value) {
        std::unique_lock lock(queue_mutex);
        while (queue.empty()) {
            if (is_closed) {
                return false;
            }
            new_item_or_closed_event.wait(lock);
        }

        popped_value = queue.front();
        queue.pop();
        item_removed_event.notify_one();
        return true;
    }

    bool try_pop(D & popped_value) {
        std::unique_lock lock(queue_mutex);
        if (queue.empty()) {
            return false;
        }

        popped_value = queue.front();
        queue.pop();
        item_removed_event.notify_one();
        return true;
    }

    bool empty() const {
        std::unique_lock lock(queue_mutex);
        return queue.empty();
    }

    bool closed() const {
        std::unique_lock lock(queue_mutex);
        return is_closed;
    }

    size_t limit() const {
        return queue_limit;
    }

    size_t size() const
    {
        std::unique_lock lock(queue_mutex);
        return queue.size();
    }

};

Повторное использование одного и того же мьютекса для двух разных потоков - моя проблема?Я немного запутался в том, как процессор знает, какой из потоков должен быть сигнализирован первым.

...