Я пытаюсь реализовать базовый рабочий пул, используя pthreads.
Сценарий таков, что мне нужно определенное количество работников, которые будут жить на протяжении всей моей программы.
Мне никогда не понадобится сигнализировать отдельные потоки, но все потоки одновременно, поэтому я хочу сделать одну трансляцию.
Мне нужно дождаться завершения всех потоков, прежде чем продолжится основная программа, поэтому я решил использовать барьер_wit в каждом рабочем потоке.
Дело в том, что трансляция не работает, если мой поток вызывает барьер_wait.
Полный пример и компилируемый код приведены ниже. Это только для одного триггера вещания, в моей полной версии я зациклюсь на чем-то вроде
while(conditionMet){
1.prepare data
2.signal threads using data
3.post processing of thread results (because of barrier all threads finished)
4.modify conditionMet if needed
}
Спасибо
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
void checkResults(char *str,int i){
fprintf(stdout,"%s:%d\n",str,i);
}
void checkResults(char *str,size_t n,int i){
fprintf(stdout,"%s[%lu]:%d\n",str,n,i);
}
/* For safe condition variable usage, must use a boolean predicate and */
/* a mutex with the condition. */
int conditionMet = 0;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_barrier_t barr;
#define NTHREADS 3
void *threadfunc(void *parm)
{
size_t i = (size_t) parm;
int rc;
rc = pthread_mutex_lock(&mutex);
checkResults("\tpthread_mutex_lock()",i, rc);
while (0==conditionMet) {
printf("\tThread blocked[%d]\n",(int)i);
rc = pthread_cond_wait(&cond, &mutex);
checkResults("\tpthread_cond_wait()",i, rc);
checkResults("\tbefore barrier",i);
rc = pthread_barrier_wait(&barr);//broadcast works if this is commented out
if(rc)
fprintf(stdout,"problems waiting for baarr\n");
checkResults("\tafter barrier",i);
}
rc = pthread_mutex_unlock(&mutex);
checkResults("\tpthread_mutex_lock()",i, rc);
return NULL;
}
int main(int argc, char **argv)
{
int rc=0;
int i;
pthread_t threadid[NTHREADS];
if(pthread_barrier_init(&barr, NULL,NTHREADS))
{
printf("Could not create a barrier\n");
}
printf("Enter Testcase - %s\n", argv[0]);
printf("Create %d threads\n", NTHREADS);
for(i=0; i<NTHREADS; ++i) {
rc = pthread_create(&threadid[i], NULL, threadfunc,(void *) i);
if(rc)
checkResults("pthread_create()", rc);
}
sleep(5); /* Sleep isn't a very robust way to serialize threads */
rc = pthread_mutex_lock(&mutex);
checkResults("pthread_mutex_lock()", rc);
/* The condition has occured. Set the flag and wake up any waiters */
conditionMet = 1;
printf("\nWake up all waiters...\n");
rc = pthread_cond_broadcast(&cond);
checkResults("pthread_cond_broadcast()", rc);
rc = pthread_mutex_unlock(&mutex);
checkResults("pthread_mutex_unlock()", rc);
printf("Wait for threads and cleanup\n");
for (i=0; i<NTHREADS; ++i) {
rc = pthread_join(threadid[i], NULL);
checkResults("pthread_join()", rc);
}
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
printf("Main completed\n");
return 0;
}