У нас есть задание, основанное на проблеме производителя / потребителя. По сути, назначение сводится к следующему:
Позвольте N_PROD, N_CON
представлять количество потоков производителя / потребителя, соответственно, которые создаются, и пусть BUF_SIZE
представляет размер буфера.
Каждый поток производителей должен выбрать 2 случайных простых числа, умножить их и добавить результат в буфер
Пусть TOTAL_PRIMES
представляет, сколько раз всего элементов добавляются или удаляются из буфера. Очевидно, что некоторые потоки могут обращаться к буферу больше, чем другие (например, если TOTAL_PRIMES=5
и N_PROD=3
, то вполне нормально, если producer1
добавляет три разных продукта в буфер, producer2
добавляет два, а producer3
не добавляет ни одного).
порожденные потоки должны начать работать только после того, как основной поток создал всех производителей / потребителей.
Каждый раз, когда я запускаю программу, кажется, что она никогда не превосходит 3 или 4 основных продукта. Кроме того, кажется, продолжает писать те же данные.
Мой главный вопрос: почему программа никогда не достигает конца? Я не могу понять, где он застрял. Я не знаю, является ли повторяющиеся данные следствием первой проблемы, или это две отдельные проблемы.
Вот то, что я считаю наиболее важной частью моего кода (поэтому не включая функции, которые просто выполняют некоторые вычисления или выводят что-то на консоль):
// Declaration of thread condition variable
pthread_cond_t start_cond = PTHREAD_COND_INITIALIZER;
// declaring mutex
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
lluint buf[BUF_SIZE];
int p_count = 0;
int c_count = 0;
int all_started = 0;
int pdone = 0;
int cdone = 0;
//////////////////////////////////
sem_t* sem_access;
sem_t* sem_occupy; //how many elements currently in buffer
sem_t* sem_free; //how much free space we have
sem_t* sem_all_started;
sem_t* sem_buf;
///////////////////////////////////////////
// int main()
int main(int argc, char* argv[])
{
close_Semaphors();
pthread_t p_threads[N_PROD];
pthread_t c_threads[N_CONS];
open_all_sem(); //helper function to opens all semaphores listed above
int rc, t;
for (t=0; t<N_PROD; t++)
{
rc = pthread_create(&p_threads[t], NULL, producer, (void*)t);
if (rc)
{
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
sem_wait(sem_access);
printf("main thread created all producer threads\n");
sem_post(sem_access);
for (t=0; t<N_CONS; t++)
{
rc = pthread_create(&c_threads[t], NULL, consumer, (void*)t);
if (rc)
{
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
printf("main thread created all consumer threads\n");
broadcast_all_started();
for (t=0; t<N_PROD; t++)
{
rc = pthread_join(p_threads[t], NULL);
if (rc != 0)
{
printf("Error joining producer thread %d\n", t+1);
exit(-1);
}
}
for (t=0; t<N_CONS; t++)
{
rc = pthread_join(c_threads[t], NULL);
if (rc != 0)
{
printf("Error joining consumer thread %d\n", t+1);
exit(-1);
}
}
printf("Back in main thread\n");
sem_unlink("/sem_free");
sem_unlink("/sem_access");
sem_unlink("/sem_occupy");
sem_unlink("/sem_all_started");
sem_unlink("/sem_buf");
printf("Goodbye!\n");
close_Semaphors();
return 0;
}
void producer(void* id)
{
wait_in_threads_until_all_start();
int p_id = (int)id + 1;
while (!pdone)
{
lluint prime1, prime2, primeProd;
prime1 = getPrimeNum();
prime2 = getPrimeNum();
primeProd = prime1 * prime2;
sem_wait(sem_free);
sem_wait(sem_access);
write_add_to_buf_msg(p_id, prime1, prime2, primeProd);
write_producer_is_done(p_id); //simply outputs something
sem_post(sem_access);
sem_post(sem_occupy);
p_count++;
if (p_count == TOTAL_MSG)
{
printf("all producers terminated\n");
pdone = 1;
}
}
if (pdone == 1)
pthread_exit(NULL);
}
void add_to_buf (lluint prod)
{
sem_wait(sem_buf);
int val;
sem_getvalue(sem_occupy, &val);
buf[val] = prod;
sem_post(sem_buf);
}
void consumer(void* id)
{
wait_in_threads_until_all_start();
int c_id = (int)id+1;
while (!cdone)
{
sem_wait(sem_occupy);
lluint prod, factor1, factor2;
sem_wait(sem_access);
write_remove_from_buf_msg(c_id, &prod); //within this we call remove_from_buf
find_two_factors(prod, &factor1, &factor2);
printf(" = %lli x %lli\n", factor1, factor2);
write_consumer_is_done(c_id);
sem_post(sem_access);
sem_post(sem_free);
c_count++;
if (c_count == TOTAL_MSG)
{
printf("all consumers terminated\n");
cdone = 1;
}
}
if (cdone == 1)
pthread_exit(NULL);
}
void remove_from_buf(lluint* prod)
{
sem_wait(sem_buf);
int val;
sem_getvalue(sem_occupy, &val);
*prod = buf[val];
sem_post(sem_buf);
}
void wait_in_threads_until_all_start()
{
sem_post(sem_all_started);
pthread_mutex_lock(&lock);
if (all_started == 0)
{
pthread_cond_wait(&start_cond, &lock);
}
pthread_mutex_unlock(&lock);
}
void all_threads_ready()
{
pthread_mutex_lock(&lock);
all_started = 1;
pthread_cond_broadcast(&start_cond);
pthread_mutex_unlock(&lock);
}
void close_Semaphors(void)
{
sem_close(sem_access);
sem_close(sem_occupy);
sem_close(sem_free);
sem_close(sem_all_started);
sem_close(sem_buf);
}
Вот вывод из 4 отдельных запусков программы (каждый раз, когда мне приходилось вводить `` cntl + c```, чтобы остановить его)
Output1.txt
main thread created all producer threads
main thread created all consumer threads
producer #1 going to add product: 15347 = 103 x 149
producer #1 is done
producer #2 going to add product: 15347 = 103 x 149
producer #2 is done
consumer #3 just removed: 15347 = 103 x 149
consumer #3 is done
consumer #1 just removed: 15347 = 103 x 149
consumer #1 is done
producer #1 going to add product: 19367 = 107 x 181
producer #1
Output2.txt
main thread created all producer threads
main thread created all consumer threads
producer #1 going to add product: 15347 = 103 x 149
producer #1
Output3.txt
main thread created all producer threads
main thread created all consumer threads
producer #1 going to add product: 15347 = 103 x 149
producer #1 is done
consumer #3 just removed: 15347 = 103 x 149
consumer #3 is done
producer #1 going to add product: 19367 = 107 x 181
producer #1 is done
consumer #2 just removed: 19367 = 107 x 181
consumer #2 is done
producer #1 going to add product: 27221 = 163 x 167
producer #1 is done
consumer #1 just removed: 27221 = 163 x 167
consumer #1
Output4.txt
main thread created all producer threads
main thread created all consumer threads
producer #1 going to add product: 15347 = 103 x 149
producer #1 is done
consumer #3 just removed: 15347 = 103 x 149
consumer #3 is done
producer #1 going to add product: 19367 = 107 x 181
producer #1 is done
consumer #1 just removed: 19367 = 107 x 181
consumer #1 is done
producer #1 going to add product: 27221 = 163 x 167
producer #1 is done
consumer #2 just removed: 27221 = 163 x 167
consumer #2 is done
producer #1 going to add product: 20651 = 107 x 193
producer #1 is done
consumer #3 just removed: 20651 = 107 x 193
consumer #3 is done
producer #1 going to add product: 30967 = 173 x 179
producer #1 is done
consumer #1 just removed: 30967 = 173 x 179
consumer #1