Tensorflow Java использует слишком много памяти с искрой на YARN - PullRequest
0 голосов
/ 16 октября 2019

При использовании java tenorflow для вывода объем памяти для выполнения задания на YARN слишком велик. Работа отлично выполняется с моим компьютером (2 ядра по 16 ГБ ОЗУ) и занимает 35 минут. Но когда я пытаюсь запустить его на YARN с 10 исполнителями 16 Гб памяти и 16 Гб памяти, накладные расходы приводят к гибели исполнителей за использование слишком большого объема памяти.

Прогнозирование запускается в кластере Hortonworks с YARN 2.7.3 и Spark 2.2.1,Ранее мы использовали DL4J для вывода и все работало менее 3 минут. Тензор корректно закрывается после использования, и мы используем mapPartition для прогнозирования. Каждая задача содержит приблизительно 20 000 записей (1 МБ), поэтому входной тензор будет равен 2 000 000 x 14, а выходной тензор - 2 000 000 (5 МБ). Параметр

передается на искру при работе на YARN

--master yarn --deploy-mode cluster --driver-memory 16G --num-executors 10 --executor-memory 16G --executor-cores 2 --conf spark.driver.memoryOverhead=16G --conf spark.yarn.executor.memoryOverhead=16G --conf spark.sql.shuffle.partitions=200 --conf spark.tasks.cpu=2

Эта конфигурация может работать, если мы установим spark.sql.shuffle.partitions = 2000, но это займет 3 часа

ОБНОВЛЕНИЕ:

Разница между локальным и кластернымбыло на самом деле из-за отсутствия фильтра. мы на самом деле выполняем прогноз на большем количестве данных, чем мы.

1 Ответ

0 голосов
/ 22 октября 2019

Чтобы уменьшить объем памяти каждого раздела, вы должны создать пакет внутри каждого раздела (используйте grouped(batchSize)). Таким образом, вы выполняете прогнозирование для каждой строки быстрее и выделяете тензор заранее определенного размера (batchSize). Если вы исследуете код лога tenorflowOnSpark , то это то, что они сделали. Ниже вы найдете переработанный пример реализации, которую этот код может не компилировать, но вы получите представление о том, как это сделать.

    lazy val sess = SavedModelBundle.load(modelPath, "serve").session
    lazy val numberOfFeatures = 1
    lazy val laggedFeatures = Seq("cost_day1", "cost_day2", "cost_day3")
    lazy val numberOfOutputs = 1
    val predictionsRDD = preprocessedData.rdd.mapPartitions { partition =>
        partition.grouped(batchSize).flatMap { batchPreprocessed =>
          val numberOfLines = batchPreprocessed.size
          val featuresShape: Array[Long] = Array(numberOfLines, laggedFeatures.size / numberOfFeatures, numberOfFeatures)

          val featuresBuffer: FloatBuffer = FloatBuffer.allocate(numberOfLines)

          for (
            featuresWithKey <- batchPreprocessed;
            feature <- featuresWithKey.features
          ) {
            featuresBuffer.put(feature)
          }
          featuresBuffer.flip()
          val featuresTensor = Tensor.create(featuresShape, featuresBuffer)

          val results: Tensor[_] = sess.runner
            .feed("cost", featuresTensor)
            .fetch("prediction")
            .run.get(0)

          val output = Array.ofDim[Float](results.numElements(), numberOfOutputs)
          val outputArray: Array[Array[Float]] = results.copyTo(output)

          results.close()
          featuresTensor.close()
          outputArray
        }
    }
    spark.createDataFrame(predictionsRDD)

Мы используем FloatBuffer и Shape для создания Tensor, как рекомендовано в thisвыпуск

...