проблема pthread_cond_broadcast - PullRequest
       6

проблема pthread_cond_broadcast

5 голосов
/ 19 октября 2010

Использование pthreads в linux 2.6.30 Я пытаюсь отправить один сигнал, который заставит несколько потоков начать выполнение. Вещание, кажется, только получено одним потоком. Я пробовал и pthread_cond_signal и pthread cond_broadcast, и оба, кажется, ведут себя одинаково. Для мьютекса в pthread_cond_wait я пробовал как общие мьютексы, так и отдельные (локальные) мьютексы без видимой разницы.


worker_thread(void *p)
{
   // setup stuff here

   printf("Thread %d ready for action \n", p->thread_no);
   pthread_cond_wait(p->cond_var, p->mutex);
   printf("Thread %d off to work \n", p->thread_no);

   // work stuff
}

dispatch_thread(void *p)
{
   // setup stuff

   printf("Wakeup, everyone ");
   pthread_cond_broadcast(p->cond_var);
   printf("everyone should be working \n");

   // more stuff
}

main()
{
   pthread_cond_init(cond_var);

   for (i=0; i!=num_cores; i++) {
      pthread_create(worker_thread...);
   }

   pthread_create(dispatch_thread...);
}

Выход:


  Thread 0 ready for action
  Thread 1 ready for action
  Thread 2 ready for action
  Thread 3 ready for action
  Wakeup, everyone
  everyone should be working
  Thread 0 off to work

Какой хороший способ отправки сигналов всем потокам?

1 Ответ

9 голосов
/ 19 октября 2010

Во-первых, у вас должен быть заблокирован мьютекс в точке, где вы звоните pthread_cond_wait().Как правило, неплохо бы также удерживать мьютекс при вызове pthread_cond_broadcast().

Во-вторых, вы должны циклически вызывать pthread_cond_wait(), пока условие ожидания выполняется.Могут произойти ложные пробуждения, и вы должны быть в состоянии справиться с ними.

Наконец, ваша настоящая проблема: вы сигнализируете все потоки, но некоторые из них еще не ждут, когда сигнал будет отправлен.Ваш основной поток и поток диспетчеризации участвуют в ваших рабочих потоках: если основной поток может запустить поток диспетчеризации, а поток диспетчеризации может захватить мьютекс и транслировать его раньше, чем рабочие потоки, то эти рабочие потоки никогда не проснутся.

Вам нужна точка синхронизации перед передачей сигнала, когда вы ожидаете сигнала, пока все потоки, как известно, не ожидают сигнала.Это или вы можете продолжать сигнализацию, пока не узнаете, что все потоки были разбужены.

В этом случае вы можете использовать мьютекс для защиты числа спящих потоков.Каждый поток захватывает мьютекс и увеличивает счетчик.Если счетчик соответствует количеству рабочих потоков, то это последний поток, который увеличивает счетчик, и поэтому сигнализирует другой переменной условия, совместно использующей тот же мьютекс, к спящему потоку диспетчеризации, что все потоки готовы.Затем поток ожидает в исходном состоянии, что приводит к освобождению мьютекса.

Если поток диспетчеризации еще не спал, когда последний рабочий поток сигнализирует об этом условии, он обнаружит, что число уже соответствуетжелаемый счетчик и не надо ждать, но сразу же расскажет об общем состоянии, чтобы разбудить работников, которые теперь гарантированно спят.

В любом случае, вот некоторый рабочий исходный код, который конкретизирует ваш пример кода и включает мое решение:

#include <stdio.h>
#include <pthread.h>
#include <err.h>

static const int num_cores = 8;

struct sync {
  pthread_mutex_t *mutex;
  pthread_cond_t *cond_var;
  int thread_no;
};

static int sleeping_count = 0;
static pthread_cond_t all_sleeping_cond = PTHREAD_COND_INITIALIZER;

void *
worker_thread(void *p_)
{
  struct sync *p = p_;
   // setup stuff here

   pthread_mutex_lock(p->mutex);
   printf("Thread %d ready for action \n", p->thread_no);

   sleeping_count += 1;
   if (sleeping_count >= num_cores) {
       /* Last worker to go to sleep. */
       pthread_cond_signal(&all_sleeping_cond);
   }

   int err = pthread_cond_wait(p->cond_var, p->mutex);
   if (err) warnc(err, "pthread_cond_wait");
   printf("Thread %d off to work \n", p->thread_no);
   pthread_mutex_unlock(p->mutex);

   // work stuff
   return NULL;
}

void *
dispatch_thread(void *p_)
{
  struct sync *p = p_;
   // setup stuff

   pthread_mutex_lock(p->mutex);
   while (sleeping_count < num_cores) {
       pthread_cond_wait(&all_sleeping_cond, p->mutex);
   }
   printf("Wakeup, everyone ");
   int err = pthread_cond_broadcast(p->cond_var);
   if (err) warnc(err, "pthread_cond_broadcast");
   printf("everyone should be working \n");
   pthread_mutex_unlock(p->mutex);

   // more stuff
   return NULL;
}

int
main(void)
{
   pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
   pthread_cond_t cond_var = PTHREAD_COND_INITIALIZER;

   pthread_t worker[num_cores];
   struct sync info[num_cores];
   for (int i = 0; i < num_cores; i++) {
      struct sync *p = &info[i];
      p->mutex = &mutex;
      p->cond_var = &cond_var;
      p->thread_no = i;
      pthread_create(&worker[i], NULL, worker_thread, p);
   }

   pthread_t dispatcher;
   struct sync p = {&mutex, &cond_var, num_cores};
   pthread_create(&dispatcher, NULL, dispatch_thread, &p);

   pthread_exit(NULL);
   /* not reached */
   return 0;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...