Увеличьте параллельность чтения файла паркета - Spark оптимизирует самостоятельное соединение - PullRequest
2 голосов
/ 07 апреля 2020

Я хочу выполнить самостоятельное соединение, чтобы сгенерировать подходящие пары кандидатов. В настоящее время это не работает, так как эта операция слишком медленная. К сожалению, я не могу транслировать кадры данных, так как они слишком велики.

Сначала я собираю количество кортежей, чтобы уменьшить объем данных:

val aggregated = df.groupBy("discrete_foo", "discrete_bar").agg(sum("value"))
aggregated.repartition(7, "discrete_foo", "discrete_bar").sortWithinPartitions("discreate_foo", "discrete_bar, "baz").write.parquet.option("compression", "gzip").mode("overwrite")

Это работает просто отлично и быстро. Затем я хочу выполнить самостоятельное объединение для генерации кандидатов. Я уже заметил, что мне нужно генерировать больше параллелизма:

--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \ 

Поэтому установлены увеличенные значения по умолчанию и случайный параллелизм. Кроме того, я попытался укоротить оба дискретных значения (т.е. увеличить количество элементов, попадающих в дискретный блок) и тем самым уменьшить количество кортежей. Все еще не повезло. Поэтому я дополнительно попытался форсировать разбиение большего числа задач:

val materializedAggregated= spark.read.parquet(s"path/to/file/aggregated_stuff"      )
  .repartition(4000)
val selfB = materializedAggregated
  .withColumnRenamed("baz", "other_batz")
  .withColumnRenamed("value", "other_value")

val candidates = materializedMultiSTW
  .join(selfB, Seq("discrete_foo", "discrete_bar"), "inner")
  .filter(col(FeedIdentifierColumns.imsiHash) =!= col("other_imsi_hash"))

Однако это также не работает и слишком медленно. Что еще можно сделать, чтобы этот запрос был быстрее вычислен? Я что-то упускаю?

Ниже вы увидите различные неудачные попытки увеличить параллелизм при чтении данных для самостоятельного объединения.

Я даже установил:

--conf spark.sql.files.maxPartitionBytes=16777216 \

до 1/8, т.е. 16 против 128 МБ, но число сгенерированных заданий слишком мало, то есть всего 250.

некоторые детали

План выполнения:

enter image description here

Даже без этого ручного перераспределения это слишком медленно, и я боюсь, что недостаточно разделов создано:

enter image description here

Выполняется еще меньше задач - что, скорее всего, сделает его медленнее:

enter image description here

Как я могу убедиться, что этот начальный шаг имеет более высокий параллелизм? Может ли ведро помочь? Но при чтении перемешанных данных только один раз - это на самом деле не приведет к ускорению - верно? Как насчет шага перераспределения при записи агрегированных файлов? Должен ли я установить большее число здесь? До сих пор, даже если его опустить (и, в основном, пересчитать агрегацию дважды), оно не превысит 260 задач.

environment

Я использую spark 2.3.x на HDP 3.1

1 Ответ

2 голосов
/ 07 апреля 2020

Максимальное количество задач из вашего внутреннего объединения будет равно количеству ключей объединения (т. Е. Их мощности), независимо от настроек для spark.sql.shuffle.partitions и spark.default.parallelism.

Это происходит потому, что в SortMergeJoin, данные будут перетасовываться с помощью ha sh ключа соединения. Все данные от каждого отдельного ключа соединения будут переданы go одному исполнителю.

Поэтому проблема в том, что у вас недостаточно корзин - они слишком грубые. Максимальное количество задач, которое вы увидите, будет равно количеству бинов.

Если вы скопируете свои данные с большей детализацией, вы увидите, что количество задач увеличится.

...