Я использую PySpark для обработки данных изображения. Мои изображения хранятся в Amazon S3, и у меня есть структура данных, которая связывает каждое изображение со списком полигонов (например, может иметь до 500 полигонов, и это источник искажений данных):
image_path_poly_list = [(ImagePath1, [poly1_1, poly1_2, ..., poly1_N]), ..., (ImagePathM, [polyM_1, polyM_2, ..., polyM_K])]
Каждое изображение может содержать один или несколько каналов (например, только красный или красный, зеленый, синий), и каждый канал может иметь размер до 11000 x 11000 пикселей. Другими словами, каждый канал может содержать до 500 МБ данных. Следовательно, изображение может занимать более 1,5 ГБ пространства (с учетом 3-х каналов, «общего» случая). Изображения в формате .jp2 (на самом деле очень сжатые), декомпрессия довольно медленная и требует много ресурсов процессора.
Я бы хотел обрезать каждое изображение соответствующими полигонами. Так как изображения слишком велики, чтобы поместиться в Spark Driver, я бы лучше загрузил / открыл их в Spark Slaves. Поэтому я создаю RDD из image_path_poly_list
, а затем выполняю операцию карты для загрузки / открытия изображений.
rdd1 -> [(Image1, [poly1_1, poly1_2, ..., poly1_N]), ..., (ImageM, [polyM_1, polyM_2, ..., polyM_K])]
Затем я делаю flatMap для получения нового числа пар:
rdd2 -> (Image1, poly1_1), (Image1, poly1_2), ..., (ImageM, polyM_K)
Затем я делаю карту, чтобы обрезать каждую пару изображений и полигонов. Ниже идея моего кода:
def openImage(elem):
imagePath, list_of_poly = elem
image = downloadAndOpen(imagePath) # download from S3 and load into memory
return image, list_of_poly
def expand(elem):
image, list_of_poly = elem
return [(image, poly) for poly in list_of_poly]
def process(elem):
image, poly = elem
return clip(image, elem) # clip returns the clipped image
rdd1 = sc.parallelize(image_path_poly_list).map(openImage)
rdd2 = rdd1.flatmap(expand).map(process).collect()
Однако я заметил, что кластер, который я использую, практически простаивает, и через некоторое время используется только несколько ядер, например, если данные искажены. Единственный источник искажений, который я могу найти в данных, - это переменное число полигонов, которое может иметь каждое изображение. Я думал, что flatMap перебалансирует рабочую нагрузку, так как время обработки пары (Image, Poly) почти постоянно.
Затем, после некоторого веб-исследования, я думаю, что все пары (Image, poly) все еще лежат в одном разделе после flatMap, и каждый раздел (насколько я понимаю) обрабатывается последовательно одним ядром. Поэтому я попытался сделать перераспределение с огромными numPartitions . Это, однако, очень медленно, так как изображения кэшируются на диск:
rdd2 = rdd1.flatmap(expand).repartition(hugeNumPartitions).map(process).collect()
Есть ли способ сделать что-то вроде параллели для, чтобы я мог использовать одно и то же изображение для обрезки параллельно со списком полигонов? Я думаю, что использование многопроцессорных пулов внутри карты может помочь, но я пытаюсь избежать этого и ищу решение с использованием только Spark.
Спасибо!