Это можно сделать чисто с помощью Ray , который является библиотекой для параллельного и распределенного Python.
Ресурсы в Ray
Когда выЗапустите Рэй, вы можете сказать, какие ресурсы доступны на этой машине.Ray автоматически попытается определить количество ядер ЦП и количество графических процессоров, но они могут быть указаны, и фактически произвольные пользовательские ресурсы также могут быть переданы, например, путем вызова
ray.init(num_cpus=4, resources={'Network': 2})
Это говорит Рэю, что на машине имеется 4 ядра ЦП и 2 пользовательских ресурса под названием Network
.
Каждая «задача» Рэя, которая является планируемым блоком работы,имеет определенные требования к ресурсам.По умолчанию для задачи требуется 1 ядро процессора и больше ничего.Однако произвольные требования к ресурсам можно указать, объявив соответствующую функцию с помощью
@ray.remote(resources={'Network': 1})
def f():
pass
. Это говорит Рэю, что для выполнения f
в «рабочем» процессе должно быть 1 ядро ЦП (значение по умолчанию) и ресурс 1 Network
.
Поскольку на компьютере установлено 2 ресурса Network
и 4 ядра ЦП, одновременно может выполняться не более 2 копий f
.С другой стороны, если есть другая функция g
, объявленная с
@ray.remote
def g():
pass
, то одновременно могут быть выполнены четыре копии g
или две копии f
и две копии g
быть выполненным одновременно.
Пример
Вот пример с заполнителями для фактических функций, используемых для загрузки контента и его обработки.
import ray
import time
max_concurrent_downloads = 2
ray.init(num_cpus=4, resources={'Network': max_concurrent_downloads})
@ray.remote(resources={'Network': 1})
def download_content(url):
# Download the file.
time.sleep(1)
return 'result from ' + url
@ray.remote
def process_result(result):
# Process the result.
time.sleep(1)
return 'processed ' + result
urls = ['url1', 'url2', 'url3', 'url4']
result_ids = [download_content.remote(url) for url in urls]
processed_ids = [process_result.remote(result_id) for result_id in result_ids]
# Wait until the tasks have finished and retrieve the results.
processed_results = ray.get(processed_ids)
Вот описание временной шкалы (которое вы можете получить, запустив ray timeline
из командной строки и открыв получившийся файл JSON в chrome: // tracing в веб-браузере Chrome).
В приведенном выше сценариимы представляем 4 download_content
задач.Это те, которые мы ограничиваем, указав, что им требуется ресурс Network
(в дополнение к ресурсу ЦП 1 по умолчанию).Затем мы отправляем 4 process_result
задач, каждая из которых требует по умолчанию 1 ресурс ЦП.Задачи выполняются в три этапа (просто посмотрите на поля blue ).
- Мы начинаем с выполнения 2
download_content
задач, что столько, сколько можно выполнить за одинвремя (из-за ограничения скорости).Мы пока не можем выполнить ни одну из задач process_result
, поскольку они зависят от результатов выполнения задач download_content
. - Они завершаются, поэтому мы начинаем выполнять оставшиеся две задачи
download_content
, а также двеprocess_result
задач, потому что мы не ограничиваем скорость process_result
задач. - Мы выполняем оставшиеся
process_result
задач.
Каждая "строка" - это один рабочий процесс.Время идет слева направо.
Подробнее о том, как это сделать, можно прочитать в документации Ray .