Избегайте перераспределения затрат при фильтрации, а затем объединении - PullRequest
0 голосов
/ 04 мая 2018

Я выполняю запрос диапазона в СДР точек (x, y) в pyspark. Я разделил пространство xy на сетку 16 * 16 (256 ячеек) и назначил каждую точку в моем RDD одной из этих ячеек. GridMappedRDD является паройRDD: (cell_id, Point object)

Я разделил этот СДР на 256 разделов, используя:

gridMappedRDD.partitionBy(256)

Запрос диапазона представляет собой прямоугольное поле. У меня есть метод для моего объекта Grid, который может возвращать список идентификаторов ячеек, которые перекрываются с диапазоном запросов. Итак, я использовал это как фильтр для удаления несвязанных ячеек:

filteredRDD = gridMappedRDD.filter(lambda x: x[0] in candidateCells)

Но проблема в том, что при выполнении запроса и последующем сборе результатов оцениваются все 256 разделов; Задача создается для каждого раздела.

Чтобы избежать этой проблемы, я попытался объединить отфильтрованный RDD с длиной списка потенциальных ячеек, и я надеялся, что это может решить проблему.

filteredRDD.coalesce(len(candidateCells))

На самом деле результирующий RDD имеет len(candidateCells) разделов, но разделы не совпадают с gridMappedRDD.

Как указано в документации на коалесцию, параметр shuffle имеет значение False и не следует выполнять перемешивание между разделами, но я вижу (с помощью glom ()), что это не так.

Например, после coalesce(4) с candidateCells=[62, 63, 78, 79] разделы выглядят так:

[[(62, P), (62, P) .... , (63, P)],
 [(78, P), (78, P) .... , (79, P)],
 [], []
]

На самом деле, объединяя, я получаю случайное чтение, равное размеру всего моего набора данных для каждой задачи, которая занимает значительное время. Что мне нужно, так это СДР с только разделами, связанными с ячейками в клетках-кандидатах, без каких-либо перемешиваний. Итак, мой вопрос: возможно ли отфильтровать только некоторые разделы без перестановок? В приведенном выше примере мой отфильтрованный RDD будет иметь 4 раздела с точно такими же данными, что и 62, 63, 78, 79-й разделы оригинального RDD. При этом запрос может быть направлен только на затрагивающие разделы.

1 Ответ

0 голосов
/ 04 мая 2018

Вы сделали здесь несколько неверных предположений:

  • Перемешивание не имеет отношения к coalesce (здесь не полезно coalesce). Это вызвано partitionBy. Разбиение по определению требует тасования.
  • Разделение нельзя использовать для оптимизации filter. Spark ничего не знает о функции, которую вы используете (это черный ящик).
  • Разделение не однозначно сопоставляет ключи с разделами. Несколько ключей могут быть размещены в одном разделе - Как работает HashPartitioner?

Что вы можете сделать:

  • Если результирующее подмножество является небольшим перераспределением и применяется lookup для каждой клавиши:

    from itertools import chain
    
    partitionedRDD = gridMappedRDD.partitionBy(256)
    
    chain.from_iterable(
        ((c, x) for x in partitionedRDD.lookup(c)) 
        for c in candidateCells
    )
    
  • Если данные большие, вы можете попытаться пропустить сканирование разделов (количество задач не изменится, но некоторые задачи могут быть закорочены):

    candidatePartitions = [
        partitionedRDD.partitioner.partitionFunc(c) for c in candidateCells
    ]
    
    partitionedRDD.mapPartitionsWithIndex(
        lambda i, xs: (x for x in xs if x[0] in candidateCells) if i in candidatePartitions else []
    )
    

Эти два метода имеют смысл, только если вы выполняете несколько «поисков». Если это разовая операция, лучше выполнить линейный фильтр:

  • Это дешевле, чем перемешать и перераспределить.
  • Если исходные данные равномерно распределены, последующая обработка сможет лучше использовать имеющиеся ресурсы.
...