Реализация простого пула потоков - PullRequest
2 голосов
/ 06 марта 2012

В настоящее время мне нужна простая и эффективная реализация пула потоков.Я искал здесь, а также в Google и нашел множество интересных ссылок, но пока ничего не нашел подходящего.Большинство реализаций, которые я обнаружил в Интернете, либо слишком сложны, либо в них отсутствуют некоторые ключевые функции, которые мне нужны.

Также я не хочу использовать код, который мне не понятен, поэтому я решил написать кодэто сам (иногда переосмысление колеса помогает мне продвигаться вперед с точки зрения знаний и опыта).Я, конечно, понимаю основную идею пула потоков, но некоторые детали реализации мне все еще неясны.Это, вероятно, потому, что вид пула потоков, который мне нужен, немного особенный.Позвольте мне описать это.У меня есть задача, которая выполняется сотни тысяч раз на конкретном (большом) буфере.Я измерил, что производительность намного лучше, если я использую потоки для этой задачи - буфер разделяется на подбуферы, и каждый поток выполняет свою задачу на подбуфере и возвращает результат.Затем все результаты всех потоков суммируются, что дает мне окончательное решение.

Однако, поскольку это делается очень часто, я теряю драгоценное время из-за того, что создается так много потоков (из-за накладных расходов, которые возникаютс созданием темы).Поэтому я хотел бы иметь пул потоков, который будет выполнять эту задачу, вместо того, чтобы каждый раз создавать новый набор потоков.

Чтобы быть более понятным, это то, что у меня есть:

  • Разделить буфер на N подпуферов одинакового размера
  • Для каждого подпуфера создать поток и запустить его в подпуфере
  • Ожидать все потокидля завершения (WaitForMultipleObjects), сложите вместе результаты и уничтожьте нити
  • Repeat

Чего я хотел бы достичь, это:

  • Разделитьбуфер в N подпуферах одинакового размера
  • Назначение каждого подпуфера потоку из пула потоков (который имеет ровно N потоков)
  • Как только поток завершается, дайте ему спать до следующегозадание готово
  • Когда все потоки завершены (и находятся в спящем режиме), сложите полученные результаты
  • Повторите, проснув потоки и назначив им новые задачи

Как видите, это немного особенноеHread Pool, так как мне нужно ждать окончания потоков.По сути, я хочу избавиться от накладных расходов на создание потоков все время, так как программа проходит сотни тысяч итераций, поэтому она может создавать и уничтожать миллионы потоков в течение своего срока службы.Хорошая новость заключается в том, что мне вообще не нужна синхронизация между потоками, все они получают свои собственные данные и место для хранения результатов.Однако я должен подождать, пока все потоки не будут завершены, и у меня будет окончательное решение, потому что следующая задача зависит от результатов предыдущей задачи.

Моя основная проблема связана с управлением потоками:

  • как мне заставить мои потоки "заснуть" и разбудить их, когда новая задача будет готова?
  • как мне ждать завершения всех потоков?

Iбуду благодарен за любую помощь.Также не стесняйтесь задавать вопросы, если я не был достаточно ясен.Спасибо!

Ответы [ 4 ]

2 голосов
/ 06 марта 2012

Для меня предпочтительным способом общения с потоками является условные переменные .Потому что вы можете определить необходимое условие и сигнализировать о его изменении.В вашем случае вы можете объединить его с очередью , с которой передаются подбуферы, поэтому каждый поток ожидает, пока очередь пуста.Результат затем может быть помещен в другую очередь , где управляющая очередь ожидает, пока все потоки не отправят результат в очередь (ссылка на эту очередь передается как запрос вместе с подбуферами).

1 голос
/ 06 марта 2012

Вы смотрели на другие реализации пула потоков? Например, http://threadpool.sourceforge.net/. То, что вы хотите достичь, не совсем новое. Один из способов заставить потоки ждать новую задачу - заблокировать мьютекс и разблокировать этот мьютекс, когда другая задача будет готова. Вы также можете сделать так, чтобы потоки уведомляли об этом, используя какое-то уведомление от потока обратно к родителю.

В своей работе я активно использовал пулы / потоки потоков и использовал ØMQ для связи между потоками, что позволяет потоку блокировать запрос read() от ØMQ, когда он готов к новой работе.

С небольшими исследованиями и с небольшим количеством времени и усилий вы сможете понять, как создавать или использовать существующие фреймворки / инструменты для создания того, что вам нужно. Затем вы можете вернуться к SO, когда у вас есть код, с которым у вас проблемы.

0 голосов
/ 06 марта 2012

'Как видите, это особый пул потоков, так как мне нужно дождаться окончания потоков.' - не полностью. Вы хотите, чтобы поток, который обрабатывает последнюю задачу в вашей работе, предоставлял уведомление о завершении работы. Уведомление о завершении - это обычная функция theadPool, иначе исходящий поток не сможет обработать полный набор результатов. Пулы часто обрабатывают более одной иерархии задач / задач одновременно, поэтому метод уведомления о завершении должен быть независимым от потоков - нет join () или чего-либо подобного. Кроме того, нет WaitForMultipleObject () - используется массив объектов синхронизации, который сложен в управлении и ограничен 64 объектами.

Пулы потоков обычно имеют пул потоков, ожидающих в очереди производителя-потребителя для выполнения задач. Задачи обычно наследуются от некоторого класса 'Ctask', который предоставляет службы пула потоков. Механизм для рассмотрения завершения и уведомления является одним из них.

Очередь «производитель-получатель» - это, по сути, «обычный» класс очередей, защищенный от множественного доступа мьютексом и семафором для подсчета задач в очереди и ожидания потоков. Каждый из потоков пула проходит эту очередь, и они зацикливаются, ожидая семафор очереди, затем извлекают задачи из заблокированной очереди и вызывают метод run () полученных задач.

В каждой задаче пул потоков будет загружен как элемент данных, поскольку он передается в пул. Это позволяет задаче отправлять больше задач, если это необходимо.

Завершение каждой задачи обычно уведомляется где-то, вызывая метод события, являющийся членом задачи, загружаемый исходным потоком до того, как задача отправляется в пул.

Задача также должна иметь целое число обратного отсчета атомной под-задачи и событие для ожидания завершения других задач.

Как это может работать в вашем примере? У вас может быть «основная» задача, которая отправляет задачи обработки массива и ожидает их завершения.

В пуле должно быть больше потоков, чем ядер. Я предлагаю вдвое больше.

Массив необходимо разделить, чтобы для каждого раздела использовалась отдельная задача. Сколько задач - достаточно, чтобы все доступные ядра были израсходованы, но не так много, чтобы генерировались избыточные переключатели контекста. Для любого массива разумного размера предположим, что 64 задачи - это разумное разделение - больше, чем типичное число доступных процессоров. Кроме того, задачи не должны быть разделены последовательно, чтобы избежать ложного обмена.

Итак, это «первичная» задача обработки массива. Загрузите его со ссылкой на массив и установите его событие завершения, чтобы указать на какой-то метод, который сигнализирует о событии. Отправьте задачу в пул, дождитесь события.

Задачи загружаются в поток. Его метод run () использует два цикла и создает 32 задачи обработки массива, каждая со своим собственным начальным индексом и длиной в массиве, но с непоследовательными начальными индексами. Задача использует собственный унаследованный метод submit () для загрузки каждой из 32 новых задач в пул. Помимо фактической постановки в очередь задач для выполнения в потоках, этот метод submit () также атомарно увеличивает целое число счетчиков и устанавливает событие завершения задач в частное событие завершения перед постановкой в ​​очередь задачи. Частное событие завершения атомарно уменьшает счет завершения и сигнализирует событие, если ноль. После отправки всех 32 событий обработки массива основная задача ожидает отдельного события завершения.

Итак, 32 задачи обработки массива выполняются в потоках.По завершении каждого выполняющийся поток вызывает его событие завершения, которое уменьшает целое число счетчиков в основной задаче.В конце концов, последняя задача обработки массива завершается, и целое число счетчиков уменьшается до нуля, что сигнализирует о событии, которого ожидает поток, выполняющий основную задачу.Основная задача вызывает свое собственное событие завершения, поэтому сигнализирует о событии, от которого ожидает основной источник задач.

Основной источник задач запускается с полностью обработанным массивом.

.. или,Вы могли бы использовать класс threadPool, который уже работает, как предложено другими.

0 голосов
/ 06 марта 2012

как заставить мои потоки "спать" и разбудить их, когда новая задача будет готова?

Вы используете мьютекс или семафор, в зависимости от вашей ситуации (в некоторых случаях выможет потребоваться использовать условную переменную или автоматическое / ручное повторное обнаружение), чтобы заставить потоки ждать друг на друга или просыпаться, когда что-то происходит.

как мне ждать завершения всех потоков?

Вы должны использовать join() в каждом потоке, чтобы дождаться его завершения.Поэтому, если у вас есть коллекция потоков, вы можете захотеть пройти и вызвать объединение для каждого потока, который все еще работает.

В качестве отдельной заметки: Пулы потоков уже существуют, и выможно просто использовать такие вещи, как Boost.Threadpool, а не изобретать велосипед.

...