Как обрабатывать одно и то же изображение параллельно со Spark, избегая ненужных копий? - PullRequest
0 голосов
/ 27 апреля 2018

Я использую PySpark для обработки данных изображения. Мои изображения хранятся в Amazon S3, и у меня есть структура данных, которая связывает каждое изображение со списком полигонов (например, может иметь до 500 полигонов, и это источник искажений данных):

image_path_poly_list = [(ImagePath1, [poly1_1, poly1_2, ..., poly1_N]), ..., (ImagePathM, [polyM_1, polyM_2, ..., polyM_K])]

Каждое изображение может содержать один или несколько каналов (например, только красный или красный, зеленый, синий), и каждый канал может иметь размер до 11000 x 11000 пикселей. Другими словами, каждый канал может содержать до 500 МБ данных. Следовательно, изображение может занимать более 1,5 ГБ пространства (с учетом 3-х каналов, «общего» случая). Изображения в формате .jp2 (на самом деле очень сжатые), декомпрессия довольно медленная и требует много ресурсов процессора.

Я бы хотел обрезать каждое изображение соответствующими полигонами. Так как изображения слишком велики, чтобы поместиться в Spark Driver, я бы лучше загрузил / открыл их в Spark Slaves. Поэтому я создаю RDD из image_path_poly_list, а затем выполняю операцию карты для загрузки / открытия изображений.

rdd1 -> [(Image1, [poly1_1, poly1_2, ..., poly1_N]), ..., (ImageM, [polyM_1, polyM_2, ..., polyM_K])]

Затем я делаю flatMap для получения нового числа пар:

rdd2 -> (Image1, poly1_1), (Image1, poly1_2), ..., (ImageM, polyM_K)

Затем я делаю карту, чтобы обрезать каждую пару изображений и полигонов. Ниже идея моего кода:

def openImage(elem):
    imagePath, list_of_poly = elem 
    image = downloadAndOpen(imagePath) # download from S3 and load into memory  
    return image, list_of_poly 

def expand(elem):
    image, list_of_poly = elem
    return [(image, poly) for poly in list_of_poly]

def process(elem):
    image, poly = elem
    return clip(image, elem) # clip returns the clipped image

rdd1 = sc.parallelize(image_path_poly_list).map(openImage)
rdd2 = rdd1.flatmap(expand).map(process).collect()

Однако я заметил, что кластер, который я использую, практически простаивает, и через некоторое время используется только несколько ядер, например, если данные искажены. Единственный источник искажений, который я могу найти в данных, - это переменное число полигонов, которое может иметь каждое изображение. Я думал, что flatMap перебалансирует рабочую нагрузку, так как время обработки пары (Image, Poly) почти постоянно.

Затем, после некоторого веб-исследования, я думаю, что все пары (Image, poly) все еще лежат в одном разделе после flatMap, и каждый раздел (насколько я понимаю) обрабатывается последовательно одним ядром. Поэтому я попытался сделать перераспределение с огромными numPartitions . Это, однако, очень медленно, так как изображения кэшируются на диск:

rdd2 = rdd1.flatmap(expand).repartition(hugeNumPartitions).map(process).collect()

Есть ли способ сделать что-то вроде параллели для, чтобы я мог использовать одно и то же изображение для обрезки параллельно со списком полигонов? Я думаю, что использование многопроцессорных пулов внутри карты может помочь, но я пытаюсь избежать этого и ищу решение с использованием только Spark.

Спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...