Оптимизация записи разделенных данных в S3 в spark sql - PullRequest
0 голосов
/ 16 января 2019

У меня около 700 ГБ данных, которые я читаю из HDFS при каждом запуске задания Spark. Моя работа читает эти данные, фильтрует около 60% данных, разделяет их следующим образом:

val toBePublishedSignals = hiveCtx.sql("some query")

toBePublishedSignals.write.partitionBy("A", "B", "C").format(JSON_DATA_FORMAT)
      .mode(SaveMode.Append).save(getS3DataPath())

val metadataFiles = hiveCtx.sql("some query")
metadataFiles.distinct().write.partitionBy("A", "C").format(JSON_DATA_FORMAT)
  .mode(SaveMode.Append).save(getS3MetadataPath())

Задание застревает у водителя. Я взял дамп драйвера, и он застрял в следующем:

    at com.a9.trafficvalidation.hadoop.utils.fs.AWSS3FileSystem.retrieveObjectListing(AWSS3FileSystem.java:366)
    at com.a9.trafficvalidation.hadoop.utils.fs.AWSS3FileSystem.getFileStatus(AWSS3FileSystem.java:335)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:402)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:362)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:222)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
    - locked <0x00000002d9b98288> (a org.apache.spark.sql.execution.command.ExecutedCommandExec)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
    - locked <0x00000002d9b98330> (a org.apache.spark.sql.execution.QueryExecution)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:510)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)

Похоже, что листинг S3 является большим узким местом. Работа застревает на несколько часов и не завершается.

Или, в любом случае, я могу сохранить путь как S3: // bucket / A = dvfw / B = wfwef в фрейме данных, перераспределить фрейм данных по пути, а затем разбить только на «C» и записать в путь? Я не знаю, как я могу сделать это, не просматривая весь фрейм данных и сохраняя DF за один раз.

Занимался этим с утра! Нужны советы о том, как справиться с этим / избежать этого!

ТИА!

1 Ответ

0 голосов
/ 17 января 2019

Насколько я помню, такая ситуация возникает, когда вы пишете в режиме добавления и у вас много разделов в конечном местоположении. Spark извлекает существующие разделы и, возможно, схемы. Я бы предложил два возможных решения.

1) Если у вас нет много разделов для записи за исполнение, вы можете попробовать следующее:

// Prepare data and cache it
// There are a lot of data, so a part of it most probably will be written to disk
val toBePublishedSignals = hiveCtx.sql("some query").persist(StorageLevel.MEMORY_AND_DISK_SER_2)

// Get all unique combinations of partitions columns
val partitions = toBePublishedSignals.selectExpr("A", "B", "C").distinct().collect()

// Write each combination as a separate partition
partitions.foreach { p =>
    val a = p.getAs[String]("A"))
    val b = p.getAs[String]("B"))
    val c = p.getAs[String]("C"))
    val path = new Path(new Path(new Path(getS3DataPath(), s"A=$a"), s"B=$b"), s"C=$c")
    toBePublishedSignals.filter(col("A") === a && col("B") === b && col("C") === c)
                       .write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}

И то же самое для метаданных.

// Prepare data and cache it
val metadataFiles = hiveCtx.sql("some query").distinct().persist(StorageLevel.MEMORY_AND_DISK_SER_2)

// Get all unique combinations of partitions columns
val partitions = metadataFiles.selectExpr("A", "C").distinct().collect()

// Write each combination as a separate partition
partitions.foreach { p =>
    val a = p.getAs[String]("A"))
    val c = p.getAs[String]("C"))
    val path = new Path(new Path(getS3MetadataPath(), s"A=$a"), s"C=$c")
    metadataFiles.filter(col("A") === a && col("C") === c)
                 .write.format(JSON_DATA_FORMAT).mode(SaveMode.Append).save(path.toUri.toString)
}

Я не знаю о типах данных столбцов раздела, поэтому в моем примере это строки. Код выше является только примером. Его можно переписать в более общий способ, используя операцию свертывания и извлекая типы данных из схемы DataFrame.

2) В качестве опции возможно чтение записей из разделов, к которым вы собираетесь прикоснуться, в существующих данных и объединение с входящими записями. Давайте представим, что A/B/C равны year/month/day соответственно. У нас есть некоторые новые данные, и df DataFrame является результатом обработки данных. После обработки мы получили следующие данные

2018|10|11|f1|f2|f3
2018|11|14|f1|f2|f3
2018|11|15|f1|f2|f3

Это означает, что нам нужно прочитать разделы из местоположения, которое содержит окончательные данные (местоположение, которое возвращается getS3DataPath())

year=2018/month=10/day=11
year=2018/month=11/day=14
year=2018/month=11/day=15

Для этого нам нужно создать функцию фильтра, которая является комбинацией нескольких других функций. Мы используем Reduce для их объединения, используя следующую логику:

year=2018 && month=10 && day=11
or
year=2018 && month=11 && day=14
or
year=2018 && month=11 && day=15
// Do processing
val toBePublishedSignalsNew = hiveCtx.sql("some query")

// Create a filter function for querying existing data
val partitions = toBePublishedSignalsNew.selectExpr("A", "B", "C").distinct().collect()
val filterFunction = partitions.map { partitionValues =>
    partitionColumns.map { columnName =>
        (input: Row) => input.getAs[String](columnName) == partitionValues.getAs[String](columnName)
    }.reduceOption((f1, f2) => (row: Row) => f1(row) && f2(row)).getOrElse((_: Row) => false)
}.reduceOption((f1, f2) => (row: Row) => f1(row) || f2(row)).getOrElse((_: Row) => false)

// Read existing partitions that match incoming data
val toBePublishedSignalsExisting = sparkSession.read.json(getS3DataPath()).filter(filterFunction)

// Combine new and existing data and write the result to a temporary location
toBePublishedSignalsExisting
    .union(toBePublishedSignalsNew)
    .write
    .partitionBy("A", "B", "C")
    .format(JSON_DATA_FORMAT)
    .mode(SaveMode.Overwrite)
    .save(temporaryLocationS3)

После этого вам нужно будет заменить разделы в расположении, которое возвращается getS3DataPath(), на разделы, расположенные в temporaryLocationS3. Приведенный выше пример будет работать, только если столбцы разделов содержат строки. Если они имеют другие типы данных, вам, вероятно, придется добавить некоторое сопоставление для функций фильтра. Например, для IntegerType это будет выглядеть как

(input: Row) => input.getAs[Int](columnName) == partitionValues.getAs[Int](columnName)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...