Параллельная очередь с использованием Qt является взаимоблокировкой - PullRequest
0 голосов
/ 18 августа 2011

Я пытаюсь создать параллельную очередь с параллельными конструкциями потоков Qt.

#ifndef CONCURRENTQUEUE_H
#define CONCURRENTQUEUE_H
#include <QMutex>
#include <QWaitCondition>
#include <queue>

template<typename Data>
class ConcurrentQueue
{
private:
    std::queue<Data> the_queue;
    QMutex the_mutex;
    QWaitCondition the_condition_variable;
    bool closed;

public:

    void setClosed(bool state)
    {
        QMutexLocker locker(&the_mutex);
        closed = state;    
    }

    bool getClosed()
    {
        QMutexLocker locker(&the_mutex);
        return closed;    
    }

    void push(Data const& data)
    {
        QMutexLocker locker(&the_mutex);
        the_queue.push(data);
        the_condition_variable.wakeOne();    
    }

    bool empty()
    {
        QMutexLocker locker(&the_mutex);
        return the_queue.empty();    
    }

    bool try_pop(Data& popped_value)
    {
        QMutexLocker locker(&the_mutex);
        if(the_queue.empty())
        {
            return false;
        }
        popped_value = the_queue.front();
        the_queue.pop();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        QMutexLocker locker(&the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(&the_mutex);
        }
        popped_value = the_queue.front();
        the_queue.pop();
        the_condition_variable.wakeOne();
    }

    //created to allow for a limited queue size
    void wait_and_push(Data const& data, const int max_size)
    {
        QMutexLocker locker(&the_mutex);
        while(the_queue.size() >= max_size)
        {
            the_condition_variable.wait(&the_mutex);
        }
        the_queue.push(data);
        the_condition_variable.wakeOne();
    }


};

#endif // CONCURRENTQUEUE_H

У меня есть поток-производитель, использующий метод wait_and_push для передачи данных в очередь, и я пытаюсь получить свою очередьпотребитель читает из очереди, используя try_pop

 while(!tiles->empty() || !tiles->getClosed())
{
             if(!tiles->try_pop(tile))
                    continue;
//do stuff with the tile
}

Однако иногда это происходит.Производитель устанавливает закрытое логическое значение в качестве флага для потоков потребителя, чтобы завершить загрузку очереди.У моего потребителя есть только способ узнать, загружается очередь, все еще выполняется или еще не запущена.

Причина, по которой производитель не может использовать обычный push-запрос "wait_and_push", заключается в том, что я хотел иметь возможность блокировать этот поток до тех пор, пока некоторые элементы не будут обработаны, чтобы избежать излишнего потребления памяти и ненужных операцийдисковый ввод / вывод.

Может кто-нибудь указать мне, что происходит не так?

1 Ответ

3 голосов
/ 18 августа 2011

Вы забыли добавить

the_condition_variable.wakeOne();

в try_pop.

Если к вашей очереди будут подключаться несколько производителей / потребителей, у вас должен быть отдельный QWaitCondition для производителей и потребителей, иначе wakeOne может не пробудить правильный поток.

EDIT:

Если будет несколько производителей / потребителей, то у вас должно быть notFullCondvar и notEmptyCondvar.

  • Метод try_pop пробуждает notFullCondvar.
  • Метод wait_and_pop ожидает notEmptyCondvar, но вызывает notFullCondvar.
  • Метод push пробуждает notEmptyCondvar.
  • Метод wait_and_push ожидает notFullCondvar, но вызывает notEmptyCondvar.

Надеюсь, это имеет смысл.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...