Как настроить сохранение операции с партией - PullRequest
1 голос
/ 11 апреля 2019

Мне нужно разделить данные набора данных на 6 столбцов: регион / год / месяц / день / id / quadkey Где на верхнем уровне у меня есть только бинарное состояние области, а на самом нижнем уровне это фактически то место, где оно попадает во многие разделы. Допустим, у нас есть 2 региона / обычно 1 год / обычно 1 месяц / 3-4 дня / 100-150 идентификаторов / 50-200 quadkeys Когда я выполняю это, я получаю действительно несбалансированную операцию случайного воспроизведения, и иногда исполнители терпят неудачу из-за превышения пределов памяти. Также из пользовательского интерфейса History я заметил, что некоторые задачи на этапе шляпы очень большие (~ 15 ГБ), тогда как другие намного меньше (~ 1 ГБ).

Я пытался играть с

sqlContext.setConf("spark.sql.shuffle.partitions", "3000")

Также я попытался увеличить количество исполнителей, но с теми же настройками памяти. Вот ошибки, которые я получаю:

19/04/10 09:47:36 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
    ...
// stage: DataFrame
val partitionColumns = List("region", "year", "month", "day", "id", "quadkey")
stage.repartition(partitionColumns.map(new org.apache.spark.sql.Column(_)):_*)
      .write.partitionBy(partitionColumns:_*)
      .format("parquet")
      .option("compression", "gzip")
      .mode(SaveMode.Append)
      .save(destUrl)

Я ожидал, что на этапе сохранения будут сбалансированные задачи, какие настройки для перемешивания я должен установить для этого? Или у меня должны быть исполнители с памятью выше 20-25 Гб? Каким должен быть подход масштабирования в таком случае?

1 Ответ

1 голос
/ 11 апреля 2019

Одним из подходов может быть добавление дополнительных столбцов к repartition, и этот столбец будет иметь большую мощность (идентификатор записей или некоторые случайные значения)

если количество файлов становится большим, попробуйте установить numPartitions с последующим разделением столбцов.

df.repartition(numPartitions, partition_cols_including_high_cardinality_column:_*).write........

=============================================== ============================ Edit:

В сценариях, где данные искажаются, когда некоторые комбинации разделов содержат больше данных, чем другие, перераспределение их с одним и тем же столбцом может быть не очень хорошей идеей.
При перераспределении все комбинации ключей разделов, соответствующие данным, будут сначала собраны на одном исполнителе, и будет создан один файл, если ваши partitionBy и передел имеют одинаковые аргументы столбца. Таким образом, в этом случае несколько комбинаций разделов будут иметь файлы, такие как ~ 15 Гб, а некоторые, например, ~ 1 Гб, что не идеально подходит для источников данных, таких как HDFS

.

Так что я предлагаю здесь иметь колонки перераспределения, которые равномерно распределяют данные по исполнителям. Учитывая это, мы перераспределили данные в некоторой комбинации столбцов E, она выдает, скажем, 400 строк для каждого исполнителя, с которым нужно работать, затем каждый исполнитель запишет свои данные на основе спецификации partitionBy. И когда вы проверяете ваш окончательный вывод, у каждого раздела будет количество файлов, равное количеству исполнителей, которые получили строки с одинаковой спецификацией partitionBy. Количество исполнителей определяется спецификацией колонки перераспределения.

То, что я предложил выше, - это иметь другой набор столбцов для перераспределения, что поможет равномерно распределять данные по исполнителям. И если по каким-то причинам это невозможно для данных, добавьте несколько случайных столбцов (метод называется salting). Возможность добавления numPartitions фиксирует верхнюю границу числа исполнителей, работающих с данными, тем самым фиксируя количество файлов, записанных в каталог раздела. Установка numPartitions чрезвычайно полезна, когда ваш столбец перераспределения имеет большую мощность, поскольку это может создать много файлов в ваших выходных каталогах.

import org.apache.spark.sql.functions.rand

df.repartition(numPartitions, $"some_col_1", rand)
  .write.partitionBy("some_col")
  .parquet("partitioned_lake")

здесь, исправляя numPartitions, мы уверены, что выходные данные для каждой спецификации partitionBy будут содержать максимум файлов numPartitions.

полезная ссылка - http://tantusdata.com/spark-shuffle-case-2-repartitioning-skewed-data/
Надеюсь, это поможет

...