Я новичок в распределенной разработке Spark.Я пытаюсь оптимизировать существующее задание Spark, выполнение которого занимает до 1 часа.
Инфраструктура :
- EMR [10 экземпляров r4.8xlarge(32 ядра, 244 ГБ)]
- Исходные данные: 1000 GZ-файлов в S3 (~ 30 МБ каждый)
- Параметры выполнения Spark [Исполнители: 300, Память для исполнителей: 6 ГБ, Ядра: 1]
В общем, задание Spark выполняет следующее:
private def processLines(lines: RDD[String]): DataFrame = {
val updatedLines = lines.mapPartitions(row => ...)
spark.createDataFrame(updatedLines, schema)
}
// Read S3 files and repartition() and cache()
val lines: RDD[String] = spark.sparkContext
.textFile(pathToFiles, numFiles)
.repartition(2 * numFiles) // double the parallelism
.cache()
val numRawLines = lines.count()
// Custom process each line and cache table
val convertedLines: DataFrame = processLines(lines)
convertedRows.createOrReplaceTempView("temp_tbl")
spark.sqlContext.cacheTable("temp_tbl")
val numRows = spark.sql("select count(*) from temp_tbl").collect().head().getLong(0)
// Select a subset of the data
val myDataFrame = spark.sql("select a, b, c from temp_tbl where field = 'xxx' ")
// Define # of parquet files to write using coalesce
val numParquetFiles = numRows / 1000000
var lessParts = myDataFrame.rdd.coalesce(numParquetFiles)
var lessPartsDataFrame = spark.sqlContext.createDataFrame(lessParts, myDataFrame.schema)
lessPartsDataFrame.createOrReplaceTempView('my_view')
// Insert data from view into Hive parquet table
spark.sql("insert overwrite destination_tbl
select * from my_view")
lines.unpersist()
Приложение считывает все файлы S3 => переразбиения в два раза больше файлов => кэширует RDD =>пользовательский обрабатывает каждую строку => создает временное представление / кэш-таблицу => считает количество строк => выбирает подмножество данных => уменьшает количество разделов => создает представление подмножества данных => вставляет в место назначения ульяТаблица с использованием вида => не оперирует RDD.
Я не уверен, почему выполнение занимает много времени.Неправильно ли установлены параметры выполнения искры или что-то здесь неправильно вызывается?