Программа C ++ для одного производителя с несколькими потребителями иногда падает - PullRequest
0 голосов
/ 16 октября 2018

В приведенном ниже коде я создаю producer thread и n consumer threads, которые читают с каждого из выделенных queue и печатают в stdout.Время от времени этот код вылетает при утверждении consumerQueues[id]->empty().Проходя через отладчик, я вижу, что consumerQueues[id] это 0x0, когда он падает.Теперь в функции init() я создаю ith потребитель queue до того, как будет создан ith worker thread.Я не уверен, почему consumerQueues[id] останется 0x0.Пожалуйста, помогите мне понять, что происходит.

#include <thread>
#include <queue>
#include <memory>
#include <iostream>
#include <mutex>
#include <condition_variable>

class Test
{
private:
    void producer()
    {
        while(true)
        {
            std::string s = "abc";
            for(const auto& q : consumerQueues)
            {
                std::unique_lock<std::mutex> lock(mutex);
                q->push(s);
                condition_variable.notify_all();
            }
        }
    }

    void consumer(int id)
    {
        while (true)
        {
            std::string job;
            {
                std::unique_lock<std::mutex> lock(mutex);
                while(consumerQueues[id]->empty())
                {
                    condition_variable.wait(lock);
                }
                job = consumerQueues[id]->front();
                consumerQueues[id]->pop();
            }
            std::cout << "ID "<< id << " job " << job << std::endl;
        }
    }

    std::mutex mutex;
    std::condition_variable condition_variable;
    std::vector<std::thread> workers;
    std::vector<std::shared_ptr<std::queue<std::string>>> consumerQueues;
    std::thread producerThread;

public:

    Test(const unsigned n_threads):
    workers(std::vector<std::thread>(n_threads))
    {}

    Test(const Test &) = delete;
    Test(Test &&) = delete;

    Test & operator=(const Test &) = delete;
    Test & operator=(Test &&) = delete;

    void init()
    {
        for (unsigned i = 0; i < workers.size(); ++i)
        {
            consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
            workers[i] = std::thread(&Test::consumer, this, i);
        }
       producerThread  = std::thread(&Test::producer, this);
    }

    ~Test()
    {
        producerThread.join();
        for (unsigned i = 0; i < workers.size(); ++i)
        {
            if(workers[i].joinable())
            {
                workers[i].join();
            }
        }
    }
};


int main()
{
    Test t(1000);
    t.init();
    return 0;
}

1 Ответ

0 голосов
/ 16 октября 2018

Ваша функция инициализации изменяет std :: vector без мьютекса.Это изменяет вектор в то же время, когда потоки запускаются один за другим.

Чтобы это работало, ваша функция инициализации должна выглядеть примерно так:

 void init() {
     for (unsigned i = 0; i < workers.size(); ++i) {
            std::unique_lock<std::mutex> lock(mutex);
            consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
            workers[i] = std::thread(&Test::consumer, this, i);
     }
     producerThread  = std::thread(&Test::producer, this);
 }

От: http://www.cplusplus.com/reference/vector/vector/push_back/

Гонки данных

Контейнер изменен.Если происходит перераспределение, все содержащиеся элементы модифицируются.В противном случае доступ к существующему элементу невозможен, и одновременный доступ к ним или их изменение безопасны.

Перераспределение происходит довольно часто, когда оно начинается с 0 элементов и достигает 1000. Таким образом, вы также можете зарезервировать размервектор, чтобы убедиться, что перераспределение не происходит:

 void init() {
     consumerQueues.reserve(workers.size());
     for (unsigned i = 0; i < workers.size(); ++i) {
            consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
            workers[i] = std::thread(&Test::consumer, this, i);
     }
     producerThread  = std::thread(&Test::producer, this);
 }
...