Я запускаю конвейер для нескольких изображений.Конвейер состоит из чтения изображений из файловой системы, обработки каждого из них, а затем сохранения изображений в файловой системе.Однако работник dask не работает из-за MemoryError.Есть ли способ убедиться, что рабочие не загружают слишком много изображений в память?т.е. дождитесь, пока на рабочем месте будет достаточно места, прежде чем запускать конвейер обработки нового образа.
У меня есть один планировщик и 40 рабочих с 4 ядрами, 15 ГБ ОЗУ и запущенным Centos7.Я пытаюсь обработать 125 изображений в пакете;каждое изображение достаточно большое, но достаточно маленькое, чтобы поместиться на работнике;около 3 ГБ требуется для всего процесса.
Я попытался обработать меньшее количество изображений, и это прекрасно работает.
РЕДАКТИРОВАНИЕ
from dask.distributed import Client, LocalCluster
# LocalCluster is used to show the config of the workers on the actual cluster
client = Client(LocalCluster(n_workers=2, resources={'process': 1}))
paths = ['list', 'of', 'paths']
# Read the file data from each path
data = client.map(read, path, resources={'process': 1)
# Apply foo to the data n times
for _ in range(n):
data = client.map(foo, x, resources={'process': 1)
# Save the processed data
data.map(save, x, resources={'process': 1)
# Retrieve results
client.gather(data)
Я ожидал, что изображения будут обрабатываться, поскольку на рабочих местах было свободное место, но кажется, что все изображения загружаются одновременно на разных рабочих.
РЕДАКТИРОВАТЬ: Моя проблема заключается в том, что все задачи назначаются работникам, и им не хватает памяти.Я нашел, как ограничить количество задач, которые рабочий обрабатывает в один момент [https://distributed.readthedocs.io/en/latest/resources.html#resources-are-applied-separately-to-each-worker-process](see здесь).Однако с этим ограничением, когда я выполняю свою задачу, все они заканчивают этап чтения, затем этап процесса и, наконец, этап сохранения.Это является проблемой, поскольку образ выливается на диск.
Можно ли завершить каждую задачу перед началом новой?например, на Worker-1: чтение (img1) -> процесс (img1) -> сохранение (img1) -> чтение (img2) -> ...