C ++ многопоточная проблема производитель-потребитель - PullRequest
0 голосов
/ 05 октября 2019

Я написал код нескольких производителей и потребителей с условными переменными. Даже когда у меня только один производитель и один потребитель, это не работает. Как производители, так и потребители должны участвовать в процессе (верно). Когда я запускаю код, он застревает на 50% запусков. Я полагаю, это зашло в тупик из-за ожидания. Мне не удается отладить, где он застрял и как разблокировать conds. По запросу я должен создать код с ожиданием, сигналом и трансляцией.

Если очередь заполнена, производитель ждет. если очередь пуста, потребитель ожидает.

void WaitableQueue::enqueue(size_t a_item)
{
    (m_cond.getMutex()).lock();

    while(m_itemsCounter==m_capacity && !m_isBeingDestroyed)
    {
        ++m_numberOfWaiting;
        m_cond.wait();
        --m_numberOfWaiting;
    }

    std::cout<<"enqueue "<<a_item<<"\n";

    m_queue.push(a_item);
    ++m_itemsCounter;
    ++m_numbOfProduced;
    if(m_isBeingDestroyed)
    {
        m_cond.broadcast(); 
    }

    (m_cond.getMutex()).unlock();
    m_cond.broadcast();
}

void WaitableQueue::dequeue()
{
    (m_cond.getMutex()).lock();

    while(m_itemsCounter==0 && !m_isBeingDestroyed)
    {
        ++m_numberOfWaiting;
        std::cout<<"Waiting\n";
        m_cond.wait();
        std::cout<<"Done waiting\n";
        --m_numberOfWaiting;
    }

    if (m_isBeingDestroyed)
    {
        (m_cond.getMutex()).unlock();
        m_cond.broadcast();
        return;
    }
    std::cout<<"dequeue "<<m_queue.front()<<"\n";
    m_queue.pop();
    --m_itemsCounter;
    ++m_numbOfConsumed;
    (m_cond.getMutex()).unlock();
    m_cond.broadcast();
}

void WaitableQueue::destroy()
{
    (m_cond.getMutex()).lock();
    m_isBeingDestroyed=true;
    (m_cond.getMutex()).unlock();
}



void Producer::run()
{
    for(size_t i=0;i<m_numOfItemsToProduce;++i)
    {
        usleep(m_delay);
        size_t item=produce();
        m_wq.enqueue(item);
    }
}


Producer::produce() const
{
    return rand()%m_numOfItemsToProduce;
}

void Consumer::run()
{
    m_numOfProducersMutex.lock();
    while(m_numOfProducers>0)
    {
        m_numOfProducersMutex.unlock();
        usleep(m_delay);
        m_wq.dequeue();
        m_numOfProducersMutex.lock();
    }
    m_numOfProducersMutex.unlock();
}


int main()
{
    size_t numProducers=1;
    size_t numConsumers=3;
    Mutex mutex;
    ConditionalVariable cond(mutex);

    WaitableQueue<size_t> wq(NUM_OF_ITEMS,cond);
    std::vector<Producer<size_t>*> producerArray;
    std::vector<Consumer<size_t>*> consumerArray;
    Mutex numOfProducersMutex;

    for(size_t i=0;i<numProducers;++i)
    {
        Producer<size_t>* tempP=new Producer<size_t>(wq,NUM_OF_ITEMS,DELAY);
        producerArray.push_back(tempP);
    }

    for(size_t i=0;i<numConsumers;++i)
    {
        Consumer<size_t>* tempC=new Consumer<size_t>(wq,numProducers,numOfProducersMutex,DELAY);
        consumerArray.push_back(tempC);
    }

    for(size_t i=0;i<numProducers;++i)
    {
        producerArray[i]->start();
    }

    for(size_t i=0;i<numConsumers;++i)
    {
        consumerArray[i]->start();
    }

    for(size_t i=0;i<numProducers;++i)
    {
        producerArray[i]->join();
        numOfProducersMutex.lock();
        --numProducers;
        numOfProducersMutex.unlock();
    }
    usleep(100);

    //tell the consumers stop waiting
    wq.destroy();
   for(size_t i=0;i<numConsumers;++i)
    {
        consumerArray[i]->join();
    }

   for(size_t i=0;i<numProducers;++i)
   {
        delete producerArray[i];
   }

    for(size_t i=0;i<numConsumers;++i)
   {
        delete consumerArray[i];
   }
}

Это работает около 50% времени выполнения. В остальных 50% оно застревает.

Ответы [ 2 ]

0 голосов
/ 07 октября 2019

Чтобы решить проблему производителя-потребителя, используя условную переменную, сначала вам нужно понять проблему ограниченного буфера.

Проверьте здесь реализацию потокаобезопасной очереди буфера с использованием условной переменной в C ++: https://codeistry.wordpress.com/2018/03/08/buffer-queue-handling-in-multithreaded-environment/

Вы можете использовать эту очередь буферов в качестве строительного блока для решения проблемы многократного использования продукта. Пожалуйста, проверьте здесь, как потокобезопасная очередь буферов используется для решения проблемы производителя-потребителя в C ++: https://codeistry.wordpress.com/2018/03/09/unordered-producer-consumer/

0 голосов
/ 06 октября 2019

Вы обнаружили еще один пример того, как C ++ делает сложную проблему из концептуально простой проблемы.

Похоже, что вы хотите, чтобы один или несколько производителей производили одинаковое количество значений и имели набор потребителейчитать и обрабатывать эти значения. Также кажется, что вы хотите, чтобы количество производителей было равно количеству потребителей, но при этом позволяете настраивать это число (производителей и потребителей).

Эта проблема очень проста с использованием Ada, который был разработан с параллелизмом вmind.

Первый файл - это спецификация пакета Ada, определяющая типы задач нашего производителя и потребителя.

generic
   Items_To_Handle : Positive;
package Integer_Prod_Con is

   task type Producer;

   task type Consumer;

end Integer_Prod_Con;

Общий параметр очень похож на параметр шаблона. В этом случае значение, переданное в качестве универсального параметра, должно быть положительным целым числом. Реализация пакета следующая.

with Ada.Containers.Synchronized_Queue_Interfaces;
with Ada.Containers.Unbounded_Synchronized_Queues;
with Ada.Text_Io; use Ada.Text_IO;

package body Integer_Prod_Con is
   package Int_Interface is new Ada.Containers.Synchronized_Queue_Interfaces(Integer);
   package Int_Queue is new Ada.Containers.Unbounded_Synchronized_Queues(Queue_Interfaces =>Int_Interface);
   use Int_Queue;

   The_Queue : Queue;

   --------------
   -- Producer --
   --------------

   task body Producer is
   begin
      for Num in 1..Items_To_Handle loop
         The_Queue.Enqueue(Num);
         delay 0.010;
      end loop;
   end Producer;

   --------------
   -- Consumer --
   --------------

   task body Consumer is
      Value : Integer;
   begin
      for Num in 1..Items_To_Handle loop
         The_Queue.Dequeue(Value);
         Put_Line(Value'Image);
         delay 0.010;
      end loop;
   end Consumer;

end Integer_Prod_Con;

Пакет использует предопределенный универсальный пакет, реализующий неограниченную очередь в качестве буфера. Это позволяет очереди увеличиваться и уменьшаться по мере необходимости программой. Каждая задача производителя ставит в очередь целочисленные значения от 1 до Items_To_Handle, и каждый потребитель снимает с очереди и выводит одинаковое количество элементов из очереди.

Основная процедура для этой программы:

with Integer_Prod_Con;

procedure Int_Queue_Main is
   PC_Count : constant := 3;

   package short_list is new Integer_Prod_Con(10);
   use short_List;

   Producers : Array(1..PC_Count) of Producer;
   Consumers : Array(1..PC_Count) of Consumer;
begin
   null;
end Int_Queue_Main;

вывод этой программы:

 1
 1
 1
 2
 2
 2
 3
 3
 3
 4
 4
 4
 5
 5
 5
 6
 6
 6
 7
 7
 7
 8
 8
 8
 9
 9
 9
 10
 10
 10
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...