C производитель / потребитель - вывод одинаковых данных и застревание до окончания - PullRequest
0 голосов
/ 10 января 2020

У нас есть задание, основанное на проблеме производителя / потребителя. По сути, назначение сводится к следующему:

  • Позвольте N_PROD, N_CON представлять количество потоков производителя / потребителя, соответственно, которые создаются, и пусть BUF_SIZE представляет размер буфера.

  • Каждый поток производителей должен выбрать 2 случайных простых числа, умножить их и добавить результат в буфер

  • Пусть TOTAL_PRIMES представляет, сколько раз всего элементов добавляются или удаляются из буфера. Очевидно, что некоторые потоки могут обращаться к буферу больше, чем другие (например, если TOTAL_PRIMES=5 и N_PROD=3, то вполне нормально, если producer1 добавляет три разных продукта в буфер, producer2 добавляет два, а producer3 не добавляет ни одного).

  • порожденные потоки должны начать работать только после того, как основной поток создал всех производителей / потребителей.

Каждый раз, когда я запускаю программу, кажется, что она никогда не превосходит 3 или 4 основных продукта. Кроме того, кажется, продолжает писать те же данные.

Мой главный вопрос: почему программа никогда не достигает конца? Я не могу понять, где он застрял. Я не знаю, является ли повторяющиеся данные следствием первой проблемы, или это две отдельные проблемы.

Вот то, что я считаю наиболее важной частью моего кода (поэтому не включая функции, которые просто выполняют некоторые вычисления или выводят что-то на консоль):

// Declaration of thread condition variable 
pthread_cond_t start_cond = PTHREAD_COND_INITIALIZER; 
// declaring mutex 
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; 

lluint buf[BUF_SIZE];

int p_count = 0;
int c_count = 0;
int all_started = 0;
int pdone = 0;
int cdone = 0;

//////////////////////////////////
sem_t* sem_access;
sem_t* sem_occupy;  //how many elements currently in buffer
sem_t* sem_free;    //how much free space we have
sem_t* sem_all_started;

sem_t* sem_buf;

///////////////////////////////////////////

// int main()
int main(int argc, char* argv[])
{
    close_Semaphors();

    pthread_t p_threads[N_PROD];
    pthread_t c_threads[N_CONS];

    open_all_sem(); //helper function to opens all semaphores listed above

    int rc, t;
    for (t=0; t<N_PROD; t++)
    {
        rc = pthread_create(&p_threads[t], NULL, producer, (void*)t);
        if (rc)
        {
          printf("ERROR; return code from pthread_create() is %d\n", rc);
          exit(-1);
       }
    }


    sem_wait(sem_access);   
    printf("main thread created all producer threads\n");   
    sem_post(sem_access);   

    for (t=0; t<N_CONS; t++)
    { 
        rc = pthread_create(&c_threads[t], NULL, consumer, (void*)t);
        if (rc)
        {
          printf("ERROR; return code from pthread_create() is %d\n", rc);
          exit(-1);
       }
    }

    printf("main thread created all consumer threads\n");   
    broadcast_all_started();

    for (t=0; t<N_PROD; t++)
    {
        rc = pthread_join(p_threads[t], NULL);
        if (rc != 0)
        {
            printf("Error joining producer thread %d\n", t+1);
            exit(-1);
        }
    }

    for (t=0; t<N_CONS; t++)
    {
        rc = pthread_join(c_threads[t], NULL);
        if (rc != 0)
        {
            printf("Error joining consumer thread %d\n", t+1);
            exit(-1);
        }

    }

    printf("Back in main thread\n");

    sem_unlink("/sem_free");
    sem_unlink("/sem_access"); 
    sem_unlink("/sem_occupy");
    sem_unlink("/sem_all_started");
    sem_unlink("/sem_buf");

    printf("Goodbye!\n");
    close_Semaphors();
    return 0;
}



void producer(void* id)
{
    wait_in_threads_until_all_start();
    int p_id = (int)id + 1;

    while (!pdone)
    {
        lluint prime1, prime2, primeProd;

        prime1 = getPrimeNum();  
        prime2 = getPrimeNum();
        primeProd = prime1 * prime2;


        sem_wait(sem_free); 
        sem_wait(sem_access);   
        write_add_to_buf_msg(p_id, prime1, prime2, primeProd);   
        write_producer_is_done(p_id);  //simply outputs something 
        sem_post(sem_access);
        sem_post(sem_occupy);   

        p_count++;
        if (p_count == TOTAL_MSG)
        {
            printf("all producers terminated\n");
            pdone = 1;
        }
    }

    if (pdone == 1)
        pthread_exit(NULL);
}


void add_to_buf (lluint prod)
{

    sem_wait(sem_buf);
    int val;
    sem_getvalue(sem_occupy, &val);

    buf[val] = prod;
    sem_post(sem_buf);      
}


void consumer(void* id)
{
    wait_in_threads_until_all_start();
    int c_id = (int)id+1;

    while (!cdone)
    {       
        sem_wait(sem_occupy);

        lluint prod, factor1, factor2;      
        sem_wait(sem_access);   
        write_remove_from_buf_msg(c_id, &prod); //within this we call remove_from_buf
        find_two_factors(prod, &factor1, &factor2);     
        printf(" = %lli x %lli\n", factor1, factor2);
        write_consumer_is_done(c_id);
        sem_post(sem_access);   
        sem_post(sem_free);
        c_count++;  

        if (c_count == TOTAL_MSG)
        {
            printf("all consumers terminated\n");
            cdone = 1;
        }
    }
    if (cdone == 1)
        pthread_exit(NULL);

}


void remove_from_buf(lluint* prod)
{
    sem_wait(sem_buf);   

    int val;
    sem_getvalue(sem_occupy, &val);

    *prod = buf[val];
    sem_post(sem_buf);   
}


void wait_in_threads_until_all_start()
{
    sem_post(sem_all_started);
    pthread_mutex_lock(&lock);
    if (all_started == 0)
    {
        pthread_cond_wait(&start_cond, &lock); 
    }
    pthread_mutex_unlock(&lock);
}


void all_threads_ready()
{
    pthread_mutex_lock(&lock);
    all_started = 1;
    pthread_cond_broadcast(&start_cond);
    pthread_mutex_unlock(&lock);
}



void close_Semaphors(void)
{
    sem_close(sem_access);
    sem_close(sem_occupy);
    sem_close(sem_free);
    sem_close(sem_all_started);
    sem_close(sem_buf);
}

Вот вывод из 4 отдельных запусков программы (каждый раз, когда мне приходилось вводить `` cntl + c```, чтобы остановить его)

Output1.txt

main thread created all producer threads
main thread created all consumer threads
producer #1 going to add product: 15347 = 103 x 149
producer #1 is done
producer #2 going to add product: 15347 = 103 x 149
producer #2 is done
consumer #3 just removed: 15347 = 103 x 149
consumer #3 is done
consumer #1 just removed: 15347 = 103 x 149
consumer #1 is done
producer #1 going to add product: 19367 = 107 x 181
producer #1 

Output2.txt

main thread created all producer threads
main thread created all consumer threads
producer #1 going to add product: 15347 = 103 x 149
producer #1 

Output3.txt

main thread created all producer threads
main thread created all consumer threads
producer #1 going to add product: 15347 = 103 x 149
producer #1 is done
consumer #3 just removed: 15347 = 103 x 149
consumer #3 is done
producer #1 going to add product: 19367 = 107 x 181
producer #1 is done
consumer #2 just removed: 19367 = 107 x 181
consumer #2 is done
producer #1 going to add product: 27221 = 163 x 167
producer #1 is done
consumer #1 just removed: 27221 = 163 x 167
consumer #1 

Output4.txt

main thread created all producer threads
main thread created all consumer threads
producer #1 going to add product: 15347 = 103 x 149
producer #1 is done
consumer #3 just removed: 15347 = 103 x 149
consumer #3 is done
producer #1 going to add product: 19367 = 107 x 181
producer #1 is done
consumer #1 just removed: 19367 = 107 x 181
consumer #1 is done
producer #1 going to add product: 27221 = 163 x 167
producer #1 is done
consumer #2 just removed: 27221 = 163 x 167
consumer #2 is done
producer #1 going to add product: 20651 = 107 x 193
producer #1 is done
consumer #3 just removed: 20651 = 107 x 193
consumer #3 is done
producer #1 going to add product: 30967 = 173 x 179
producer #1 is done
consumer #1 just removed: 30967 = 173 x 179
consumer #1 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...