лучший способ загрузки, обработки и преобразования в TFRecords с использованием dask - PullRequest
0 голосов
/ 29 января 2019

Мне нужно загрузить изображения, выполнить некоторую обработку изображений на них, а затем упаковать их в n TFRecords (например, 100 изображений на TFRecord).

Делая это с очередями Python, у меня было бы d потоков загрузки, p рабочих процессов, которые обрабатывают изображения, а затем w рабочих процессов, которые записывали бы изображения с помощью TFRecordWriter, когда онидоступны.

Я хотел бы попробовать это с dask, поэтому у меня есть что-то вроде:

urls = bag.from_sequence(images_urls)
processed = urls.map(download_image).map(process)
by2 = processed.repartition(2).map_partitions(packing)
by3 = processed.repartition(3).map_partitions(packing)
bag.concat([by2, by3]).compute()

Проблема с вышеизложенным заключается в том, что map_partitions, кажется, не получаетизображения в потоковом режиме.Похоже, весь раздел находится в памяти до вызова функции packing для раздела.

Еще одно раздражение в связи с вышесказанным заключается в том, что в dask я не вижу способа точно контролировать, какая часть dagгде запланированоЗагрузчик может происходить с планировщиком потоков, где части обработки и упаковщика должны выполняться в разных процессах.Это возможно, или вы застряли с типом планировщика по всему графику?

1 Ответ

0 голосов
/ 29 января 2019

Если вы не хотите, чтобы несколько изображений загружались в пакете с помощью bag, то вы можете создать свою сумку, чтобы иметь только один элемент на раздел (так как вы заранее знаете количество URL-адресов).Все входные данные для любой данной задачи действительно должны быть в памяти работника, прежде чем этот работник сможет выполнить данную задачу.

Возможно, более простой подход - вернуться к использованию синтаксиса delayed, что-то вроде

ims = [dask.delayed(download_image)(url) for url in images_urls]
processed = [dask.delayed(process)(im) for im in ims]
packs = [dask.delayed(packing)(processed[n:n+100]) for n in
         range(0, len(processed), 100)]
dask.compute(packs)

Действительно, у вас может быть только один планировщик для выполнения графика, но Dask будет пытаться выполнить задачи планировщика в месте, где были загружены данные.

...