explode
будет принимать значения типа map или array. но не строка
Из вашего примера json Detail.TaxDetails
имеет тип string, а не array.
Для извлечения Detail.TaxDetails
значений строкового типа вы должны использовать
def from_json(e: org.apache.spark.sql.Column,schema: org.apache.spark.sql.types.StructType): org.apache.spark.sql.Column
Note
Ваш json поврежден, я изменили ваш json, как показано ниже.
scala> val json = """{
| "Name": "json",
| "Version": 1,
| "Details": [
| "{\"Id\":\"123\",\"TaxDetails\":[{\"TaxDetail1\":\"val1\", \"TaxDetail2\":\"val2\"}]}",
| "{\"Id\":\"234\",\"TaxDetails\":[{\"TaxDetail3\":\"val3\", \"TaxDetail4\":\"val4\"}]}"
| ]
| }"""
json: String =
{
"Name": "json",
"Version": 1,
"Details": [
"{\"Id\":\"123\",\"TaxDetails\":[{\"TaxDetail1\":\"val1\", \"TaxDetail2\":\"val2\"}]}",
"{\"Id\":\"234\",\"TaxDetails\":[{\"TaxDetail3\":\"val3\", \"TaxDetail4\":\"val4\"}]}"
]
}
Пожалуйста, проверьте следующий код, как извлечь значение для Detail.TaxDetails
scala> val df = spark.read.json(Seq(json).toDS)
df: org.apache.spark.sql.DataFrame = [Details: array<string>, Name: string ... 1 more field]
scala> df.printSchema
root
|-- Details: array (nullable = true)
| |-- element: string (containsNull = true)
|-- Name: string (nullable = true)
|-- Version: long (nullable = true)
scala> df.withColumn("details",explode($"details").as("details")).show(false) // inside details array has string values.
+----------------------------------------------------------------------+----+-------+
|details |Name|Version|
+----------------------------------------------------------------------+----+-------+
|{"Id":"123","TaxDetails":[{"TaxDetail1":"val1", "TaxDetail2":"val2"}]}|json|1 |
|{"Id":"234","TaxDetails":[{"TaxDetail3":"val3", "TaxDetail4":"val4"}]}|json|1 |
+----------------------------------------------------------------------+----+-------+
scala> val json = spark.read.json(Seq("""[{"Id": "123","TaxDetails": [{"TaxDetail1": "val1","TaxDetail2": "val2"}]},{"Id": "234","TaxDetails": [{"TaxDetail3": "val3","TaxDetail4": "val4"}]}]""").toDS).schema.json
json: String = {"type":"struct","fields":[{"name":"Id","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetails","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"TaxDetail1","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetail2","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetail3","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetail4","type":"string","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}}]}
scala> val schema = DataType.fromJson(json).asInstanceOf[StructType] // Creating schema for inner string
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))
scala> spark.time(df.withColumn("details",explode($"details")).withColumn("details",from_json($"details",schema)).withColumn("id",$"details.id").withColumn("taxdetails",explode($"details.taxdetails")).select($"name",$"version",$"id",$"taxdetails.*").show(false))
+----+-------+---+----------+----------+----------+----------+
|name|version|id |TaxDetail1|TaxDetail2|TaxDetail3|TaxDetail4|
+----+-------+---+----------+----------+----------+----------+
|json|1 |123|val1 |val2 |null |null |
|json|1 |234|null |null |val3 |val4 |
+----+-------+---+----------+----------+----------+----------+
scala>
Updated
выше Я взял json вручную и создал схему. Пожалуйста, проверьте код ниже, чтобы получить схему из доступных данных.
scala> spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).printSchema
root
|-- Id: string (nullable = true)
|-- TaxDetails: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- TaxDetail1: string (nullable = true)
| | |-- TaxDetail2: string (nullable = true)
| | |-- TaxDetail3: string (nullable = true)
| | |-- TaxDetail4: string (nullable = true)
scala> spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).schema
res12: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))
scala> val schema = spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).schema
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))
scala> spark.time(df.withColumn("details",explode($"details")).withColumn("details",from_json($"details",schema)).withColumn("id",$"details.id").withColumn("taxdetails",explode($"details.taxdetails")).select($"name",$"version",$"id",$"taxdetails.*").show(false))
+----+-------+---+----------+----------+----------+----------+
|name|version|id |TaxDetail1|TaxDetail2|TaxDetail3|TaxDetail4|
+----+-------+---+----------+----------+----------+----------+
|json|1 |123|val1 |val2 |null |null |
|json|1 |234|null |null |val3 |val4 |
+----+-------+---+----------+----------+----------+----------+
Time taken: 212 ms
scala>