У нас есть 3-узловый искровой EMR-кластер (m3Xlarge). Мы пытаемся объединить несколько больших таблиц размером 4 ГБ (250+ столбцов), а несколько небольших ссылочных таблиц (15) имеют 2-3 столбца в каждой. Поскольку мы используем искровое динамическое расположение, которое по умолчанию включено в EMR.
Таким образом, при записи в HDFS для сохранения результатов требуется более 1 часа (это потому, что мы используем coalesce (1) в конечном DataFrame).
Даже мы пытались использовать широковещательные соединения, но пока не повезло. Как мы можем улучшить производительность для вышеупомянутого?
Каково будет окончательное время оптимизации для вышеуказанного Процесса?
Какими могут быть возможные способы улучшения производительности?
Любая помощь будет оценена!
Вот моя функция соединения
def multiJoins(MasterTablesDF: DataFrame, tmpReferenceTablesDF_List: MutableList[DataFrame], tmpReferenceTableJoinDetailsList: MutableList[Array[String]], DrivingTable: String): DataFrame = {
// Define final output of Driving Table
var final_df: DataFrame = null
if (MasterTablesDF != null) {
if (!MasterTablesDF.head(1).isEmpty && tmpReferenceTablesDF_List.length >= 1) {
for (i <- 0 until tmpReferenceTablesDF_List.length) {
val eachReferenceTableDF = tmpReferenceTablesDF_List(i)
var eachJoinDetails = tmpReferenceTableJoinDetailsList(i)
//for first ref table Join
if (i == 0) {
println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
if (eachJoinDetails(0).equals(eachJoinDetails(1))) {
println("############## Driving table and Ref table Joining columns are same joining first Drive table ==>" + DrivingTable + "With Ref table ==>" + eachJoinDetails(3))
//if reftable and Driving table have same join columns using seq() to remove duplicate columns after Joins
final_df = MasterTablesDF.join(broadcast(eachReferenceTableDF), Seq(eachJoinDetails(0)), eachJoinDetails(2)) //.select(ReqCols.head, ReqCols.tail: _*)
} else {
//if the joining column names of the driving and ref tables are not same then
//using driving table join col and reftable join cols
println("############### Driving table and Ref table joining columns are not same joining first Drive table ==>" + DrivingTable + "With Ref table ==>" + eachJoinDetails(3) + "\n")
final_df = MasterTablesDF.join(broadcast(eachReferenceTableDF), MasterTablesDF(eachJoinDetails(0)) === eachReferenceTableDF(eachJoinDetails(1)), eachJoinDetails(2))
}
} //Joining Next reference table dataframes with final DF
else {
if (eachJoinDetails(0).equals(eachJoinDetails(1))) {
println("###### drive table and another ref table join cols are same joining driving table ==>" + DrivingTable + "With RefTable" + eachJoinDetails(3))
final_df = final_df.join(broadcast(eachReferenceTableDF), Seq(eachJoinDetails(0)), eachJoinDetails(2)) //.select(ReqCols.head, ReqCols.tail: _*)
// final_df.unpersist()
} else {
println("###### drive table and another ref table join cols are not same joining driving table ==>" + DrivingTable + "With RefTable" + eachJoinDetails(3) + "\n")
final_df = final_df.join(broadcast(eachReferenceTableDF), MasterTablesDF(eachJoinDetails(0)) === eachReferenceTableDF(eachJoinDetails(1)), eachJoinDetails(2))
}
}
}
}
}
return final_df
//Writing is too slow
//final_df.coalesce(1).write.format("com.databricks.spark.csv").option("delimiter", "|").option("header", "true")
.csv(hdfsPath)
}
Это нормально? это из-за зацикливания?