Мне нужно загрузить изображения, выполнить некоторую обработку изображений на них, а затем упаковать их в n
TFRecords (например, 100 изображений на TFRecord).
Делая это с очередями Python, у меня было бы d
потоков загрузки, p
рабочих процессов, которые обрабатывают изображения, а затем w
рабочих процессов, которые записывали бы изображения с помощью TFRecordWriter, когда онидоступны.
Я хотел бы попробовать это с dask
, поэтому у меня есть что-то вроде:
urls = bag.from_sequence(images_urls)
processed = urls.map(download_image).map(process)
by2 = processed.repartition(2).map_partitions(packing)
by3 = processed.repartition(3).map_partitions(packing)
bag.concat([by2, by3]).compute()
Проблема с вышеизложенным заключается в том, что map_partitions
, кажется, не получаетизображения в потоковом режиме.Похоже, весь раздел находится в памяти до вызова функции packing
для раздела.
Еще одно раздражение в связи с вышесказанным заключается в том, что в dask
я не вижу способа точно контролировать, какая часть dagгде запланированоЗагрузчик может происходить с планировщиком потоков, где части обработки и упаковщика должны выполняться в разных процессах.Это возможно, или вы застряли с типом планировщика по всему графику?