Есть ли лучший способ загрузить огромный файл tar в Spark, избегая при этом OutOfMemoryError? - PullRequest
0 голосов
/ 10 ноября 2018

У меня есть один tar-файл mytar.tar размером 40 ГБ. Внутри этого tar файла находятся 500 tar.gz файлов, а внутри каждого из этих tar.gz файлов - куча JSON файлов. Я написал код для обработки этого единственного файла tar и попытался получить список содержимого строки JSON. Мой код выглядит следующим образом.

val isRdd = sc.binaryFiles("/mnt/mytar.tar")
  .flatMap(t => { 
    val buf = scala.collection.mutable.ListBuffer.empty[TarArchiveInputStream]
    val stream = t._2
    val is = new TarArchiveInputStream(stream.open())
    var entry = is.getNextTarEntry()
    while (entry != null) {
      val name = entry.getName()
      val size = entry.getSize.toInt

      if (entry.isFile() && size > -1) {
        val content = new Array[Byte](size)
        is.read(content, 0, content.length)

        val tgIs = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(content)))
        buf += tgIs
      }
      entry = is.getNextTarEntry()
    }
    buf.toList
  })
  .cache

val byteRdd = isRdd.flatMap(is => {
    val buf = scala.collection.mutable.ListBuffer.empty[Array[Byte]]
    var entry = is.getNextTarEntry()
    while (entry != null) {
      val name = entry.getName()
      val size = entry.getSize.toInt

      if (entry.isFile() && name.endsWith(".json") && size > -1) {
        val data = new Array[Byte](size)
        is.read(data, 0, data.length)
        buf += data
      }
      entry = is.getNextTarEntry()
    }
    buf.toList
  })
  .cache

val jsonRdd = byteRdd
  .map(arr => getJson(arr))
  .filter(_.length > 0)
  .cache

jsonRdd.count //action just to execute the code

Когда я выполняю этот код, я получаю OutOfMemoryError (OOME).

org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 24.0 failed 4 times, most recent failure: 
Lost task 0.3 in stage 24.0 (TID 137, 10.162.224.171, executor 13): 
java.lang.OutOfMemoryError: Java heap space

Мой кластер EC2 имеет 1 драйвер и 2 рабочих узла типа i3.xlarge (30,5 ГБ памяти, 4 ядра). Глядя на журналы и думая об этом, я считаю, что OOME происходит во время создания isRDD (входной поток RDD).

Есть ли что-то еще в коде или создании моего кластера Spark, что я могу сделать, чтобы смягчить эту проблему? Должен ли я выбрать экземпляр EC2 с большим объемом памяти (например, оптимизированный для памяти экземпляр, например R5.2xlarge)? Впрочем, я обновил до кластерной настройки R5.2x и все еще видел OOME.

Одна вещь, о которой я думал, это распаковать mytar.tar и вместо этого начать с файлов .tar.gz внутри. Я думаю, что каждый .tar.gz в файле tar должен быть меньше 30 ГБ, чтобы избежать OOME (на i3.xlarge).

Любые советы или рекомендации приветствуются.

...