Проблема оптимизации для большого объема данных: java.lang.OutOfMemoryError: превышен предел издержек GC - PullRequest
0 голосов
/ 15 апреля 2019

У меня проблема с оптимизацией кода, я работаю с очень большими объемами. Когда я запускаю код на небольшом объеме данных, запускаю код без проблем, но как только я достигаю реального размера, я получаю следующие ошибки: java.lang.OutOfMemoryError: Превышен лимит накладных расходов GC а также java.util.concurrent.TimeoutException: время ожидания фьючерса истекло после [300 секунд]

Я уже пытался оптимизировать это (удалить неиспользуемое действие ...), я пытался запустить свой код внутри цикла for, чтобы запустить код последовательно, я попытался внутри каждого для каждого.

Я работаю на "--num-executors 6 --executor-memory 22G --executor-cores 8 --driver-cores 6 --driver-memory 10G"

...

Это основной цикл кода

datatest
  .select("yearWeek", "cluster").distinct().collect().toList
  .map(cluster => (cluster.getString(0), cluster.getString(1)))
  .map(cluster => {
    TestModel(datatest.filter($"cluster" === cluster._2), cluster._1, cluster._2, ModelCcr_1, ModelErlang_1)
  })

...

А внутри функции:

...

def TestModel(Test_Filtered: DataFrame, yearWeek: String, Cluster: String, ModelCcr: List[(String, Array[loadModelWithId.DecisionTreeModelElsa])], ModelErlang: List[(String, Array[loadModelWithId.DecisionTreeModelElsa])] ): DataFrame = {

val sparkContext = sparkSession.sparkContext
val sqlContext = sparkSession.sqlContext
import sqlContext.implicits._

val initialSetReduce = List(0.0)
val addToSetReduce = (s: List[Double], v: Double) => if (s.head==0.0) List(v) else v :: s
val mergePartitionSetsReduce = (p1: List[Double], p2: List[Double]) => p1 ++ p2



val ModelCcr_Off = ModelCcr.filter(x => x._1 == Cluster)
val ModelErlang_Off = ModelErlang.filter(x => x._1 == Cluster)


val moyenne_ccr = Test_Filtered.select("ccr").agg(mean("ccr")).map(x => x.getDouble(0)).head()

val moyenne_erlang = Test_Filtered.select("erlang").agg(mean("erlang")).map(x => x.getDouble(0)).head()


val TrainForDeploy = Test_Filtered
  .select(
  col("erlang")
  , col("resource")
  ,col("resourceIndexCluster")
  , col("dayTime")
  , col("nbConnections").cast("Double")
  , col("ccr")
  , col("resourceType")
  , col("yearWeek")
  ).rdd.map(x => (
  (x.getDouble(0), Vectors.dense(x.getLong(2),x.getDouble(3), x.getDouble(4), 1D, 1D)),
  (x.getDouble(5), Vectors.dense(x.getLong(2),x.getDouble(3), x.getDouble(4), 1D, 1D))
  , x.getString(6)
  , x.getString(7)
  ,x.getString(1)
)).persist(StorageLevel.MEMORY_AND_DISK)



val TrainDeployCCR = TrainForDeploy
  .map(data =>
    ModelCcr_Off.map(x => x._2
      .map(tree =>
        //Calcul des prédictions pour le ccr
        ((data._2._2, data._2._1, data._3, data._4,data._5), Distrib.noeudPredictPredQuality(tree.decisionTreeModel, data._2._2))
      )))
  .flatMap(x => x.flatten(x=>x))



val TrainDeployErlang = TrainForDeploy
  .map(data =>
    ModelErlang_Off.map(x => x._2
      .map(tree =>
        //Calcul des prédictions pour le ccr
        ((data._1._2, data._1._1, data._3, data._4,data._5), Distrib.noeudPredictPredQuality(tree.decisionTreeModel, data._1._2))
      )))
  .flatMap(x => x.flatten(x=>x))


TrainForDeploy.unpersist()


val TrainNodes_erlang = TrainDeployErlang
  .aggregateByKey(initialSetReduce)(addToSetReduce, mergePartitionSetsReduce) 



val TrainNodes_ccr = TrainDeployCCR
  .aggregateByKey(initialSetReduce)(addToSetReduce, mergePartitionSetsReduce)




val quantile_ccr = TrainNodes_ccr.map(x => (x._1, x._2)) // enelever calcul des quantile mettre hors de la fonction
  .map(x => (x._1._5, x._1._1(1), x._1._2, x._1._3, x._1._4, x._2))
  .toDF("resource", "dayTime", "ccr", "resourceType", "yearWeek", "ListPredCCR")
  .withColumn("erreurdelamoyenneCCR", erreurDeLaMoyenne(col("ccr"), col("ListPredCCR"), lit(moyenne_ccr)))
  .withColumn("erreurAuCarreCCR", erreurAuCarre(col("ccr"), col("ListPredCCR")))




val quantile_erlang = TrainNodes_erlang.map(x => (x._1, x._2)) //a changer le .first
  .map(x => (x._1._5, x._1._1(1), x._1._2, x._1._3, x._1._4, x._2))
  .toDF("resource", "dayTime", "erlang", "resourceType", "yearWeek", "ListPredErlang")
  .withColumn("erreurdelamoyenneErlang", erreurDeLaMoyenne(col("erlang"), col("ListPredErlang"), lit(moyenne_erlang)))
  .withColumn("erreurAuCarreErlang", erreurAuCarre(col("erlang"), col("ListPredErlang")))




val cluster_Data = quantile_ccr.coalesce(1).join(quantile_erlang.coalesce(1), Seq("resource", "dayTime", "yearWeek", "resourceType"), "inner")



cluster_Data

}

...

На самом деле код выполняется 5 недель легко, но на 6-й неделе происходит сбой кода и выводится ошибка исключения GC или ошибка TimedOut.

Я думаю, что у исполнителей spark недостаточно памяти для запуска всех данных?

Я уже пытался установить функцию сохранения или кэширования, но безуспешно.

Можем ли мы освободить память после каждой итерации? Или, если у вас есть другой способ оптимизировать код.

Спасибо вам за помощь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...