Ваш документ содержит многозначный массив, поэтому вы не можете сгладить его полностью за один проход, поскольку вы не можете присвоить обоим элементам массива одно и то же имя столбца.Кроме того, обычно плохой идеей является использование точки в имени столбца, поскольку это может легко запутать анализатор Spark, и его необходимо будет всегда экранировать.
Обычный способ сгладить такой набор данных - создатьновые строки для каждого элемента массива.Вы можете использовать функцию explode
, чтобы сделать это, но вам нужно будет рекурсивно вызвать вашу операцию сглаживания, потому что explode
не может быть вложенной.
Следующий код работает, как и ожидалось, используя '_' вместо''как разделитель имен столбцов:
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.{Dataset, Row}
object RuntimeError {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("FlattenSchema").getOrCreate()
val rowTag = "idocData"
val dataFrameReader = spark.read.option("rowTag", rowTag)
val xmlUri = "bad_011_1.xml"
val df = dataFrameReader.format("xml").load(xmlUri)
val df2 = flatten(df)
}
def flatten(df: Dataset[Row], prefixSeparator: String = "_") : Dataset[Row] = {
import org.apache.spark.sql.functions.{col,explode}
def mustFlatten(sc: StructType): Boolean =
sc.fields.exists(f => f.dataType.isInstanceOf[ArrayType] || f.dataType.isInstanceOf[StructType])
def flattenAndExplodeOne(sc: StructType, parent: Column = null, prefix: String = null, cols: Array[(DataType,Column)] = Array[(DataType,Column)]()): Array[(DataType,Column)] = {
val res = sc.fields.foldLeft(cols)( (columns, f) => {
val my_col = if (parent == null) col(f.name) else parent.getItem(f.name)
val flat_name = if (prefix == null) f.name else s"${prefix}${prefixSeparator}${f.name}"
f.dataType match {
case st: StructType => flattenAndExplodeOne(st, my_col, flat_name, columns)
case dt: ArrayType => {
if (columns.exists(_._1.isInstanceOf[ArrayType])) {
columns :+ ((dt, my_col.as(flat_name)))
} else {
columns :+ ((dt, explode(my_col).as(flat_name)))
}
}
case dt => columns :+ ((dt, my_col.as(flat_name)))
}
})
res
}
var flatDf = df
while (mustFlatten(flatDf.schema)) {
val newColumns = flattenAndExplodeOne(flatDf.schema, null, null).map(_._2)
flatDf = flatDf.select(newColumns:_*)
}
flatDf
}
}
Полученный df2 имеет следующую схему и данные:
df2.printSchema
root
|-- E2EDP01008GRP_E2EDPT1001GRP_E2EDPT2001_DATAHEADERCOLUMN_DOCNUM: long (nullable = true)
|-- E2EDP01008GRP__xmlns: string (nullable = true)
df2.show(true)
+--------------------------------------------------------------+--------------------+
|E2EDP01008GRP_E2EDPT1001GRP_E2EDPT2001_DATAHEADERCOLUMN_DOCNUM|E2EDP01008GRP__xmlns|
+--------------------------------------------------------------+--------------------+
| 141036013|http://Microsoft....|
| 141036013|http://Microsoft....|
+--------------------------------------------------------------+--------------------+