Создание многопоточной рабочей очереди (потребитель / производитель) в C ++ - PullRequest
7 голосов
/ 01 декабря 2009

У меня есть следующий сценарий: у меня есть один поток, который должен заполнить контейнер с парами целых чисел (по сути, описания задач), и у меня есть большой количество рабочих потоков (8-16), которые должны взять элементы из этого контейнера и выполнить некоторая работа.

Я думал, что проблему можно легко решить с помощью очереди блокировки - например, при удалении элементов потоки синхронизируют доступ к очереди и спят, если нет доступных данных.

Я (возможно, ошибочно) предположил, что нечто подобное должно существовать в STL или в boost, но я не смог ничего найти.

Должен ли я сам реализовать это? Кажется, такой распространенный сценарий ...

Ответы [ 6 ]

4 голосов
/ 02 декабря 2009

Если вы реализуете это самостоятельно, реализация должна быть довольно простой комбинацией семафора, мьютекса и объекта очереди.

Вот некоторый псевдокод:

Produce{
    pthread_mutex_lock(&mutex);
    queue.push_back(someObjectReference);
    pthread_mutex_unlock(&mutex);
    sem_post(&availabilitySem);
}

Consume{
    sem_wait(&availabilitySem);
    pthread_mutex_lock(&mutex);
    queue.pop_front(someObjectReference);
    pthread_mutext_unlock(&mutex);
}
2 голосов
/ 02 декабря 2009

Если вы находитесь в Windows, взгляните на библиотеку агентов в VS2010, это основной сценарий.

http://msdn.microsoft.com/en-us/library/dd492627(VS.100).aspx

т.е.

//an unbounded_buffer is like a queue
unbounded_buffer<int> buf;

//you can send messages into it with send or asend
send(buf,1);

//receive will block and wait for data
int result = receive(buf)

вы можете использовать потоки, «агенты» или «задачи» для вывода данных ... или вы можете связать буферы вместе и преобразовать проблему блокирующего семантического производителя / потребителя в сеть потока данных.

1 голос
/ 02 декабря 2009

Если вы работаете в Windows и хотите иметь эффективную очередь с точки зрения управления потоками, которые могут запускаться для обработки элементов из нее, посмотрите на порты завершения ввода-вывода (см. здесь ). Моя бесплатная серверная структура включает реализацию очереди задач, основанную на IOCP, и которая также может представлять интерес, если вы собираетесь пойти по этому пути; хотя, возможно, он слишком специализирован для того, что вы хотите.

0 голосов
/ 02 декабря 2009

Если вы используете OSX Snow Leopard, вы можете посмотреть Grand Central Dispatch .

0 голосов
/ 01 декабря 2009

Вам следует взглянуть на ACE (Adaptive Communication Environment) и ACE_Message_Queue. Всегда есть повышение message_queue , но ACE - это то, где он находится с точки зрения высокой производительности параллелизма.

0 голосов
/ 01 декабря 2009

Я думаю message_queue из boost::interprocess - это то, что вы хотите. Вторая ссылка имеет пример использования.

...