Разбиение большого перекошенного набора данных в S3 методом Spark's partitionBy - PullRequest
0 голосов
/ 29 октября 2018

Я пытаюсь записать большой набор многораздельных данных на диск с помощью Spark, и алгоритм partitionBy борется с обоими подходами, которые я пробовал.

Разделы сильно перекошены - некоторые изразделы являются массивными, а другие - крошечными.

Задача # 1 :

Когда я использую перераспределение перед repartitionBy, Spark записывает все разделы как один файл, дажеогромные

val df = spark.read.parquet("some_data_lake")
df
  .repartition('some_col).write.partitionBy("some_col")
  .parquet("partitioned_lake")

Это выполняется вечно, потому что Spark не записывает большие разделы параллельно.Если один из разделов имеет 1 ТБ данных, Spark попытается записать все 1 ТБ данных в один файл.

Задача № 2 :

Когда я надеваюне использовать repartition, Spark выдает слишком много файлов.

Этот код запишет безумное количество файлов.

df.write.partitionBy("some_col").parquet("partitioned_lake")

Я запустил это на крошечных 8 ГБ данныхподмножество и Spark записали более 85 000 файлов!

Когда я попытался запустить это для набора производственных данных, один раздел с 1,3 ГБ данных был записан в виде 3100 файлов.

Что бы я хотел

Я бы хотел, чтобы каждый раздел записывался в виде файлов размером 1 ГБ.Таким образом, раздел с 7 ГБ данных будет записан в виде 7 файлов, а раздел с 0,3 ГБ данных будет записан в виде одного файла.

Какой мой лучший путь вперед?

1 Ответ

0 голосов
/ 29 октября 2018

Самое простое решение - добавить один или несколько столбцов в repartition и явно указать количество разделов.

val numPartitions = ???

df.repartition(numPartitions, $"some_col", $"some_other_col")
 .write.partitionBy("some_col")
 .parquet("partitioned_lake")

где:

  • numPartitions - должна быть верхняя граница (фактическое число может быть меньше) желаемого количества файлов, записанных в каталог раздела.
  • $"some_other_col" (и необязательные дополнительные столбцы) должны иметь большую мощность и быть независимыми от $"some_column (между этими двумя должна быть функциональная зависимость, и они не должны быть сильно коррелированными).

    Если данные не содержат такого столбца, вы можете использовать o.a.s.sql.functions.rand.

    import org.apache.spark.sql.functions.rand
    
    df.repartition(numPartitions, $"some_col", rand)
      .write.partitionBy("some_col")
      .parquet("partitioned_lake")
    
...