Я нашел решение здесь . Для меня немного сложнее понять проблему:
- Производители и потребители должны иметь возможность общаться в обоих направлениях. В любом случае недостаточно.
- Эта двусторонняя связь может быть упакована в одно условие.
Чтобы проиллюстрировать, упомянутое выше сообщение в блоге продемонстрировало, что это действительно значимое и желательное поведение:
pthread_mutex_lock(&cond_mutex);
pthread_cond_broadcast(&cond):
pthread_cond_wait(&cond, &cond_mutex);
pthread_mutex_unlock(&cond_mutex);
Идея состоит в том, что если и производители, и потребители используют эту логику, то будет безопасно, чтобы кто-то из них спал первым, поскольку каждый из них сможет разбудить другую роль. Другими словами, в типичном сценарии «производитель-потребитель» - если потребителю нужно спать, это потому, что производителю нужно проснуться, и наоборот. Упаковка этой логики в одно условие pthread имеет смысл.
Конечно, приведенный выше код имеет непреднамеренное поведение, когда рабочий поток также пробуждает другой спящий рабочий поток, когда он на самом деле просто хочет разбудить производителя. Это можно решить с помощью простой проверки переменных, как подсказывает @Borealid:
while(!work_available) pthread_cond_wait(&cond, &cond_mutex);
При рабочей трансляции все рабочие потоки будут пробуждены, но один за другим (из-за неявной блокировки мьютекса в pthread_cond_wait
). Поскольку один из рабочих потоков будет потреблять работу (установив work_available
обратно на false
), когда другие рабочие потоки проснутся и фактически приступят к работе, работа будет недоступна, поэтому рабочий снова будет спать.
Вот некоторый прокомментированный код, который я тестировал, для всех, кто интересуется:
// gcc -Wall -pthread threads.c -lpthread
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <assert.h>
pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t my_cond_m = PTHREAD_MUTEX_INITIALIZER;
int * next_work = NULL;
int all_work_done = 0;
void * worker(void * arg)
{
int * my_work = NULL;
while(!all_work_done)
{
pthread_mutex_lock(&my_cond_m);
if(next_work == NULL)
{
// Signal producer to give work
pthread_cond_broadcast(&my_cond);
// Wait for work to arrive
// It is wrapped in a while loop because the condition
// might be triggered by another worker thread intended
// to wake up the producer
while(!next_work && !all_work_done)
pthread_cond_wait(&my_cond, &my_cond_m);
}
// Work has arrived, cache it locally so producer can
// put in next work ASAP
my_work = next_work;
next_work = NULL;
pthread_mutex_unlock(&my_cond_m);
if(my_work)
{
printf("Worker %d consuming work: %d\n", (int)(pthread_self() % 100), *my_work);
free(my_work);
}
}
return NULL;
}
int * create_work()
{
int * ret = (int *)malloc(sizeof(int));
assert(ret);
*ret = rand() % 100;
return ret;
}
void * producer(void * arg)
{
int i;
for(i = 0; i < 10; i++)
{
pthread_mutex_lock(&my_cond_m);
while(next_work != NULL)
{
// There's still work, signal a worker to pick it up
pthread_cond_broadcast(&my_cond);
// Wait for work to be picked up
pthread_cond_wait(&my_cond, &my_cond_m);
}
// No work is available now, let's put work on the queue
next_work = create_work();
printf("Producer: Created work %d\n", *next_work);
pthread_mutex_unlock(&my_cond_m);
}
// Some workers might still be waiting, release them
pthread_cond_broadcast(&my_cond);
all_work_done = 1;
return NULL;
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, worker, NULL);
pthread_create(&t2, NULL, worker, NULL);
pthread_create(&t3, NULL, worker, NULL);
pthread_create(&t4, NULL, worker, NULL);
producer(NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
return 0;
}