спарк scala большой XML-файл, разделенный по атрибуту строки - PullRequest
0 голосов
/ 04 января 2019

Я пытаюсь разбить большой xml-файл (1 Терабайт) на более мелкие файлы в Scala-Spark. Вот пример XML выглядит следующим образом (test1.xml). «LOG» является корневым тегом, а «LgRec» является тегом строки, и требуется разделить файлы на более мелкие файлы на основе значений «RecId» (который является атрибутом LgRec). Итак, в этом случае он должен выдать 3 файла test1_AA.xml, test1_BB.xml, test1_CC.xml.

Я написал код, используя библиотеку DATABRICKS XML, и протестировал файл меньшего размера, он работает. Мой вопрос, есть ли лучший способ сделать это, когда я запускаю эту программу для файла 1 ТБ. Сохраняет ли Spark весь 1 ТБ DF и меньшие DF в памяти, что означает, что мне требуется около 2 ТБ + памяти на моем Spark Cluster?

Ценю ваши мысли

test1.xml (input file)
<?xml version="1.0" encoding="UTF-8"?>
<LOG>
  <LgRec RecId="AA">
    <UpdRec>
      <field num="001">aaaa</field>
      <field num="002">100</field>
      <field num="003">100.10</field>
      <field num="004">20181125</field>
      <field num="005">101010.123</field>
    </UpdRec>
  </LgRec>
  <LgRec RecId="BB">
    <UpdRec>
      <field num="001">xaaa</field>
      <field num="002">1100</field>
      <field num="003">1100.10</field>
      <field num="004">20181125</field>
      <field num="005">111111.123</field>
    </UpdRec>
  </LgRec>
  <LgRec RecId="CC">
    <UpdRec>
      <field num="001">zaaaa</field>
      <field num="002">3100</field>
      <field num="003">3100.10</field>
      <field num="004">20181225</field>
      <field num="005">131313.123</field>
    </UpdRec>
  </LgRec>
</LOG>

test1_AA.xml  (output file)
<LOG>
  <LgRec RecId="AA">
    <UpdRec>
      <field num="001">aaaa</field>
      <field num="002">100</field>
      <field num="003">100.10</field>
      <field num="004">20181125</field>
      <field num="005">101010.123</field>
    </UpdRec>
  </LgRec>

test1_BB.xml  (output file)
  <LgRec RecId="BB">
    <UpdRec>
      <field num="001">xaaa</field>
      <field num="002">1100</field>
      <field num="003">1100.10</field>
      <field num="004">20181125</field>
      <field num="005">111111.123</field>
    </UpdRec>
  </LgRec>

test1_CC.xml  (output file)
  <LgRec RecId="CC">
    <UpdRec>
      <field num="001">zaaaa</field>
      <field num="002">3100</field>
      <field num="003">3100.10</field>
      <field num="004">20181225</field>
      <field num="005">131313.123</field>
    </UpdRec>
  </LgRec>
</LOG>

Вот код

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import com.databricks.spark.xml._


object splitXML {

  def main(args: Array[String]) = {

    val spark = SparkSession
      .builder()
      .appName("splitXML")
      .master("local")
      .getOrCreate()

    import spark.implicits._

    val inputFile = "test1.xml"

    val outputFilePrefix = "test1_"
    val outputFileSuffix = ".xml"

    val inputXMLDF = spark.read
                     .option("roottag", "LOG")
                     .option("rowTag" , "LgRec")
                     .xml(inputFile)

    //get distinct recId's
    val recTypeDF = inputXMLDF.select("_RecId").distinct() 

    //for each recid filter the DF by recId and output the file
    recTypeDF.collect().map(row => {
                                     println("row = " + row(0) )
                                     inputXMLDF.filter(inputXMLDF("_RecId")===row(0))
                                               .write
                                               .option("roottag", "LOG")
                                               .option("rowTag" , "LgRec")
                                               .xml(outputFilePrefix + row(0) + outputFileSuffix)

                                   })

    spark.close()
  }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...