Как избежать OutOfMemoryError в маленьком ArrayBuffer в рамках небольшой функции? - PullRequest
0 голосов
/ 16 октября 2019

Функция scanFolder() работала, но иногда выдается исключение ниже

object MyClass{
    // ... etc
    val fs = FileSystem.get(new Configuration())
    // .. etc
    val dbs = scanFolder(warehouse)
    val dbs_prod = dbs.filter( s => db_regex.findFirstIn(s).isDefined )
    for (db <- dbs_prod)
      for (t <- scanFolder(db) {
            var parts = 0; var size = 0L
            fs.listStatus( new Path(t) ).foreach( p => {
                parts = parts + 1
                try { // can lost partition-file during loop, producing "file not found"
                    size  = size  + fs.getContentSummary(p.getPath).getLength
                } catch { case _: Throwable => }
            }) // p loop, partitions
            allVals.append(  s"('${cutPath(db)}','${cutPath(t)}',${parts},${size},'$dbHoje')"  )
            if (trStrange_count>0) contaEstranhos += 1
      }

    def scanFolder(thePath: String, toCut: Boolean = false) : ArrayBuffer[String] = {
        val lst = ArrayBuffer[String]()
        fs.listStatus( new Path(thePath) ).foreach(
            x => lst.append(  cutPath(x.getPath.toString,toCut)  )
        )
        lst.sorted
    }
}

Ошибка:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.util.Arrays.copyOf(Arrays.java:3332)
        at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
        at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
        at java.lang.StringBuilder.append(StringBuilder.java:136)
        at org.apache.hadoop.fs.Path.<init>(Path.java:109)
        at org.apache.hadoop.fs.Path.<init>(Path.java:93)
        ...

(редактировать после дополнительных тестов)

Я использую Scala v2.11 (и сохраняю данные после циклов, используя Spark v2.2).

Я изменяю код, исключая любое использование вызовов функции scanFolder(), чтобы избежать использования ArrayBuffer. Сейчас напрямую использует итератор fs.listStatus( new Path(x) ).foreach( ...code... ) во втором цикле.

... Программа работает в течение ~ 30 минут ... во время некоторых сообщений:

Exception in thread "LeaseRenewer:spdq@TLVBRPRDK" java.lang.OutOfMemoryError: Java heap space
Exception in thread "refresh progress" java.lang.OutOfMemoryError: Java heap space
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
Exception in thread "dispatcher-event-loop-23" java.lang.OutOfMemoryError: Java heap space
Exception in thread "dispatcher-event-loop-39" java.lang.OutOfMemoryError: Java heap space

Окончательное сообщение об ошибке,остановка программы:

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "SparkListenerBus"
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
19/10/16 18:42:14 WARN DefaultPromise: An exception was thrown by org.apache.spark.network.server.TransportRequestHandler$$Lambda$16/773004452.operationComplete()
java.lang.OutOfMemoryError: Java heap space
19/10/16 18:42:14 WARN DefaultChannelPipeline: An exception 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
java.lang.OutOfMemoryError: Java heap space

Ответы [ 2 ]

1 голос
/ 17 октября 2019

У меня нет ответа на ваш конкретный вопрос. Но я хотел бы дать некоторые рекомендации, которые могут быть полезны:

  • предпочитают потоковый стиль обработки, то есть избегают циклов. Используйте map, filter, fold и т. Д. Не используйте глобальные состояния, такие как lst и allVars, в своем фрагменте кода.
  • избегайте ненужной сортировки. scanFolder заканчивается сортировкой без веской причины.
  • рассмотрите возможность добавления кучи памяти для вашей задачи, это уменьшит давление GC.
  • используйте профилировщик JVM для сужения определенного фрагмента кодажадный для выделения памяти. В вашем случае вам это не понадобится, поскольку это довольно очевидно. Но в более сложных случаях это может сильно помочь.
0 голосов
/ 22 октября 2019

После @SergeyRomanovsky хороших подсказок я решил проблему ... Вот фрагмент, который я использовал сначала с Spark-Shell (sshell на моем терминале),

  1. Добавить память с помощьюсамые популярные директивы, sshell --driver-memory 12G --executor-memory 24G

  2. Удалите самый внутренний (и проблемный) цикл, уменьшив int до parts = fs.listStatus( new Path(t) ).length и заключив его в директиву try.

  3. Добавление еще одной директивы try для запуска самого внутреннего цикла после успешного выполнения try .length.

  4. Уменьшены ArrayBuffer[] переменные до минимума, удалены старые scanFolder().

Полный фрагмент:

// ... val allVals = ArrayBuffer[String]()
// for loop:
      var parts = -1
      var size = -1L
      try { // can lost partition-file during loop, producing "file not found"
          val pp = fs.listStatus( new Path(t) )
          parts = pp.length
          pp.foreach( p => {
              try { // timeOut and other filesystem bugs
                  size  = size  + fs.getContentSummary(p.getPath).getLength
              } catch { case _: Throwable => }
          }) // p loop, partitions
      } catch { case _: Throwable => }
      allVals.append(  s"('${dbCut}','${cutPath(t)}',$parts,$size,'$dbHoje')"  )

PS: для элемента 1 накомпилятор, используйте SparkSession.builder()... .config("spark.executor.memory", "24G").config("spark.driver.memory", "12G")

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...