Как оптимизировать обработку файлов S3 Spark Job в Hive Parquet Table - PullRequest
0 голосов
/ 21 января 2019

Я новичок в распределенной разработке Spark.Я пытаюсь оптимизировать существующее задание Spark, выполнение которого занимает до 1 часа.

Инфраструктура :

  • EMR [10 экземпляров r4.8xlarge(32 ядра, 244 ГБ)]
  • Исходные данные: 1000 GZ-файлов в S3 (~ 30 МБ каждый)
  • Параметры выполнения Spark [Исполнители: 300, Память для исполнителей: 6 ГБ, Ядра: 1]

В общем, задание Spark выполняет следующее:

private def processLines(lines: RDD[String]): DataFrame = {
    val updatedLines = lines.mapPartitions(row => ...)
    spark.createDataFrame(updatedLines, schema)
}

// Read S3 files and repartition() and cache()
val lines: RDD[String] = spark.sparkContext
    .textFile(pathToFiles, numFiles) 
    .repartition(2 * numFiles) // double the parallelism
    .cache()

val numRawLines = lines.count()

// Custom process each line and cache table
val convertedLines: DataFrame = processLines(lines)
convertedRows.createOrReplaceTempView("temp_tbl")
spark.sqlContext.cacheTable("temp_tbl")
val numRows = spark.sql("select count(*) from temp_tbl").collect().head().getLong(0)

// Select a subset of the data
val myDataFrame = spark.sql("select a, b, c from temp_tbl where field = 'xxx' ")

// Define # of parquet files to write using coalesce
val numParquetFiles = numRows / 1000000
var lessParts = myDataFrame.rdd.coalesce(numParquetFiles)
var lessPartsDataFrame = spark.sqlContext.createDataFrame(lessParts, myDataFrame.schema)
lessPartsDataFrame.createOrReplaceTempView('my_view')

// Insert data from view into Hive parquet table
spark.sql("insert overwrite destination_tbl 
           select * from my_view")    
lines.unpersist()

Приложение считывает все файлы S3 => переразбиения в два раза больше файлов => кэширует RDD =>пользовательский обрабатывает каждую строку => создает временное представление / кэш-таблицу => считает количество строк => выбирает подмножество данных => уменьшает количество разделов => создает представление подмножества данных => вставляет в место назначения ульяТаблица с использованием вида => не оперирует RDD.

Я не уверен, почему выполнение занимает много времени.Неправильно ли установлены параметры выполнения искры или что-то здесь неправильно вызывается?

1 Ответ

0 голосов
/ 21 января 2019

Прежде чем смотреть на метрики, я бы попробовал следующее изменение в вашем коде.

private def processLines(lines: DataFrame): DataFrame = {
  lines.mapPartitions(row => ...)
}

val convertedLinesDf = spark.read.text(pathToFiles)
    .filter("field = 'xxx'")
    .cache()

val numLines = convertedLinesDf.count() //dataset get in memory here, it takes time        
// Select a subset of the data, but it will be fast if you have enough memory
// Just use Dataframe API
val myDataFrame = convertedLinesDf.transform(processLines).select("a","b","c")

//coalesce here without converting to RDD, experiment what best
myDataFrame.coalesce(<desired_output_files_number>)
  .write.option(SaveMode.Overwrite)
  .saveAsTable("destination_tbl")
  • Кэширование бесполезно, если не считать количество строк. И это займет немного памяти и добавит немного давления ГХ
  • Таблица кэширования может занимать больше памяти и увеличивать давление GC
  • Преобразование Dataframe в RDD является дорогостоящим, поскольку подразумевает операции ser / deser
  • Не уверен, что вы пытаетесь сделать: val numParquetFiles = numRows / 1000000 и перераспределение (2 * numFiles). С вашей настройкой 1000 файлов по 30 МБ каждый даст вам 1000 разделов. Это может быть хорошо, как это. Вызов перераспределения и объединения может вызвать операцию перетасовки, которая является дорогостоящей. (Объединение может не вызывать случайное перемешивание)

Скажите, если у вас есть какие-либо улучшения!

...