Я пытаюсь разбить большой xml-файл (1 Терабайт) на более мелкие файлы в Scala-Spark. Вот пример XML выглядит следующим образом (test1.xml).
«LOG» является корневым тегом, а «LgRec» является тегом строки, и требуется разделить файлы на более мелкие файлы на основе значений «RecId» (который является атрибутом LgRec). Итак, в этом случае он должен выдать 3 файла test1_AA.xml, test1_BB.xml, test1_CC.xml.
Я написал код, используя библиотеку DATABRICKS XML, и протестировал файл меньшего размера, он работает. Мой вопрос, есть ли лучший способ сделать это, когда я запускаю эту программу для файла 1 ТБ. Сохраняет ли Spark весь 1 ТБ DF и меньшие DF в памяти, что означает, что мне требуется около 2 ТБ + памяти на моем Spark Cluster?
Ценю ваши мысли
test1.xml (input file)
<?xml version="1.0" encoding="UTF-8"?>
<LOG>
<LgRec RecId="AA">
<UpdRec>
<field num="001">aaaa</field>
<field num="002">100</field>
<field num="003">100.10</field>
<field num="004">20181125</field>
<field num="005">101010.123</field>
</UpdRec>
</LgRec>
<LgRec RecId="BB">
<UpdRec>
<field num="001">xaaa</field>
<field num="002">1100</field>
<field num="003">1100.10</field>
<field num="004">20181125</field>
<field num="005">111111.123</field>
</UpdRec>
</LgRec>
<LgRec RecId="CC">
<UpdRec>
<field num="001">zaaaa</field>
<field num="002">3100</field>
<field num="003">3100.10</field>
<field num="004">20181225</field>
<field num="005">131313.123</field>
</UpdRec>
</LgRec>
</LOG>
test1_AA.xml (output file)
<LOG>
<LgRec RecId="AA">
<UpdRec>
<field num="001">aaaa</field>
<field num="002">100</field>
<field num="003">100.10</field>
<field num="004">20181125</field>
<field num="005">101010.123</field>
</UpdRec>
</LgRec>
test1_BB.xml (output file)
<LgRec RecId="BB">
<UpdRec>
<field num="001">xaaa</field>
<field num="002">1100</field>
<field num="003">1100.10</field>
<field num="004">20181125</field>
<field num="005">111111.123</field>
</UpdRec>
</LgRec>
test1_CC.xml (output file)
<LgRec RecId="CC">
<UpdRec>
<field num="001">zaaaa</field>
<field num="002">3100</field>
<field num="003">3100.10</field>
<field num="004">20181225</field>
<field num="005">131313.123</field>
</UpdRec>
</LgRec>
</LOG>
Вот код
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import com.databricks.spark.xml._
object splitXML {
def main(args: Array[String]) = {
val spark = SparkSession
.builder()
.appName("splitXML")
.master("local")
.getOrCreate()
import spark.implicits._
val inputFile = "test1.xml"
val outputFilePrefix = "test1_"
val outputFileSuffix = ".xml"
val inputXMLDF = spark.read
.option("roottag", "LOG")
.option("rowTag" , "LgRec")
.xml(inputFile)
//get distinct recId's
val recTypeDF = inputXMLDF.select("_RecId").distinct()
//for each recid filter the DF by recId and output the file
recTypeDF.collect().map(row => {
println("row = " + row(0) )
inputXMLDF.filter(inputXMLDF("_RecId")===row(0))
.write
.option("roottag", "LOG")
.option("rowTag" , "LgRec")
.xml(outputFilePrefix + row(0) + outputFileSuffix)
})
spark.close()
}
}