Я смог добиться этого с помощью RDD API:
val jsonRDD = df.rdd.map{row =>
def unnest(r: Row): Map[String, Any] = {
r.schema.fields.zipWithIndex.flatMap{case (f, i) =>
(f.name, f.dataType) match {
case ("props", _:StructType) =>
val propsObject = r.getAs[Row](f.name)
Map(f.name -> propsObject.schema.fields.flatMap{propsAttr =>
val subObject = propsObject.getAs[Row](propsAttr.name)
subObject.schema.fields.map{subField =>
s"${propsAttr.name}.${subField.name}" -> subObject.get(subObject.fieldIndex(subField.name))
}
}.toMap)
case (fname, _: StructType) => Map(fname -> unnest(r.getAs[Row](fname)))
case (fname, ArrayType(_: StructType,_)) => Map(fname -> r.getAs[Seq[Row]](fname).map(unnest))
case _ => Map(f.name -> r.get(i))
}
}
}.toMap
val asMap = unnest(row)
new ObjectMapper().registerModule(DefaultScalaModule).writeValueAsString(asMap)
}
val finalDF = spark.read.json(jsonRDD.toDS).cache
Решение должно принимать глубоко вложенные входные данные благодаря рекурсии.
С вашими данными вот что мы get:
finalDF.printSchema()
finalDF.show(false)
finalDF.select("props.*").show()
Выходы:
root
|-- id: string (nullable = true)
|-- props: struct (nullable = true)
| |-- type.id: string (nullable = true)
| |-- type.isMale: boolean (nullable = true)
| |-- type.mcc: long (nullable = true)
| |-- type.name: string (nullable = true)
|-- test_id: string (nullable = true)
+-------+----------------------+-------+
|id |props |test_id|
+-------+----------------------+-------+
|abchchd|[dd, true, 1234, Adam]|ndsbsb |
+-------+----------------------+-------+
+-------+-----------+--------+---------+
|type.id|type.isMale|type.mcc|type.name|
+-------+-----------+--------+---------+
| dd| true| 1234| Adam|
+-------+-----------+--------+---------+
Но мы также можем передавать больше вложенных / сложных структур, например:
val str2 = """{"newroot":[{"mystruct":{"id":"abchchd","test_id":"ndsbsb","props":{"type":{"isMale":true,"id":"dd","mcc":1234,"name":"Adam"}}}}]}"""
...
finalDF.printSchema()
finalDF.show(false)
Дает следующий вывод:
root
|-- newroot: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- mystruct: struct (nullable = true)
| | | |-- id: string (nullable = true)
| | | |-- props: struct (nullable = true)
| | | | |-- type.id: string (nullable = true)
| | | | |-- type.isMale: boolean (nullable = true)
| | | | |-- type.mcc: long (nullable = true)
| | | | |-- type.name: string (nullable = true)
| | | |-- test_id: string (nullable = true)
+---------------------------------------------+
|root |
+---------------------------------------------+
|[[[abchchd, [dd, true, 1234, Adam], ndsbsb]]]|
+---------------------------------------------+
РЕДАКТИРОВАТЬ : Как вы упомянули, если у вас есть записи с другой структурой, вам нужно обернуть вышеуказанное значение subObject
в опцию.
Вот фиксированное значение unnest
Функция:
def unnest(r: Row): Map[String, Any] = {
r.schema.fields.zipWithIndex.flatMap{case (f, i) =>
(f.name, f.dataType) match {
case ("props", _:StructType) =>
val propsObject = r.getAs[Row](f.name)
Map(f.name -> propsObject.schema.fields.flatMap{propsAttr =>
val subObjectOpt = Option(propsObject.getAs[Row](propsAttr.name))
subObjectOpt.toSeq.flatMap{subObject => subObject.schema.fields.map{subField =>
s"${propsAttr.name}.${subField.name}" -> subObject.get(subObject.fieldIndex(subField.name))
}}
}.toMap)
case (fname, _: StructType) => Map(fname -> unnest(r.getAs[Row](fname)))
case (fname, ArrayType(_: StructType,_)) => Map(fname -> r.getAs[Seq[Row]](fname).map(unnest))
case _ => Map(f.name -> r.get(i))
}
}
}.toMap
Новый printSchema
дает:
root
|-- id: string (nullable = true)
|-- props: struct (nullable = true)
| |-- type.id: string (nullable = true)
| |-- type.isMale: boolean (nullable = true)
| |-- type.mcc: long (nullable = true)
| |-- type.name: string (nullable = true)
| |-- type2.id: string (nullable = true)
| |-- type2.isMale: boolean (nullable = true)
| |-- type2.mcc: long (nullable = true)
| |-- type2.name: string (nullable = true)
|-- test_id: string (nullable = true)