Почему работник dask не работает из-за MemoryError при выполнении задачи «небольшого размера»?[Dask.bag] - PullRequest
2 голосов
/ 10 апреля 2019

Я запускаю конвейер для нескольких изображений.Конвейер состоит из чтения изображений из файловой системы, обработки каждого из них, а затем сохранения изображений в файловой системе.Однако работник dask не работает из-за MemoryError.Есть ли способ убедиться, что рабочие не загружают слишком много изображений в память?т.е. дождитесь, пока на рабочем месте будет достаточно места, прежде чем запускать конвейер обработки нового образа.

У меня есть один планировщик и 40 рабочих с 4 ядрами, 15 ГБ ОЗУ и запущенным Centos7.Я пытаюсь обработать 125 изображений в пакете;каждое изображение достаточно большое, но достаточно маленькое, чтобы поместиться на работнике;около 3 ГБ требуется для всего процесса.

Я попытался обработать меньшее количество изображений, и это прекрасно работает.

РЕДАКТИРОВАНИЕ

from dask.distributed import Client, LocalCluster

# LocalCluster is used to show the config of the workers on the actual cluster
client = Client(LocalCluster(n_workers=2, resources={'process': 1}))

paths = ['list', 'of', 'paths']

# Read the file data from each path
data = client.map(read, path, resources={'process': 1)

# Apply foo to the data n times
for _ in range(n):
    data = client.map(foo, x, resources={'process': 1)

# Save the processed data
data.map(save, x, resources={'process': 1)

# Retrieve results
client.gather(data)

Я ожидал, что изображения будут обрабатываться, поскольку на рабочих местах было свободное место, но кажется, что все изображения загружаются одновременно на разных рабочих.

РЕДАКТИРОВАТЬ: Моя проблема заключается в том, что все задачи назначаются работникам, и им не хватает памяти.Я нашел, как ограничить количество задач, которые рабочий обрабатывает в один момент [https://distributed.readthedocs.io/en/latest/resources.html#resources-are-applied-separately-to-each-worker-process](see здесь).Однако с этим ограничением, когда я выполняю свою задачу, все они заканчивают этап чтения, затем этап процесса и, наконец, этап сохранения.Это является проблемой, поскольку образ выливается на диск.

Можно ли завершить каждую задачу перед началом новой?например, на Worker-1: чтение (img1) -> процесс (img1) -> сохранение (img1) -> чтение (img2) -> ...

1 Ответ

1 голос
/ 11 апреля 2019

Dask, как правило, не знает, сколько памяти потребуется для выполнения задачи, он может знать только размер выходных данных и только после их завершения. Это потому, что Dask просто выполняет функцию pthon, а затем ожидает ее завершения; но все вещи могут происходить внутри функции Python. Как правило, вы должны ожидать, что начнется столько задач, сколько у вас будет доступных рабочих ядер -

Если вам нужна меньшая общая загрузка памяти, тогда ваше решение должно быть простым: иметь достаточно малое количество рабочих, чтобы, если все они использовали максимальный объем памяти, который вы можете ожидать, у вас все еще есть запас в система справиться.

Для РЕДАКТИРОВАНИЯ: вы можете попробовать запустить оптимизацию на графике перед отправкой (хотя, я думаю, это должно произойти в любом случае), поскольку звучит так, как будто ваши линейные цепочки задач должны быть "объединены". http://docs.dask.org/en/latest/optimize.html

...