Сохраните вашу схему внутри ArrayType
т.е. ArrayType(new StructType().add("packQty",FloatType).add("gtin", StringType))
, это даст вам нулевые значения, поскольку имена столбцов схемы не совпадают с данными json.
Измените схему ArrayType(new StructType().add("packQty",FloatType).add("gtin", StringType))
на ArrayType(new StructType().add("A",FloatType).add("B", StringType))
& После анализа данных переименуйте необходимые столбцы.
Пожалуйста, проверьте код ниже.
Если имена столбцов совпадают в обоих schema
& JSON
данные.
scala> val json = Seq("""[{"A":120.0,"B":"0005236"},{"A":10.0,"B":"0005200"},{"A":12.0,"B":"00042276"},{"A":20.0,"B":"00052000"}]""").toDF("testQTY")
json: org.apache.spark.sql.DataFrame = [testQTY: string]
scala> val schema = ArrayType(StructType(StructField("A",DoubleType,true):: StructField("B",StringType,true) :: Nil))
schema: org.apache.spark.sql.types.ArrayType = ArrayType(StructType(StructField(A,DoubleType,true), StructField(B,StringType,true)),true)
scala> json.withColumn("jsonData",from_json($"testQTY",schema)).select(explode($"jsonData").as("jsonData")).select($"jsonData.A".as("packQty"),$"jsonData.B".as("gtin")).show(false)
+-------+--------+
|packQty|gtin |
+-------+--------+
|120.0 |0005236 |
|10.0 |0005200 |
|12.0 |00042276|
|20.0 |00052000|
+-------+--------+
Если имена столбцов не совпадают в обоих schema
и JSON
данных.
scala> val json = Seq("""[{"A":120.0,"B":"0005236"},{"A":10.0,"B":"0005200"},{"A":12.0,"B":"00042276"},{"A":20.0,"B":"00052000"}]""").toDF("testQTY")
json: org.apache.spark.sql.DataFrame = [testQTY: string]
scala> val schema = ArrayType(StructType(StructField("packQty",DoubleType,true):: StructField("gtin",StringType,true) :: Nil)) // Column names are not matched with json & schema.
schema: org.apache.spark.sql.types.ArrayType = ArrayType(StructType(StructField(packQty,DoubleType,true), StructField(gtin,StringType,true)),true)
scala> json.withColumn("jsonData",from_json($"testQTY",schema)).select(explode($"jsonData").as("jsonData")).select($"jsonData.*").show(false)
+-------+----+
|packQty|gtin|
+-------+----+
|null |null|
|null |null|
|null |null|
|null |null|
+-------+----+
Альтернативный способ синтаксического анализа json string
в DataFrame с использованием DataSet
scala> val json = Seq("""[{"A":120.0,"B":"0005236"},{"A":10.0,"B":"0005200"},{"A":12.0,"B":"00042276"},{"A":20.0,"B":"00052000"}]""").toDS // Creating DataSet from json string.
json: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val schema = StructType(StructField("A",DoubleType,true):: StructField("B",StringType,true) :: Nil) // Creating schema.
schema: org.apache.spark.sql.types.StructType = StructType(StructField(A,DoubleType,true), StructField(B,StringType,true))
scala> spark.read.schema(schema).json(json).select($"A".as("packQty"),$"B".as("gtin")).show(false)
+-------+--------+
|packQty|gtin |
+-------+--------+
|120.0 |0005236 |
|10.0 |0005200 |
|12.0 |00042276|
|20.0 |00052000|
+-------+--------+