У меня есть фрейм данных с данными json, я пытаюсь польстить им, используя функцию более плоской и функции explode_outer, но поздно обнаружил, что наша производственная система имеет поддержку только для spark 2.1, поэтому я не могу использовать explode_outer функция, так что я создал функцию udf для функции разнесения здесь код для функции и разнесения внешней функции.
private val parseJsonUDF = udf((dfe: DataFrame, columnsToExplode: List[String])=>
{
val arrayFields = dfe.schema.fields
.map(field => field.name -> field.dataType)
.collect { case (name: String, type_val: ArrayType) => (name, type_val.asInstanceOf[ArrayType]) }
.toMap
columnsToExplode.foldLeft(dfe) { (dataFrame, arrayCol) =>
dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
.otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))
}
})
И код, который использует udf
def flattenDataframe(df: DataFrame): DataFrame= {
//getting all the fields from schema
val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
//length shows the number of fields inside dataframe
val length = fields.length
for (i <- 0 to fields.length - 1) {
val field = fields(i)
val fieldtype = field.dataType
val fieldName = field.name
fieldtype match {
case arrayType: ArrayType =>
val fieldName1 = fieldName
val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName1)
//val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName1) as $fieldName1")
val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"parseJsonUDF($df,$fieldName1) as $fieldName1")
val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
return flattenDataframe(explodedDf)
case structType: StructType =>
val childFieldnames = structType.fieldNames.map(childname => fieldName + "." + childname)
val newfieldNames = fieldNames.filter(_ != fieldName) ++ childFieldnames
val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_").replace("$", "_").replace("__", "_").replace(" ", "").replace("-", ""))))
val explodedf = df.select(renamedcols: _*)
return flattenDataframe(explodedf)
case _ =>
}
}
df
}
При вызове функции я получаю эту ошибку.
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] is not supported
Данные
val json_string = """{
"Total_Value": 3,
"Topic": "Example",
"values": [
{
"value1": "#example1",
"points": [
[
"123",
"156"
]
],
"properties": {
"date": "12-04-19",
"model": "Model example 1"
}
},
{"value2": "#example2",
"points": [
[
"124",
"157"
]
],
"properties": {
"date": "12-05-19",
"model": "Model example 2"
}
}
]
}"""