C ++ Threadsafe Queuing - PullRequest
       5

C ++ Threadsafe Queuing

0 голосов
/ 10 октября 2019

Я знаю, что здесь обсуждались сотни дискуссий о многопоточных очередях и о том, как это сделать. Я поймал кусок кода, и он компилируется, но теперь, как бы я ни пытался его использовать, он просто падает. Надеюсь, кто-нибудь может дать мне представление, почему? Я новичок здесь, так что я еще не эксперт по C ++:)

Теоретически мне нужно, чтобы только 1 поток проталкивался в очередь, но на одной машине будет несколько потоков, выталкивающих записи из очереди. Это не распределенная программа HPC.

//
// Threadsafe Queueing...
// Stolen: https://juanchopanzacpp.wordpress.com/2013/02/26/concurrent-queue-c11
//
template <typename T>
class ThreadSafeQueue
{
public:
    ThreadSafeQueue() { }
    ~ThreadSafeQueue() { }

    bool
    empty()
    {
        return(_queue.empty());
    }

    T
    pop()
    {
        std::unique_lock<std::mutex> mlock(_mutex);
        while (_queue.empty())
        {
            _cond.wait(mlock);
        }

        auto item = _queue.front();
        _queue.pop();

        return(item);
    }

    void
    pop(T& item)
    {
        std::unique_lock<std::mutex> mlock(_mutex);

        while (_queue.empty())
        {
          _cond.wait(mlock);
        }

        item = _queue.front();
        _queue.pop();
    }

    void
    push(const T& item)
    {
        std::unique_lock<std::mutex> mlock(_mutex);

        _queue.push(item);
        mlock.unlock();
        _cond.notify_one();
    }

    void
    push(T&& item)
    {
        std::unique_lock<std::mutex> mlock(_mutex);
        _queue.push(std::move(item));
        mlock.unlock();
        _cond.notify_one();
    }

private:
    std::queue<T> _queue;
    std::mutex _mutex;
    std::condition_variable _cond;
};

// CRASH just instantiating this?
// ThreadSafeQueue<std::string> dir_queue;

// CRASH as soon as I call dir_queue->push(std::string);
// ThreadSafeQueue<std::string> *dir_queue = new ThreadSafeQueue<std::string>();

Спасибо!

1 Ответ

0 голосов
/ 10 октября 2019

Условная переменная предназначена для синхронизации между двумя или более потоками, так что один поток ожидает другого. В вашем случае у вас есть один ресурс - очередь, которую вы хотите защитить от одновременного доступа нескольких потоков. В этом случае вам лучше использовать std :: lock_guard.

std :: lock_guard - это легкий вес блокировки по сравнению с std :: unique_lock. Он будет уничтожен, когда выйдет за рамки видимости (RAII). std :: unique_lock дает больший детальный контроль, включая возможность блокировки и разблокировки.

Я переработал ваш код для использования std :: lock_guard:

template <typename T>
class ThreadSafeQueue
{
public:
    ThreadSafeQueue() { }
    ~ThreadSafeQueue() { }

    bool
    empty()
    {
        std::lock_guard<std::mutex> lock(_mutex);
        return _queue.empty();
    }

    T pop()
    {
        //Caution: assuming queue is not empty
        std::lock_guard<std::mutex> lock(_mutex);
        auto item = _queue.front();
        _queue.pop();
        return item;
    }

    void pop(T& item)
    {
        std::lock_guard<std::mutex> lock(_mutex);
        if (!_queue.empty())
        {
            item = _queue.front();
            _queue.pop();
        }
    }

    void push(const T& item)
    {
        std::lock_guard<std::mutex> lock(_mutex);
        _queue.push(item);
    }

    void push(T&& item)
    {
        std::lock_guard<std::mutex> lock(_mutex);
        _queue.push(std::move(item));
    }

private:
    std::queue<T> _queue;
    std::mutex _mutex;
};

Обратите внимание на мой комментарий внутри pop(). Если очередь пуста, попытка вызвать front () или pop () вызовет исключение. Вам, вероятно, следует выполнить дополнительную проверку, чтобы убедиться, что очередь не пуста.

Если вы действительно хотите, чтобы один поток ожидал другого, прежде чем выдавать сообщение, вы можете использовать условную переменную. Однако как условная переменная, так и мьютекс должны быть объявлены вне ThreadSafeQueue и должны быть доступны различным экземплярам потока. Тогда у вас будет такое условие внутри pop ():

T pop()
{
    std::unique_lock<std::mutex> mlock(_mutex);
    _cond.wait(mlock, [] {return !_queue.empty(); });
    auto item = _queue.front();
    _queue.pop();
    return item;
}

и в вашем push ():

void push(const T& item)
{
    {
        std::lock_guard<std::mutex> mlock(_mutex);
        _queue.push(item);
    }
    _cond.notify_one();
}

Выше приведен только пример. В вашем случае это вам не нужно, потому что у вас есть только одна очередь, и вы хотите, чтобы потокобезопасный доступ к ней.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...