Можно ли сделать перераспределение после использования partitionBy в spark DF? - PullRequest
0 голосов
/ 08 февраля 2019

Я задаю этот вопрос, потому что, если я укажу перераспределение как 5, тогда все мои данные (> 200Gigs) будут перемещены в 5 разных исполнителей и 98% ресурсов не будут использованы.и затем происходит раздел B, который снова создает много случайных действий.Есть ли способ, которым сначала происходит разбиениеBy, а затем перераспределение данных?

Ответы [ 2 ]

0 голосов
/ 08 февраля 2019

Хотя вопрос не совсем легок для понимания, следующее совпадает с другим ответом, и этот подход должен избежать проблем, упомянутых при ненужной перетасовке:

val n = [... some calculation for number of partitions / executors based on cluster config and volume of data to process ...]

df.repartition(n, $"field_1", $"field_2", ...)
  .sortWithinPartitions("fieldx", "field_y")
  .write.partitionBy("field_1", "field_2", ...)
  .format("location")

whereby [field_1, field_2, ...] - это один и тот же набор полей для repartition и partitionBy.

0 голосов
/ 08 февраля 2019

Вы можете использовать repartition(5, col("$colName")).
Таким образом, когда вы сделаете partitionBy("$colName"), вы пропустите shuffle для '$colName', так как он уже был перераспределен.

Также рассмотрите возможность иметь столько разделов, сколькопроизведение числа исполнителей на количество используемых ядер на 3 (хотя это может варьироваться от 2 до 4).
Итак, как мы знаем, Spark может запустить только 1 параллельную задачу для каждого раздела СДР.Предполагая, что у вас есть 8 ядер на каждого исполнителя и 5 исполнителей:
Вам необходимо: 8 * 5 * 3 = 120 разделов

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...