Тайм-аут wait_until, приводящий к прекращению только одного потока, не может перехватить все потоки - PullRequest
0 голосов
/ 10 июля 2019

Я сделал простую поточно-безопасную реализацию Buffer, создав 10 потоков для работы с очередью buffer для случайного нажатия и выталкивания некоторых чисел.Моя реализация должна позволить потокам, которые ожидают всплывающих окон, ждать только 3 секунды, а затем завершать.Когда это происходит, я печатаю сообщение о тайм-ауте.

Проблема заключается в том, что печатается только одно сообщение о тайм-ауте, затем основное объединит все потоки и вернется.Почему?

Вот код, main.cpp

#include <thread>
#include <vector>
#include <iostream>
#include <sstream>
#include "Buffer.h"

int main() {

    std::vector<std::thread> workers;
    Buffer<std::string> buffer(3);

    srandom(time(NULL));

    for (int i = 0; i < 10; i++) {
        workers.emplace_back([&buffer]{
            long num = random();
            if(num%2==0) {
                std::stringstream msg;
                msg << std::this_thread::get_id() << " pushing " << num << std::endl;
                std::cout << msg.str();
                buffer.push(std::to_string(num));
            } else {
                std::stringstream msg1;
                msg1 << std::this_thread::get_id() << " waiting to pop" << std::endl;
                std::cout << msg1.str();
                std::string popped_string = buffer.pop();
                std::stringstream msg2;
                msg2 << std::this_thread::get_id() << " popped " << popped_string << std::endl;
                std::cout << msg2.str();
            }
        });
    }

    for (auto &w: workers) {
        if (w.joinable()) w.join();
    }

    return 0;
}

Buffer.h

#ifndef PDS_CPP_BUFFER_H
#define PDS_CPP_BUFFER_H

#include <queue>
#include <mutex>
#include <condition_variable>

template <class T>
class Buffer {
private:
    std::queue<T> queue;
    std::mutex mutex;
    std::condition_variable cv;
    std::chrono::seconds sec;
public:
    Buffer(int time) : sec(time), queue() {};

    void push(T object) {
        std::lock_guard lockGuard(mutex);
        this->queue.push(object);
        this->cv.notify_one();
    }

    T pop() {
        std::unique_lock uniqueLock(mutex);
//        this->cv.wait(uniqueLock, [this]{ return !this->queue.empty(); });
        if(this->cv.wait_for(uniqueLock, this->sec, [this]{ return !this->queue.empty(); })) {
        } else {
            std::stringstream msg;
            msg << std::this_thread::get_id() << " timeout" << std::endl;
            std::cout << msg.str();
        }

        T object = this->queue.front();
        this->queue.pop();
        uniqueLock.unlock();
        return object;
    }
};


#endif //PDS_CPP_BUFFER_H
...