Одним из подходов может быть добавление дополнительных столбцов к repartition
, и этот столбец будет иметь большую мощность (идентификатор записей или некоторые случайные значения)
если количество файлов становится большим, попробуйте установить numPartitions
с последующим разделением столбцов.
df.repartition(numPartitions, partition_cols_including_high_cardinality_column:_*).write........
=============================================== ============================
Edit:
В сценариях, где данные искажаются, когда некоторые комбинации разделов содержат больше данных, чем другие, перераспределение их с одним и тем же столбцом может быть не очень хорошей идеей.
При перераспределении все комбинации ключей разделов, соответствующие данным, будут сначала собраны на одном исполнителе, и будет создан один файл, если ваши partitionBy и передел имеют одинаковые аргументы столбца. Таким образом, в этом случае несколько комбинаций разделов будут иметь файлы, такие как ~ 15 Гб, а некоторые, например, ~ 1 Гб, что не идеально подходит для источников данных, таких как HDFS
.
Так что я предлагаю здесь иметь колонки перераспределения, которые равномерно распределяют данные по исполнителям. Учитывая это, мы перераспределили данные в некоторой комбинации столбцов E, она выдает, скажем, 400 строк для каждого исполнителя, с которым нужно работать, затем каждый исполнитель запишет свои данные на основе спецификации partitionBy. И когда вы проверяете ваш окончательный вывод, у каждого раздела будет количество файлов, равное количеству исполнителей, которые получили строки с одинаковой спецификацией partitionBy. Количество исполнителей определяется спецификацией колонки перераспределения.
То, что я предложил выше, - это иметь другой набор столбцов для перераспределения, что поможет равномерно распределять данные по исполнителям. И если по каким-то причинам это невозможно для данных, добавьте несколько случайных столбцов (метод называется salting
). Возможность добавления numPartitions
фиксирует верхнюю границу числа исполнителей, работающих с данными, тем самым фиксируя количество файлов, записанных в каталог раздела. Установка numPartitions чрезвычайно полезна, когда ваш столбец перераспределения имеет большую мощность, поскольку это может создать много файлов в ваших выходных каталогах.
import org.apache.spark.sql.functions.rand
df.repartition(numPartitions, $"some_col_1", rand)
.write.partitionBy("some_col")
.parquet("partitioned_lake")
здесь, исправляя numPartitions, мы уверены, что выходные данные для каждой спецификации partitionBy будут содержать максимум файлов numPartitions.
полезная ссылка - http://tantusdata.com/spark-shuffle-case-2-repartitioning-skewed-data/
Надеюсь, это поможет