Проблема с синхронизацией потока и условными переменными в C - PullRequest
0 голосов
/ 28 июля 2011

У меня три потока, один из которых является основным, а два других являются рабочими.Первый поток, когда есть работа, которую нужно выполнить, пробуждает один из двух потоков.Каждый поток при пробуждении выполняет некоторые вычисления и, выполняя это, если находит больше работы, может разбудить другой рабочий поток или просто решить выполнить работу самостоятельно (например, добавив работу в локальную очередь).В то время как рабочие потоки имеют работу, основной поток должен ждать, пока работа не будет выполнена.Я реализовал это с помощью условных переменных следующим образом (код, представленный здесь, скрывает много деталей, пожалуйста, спросите, есть ли что-то непонятное):

MAIN THREAD (псевдокод):

//this function can be called from the main several time. It blocks the main thread till the work is done.
void new_work(){

//signaling to worker threads if work is available

    //Now, the threads have been awakened, it's time to sleep till they have finished.
    pthread_mutex_lock(&main_lock);
    while (work > 0)    //work is a shared atomic integer, incremented each time there's work to do and decremented when finished executing some work unit
       pthread_cond_wait(&main_cond);
    pthread_mutex_unlock(&main_lock);

}

РАБОЧИЕ РЕЗЬБЫ:

while (1){

   pthread_mutex_lock(&main_lock);
    if (work == 0)
       pthread_cond_signal(&main_cond);
    pthread_mutex_unlock(&main_lock);  

    //code to let the worker thread wait again -- PROBLEM!

   while (I have work to do, in my queue){
       do_work()
   }

}

Вот проблема: когда рабочий поток пробуждает основной поток, я не уверен, что рабочий поток вызывает ожидание, чтобы перевести себя в состояние ожидания новой работы.Даже если я реализую это ожидание с другой условной переменной, может случиться так, что основной поток не активен, выполняет некоторую работу, пока не достигнет точки, в которой он должен пробудить поток, который еще не вызвал ожидание ... и это можетпривести к плохим результатам.Я пробовал несколько способов решить эту проблему, но я не мог найти решение, может быть, есть очевидный способ решить его, но я его упускаю.

Можете ли вы предоставить схему для решения этой проблемыпроблемы?Я использую язык C и могу использовать любой механизм синхронизации, который вы считаете подходящим, например pthreads или posix семафоры.

Спасибо

Ответы [ 4 ]

2 голосов
/ 04 августа 2011

Обычный способ справиться с этим - создать одну рабочую очередь и защитить ее от переполнения и переполнения.Примерно так (где я остановил префиксы "pthread_"):

mutex queue_mutex;
cond_t queue_not_full, queue_not_empty;

void enqueue_work(Work w) {
    mutex_lock(&queue_mutex);
    while (queue_full())
        cond_wait(&queue_not_full, &queue_mutex);
    add_work_to_queue(w);
    cond_signal(&queue_not_empty);
    mutex_unlock(&queue_mutex);
}

Work dequeue_work() {
    mutex_lock(&queue_mutex);
    while (queue_empty())
        cond_wait(&queue_not_empty, &queue_mutex);
    Work w = remove_work_from_queue();
    cond_signal(&queue_not_full);
    mutex_unlock(&queue_mutex);
}

Обратите внимание на симметрию между этими функциями: enqueue <-> dequeue, empty <-> full, not_empty <-> not full.

Это обеспечивает потокобезопасную очередь ограниченного размера для любого количества потоков, выполняющих работу, и любого количества потоков, потребляющих работу.(На самом деле, это своего рода канонический пример использования условных переменных.) Если ваше решение не выглядит точно так, оно, вероятно, должно быть довольно близко ...

1 голос
/ 04 августа 2011

Если вы хотите, чтобы основной поток распределял работу между двумя другими, то подождите, пока оба потока не завершат свою работу, прежде чем продолжить, возможно, вы сможете сделать это с барьером.

Барьер - это конструкция синхронизации, которую вы можете использовать, чтобы заставить потоки ждать в определенной точке вашего кода, пока все количество потоков не будет готово двигаться дальше. По сути, вы инициализируете барьер pthread, говоря, что x число потоков должно ожидать его, прежде чем любой из них сможет продолжить работу. Когда каждый поток завершает свою работу и готов к продолжению, он будет ждать на барьере, и как только число потоков x достигнет барьера, им всем будет разрешено продолжить.

В вашем случае вы можете сделать что-то вроде:

pthread_barrier_t barrier;
pthread_barrier_init(&barrier, 3);

master()
{
  while (work_to_do) {
    put_work_on_worker_queues();
    pthread_barrier_wait(&barrier);
  }
}

worker()
{
  while(1) {
    while (work_on_my_queue()) {
      do_work();
    }
    pthread_barrier_wait(&barrier);
  }
}

Это должно заставить ваш главный поток выдавать работу, затем дождаться, пока оба рабочих потока завершат работу, которую им дали (если таковые имеются), прежде чем двигаться дальше.

0 голосов
/ 30 июля 2011

Я полагаю, что у вас есть вариация проблемы производитель-потребитель . То, что вы делаете, - это пишите специальную реализацию счетного семафора (который используется не только для взаимного исключения).

Если я правильно прочитал ваш вопрос, то вы пытаетесь сделать блок рабочих потоков, пока не появится доступная единица работы, а затем выполнить единицу работы, как только она станет доступной. Ваша проблема связана со случаем, когда доступно слишком много работы, и основной поток пытается разблокировать работника, который уже работает. Я бы структурировал ваш код следующим образом.

sem_t main_sem;
sem_init(&main_sem, 0, 0);

void new_work() {
    sem_post(&main_sem);
    pthread_cond_wait(&main_cond);
}

void do_work() {
    while (1) {
        sem_wait(&main_sem);
        // do stuff
        // do more stuff
        pthread_cond_signal(&main_sem);
    }
}

Теперь, если рабочие потоки генерируют больше работы, они могут просто sem_post перейти к семафору и просто отложить pthread_cond_signal до тех пор, пока вся работа не будет выполнена.

Обратите внимание, однако, что если вам действительно нужен главный поток, чтобы он всегда блокировался, когда работник работает, бесполезно переносить работу в другой поток, когда вы можете просто вызвать функцию, которая выполняет эту работу.

0 голосов
/ 28 июля 2011

Не могли бы вы иметь очередь "новая работа", которая управляется основным потоком?Основной поток может распределять по 1 заданию за один рабочий поток.Основной поток также будет слушать о выполненных работах рабочих.Если рабочий поток находит новое задание, которое нужно выполнить, просто добавьте его в очередь «новое задание» и основной поток распространит его.

Псевдокод:

JobQueue NewJobs;
Job JobForWorker[NUM_WORKERS];

workerthread()
{
  while(wait for new job)
  {
    do job (this may include adding new jobs to NewJobs queue)
    signal job complete to main thread
  }
}

main thread()
{
  while(whatever)
  {
    wait for job completion on any worker thread
    now a worker thread is free put a new job on it
  }
}
...