Я выполняю запрос диапазона в СДР точек (x, y) в pyspark. Я разделил пространство xy на сетку 16 * 16 (256 ячеек) и назначил каждую точку в моем RDD одной из этих ячеек.
GridMappedRDD является паройRDD: (cell_id, Point object)
Я разделил этот СДР на 256 разделов, используя:
gridMappedRDD.partitionBy(256)
Запрос диапазона представляет собой прямоугольное поле. У меня есть метод для моего объекта Grid, который может возвращать список идентификаторов ячеек, которые перекрываются с диапазоном запросов. Итак, я использовал это как фильтр для удаления несвязанных ячеек:
filteredRDD = gridMappedRDD.filter(lambda x: x[0] in candidateCells)
Но проблема в том, что при выполнении запроса и последующем сборе результатов оцениваются все 256 разделов; Задача создается для каждого раздела.
Чтобы избежать этой проблемы, я попытался объединить отфильтрованный RDD с длиной списка потенциальных ячеек, и я надеялся, что это может решить проблему.
filteredRDD.coalesce(len(candidateCells))
На самом деле результирующий RDD имеет len(candidateCells)
разделов, но разделы не совпадают с gridMappedRDD
.
Как указано в документации на коалесцию, параметр shuffle
имеет значение False и не следует выполнять перемешивание между разделами, но я вижу (с помощью glom ()), что это не так.
Например, после coalesce(4)
с candidateCells=[62, 63, 78, 79]
разделы выглядят так:
[[(62, P), (62, P) .... , (63, P)],
[(78, P), (78, P) .... , (79, P)],
[], []
]
На самом деле, объединяя, я получаю случайное чтение, равное размеру всего моего набора данных для каждой задачи, которая занимает значительное время. Что мне нужно, так это СДР с только разделами, связанными с ячейками в клетках-кандидатах, без каких-либо перемешиваний.
Итак, мой вопрос: возможно ли отфильтровать только некоторые разделы без перестановок? В приведенном выше примере мой отфильтрованный RDD будет иметь 4 раздела с точно такими же данными, что и 62, 63, 78, 79-й разделы оригинального RDD. При этом запрос может быть направлен только на затрагивающие разделы.