У меня есть один tar-файл mytar.tar
размером 40 ГБ. Внутри этого tar
файла находятся 500 tar.gz
файлов, а внутри каждого из этих tar.gz
файлов - куча JSON
файлов. Я написал код для обработки этого единственного файла tar
и попытался получить список содержимого строки JSON
. Мой код выглядит следующим образом.
val isRdd = sc.binaryFiles("/mnt/mytar.tar")
.flatMap(t => {
val buf = scala.collection.mutable.ListBuffer.empty[TarArchiveInputStream]
val stream = t._2
val is = new TarArchiveInputStream(stream.open())
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt
if (entry.isFile() && size > -1) {
val content = new Array[Byte](size)
is.read(content, 0, content.length)
val tgIs = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(content)))
buf += tgIs
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache
val byteRdd = isRdd.flatMap(is => {
val buf = scala.collection.mutable.ListBuffer.empty[Array[Byte]]
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt
if (entry.isFile() && name.endsWith(".json") && size > -1) {
val data = new Array[Byte](size)
is.read(data, 0, data.length)
buf += data
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache
val jsonRdd = byteRdd
.map(arr => getJson(arr))
.filter(_.length > 0)
.cache
jsonRdd.count //action just to execute the code
Когда я выполняю этот код, я получаю OutOfMemoryError (OOME).
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 24.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 24.0 (TID 137, 10.162.224.171, executor 13):
java.lang.OutOfMemoryError: Java heap space
Мой кластер EC2 имеет 1 драйвер и 2 рабочих узла типа i3.xlarge
(30,5 ГБ памяти, 4 ядра). Глядя на журналы и думая об этом, я считаю, что OOME
происходит во время создания isRDD
(входной поток RDD).
Есть ли что-то еще в коде или создании моего кластера Spark, что я могу сделать, чтобы смягчить эту проблему? Должен ли я выбрать экземпляр EC2 с большим объемом памяти (например, оптимизированный для памяти экземпляр, например R5.2xlarge)? Впрочем, я обновил до кластерной настройки R5.2x и все еще видел OOME.
Одна вещь, о которой я думал, это распаковать mytar.tar
и вместо этого начать с файлов .tar.gz
внутри. Я думаю, что каждый .tar.gz
в файле tar
должен быть меньше 30 ГБ, чтобы избежать OOME
(на i3.xlarge).
Любые советы или рекомендации приветствуются.