Нужна помощь в реализации проблемы Producer-Consumer с pthread и семафором - PullRequest
0 голосов
/ 17 апреля 2019

Я пытаюсь реализовать проблему производителя и потребителя в C ++, используя pthread и семафор. У меня есть один производитель и два потребителя. Мой продюсер читает строку из файла и сохраняет ее в очереди символ за символом. Потребители читают из строки и сохраняют символ в виде символа также один за другим. Проблема в том, что только один мой потребитель читает из очереди, другой нет, а его массив остается пустым. Как мне исправить эту проблему. Вот моя программа:

#include<iostream>
#include<pthread.h>
#include<fstream>
#include<unistd.h>
#include<semaphore.h>
#include<queue>

// define queue size
#define QUEUE_SIZE 5

// declare and initialize semaphore and read/write counter
static sem_t mutex,mutex1;
//static int counter = 0;

// Queue for saving characters
static std::queue<char> charQueue;

// indicator for end of file
static bool endOfFile = false;

// save arrays
static char consumerArray1[100];
static char consumerArray2[100];


void *Producer(void *ptr)
{
    int i=0;
    std::ifstream input("string.txt");
    char temp;
    while(input>>temp)
    {
        sem_wait(&mutex);
        charQueue.push(temp);
        sem_post(&mutex1);
        sem_post(&mutex);
        //counter++;
        std::cout<<"Procuder Index: "<<i<<std::endl;
        i++;

        sleep(6);
    }
    endOfFile = true;
    pthread_exit(NULL);
}

void *Consumer1(void *ptr)
{
    std::cout<<"Entered consumer 1:"<<std::endl;
    int i = 0;
    sem_wait(&mutex1);
    //while(charQueue.empty());
    sem_post(&mutex1);
    while(!endOfFile)// || !charQueue.empty())
    {
        sem_wait(&mutex1);


        sem_wait(&mutex);

        std::cout<<"Consumer1 index:"<<i<<" char: "<<charQueue.front()<<std::endl;
        consumerArray1[i] = charQueue.front();
        charQueue.pop();
        //std::cout<<charQueue.size()<<std::endl;

        sem_post(&mutex1);
        i++;
        //counter--;

        sem_post(&mutex);
        sleep(2);
    }
    consumerArray1[i] = '\0';
    pthread_exit(NULL);
}

void *Consumer2(void *ptr)
{
    std::cout<<"Entered consumer 2:"<<std::endl;
    int i = 0;
    sem_wait(&mutex1);
    //while(charQueue.empty());
    sem_post(&mutex1);

    while(!endOfFile)//  || charQueue.empty())
    {
        sem_wait(&mutex1);


        sem_wait(&mutex);

        std::cout<<"Consumer2 index: "<<i<<" char: "<<charQueue.front()<<std::endl;
        consumerArray2[i] = charQueue.front();
        charQueue.pop();
        sem_post(&mutex1);
        i++;
        //counter--;

        sem_post(&mutex);
        sleep(4);
    }
    consumerArray2[i] = '\0';
    pthread_exit(NULL);
}

int main()
{
    pthread_t thread[3];
    sem_init(&mutex,0,1);
    sem_init(&mutex1,0,1);
    pthread_create(&thread[0],NULL,Producer,NULL);
    int rc = pthread_create(&thread[1],NULL,Consumer1,NULL);
    if(rc)
    {
        std::cout<<"Thread not created"<<std::endl;
    }
    pthread_create(&thread[2],NULL,Consumer2,NULL);
    pthread_join(thread[0],NULL);pthread_join(thread[1],NULL);pthread_join(thread[2],NULL);
    std::cout<<"First array: "<<consumerArray1<<std::endl;
    std::cout<<"Second array: "<<consumerArray2<<std::endl;
    sem_destroy(&mutex);
    sem_destroy(&mutex1);
    pthread_exit(NULL);
}

Редактировать: Я добавил семафор вокруг доступа charQueue.empty() и charQueue.push(), но без изменений в выводе. что еще мне делать?

Ответы [ 2 ]

1 голос
/ 17 апреля 2019

У вас та же проблема, что и раньше. Ваша Consumer1 функция может вызывать charQueue.empty, в то время как ваша Producer функция может вызывать charQueue.push(temp);. Вы не можете получить доступ к объекту в одном потоке, в то время как другой поток изменяет или может изменить его. Вам нужно либо защитить charQueue мьютексом, семафором, либо другим примитивом синхронизации другого типа.

Опять же, компилятор может оптимизировать код следующим образом:

while(charQueue.empty());

Чтобы кодировать это так:

if (charQueue.empty()) while (1);

Почему? Потому что ваш код может обращаться к charQueue в любое время. И одному потоку категорически запрещено изменять объект, в то время как другой поток может обращаться к нему. Поэтому компилятору разрешается предполагать, что charQueue не будет изменен во время выполнения этого цикла, и поэтому нет необходимости проверять его на пустоту более одного раза.

У вас есть семпахоры. Используйте их, чтобы гарантировать, что только один поток может касаться charQueue одновременно.

0 голосов
/ 17 апреля 2019

Используя рекомендации @DavidSchwartz, я сделал этот код работающим. Пожалуйста, предложите мне лучший способ реализовать это, как будто есть лучшие и более безопасные способы сделать то, что я сделал. Извините за то, что я не получил большинство комментариев и ответов, так как это мой первый код, использующий pthreads и семафор. Поэтому, пожалуйста, потерпите меня:

#include<iostream>
#include<pthread.h>
#include<fstream>
#include<unistd.h>
#include<semaphore.h>
#include<queue>

// define queue size
#define QUEUE_SIZE 5

// declare and initialize semaphore and read/write counter
static sem_t mutex,mutex1;
//static int counter = 0;

// Queue for saving characters
static std::queue<char> charQueue;

// indicator for end of file
static bool endOfFile = false;

// save arrays
static char consumerArray1[100];
static char consumerArray2[100];


void *Producer(void *ptr)
{
    int i=0;
    std::ifstream input("string.txt");
    char temp;
    while(input>>temp)
    {
        sem_wait(&mutex);
        charQueue.push(temp);
        sem_post(&mutex1);
        sem_post(&mutex);

        i++;

        sleep(6);
    }

    endOfFile = true;
    sem_post(&mutex1);
    pthread_exit(NULL);
}

void *Consumer1(void *ptr)
{

    int i = 0;
    sem_wait(&mutex1);
    bool loopCond = endOfFile;
    while(!loopCond)
    {

        if(endOfFile)
        {

            loopCond = charQueue.empty();
            std::cout<<loopCond<<std::endl;
            sem_post(&mutex1);
        }
       sem_wait(&mutex1);


        sem_wait(&mutex);


        if(!charQueue.empty())
        {

            consumerArray1[i] = charQueue.front();
            charQueue.pop();
            i++;
        }        
        if(charQueue.empty()&&endOfFile)
        {

            sem_post(&mutex);
            sem_post(&mutex1);
            break;
        }  

        sem_post(&mutex);
        sleep(2);

    }

    consumerArray1[i] = '\0';
    pthread_exit(NULL);

}

void *Consumer2(void *ptr)
{

    int i = 0;
    sem_wait(&mutex1);
    bool loopCond = endOfFile;
    while(!loopCond)
    {

        if(endOfFile)
        {

            loopCond = charQueue.empty();
            std::cout<<loopCond<<std::endl;
            sem_post(&mutex1);
        }
        sem_wait(&mutex1);


        sem_wait(&mutex);


        if(!charQueue.empty())
        {

            consumerArray2[i] = charQueue.front();
            charQueue.pop();
            i++;
        }  
        if(charQueue.empty()&& endOfFile)
        {

            sem_post(&mutex);
            sem_post(&mutex1);
            break;
        }  



        sem_post(&mutex);
        sleep(4);

    }

    consumerArray2[i] = '\0';
    pthread_exit(NULL);

}

int main()
{
    pthread_t thread[3];
    sem_init(&mutex,0,1);
    sem_init(&mutex1,0,1);
    pthread_create(&thread[0],NULL,Producer,NULL);
    int rc = pthread_create(&thread[1],NULL,Consumer1,NULL);
    if(rc)
    {
        std::cout<<"Thread not created"<<std::endl;
    }
    pthread_create(&thread[2],NULL,Consumer2,NULL);
    pthread_join(thread[0],NULL);pthread_join(thread[1],NULL);pthread_join(thread[2],NULL);
    std::cout<<"First array: "<<consumerArray1<<std::endl;
    std::cout<<"Second array: "<<consumerArray2<<std::endl;
    sem_destroy(&mutex);
    sem_destroy(&mutex1);
    pthread_exit(NULL);
}
...