вещание не работает с барьерами - PullRequest
1 голос
/ 13 декабря 2011

Я пытаюсь реализовать базовый рабочий пул, используя pthreads. Сценарий таков, что мне нужно определенное количество работников, которые будут жить на протяжении всей моей программы.

Мне никогда не понадобится сигнализировать отдельные потоки, но все потоки одновременно, поэтому я хочу сделать одну трансляцию.

Мне нужно дождаться завершения всех потоков, прежде чем продолжится основная программа, поэтому я решил использовать барьер_wit в каждом рабочем потоке.

Дело в том, что трансляция не работает, если мой поток вызывает барьер_wait.

Полный пример и компилируемый код приведены ниже. Это только для одного триггера вещания, в моей полной версии я зациклюсь на чем-то вроде

while(conditionMet){
  1.prepare data
  2.signal threads using data
  3.post processing of thread results (because of barrier all threads finished)
  4.modify conditionMet if needed
}

Спасибо

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
void checkResults(char *str,int i){
  fprintf(stdout,"%s:%d\n",str,i);
}
void checkResults(char *str,size_t n,int i){
  fprintf(stdout,"%s[%lu]:%d\n",str,n,i);
}

/* For safe condition variable usage, must use a boolean predicate and  */
/* a mutex with the condition.                                          */
int                 conditionMet = 0;
pthread_cond_t      cond  = PTHREAD_COND_INITIALIZER;
pthread_mutex_t     mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_barrier_t barr;

#define NTHREADS    3

void *threadfunc(void *parm)
{
  size_t i = (size_t) parm;
  int           rc;

  rc = pthread_mutex_lock(&mutex);
  checkResults("\tpthread_mutex_lock()",i, rc);

  while (0==conditionMet) {
    printf("\tThread blocked[%d]\n",(int)i);
    rc = pthread_cond_wait(&cond, &mutex);
    checkResults("\tpthread_cond_wait()",i, rc);
    checkResults("\tbefore barrier",i);
   rc = pthread_barrier_wait(&barr);//broadcast works if this is commented out
    if(rc)
      fprintf(stdout,"problems waiting for baarr\n");
    checkResults("\tafter  barrier",i);
  }

  rc = pthread_mutex_unlock(&mutex);
  checkResults("\tpthread_mutex_lock()",i, rc);
  return NULL;
}

int main(int argc, char **argv)
{
  int                   rc=0;
  int                   i;
  pthread_t             threadid[NTHREADS];

  if(pthread_barrier_init(&barr, NULL,NTHREADS))
    {
      printf("Could not create a barrier\n");
    }



  printf("Enter Testcase - %s\n", argv[0]);

  printf("Create %d threads\n", NTHREADS);
  for(i=0; i<NTHREADS; ++i) {
    rc = pthread_create(&threadid[i], NULL, threadfunc,(void *) i);
    if(rc)
      checkResults("pthread_create()", rc);
  }

  sleep(5);  /* Sleep isn't a very robust way to serialize threads */
  rc = pthread_mutex_lock(&mutex);
  checkResults("pthread_mutex_lock()", rc);

  /* The condition has occured. Set the flag and wake up any waiters */
  conditionMet = 1;
  printf("\nWake up all waiters...\n");
  rc = pthread_cond_broadcast(&cond);
  checkResults("pthread_cond_broadcast()", rc);

  rc = pthread_mutex_unlock(&mutex);
  checkResults("pthread_mutex_unlock()", rc);

  printf("Wait for threads and cleanup\n");
  for (i=0; i<NTHREADS; ++i) {
    rc = pthread_join(threadid[i], NULL);
    checkResults("pthread_join()", rc);
  }
  pthread_cond_destroy(&cond);
  pthread_mutex_destroy(&mutex);

  printf("Main completed\n");
  return 0;
}

Ответы [ 3 ]

3 голосов
/ 13 декабря 2011

Функция потока заблокирует mutex сразу после получения сигнала.Таким образом, только одна функция потока будет ожидать на барьере (с mutex все еще заблокированным), и критерий барьера никогда не будет удовлетворен.

Вы должны изменить логику своего приложения, чтобы использовать барьер.mutex должен быть разблокирован непосредственно перед ожиданием барьера.Кроме того, учитывая использование pthread_cond_wait() в вашем коде, в вашем приложении когда-либо будет активен только один поток, что полностью исключает необходимость многопоточности.

Редактировать:

Я бы хотел немного уточнить последнее предложение.Предположим, что мы модифицируем функцию потока следующим образом:

while (0==conditionMet) {     
    printf("\tThread blocked[%d]\n",(int)i);     
    rc = pthread_cond_wait(&cond, &mutex);     
    checkResults("\tpthread_cond_wait()",i, rc);     
    checkResults("\tbefore barrier",i);

    pthread_mutex_unlock(&mutex); //added    

    rc = pthread_barrier_wait(&barr);//broadcast works if this is commented out     
    if(rc)
        fprintf(stdout,"problems waiting for baarr\n");     
    checkResults("\tafter  barrier",i);   
}

Таким образом, мы можем устранить взаимоблокировку, когда только один поток может достичь барьерной причины блокировки mutex.Но все же только один поток в данный момент времени будет работать в критической секции: когда он pthread_cond_wait() возвращает, mutex заблокирован и будет оставаться заблокированным, пока функция потока не достигнет _unlock ();_Подождите();пара.Только после этого следующая отдельная нить сможет работать и достигнуть своего барьера.Вымойте, промойте, повторите ...

То, что требуется для OP, - это чтобы функции потоков работали одновременно (почему еще кому-то захочется иметь пул потоков?).В этом случае функция может выглядеть следующим образом:

void *threadfunc(void *parm)
{
/*...*/
struct ThreadRuntimeData {
} rtd;
while (0==conditionMet) {     
    printf("\tThread blocked[%d]\n",(int)i);     
    rc = pthread_cond_wait(&cond, &mutex);     
    checkResults("\tpthread_cond_wait()",i, rc);

    GetWorkData(&rtd); //Gets some data from critical section and places it in rtd
    pthread_mutex_unlock(&mutex);

    ProcessingOfData(&rtd); //here we do the thread's job 
    //without the modification of global data; this may take a while

    pthread_mutex_lock(&mutex);
    PublishProcessedData(&rtd); //Here we modify some global data 
    //with the results of thread's work. 
    //Other threads may do the same, so we had to enter critical section again
    pthread_mutex_unlock(&mutex);   
    checkResults("\tbefore barrier",i);
    rc = pthread_barrier_wait(&barr);//broadcast works if this is commented out     
    if(rc)
        fprintf(stdout,"problems waiting for baarr\n");     
    checkResults("\tafter  barrier",i);   
}
/*...*/
}

Это просто набросок курса.Оптимальный дизайн функции потока зависит от того, что OP хочет, чтобы поток выполнял.

В качестве примечания, код для проверки результата возврата pthread_barrier_wait() должен учитывать возвращение PTHREAD_BARRIER_SERIAL_THREAD.Также было бы безопаснее объявить conditionMet как volatile.

1 голос
/ 13 декабря 2011

Из вопроса неясно, что представляют собой входные данные и как они связаны с потоками и результатами. Я не могу сказать по опубликованному коду, потому что не вижу, где должна быть выполнена настоящая работа.

Предполагая, что у вас есть N (под) задачи, N потоков и вы хотите, чтобы основной поток ожидал N результатов: вам на самом деле не нужен барьер, вы можете просто сделать:

  • основная тема
    1. push N задач в очереди ввода
    2. дождитесь получения N результатов в очереди вывода
  • рабочий поток
    1. извлечение задачи из очереди ввода
    2. результат вычисления
    3. отправить результат в очередь вывода

Самая простая из синхронизированных очередей будет просто выдвигать / выгружать элементы по одному (нажатие сигналов, если очередь пуста, всплывающее окно ожидает, если очередь пуста, и все).

Вы можете легко добавить что-то вроде push_n(vector<task> const &input), который транслирует, и pop_n(int count, vector<result> &output), который ждет всех count результатов, в качестве оптимизации, но базовый шаблон такой же.

0 голосов
/ 14 декабря 2011

Вы делаете вещи намного сложнее для себя, чем должны.Избавьтесь от барьеров.

Если вы хотите подождать, пока все работы не будут выполнены, просто держите счет количества оставшихся заданий.Защити его мьютексом.Запустите его с помощью условной переменной.Затем просто используйте pthread_cond_wait, чтобы дождаться, пока он достигнет нуля.(Вы можете использовать ту же логику, которую вы уже использовали для обработки очереди заданий.)

В качестве альтернативы, закодируйте потоки, чтобы завершить их, когда у них больше нет работы.Затем просто подождите, пока все рабочие потоки не завершатся с pthread_join.

...