Многопоточное уничтожение очереди в C ++ - PullRequest
0 голосов
/ 05 февраля 2011

Итак, у меня есть общая параллельная очередь. Кажется, работает хорошо, за исключением разрушения.

Способ реализации очереди заключается в том, что она содержит условную переменную и пару мьютексов. Запущено несколько рабочих потоков, ожидающих эту переменную условия. Когда новые объекты доступны для работы, они помещаются в очередь и сигнализируют переменную условия.

Проблема в том, что при выходе из основного потока, уничтожающем очередь, переменная условия уничтожается, но это не удается, поскольку переменная условия используется. Это создает исключение, и все гадко взрывается.

Я хотел бы дать сигнал рабочим, разбудить их и заставить их выйти, дождаться их завершения и продолжить в главном потоке. Моя проблема решается, когда эти потоки завершены - мне понадобится дополнительный примитив синхронизации?

В любом случае, вот код для очереди:

// Based on code from http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
// Original version by Anthony Williams
// Modifications by Michael Anderson

#include "boost/thread.hpp"
#include <deque>

template<typename Data>
class concurrent_queue
{
private:
    std::deque<Data> the_queue;
    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;
    bool is_canceled;

public:
    concurrent_queue() : the_queue(), the_mutex(), the_condition_variable(), is_canceled(false) {}
    struct Canceled{};
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if (is_canceled) throw Canceled();
        the_queue.push_back(data);
        lock.unlock();
        the_condition_variable.notify_one();
    }

    bool empty() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if (is_canceled) throw Canceled();
        return the_queue.empty();
    }

    bool try_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if (is_canceled) throw Canceled();
        if(the_queue.empty())
        {
            return false;
        }

        popped_value=the_queue.front();
        the_queue.pop_front();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);

        while(the_queue.empty() && !is_canceled)
        {
            the_condition_variable.wait(lock);
        }
        if (is_canceled) throw Canceled();

        popped_value=the_queue.front();
        the_queue.pop_front();
    }

    std::deque<Data> wait_and_take_all()
    {
        boost::mutex::scoped_lock lock(the_mutex);

        while(the_queue.empty() && !is_canceled)
        {
            the_condition_variable.wait(lock);
        }
        if (is_canceled) throw Canceled();

        std::deque<Data> retval;
        std::swap(retval, the_queue);
        return retval;
    }

    void cancel()
    {
       boost::mutex::scoped_lock lock(the_mutex);
       if (is_canceled) throw Canceled();
       is_canceled = true;
       lock.unlock();
       the_condition_variable.notify_all();
    }

};

Ответы [ 2 ]

2 голосов
/ 05 февраля 2011

Вы можете вызвать join() в каждом потоке, чтобы дождаться его завершения. Как то так:

void DoWork() {};

int main()
{
    boost::thread t(&DoWork);
    // signal for the thread to exit
    t.join();    // wait until it actually does exit

    // destroy the queue
}

Или вы можете использовать boost::thread_group для нескольких потоков.

int main()
{
    boost::thread_group tg;

    for(int i = 0 ; i < 10 ; ++i)
        tg.create_thread(&DoWork);

    // signal to stop work

    tg.join_all();

    // destroy the queue
}
1 голос
/ 05 февраля 2011

У вас есть два варианта.Либо, когда очередь выходит из области видимости, она на самом деле не уничтожается, в то время как другие потоки ссылаются на нее (т. Е. Используют shared_ptr, передают ее другим потокам; вызывают cancel () в конце main (); после того, как другие потокиthrow Отменено и, по-видимому, завершено, очередь будет уничтожена.)

Или, если вы хотите убедиться, что она действительно уничтожена к концу main (), вам придется подождать другие потоки.Вы можете делать то, что предлагает JaredC, если вы в порядке с обработкой ожидания вне деструктора.Чтобы сделать это внутри деструктора, кажется чище не хранить все потоки, а просто хранить счетчик и другой примитив синхронизации.В любом случае, вам нужно, чтобы очередь поддерживала некоторое состояние, чтобы позволить ей дождаться завершения всех потоков.

Мне кажется, что первое решение (с shared_ptr) чище.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...