У меня проблема с оптимизацией кода, я работаю с очень большими объемами. Когда я запускаю код на небольшом объеме данных, запускаю код без проблем, но как только я достигаю реального размера, я получаю следующие ошибки: java.lang.OutOfMemoryError: Превышен лимит накладных расходов GC
а также
java.util.concurrent.TimeoutException: время ожидания фьючерса истекло после [300 секунд]
Я уже пытался оптимизировать это (удалить неиспользуемое действие ...), я пытался запустить свой код внутри цикла for, чтобы запустить код последовательно, я попытался внутри каждого для каждого.
Я работаю на "--num-executors 6 --executor-memory 22G --executor-cores 8 --driver-cores 6 --driver-memory 10G"
...
Это основной цикл кода
datatest
.select("yearWeek", "cluster").distinct().collect().toList
.map(cluster => (cluster.getString(0), cluster.getString(1)))
.map(cluster => {
TestModel(datatest.filter($"cluster" === cluster._2), cluster._1, cluster._2, ModelCcr_1, ModelErlang_1)
})
...
А внутри функции:
...
def TestModel(Test_Filtered: DataFrame, yearWeek: String, Cluster: String, ModelCcr: List[(String, Array[loadModelWithId.DecisionTreeModelElsa])], ModelErlang: List[(String, Array[loadModelWithId.DecisionTreeModelElsa])] ): DataFrame = {
val sparkContext = sparkSession.sparkContext
val sqlContext = sparkSession.sqlContext
import sqlContext.implicits._
val initialSetReduce = List(0.0)
val addToSetReduce = (s: List[Double], v: Double) => if (s.head==0.0) List(v) else v :: s
val mergePartitionSetsReduce = (p1: List[Double], p2: List[Double]) => p1 ++ p2
val ModelCcr_Off = ModelCcr.filter(x => x._1 == Cluster)
val ModelErlang_Off = ModelErlang.filter(x => x._1 == Cluster)
val moyenne_ccr = Test_Filtered.select("ccr").agg(mean("ccr")).map(x => x.getDouble(0)).head()
val moyenne_erlang = Test_Filtered.select("erlang").agg(mean("erlang")).map(x => x.getDouble(0)).head()
val TrainForDeploy = Test_Filtered
.select(
col("erlang")
, col("resource")
,col("resourceIndexCluster")
, col("dayTime")
, col("nbConnections").cast("Double")
, col("ccr")
, col("resourceType")
, col("yearWeek")
).rdd.map(x => (
(x.getDouble(0), Vectors.dense(x.getLong(2),x.getDouble(3), x.getDouble(4), 1D, 1D)),
(x.getDouble(5), Vectors.dense(x.getLong(2),x.getDouble(3), x.getDouble(4), 1D, 1D))
, x.getString(6)
, x.getString(7)
,x.getString(1)
)).persist(StorageLevel.MEMORY_AND_DISK)
val TrainDeployCCR = TrainForDeploy
.map(data =>
ModelCcr_Off.map(x => x._2
.map(tree =>
//Calcul des prédictions pour le ccr
((data._2._2, data._2._1, data._3, data._4,data._5), Distrib.noeudPredictPredQuality(tree.decisionTreeModel, data._2._2))
)))
.flatMap(x => x.flatten(x=>x))
val TrainDeployErlang = TrainForDeploy
.map(data =>
ModelErlang_Off.map(x => x._2
.map(tree =>
//Calcul des prédictions pour le ccr
((data._1._2, data._1._1, data._3, data._4,data._5), Distrib.noeudPredictPredQuality(tree.decisionTreeModel, data._1._2))
)))
.flatMap(x => x.flatten(x=>x))
TrainForDeploy.unpersist()
val TrainNodes_erlang = TrainDeployErlang
.aggregateByKey(initialSetReduce)(addToSetReduce, mergePartitionSetsReduce)
val TrainNodes_ccr = TrainDeployCCR
.aggregateByKey(initialSetReduce)(addToSetReduce, mergePartitionSetsReduce)
val quantile_ccr = TrainNodes_ccr.map(x => (x._1, x._2)) // enelever calcul des quantile mettre hors de la fonction
.map(x => (x._1._5, x._1._1(1), x._1._2, x._1._3, x._1._4, x._2))
.toDF("resource", "dayTime", "ccr", "resourceType", "yearWeek", "ListPredCCR")
.withColumn("erreurdelamoyenneCCR", erreurDeLaMoyenne(col("ccr"), col("ListPredCCR"), lit(moyenne_ccr)))
.withColumn("erreurAuCarreCCR", erreurAuCarre(col("ccr"), col("ListPredCCR")))
val quantile_erlang = TrainNodes_erlang.map(x => (x._1, x._2)) //a changer le .first
.map(x => (x._1._5, x._1._1(1), x._1._2, x._1._3, x._1._4, x._2))
.toDF("resource", "dayTime", "erlang", "resourceType", "yearWeek", "ListPredErlang")
.withColumn("erreurdelamoyenneErlang", erreurDeLaMoyenne(col("erlang"), col("ListPredErlang"), lit(moyenne_erlang)))
.withColumn("erreurAuCarreErlang", erreurAuCarre(col("erlang"), col("ListPredErlang")))
val cluster_Data = quantile_ccr.coalesce(1).join(quantile_erlang.coalesce(1), Seq("resource", "dayTime", "yearWeek", "resourceType"), "inner")
cluster_Data
}
...
На самом деле код выполняется 5 недель легко, но на 6-й неделе происходит сбой кода и выводится ошибка исключения GC или ошибка TimedOut.
Я думаю, что у исполнителей spark недостаточно памяти для запуска всех данных?
Я уже пытался установить функцию сохранения или кэширования, но безуспешно.
Можем ли мы освободить память после каждой итерации? Или, если у вас есть другой способ оптимизировать код.
Спасибо вам за помощь.