C ++ - потоки и несколько очередей - PullRequest
5 голосов
/ 22 мая 2009

Мне нужно построить систему рабочих (представленных в виде потоков) и (нескольких) очередей. Отдельные задания ожидают в одной из очередей и ждут, когда рабочий поток их обработает. Каждый работник может обрабатывать задания только из некоторых очередей. Нет ожидания ожидания. C / C ++, pthreads, стандарт POSIX.

Проблема для меня заключается в "множественных очередях". Я знаю, как реализовать это с помощью одной очереди. Рабочие должны ждать во всех очередях, которые они могут обработать (ждать ЛЮБОЙ из них).

В Windows я бы использовал WaitForMultipleObjects, но это должно быть мультиплатформенным.

Мне не нужен какой-то конкретный код для этого, только подсказка или описание модели, которую я должен использовать. Заранее спасибо.

Ответы [ 8 ]

5 голосов
/ 22 мая 2009

Как насчет:

  • все рабочие потоки ожидают семафора
  • когда что-либо добавляется в очередь, семафор увеличивается, что пробуждает один поток
  • поток проверяет очереди, которые ему интересны, обрабатывает одну из них и возвращается к ожиданию на семафоре

Вам понадобится дополнительный мьютекс (ы) для управления фактическим чтением и записью в очереди.

4 голосов
/ 22 мая 2009

Что вы можете сделать, это использовать условную переменную . Пусть ваши рабочие потоки ждут условную переменную. Когда задание добавляется в любую из очередей заданий, сигнализируйте переменную условия. Затем, когда рабочий поток просыпается, он проверяет очереди, в которых он ожидает. Если у кого-то из них есть работа, она забирает эту работу из очереди. В противном случае он возвращается к ожиданию переменной условия. Ожидание условной переменной переводит поток в спящий режим, поэтому он не потребляет процессорного времени.

Конечно, само собой разумеется, что вы должны защищать все обращения к очередям заданий с помощью мьютекса (например, pthread_mutex_t).

1 голос
/ 22 мая 2009

Если для каждой очереди не слишком много работников, вы можете создать условную переменную для каждого работника.

0 голосов
/ 26 июня 2011

Я недавно думал об этом, и единственная идея, которую я мог придумать, - это (при условии, что у вас есть очереди, ориентированные на многопотоковое исполнение), когда-либо только несколько потоков обслуживают одну очередь.

После этого вы можете получить один или несколько рабочих процессов, создающих потоки, добавляющие задания в одну очередь, и один или несколько рабочих потоков, блокирующих очередь, до тех пор, пока они не найдут что-то для обработки.

Если у вас когда-либо есть несколько очередей, которые должны опрашивать несколько рабочих потоков, то решением может быть добавление одного дополнительного потока для каждой очереди и дополнительной очереди. Дополнительные потоки каждый блок в своей отдельной очереди, но просто перенаправляют задания в дополнительную очередь. Теперь существующий рабочий поток вернулся к блокировке в одной очереди.

0 голосов
/ 22 мая 2009

Вы можете сделать что-то вроде этого: с каждым заданием связана «очередь». Например:

Скажем, у вас есть 2 очереди. Ваша работа может сказать:

job[0].queue = 1; /* That job is in queue 1 */
job[1].queue = 1;
job[2].queue = 2; /* this job is in queue 2 */
... 
etc

Итак, у вас есть «мешок с нитями». Поток просто выбирает работу - скажем, работу [2]. Если этому потоку разрешено обрабатывать только задания из очереди 1, он помещает это задание обратно в очередь готовности и выбирает другое задание.

Таким образом, каждый поток знает, какие очереди ему разрешено обрабатывать, и когда он выбирает задание, он проверяет, совпадает ли поле «очередь» задания. Если это не так, он выбирает другую работу.

(так во многих случаях работает планирование процессов в linux на нескольких ядрах. Каждый процесс имеет битовую маску, сообщающую, на каких процессорах ему разрешено работать, а затем процессор проверяет, что ему «разрешено» запускать этот процесс до того, как при этом.)

0 голосов
/ 22 мая 2009

Вместо отдельной блокировки для каждой очереди, почему бы не иметь одну блокировку для всей очереди?

  1. Жди на замке
  2. Взять замок
  3. Отказ от очереди
  4. Снять замок
  5. Обработка снятого с производства предмета
  6. Перейти к шагу 1

Если предположить, что снятие очереди занимает незначительное время (поэтому блокировка удерживается только незначительное время), может не потребоваться более одной блокировки.

0 голосов
/ 22 мая 2009

То, что я хотел бы сделать, это использовать boost :: asio для постановки в очередь данных, чтобы ваши потоки могли проверить это. Вы можете передать ref в очередь с помощью команды post и соответственно обработать поток.

0 голосов
/ 22 мая 2009

Похоже, вы должны использовать boost :: thread , boost :: условие и std :: queue .

...