Один производитель, два потребителя действуют в одной «очереди», производимой производителем - PullRequest
1 голос
/ 22 марта 2012

Предисловие: я новичок в многопоточном программировании и немного разбираюсь в C ++.Мои требования - использовать один мьютекс и два условия mNotEmpty и mEmpty.Я также должен создать и заполнить векторы, как указано ниже.

У меня есть один поток производителя, создающий вектор случайных чисел размера n*2, и два потребителя вставляют эти значения в два отдельных вектора размера n.

В продюсере я делаю следующее:

  1. Блокировка мьютекса: pthread_mutex_lock(&mMutex1)
  2. Подождите, пока потребитель скажет, что вектор пуст: pthread_cond_wait(&mEmpty,&mMutex1)
  3. Отодвинуть значение в вектор
  4. Сигнализировать потребителю, что вектор больше не пуст: pthread_cond_signal(&mNotEmpty)
  5. Разблокировать мьютекс: pthread_mutex_unlock(&mMutex1)
  6. Возврат к шагу 1

У потребителя:

  1. Блокировка мьютекса: pthread_mutex_lock(&mMutex1)
  2. Проверьте, нет ли векторапусто, и если да, подайте сигнал производителю: pthread_cond_signal(&mEmpty)
  3. Иначе вставьте значение в один из двух новых векторов (в зависимости от того, какой поток) и удалите из исходного вектора
  4. Разблокируйте мьютекс: pthread_mutex_unlock(&mMutex1)
  5. Вернуться к шагу 1

Что не так с моим процессом?Я продолжаю получать ошибки сегментации или бесконечные циклы.

Редактировать: Вот код:

void Producer()
{
    srand(time(NULL));
    for(unsigned int i = 0; i < mTotalNumberOfValues; i++){
        pthread_mutex_lock(&mMutex1);

        pthread_cond_wait(&mEmpty,&mMutex1);
        mGeneratedNumber.push_back((rand() % 100) + 1);

        pthread_cond_signal(&mNotEmpty);
        pthread_mutex_unlock(&mMutex1);
    }
}

void Consumer(const unsigned int index)
{
    for(unsigned int i = 0; i < mNumberOfValuesPerVector; i++){
        pthread_mutex_lock(&mMutex1);
        if(mGeneratedNumber.empty()){
            pthread_cond_signal(&mEmpty);
        }else{
            mThreadVector.at(index).push_back[mGeneratedNumber.at(0)];
            mGeneratedNumber.pop_back();
        }

        pthread_mutex_unlock(&mMutex1);
    }
}

Ответы [ 4 ]

4 голосов
/ 22 марта 2012

Я не уверен, что понимаю обоснование того, как вы делаете вещи. В обычной идиоме «потребитель-провайдер» провайдер выдвигает как можно больше элементов в канал, ожидая, только если есть недостаточно места в канале; это не ждет пустого. Итак обычная идиома будет:

провайдер (чтобы нажать одну вещь):

pthread_mutex_lock( &mutex );
while ( ! spaceAvailable() ) {
    pthread_cond_wait( &spaceAvailableCondition, &mutex );
}
pushTheItem();
pthread_cond_signal( &itemAvailableCondition );
pthread_mutex_unlock( &mutex );

и на стороне потребителя, чтобы получить предмет:

pthread_mutex_lock( &mutex );
while ( ! itemAvailable() ) {
    pthread_cond_wait( &itemAvailableCondition, &mutex );
}
getTheItem();
pthread_cond_signal( &spaceAvailableCondition );
pthread_mutex_unlock( &mutex );

Обратите внимание, что для каждого условия одна сторона сигнализирует, а другая ждет. (Я не вижу ожидания у вашего потребителя.) И если есть более одного процесс с любой стороны, я бы рекомендовал использовать pthread_cond_broadcast, а не pthread_cond_signal.

В вашем коде есть ряд других проблем. Некоторые из них выглядят более как опечатки: вы должны скопировать / вставить реальный код, чтобы избежать этого. Вы действительно означает читать и выдавать mGeneratedValues, когда вы нажимаете в mGeneratedNumber и проверить, пусто ли это? (Если вы на самом деле есть две разные очереди, то вы выскочите из очереди, где нет один нажал.) И у вас нет никаких петель, ожидающих условия; вы продолжаете перебирать количество элементов, которые вы ожидать (увеличивая счетчик каждый раз, так что вы, вероятно, прорасти задолго до того, как ты это сделаешь) - я не вижу бесконечной петли, но я легко могу видеть бесконечное ожидание в pthread_cond_wait в режиссер. Я не вижу сброса ядра с рук, но что происходит, когда один из процессов заканчивается (вероятно, потребитель, потому что он никогда ждет чего угодно); если это в конечном итоге уничтожает мьютекс или условие переменных, вы можете получить дамп ядра, когда другой процесс пытается используйте их.

1 голос
/ 22 марта 2012

В продюсере вызывать pthread_cond_wait только тогда, когда очередь не пуста. В противном случае вы будете заблокированы навсегда из-за состояния гонки.

0 голосов
/ 17 июля 2014
Here is a solution to a similar problem like you. In this program producer produces a no and writes it to a array(buffer) and a maintains a file then update a status(status array) about it, while on getting data in the array(buffer) consumers start to consume(read and write to their file) and update a status that it has consumed. when producer looks that both the consumer has consumed the data it overrides the data with a new value and goes on. for convenience here i have restricted the code to run for 2000 nos.

// Producer-consumer //

#include <iostream>
#include <fstream>
#include <pthread.h>

#define MAX 100

using namespace std;

int dataCount = 2000;

int buffer_g[100];
int status_g[100];

void *producerFun(void *);
void *consumerFun1(void *);
void *consumerFun2(void *);

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t dataNotProduced = PTHREAD_COND_INITIALIZER;
pthread_cond_t dataNotConsumed = PTHREAD_COND_INITIALIZER;

int main()
{

  for(int i = 0; i < MAX; i++)
    status_g[i] = 0;

  pthread_t producerThread, consumerThread1, consumerThread2;

  int retProducer = pthread_create(&producerThread, NULL, producerFun, NULL);
  int retConsumer1 = pthread_create(&consumerThread1, NULL, consumerFun1, NULL);
  int retConsumer2 = pthread_create(&consumerThread2, NULL, consumerFun2, NULL);

  pthread_join(producerThread, NULL);
  pthread_join(consumerThread1, NULL);
  pthread_join(consumerThread2, NULL);

  return 0;

}

void *producerFun(void *)
{
    //file to write produced data by producer
    const char *producerFileName = "producer.txt";
    ofstream producerFile(producerFileName);
    int index = 0, producerCount = 0;

    while(1)
    {
      pthread_mutex_lock(&mutex);

      if(index == MAX)
      {
        index = 0;
      }
      if(status_g[index] == 0)
      {

        static int data = 0;
        data++;

        cout << "Produced:  " << data << endl;

        buffer_g[index] = data;

        producerFile << data << endl;

        status_g[index] = 5;

        index ++;
        producerCount ++;

        pthread_cond_broadcast(&dataNotProduced);
      }
      else
      {
        cout << ">> Producer is in wait.." << endl;
        pthread_cond_wait(&dataNotConsumed, &mutex);
      }
      pthread_mutex_unlock(&mutex);

      if(producerCount == dataCount)
      {
        producerFile.close();
        return NULL;
      }
    }
}

void *consumerFun1(void *)
{
  const char *consumerFileName = "consumer1.txt";
  ofstream consumerFile(consumerFileName);
  int index = 0, consumerCount = 0;

  while(1)
  {
    pthread_mutex_lock(&mutex);
    if(index == MAX)
    {
      index = 0;
    }

    if(status_g[index] != 0 && status_g[index] != 2)
    {
      int data = buffer_g[index];

      cout << "Cosumer1 consumed: " << data << endl;
      consumerFile << data << endl;

      status_g[index] -= 3;

      index ++;
      consumerCount ++;

      pthread_cond_signal(&dataNotConsumed);
    }
    else
    {
      cout << "Consumer1 is in wait.." << endl;
      pthread_cond_wait(&dataNotProduced, &mutex);
    }
      pthread_mutex_unlock(&mutex);
    if(consumerCount == dataCount)
    {
      consumerFile.close();
      return NULL;
    }
  }
}

void *consumerFun2(void *)
{
  const char *consumerFileName = "consumer2.txt";
  ofstream consumerFile(consumerFileName);
  int index = 0, consumerCount = 0;

  while(1)
  {
    pthread_mutex_lock(&mutex);
    if(index == MAX)
    {
      index = 0;
    }

    if(status_g[index] != 0 && status_g[index] != 3)
    {


      int data = buffer_g[index];
      cout << "Consumer2 consumed: " << data << endl;
      consumerFile << data << endl;

      status_g[index] -= 2;

      index ++;
      consumerCount ++;

      pthread_cond_signal(&dataNotConsumed);
    }
    else
    {
      cout << ">> Consumer2 is in wait.." << endl;
      pthread_cond_wait(&dataNotProduced, &mutex);
    }
    pthread_mutex_unlock(&mutex);

    if(consumerCount == dataCount)
    {
      consumerFile.close();
      return NULL;
    }
  }
} 

Here is only one problem that producer in not independent to produce, that is it needs to take lock on the whole array(buffer) before it produces new data, and if the mutex is locked by consumer it waits for that and vice versa, i am trying to look for it.
0 голосов
/ 22 марта 2012

Вы можете рассмотреть возможность использования мьютекса только после выполнения условия, например,

producer()
{
    while true
    {
        waitForEmpty();
        takeMutex();
        produce();
        releaseMutex();
    }
}

consumer()
{
    while true
    {
        waitForNotEmpty();
        takeMutex();
        consume();
        releaseMutex();
    }
}
...