C ++ Templated Producer-Consumer BlockingQueue, неограниченный буфер: как мне закончить элегантно? - PullRequest
3 голосов
/ 13 июля 2011

Я написал BlockingQueue для связи двух потоков.Можно сказать, что он следует шаблону «производитель-потребитель» с неограниченным буфером.Поэтому я реализовал его с помощью критического сечения и семафора, например:

#pragma once

#include "Semaphore.h"
#include "Guard.h"
#include <queue>

namespace DRA{
namespace CommonCpp{
template<class Element>
class BlockingQueue{
    CCriticalSection    m_csQueue;
    CSemaphore          m_semElementCount;
    std::queue<Element> m_Queue;
//Forbid copy and assignment
    BlockingQueue( const BlockingQueue& );
    BlockingQueue& operator=( const BlockingQueue& );
public:
    BlockingQueue( unsigned int maxSize );
    ~BlockingQueue();
    Element Pop();
    void Push( Element newElement );
};
}
}

//Template definitions
template<class Element>
DRA::CommonCpp::BlockingQueue<Element>::BlockingQueue( unsigned int maxSize ):
    m_csQueue( "BlockingQueue::m_csQueue" ),
    m_semElementCount( 0, maxSize ){
}

template<class Element>
DRA::CommonCpp::BlockingQueue<Element>::~BlockingQueue(){
    //TODO What can I do here?
}

template<class Element>
void DRA::CommonCpp::BlockingQueue<Element>::Push( Element newElement ){
    {//RAII block
        CGuard g( m_csQueue );
        m_Queue.push( newElement );
    }
    m_semElementCount.Signal();
}

template<class Element>
Element DRA::CommonCpp::BlockingQueue<Element>::Pop(){
    m_semElementCount.Wait();
    Element popped;
    {//RAII block
        CGuard g( m_csQueue );
        popped = m_Queue.front();
        m_Queue.pop();
    }
    return popped;
}

CGuard - это обертка RAII для CCriticalSection, она входит в нее при создании и оставляет ее при уничтожении.CSemaphore - это оболочка для семафора Windows.

Пока все хорошо, потоки отлично взаимодействуют.Однако, когда поток производителя прекращает производство и завершает работу, а поток потребителя потребляет все, поток потребителя остается навсегда зависшим от вызова Pop ().

Как я могу сказать, чтобы потребитель элегантно завершил работу?Я думал об отправке специального пустого элемента, но он кажется слишком небрежным.

Ответы [ 4 ]

1 голос
/ 14 июля 2011

Вам лучше использовать события вместо семафора.При добавлении возьмите блокировку на CS и проверьте количество элементов (сохраните в локальной переменной bIsEmpty).Добавьте в очередь, затем проверьте, пусто ли количество элементов, SetEvent!

В методе pop сначала заблокируйте, затем проверьте, не пусто ли оно, а затем WaitForSingleObject - как только WFSO вернется, вы получитев очереди есть хотя бы один элемент.

Отметьте это статья

1 голос
/ 13 июля 2011

Имеется ли в вашей реализации Semaphore функция ожидания по времени?В Windows это будет WaitForSingleObject() с указанием времени ожидания.Если это так, Pop () может быть реализован так:

// Pseudo code
bool Pop(Element& r, timeout)
{
   if(sem.wait(timeout))
   {
      r = m_Queue.front();
      m_Queue.pop();
   }

   return false;
}

Таким образом, Pop() все еще блокируется, хотя его можно легко прервать.Даже при очень коротких таймаутах это не потребляет значительных ресурсов ЦП (больше, чем абсолютно необходимо, да - и потенциально может привести к дополнительному переключению контекста - так что учтите эти предостережения).

1 голос
/ 13 июля 2011

Вам нужен способ сказать потребителю остановиться.Это может быть специальный элемент в очереди, скажем, простая структура-оболочка вокруг Element, или флаг - переменная-член класса очереди (в этом случае вы хотите убедиться, что с флагом обращаются атомарно - окна поиска «заблокированные» функции ).Затем вам нужно проверять это состояние у потребителя каждый раз, когда он просыпается.Наконец, в деструкторе задайте это условие остановки и подайте сигнал семафору.

Остается одна проблема - что вернуть от потребителя pop().Я бы взял логическое возвращаемое значение и аргумент типа Element&, чтобы скопировать результат в случае успеха.

Редактировать:

Примерно так:

bool Queue::Pop( Element& result ) {
    sema.Wait();
    if ( stop_condition ) return false;
    critical_section.Enter();
    result = m_queue.front();
    m_queue.pop;
    critical_section.Leave();
    return true;
}
0 голосов
/ 13 июля 2011

Измените pop, чтобы вернуть опциональное повышение (или сделайте это, как стандартная библиотека с top / pop для разделения задач), а затем подайте сигнал m_semElementCount в последний раз при уничтожении.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...