От почтового индекса до следующего с SPARK - PullRequest
0 голосов
/ 10 сентября 2018

Я получаю zip-архив «2018-06-26.zip» каждый день размером примерно ок. Сжатый 250 МБ, содержащий 165-170 000 небольших XML-файлов (КБ). Я загружаю zip-архив в HDFS (избегая проблем с небольшими файлами) и использую SPARK для извлечения их из zip (zip-файлы не разделяются), создавая парный RDD, с именем файла в качестве ключа и содержимым в качестве значения и сохранения их как файл последовательности через парный RDD. Все работает гладко с небольшим zip-архивом, содержащим только 3 XML-файла для тестирования, но когда я загружаю его в большой архив, я получаю

   java.lang.OutOfMemoryError: GC overhead limit exceeded
   at java.util.Arrays.copyOf(Arrays.java:2367)
   at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
   ...
   ...

Я работаю на Cloudera Quickstart VM: CDH 5.13.3 (HDFS: 2.60, JDK: 1.7.0.67, SPARK: 1.6.0, Scala 2.10)

Я еще не запускал его на полном кластере, так как я хотел быть уверенным, что мой код верен перед его развертыванием ...

Сборщик мусора продолжает работать с OOM с превышенным пределом служебных данных. Я знаю об увеличении объема памяти для драйвера и пространства кучи Java, но я подозреваю, что мой подход высасывает слишком много памяти .... Мониторинг использования памяти, однако, не обнаруживает утечки памяти ....

Вот код:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip", 10).collect
   .foreach { zip_file : (String, PortableDataStream) =>
    val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
    var zip_entry : ZipEntry = null
    while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
      if (!zip_entry.isDirectory) {
        val key_file_name = zip_entry.getName
        val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
        xml_map += ( key_file_name -> value_file_content )
      }
      zip_stream.closeEntry()
    }
    zip_stream.close()
  }
val xml_rdd = sc.parallelize(xml_map.toSeq).saveAsSequenceFile("/user/cloudera/2018_06_26")

Любая помощь или идеи высоко ценятся.

1 Ответ

0 голосов
/ 11 сентября 2018

Мое окончательное решение:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip").collect
   .foreach { zip_file : (String, PortableDataStream) =>
   val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
   var zip_entry : ZipEntry = null
   while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
      if (!zip_entry.isDirectory) {
      val key_file_name = zip_entry.getName
      val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
      xml_map += ( key_file_name -> value_file_content )
   }
   zip_stream.closeEntry()
  }
  zip_stream.close()
}
val xml_rdd = sc.parallelize(xml_map.toSeq, 75).saveAsSequenceFile("/user/cloudera/2018_06_26")

Оригинальный zip-файл 325 Мб, содержащий 170 000 XML-файлов, в результате чего получается 75 разделов, каждый ок.35 Мб.Всего ~ 2,5 ГБ Время выполнения локально на моем ПК с Windows: 1,2 минуты :-)

...