Как правильно синхронизировать потоки в pthreads? - PullRequest
0 голосов
/ 18 апреля 2019

Я реализую проблему производитель-потребитель, используя pthreads и семафор. У меня есть 1 производитель и 2 потребителя. Мой производитель читает символы один за другим из файла и помещает их в круговую очередь. Я хочу, чтобы потребители читали из очереди и хранили в отдельных массивах. Я хочу, чтобы чтение было таким, чтобы первый потребитель читал 2 символа, а второй читал каждый третий символ. Я пытаюсь сделать это, используя pthread_cond_wait(), но это не работает. Это мой код:

#include<iostream>
#include<pthread.h>
#include<fstream>
#include<unistd.h>
#include<semaphore.h>
#include<queue>
#include "circular_queue"

// define queue size
#define QUEUE_SIZE 5

// declare and initialize semaphore and read/write counter
static sem_t mutex,queueEmptyMutex;
//static int counter = 0;

// Queue for saving characters
static Queue charQueue(QUEUE_SIZE);
//static std::queue<char> charQueue;

// indicator for end of file
static bool endOfFile = false;

// save arrays
static char consumerArray1[100];
static char consumerArray2[100];

static pthread_cond_t cond;
static pthread_mutex_t cond_mutex; 
static bool thirdCharToRead = false;

void *Producer(void *ptr)
{
    int i=0;
    std::ifstream input("string.txt");
    char temp;
    while(input>>temp)
    {
        std::cout<<"reached here a"<<std::endl;
        sem_wait(&mutex);
        std::cout<<"reached here b"<<std::endl;
        if(!charQueue.full())
        {
            charQueue.enQueue(temp);
        }
        sem_post(&queueEmptyMutex);
        sem_post(&mutex);

        i++;

        sleep(4);
    }

    endOfFile = true;
    sem_post(&queueEmptyMutex);
    pthread_exit(NULL);
}

void *Consumer1(void *ptr)
{

    int i = 0;
    sem_wait(&queueEmptyMutex);
    bool loopCond = endOfFile;
    while(!loopCond)
    {
        std::cout<<"consumer 1 loop"<<std::endl;
        if(endOfFile)
        {

            loopCond = charQueue.empty();
            std::cout<<loopCond<<std::endl;
            sem_post(&queueEmptyMutex);
        }
       sem_wait(&queueEmptyMutex);


        sem_wait(&mutex);


        if(!charQueue.empty())
        {

            consumerArray1[i] = charQueue.deQueue();
            i++;
            if(i%2==0)
            {
                pthread_mutex_lock(&cond_mutex);
                std::cout<<"Signal cond. i = "<<i<<std::endl;
                thirdCharToRead = true;
                pthread_mutex_unlock(&cond_mutex);
                pthread_cond_signal(&cond);

            }
        }        
        if(charQueue.empty()&&endOfFile)
        {

            sem_post(&mutex);
            sem_post(&queueEmptyMutex);

            break;
        }  
        sem_post(&mutex);

        sleep(2);
        std::cout<<"consumer 1 loop end"<<std::endl;
    }

    consumerArray1[i] = '\0';
    pthread_exit(NULL);

}

void *Consumer2(void *ptr)
{

    int i = 0;
    sem_wait(&queueEmptyMutex);
    bool loopCond = endOfFile;
    while(!loopCond)
    {
        std::cout<<"consumer 2 loop"<<std::endl;
        if(endOfFile)
        {

            loopCond = charQueue.empty();
            std::cout<<loopCond<<std::endl;
            sem_post(&queueEmptyMutex);
        }
        sem_wait(&queueEmptyMutex);


        sem_wait(&mutex);


        if(!charQueue.empty())
        {
            pthread_mutex_lock(&cond_mutex);

            while(!thirdCharToRead)
            {
                std::cout<<"Waiting for condition"<<std::endl;
                pthread_cond_wait(&cond,&cond_mutex);
            }
            std::cout<<"Wait over"<<std::endl;
            thirdCharToRead = false;
            pthread_mutex_unlock(&cond_mutex);

            consumerArray2[i] = charQueue.deQueue();

            i++;
        }  
        if(charQueue.empty()&& endOfFile)
        {
            sem_post(&mutex);
            sem_post(&queueEmptyMutex);
            break;
        }  

        sem_post(&mutex);
        std::cout<<"consumer 2 loop end"<<std::endl;
        sleep(2);

    }

    consumerArray2[i] = '\0';
    pthread_exit(NULL);

}

int main()
{
    pthread_t thread[3];
    sem_init(&mutex,0,1);
    sem_init(&queueEmptyMutex,0,1);
    pthread_mutex_init(&cond_mutex,NULL);
    pthread_cond_init(&cond,NULL);
    pthread_create(&thread[0],NULL,Producer,NULL);
    int rc = pthread_create(&thread[1],NULL,Consumer1,NULL);
    if(rc)
    {
        std::cout<<"Thread not created"<<std::endl;
    }
    pthread_create(&thread[2],NULL,Consumer2,NULL);
    pthread_join(thread[0],NULL);pthread_join(thread[1],NULL);pthread_join(thread[2],NULL);
    std::cout<<"First array: "<<consumerArray1<<std::endl;
    std::cout<<"Second array: "<<consumerArray2<<std::endl;
    sem_destroy(&mutex);
    sem_destroy(&queueEmptyMutex);
    pthread_exit(NULL);
}

У меня проблема в том, что после одного чтения потребитель 2 заходит в бесконечный цикл в while(!thirdCharToRead). Есть ли лучший способ реализовать это?

1 Ответ

2 голосов
/ 18 апреля 2019

Хорошо, давайте начнем с этого кода:

        std::cout<<"Wait over"<<std::endl;
        pthread_mutex_unlock(&cond_mutex);
        thirdCharToRead = false;

Этот код говорит, что cond_mutex не защищает thirdCharToRead от одновременного доступа. Зачем? Потому что он изменяет thirdCharToRead без удержания этого мьютекса.

Теперь посмотрите на этот код:

        pthread_mutex_lock(&cond_mutex);

        while(!thirdCharToRead)
        {
            std::cout<<"Waiting for condition"<<std::endl;
            pthread_cond_wait(&cond,&cond_mutex);
        }

Теперь цикл while проверяет thirdCharToRead, поэтому мы должны хранить любую блокировку, защищающую thirdCharToRead от одновременного доступа, когда мы ее тестируем. Но цикл while будет зацикливаться вечно, если thirdCharToRead останется заблокированным для всего цикла, поскольку никакой другой поток не сможет его изменить. Таким образом, этот код имеет смысл, только если где-то в цикле мы снимаем блокировку, которая защищает thirdCharToRead, и единственная блокировка, которую мы снимаем в цикле, это cond_mutex при вызове pthread_cond_wait.

Так что этот код имеет смысл, только если cond_mutex защищает thirdCharToRead.

Хьюстон, у нас проблема. Один фрагмент кода говорит, что cond_mutex не защищает thirdCharToRead, а один фрагмент кода говорит: cond_mutex защищает thirdCharToRead.

...