Я хочу выполнить самостоятельное соединение, чтобы сгенерировать подходящие пары кандидатов. В настоящее время это не работает, так как эта операция слишком медленная. К сожалению, я не могу транслировать кадры данных, так как они слишком велики.
Сначала я собираю количество кортежей, чтобы уменьшить объем данных:
val aggregated = df.groupBy("discrete_foo", "discrete_bar").agg(sum("value"))
aggregated.repartition(7, "discrete_foo", "discrete_bar").sortWithinPartitions("discreate_foo", "discrete_bar, "baz").write.parquet.option("compression", "gzip").mode("overwrite")
Это работает просто отлично и быстро. Затем я хочу выполнить самостоятельное объединение для генерации кандидатов. Я уже заметил, что мне нужно генерировать больше параллелизма:
--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \
Поэтому установлены увеличенные значения по умолчанию и случайный параллелизм. Кроме того, я попытался укоротить оба дискретных значения (т.е. увеличить количество элементов, попадающих в дискретный блок) и тем самым уменьшить количество кортежей. Все еще не повезло. Поэтому я дополнительно попытался форсировать разбиение большего числа задач:
val materializedAggregated= spark.read.parquet(s"path/to/file/aggregated_stuff" )
.repartition(4000)
val selfB = materializedAggregated
.withColumnRenamed("baz", "other_batz")
.withColumnRenamed("value", "other_value")
val candidates = materializedMultiSTW
.join(selfB, Seq("discrete_foo", "discrete_bar"), "inner")
.filter(col(FeedIdentifierColumns.imsiHash) =!= col("other_imsi_hash"))
Однако это также не работает и слишком медленно. Что еще можно сделать, чтобы этот запрос был быстрее вычислен? Я что-то упускаю?
Ниже вы увидите различные неудачные попытки увеличить параллелизм при чтении данных для самостоятельного объединения.
Я даже установил:
--conf spark.sql.files.maxPartitionBytes=16777216 \
до 1/8, т.е. 16 против 128 МБ, но число сгенерированных заданий слишком мало, то есть всего 250.
некоторые детали
План выполнения:
![enter image description here](https://i.stack.imgur.com/TSYXV.png)
Даже без этого ручного перераспределения это слишком медленно, и я боюсь, что недостаточно разделов создано:
Выполняется еще меньше задач - что, скорее всего, сделает его медленнее:
![enter image description here](https://i.stack.imgur.com/QuOqk.png)
Как я могу убедиться, что этот начальный шаг имеет более высокий параллелизм? Может ли ведро помочь? Но при чтении перемешанных данных только один раз - это на самом деле не приведет к ускорению - верно? Как насчет шага перераспределения при записи агрегированных файлов? Должен ли я установить большее число здесь? До сих пор, даже если его опустить (и, в основном, пересчитать агрегацию дважды), оно не превысит 260 задач.
environment
Я использую spark 2.3.x на HDP 3.1