Строка преобразования, содержащая список объектов в фрейм данных - PullRequest
0 голосов
/ 28 мая 2020

у меня есть строка как

str=[{"A":120.0,"B":"0005236"},{"A":10.0,"B":"0005200"},
{"A":12.0,"B":"00042276"},{"A":20.0,"B":"00052000"}]

я пытаюсь преобразовать ее во фрейм данных ...

+-------+--------------+
|packQty|gtin          |
+-------+--------------+
|120.0  |0005236       |
|10.0   |0005200       | 
|12.0   |00042276      |
|20.0   |00052000      |
+-------+--------------+

я создал схему как

 val schema=new StructType()
  .add("packQty",FloatType)
  .add("gtin", StringType)

val df =Seq(str).toDF("testQTY")
val df2=df.withColumn("jsonData",from_json($"testQTY",schema)).select("jsonData.*")

это возвращает мне фрейм данных только с одной записью ...

+-------+--------------+
|packQty|gtin          |
+-------+--------------+
|120.0  |0005236|
+-------+--------------+

как изменить схему, чтобы я мог получить все записи

, если это был массив, тогда я мог бы использовать функцию explode () для получения значений, но я получаю эту ошибку.

cannot resolve 'explode(`gtins`)' due to data type mismatch: input to 
function explode should be array or map type, not string;;

так заполняется столбец

+----------------------------------------------------------------------+
| gtins                                                                                               
|
+----------------------------------------------------------------------
|[{"packQty":120.0,"gtin":"000520"},{"packQty":10.0,”gtin":"0005200"}]
+----------------------------------------------------------------------+

Ответы [ 2 ]

0 голосов
/ 28 мая 2020

Указание еще одного варианта -

 val data = """[{"A":120.0,"B":"0005236"},{"A":10.0,"B":"0005200"},{"A":12.0,"B":"00042276"},{"A":20.0,"B":"00052000"}]"""

 val df2 = Seq(data).toDF("gtins")
    df2.show(false)
    df2.printSchema()

    /**
      * +--------------------------------------------------------------------------------------------------------+
      * |gtins                                                                                                   |
      * +--------------------------------------------------------------------------------------------------------+
      * |[{"A":120.0,"B":"0005236"},{"A":10.0,"B":"0005200"},{"A":12.0,"B":"00042276"},{"A":20.0,"B":"00052000"}]|
      * +--------------------------------------------------------------------------------------------------------+
      *
      * root
      * |-- gtins: string (nullable = true)
      */

    df2.selectExpr("inline_outer(from_json(gtins, 'array<struct<A:double, B:string>>')) as (packQty, gtin)")
      .show(false)

    /**
      * +-------+--------+
      * |packQty|gtin    |
      * +-------+--------+
      * |120.0  |0005236 |
      * |10.0   |0005200 |
      * |12.0   |00042276|
      * |20.0   |00052000|
      * +-------+--------+
      */
0 голосов
/ 28 мая 2020

Сохраните вашу схему внутри ArrayType т.е. ArrayType(new StructType().add("packQty",FloatType).add("gtin", StringType)), это даст вам нулевые значения, поскольку имена столбцов схемы не совпадают с данными json.

Измените схему ArrayType(new StructType().add("packQty",FloatType).add("gtin", StringType)) на ArrayType(new StructType().add("A",FloatType).add("B", StringType)) & После анализа данных переименуйте необходимые столбцы.

Пожалуйста, проверьте код ниже.

Если имена столбцов совпадают в обоих schema & JSON данные.

scala> val json = Seq("""[{"A":120.0,"B":"0005236"},{"A":10.0,"B":"0005200"},{"A":12.0,"B":"00042276"},{"A":20.0,"B":"00052000"}]""").toDF("testQTY")
json: org.apache.spark.sql.DataFrame = [testQTY: string]

scala> val schema = ArrayType(StructType(StructField("A",DoubleType,true):: StructField("B",StringType,true) :: Nil))
schema: org.apache.spark.sql.types.ArrayType = ArrayType(StructType(StructField(A,DoubleType,true), StructField(B,StringType,true)),true)

scala> json.withColumn("jsonData",from_json($"testQTY",schema)).select(explode($"jsonData").as("jsonData")).select($"jsonData.A".as("packQty"),$"jsonData.B".as("gtin")).show(false)
+-------+--------+
|packQty|gtin    |
+-------+--------+
|120.0  |0005236 |
|10.0   |0005200 |
|12.0   |00042276|
|20.0   |00052000|
+-------+--------+

Если имена столбцов не совпадают в обоих schema и JSON данных.

scala> val json = Seq("""[{"A":120.0,"B":"0005236"},{"A":10.0,"B":"0005200"},{"A":12.0,"B":"00042276"},{"A":20.0,"B":"00052000"}]""").toDF("testQTY")
json: org.apache.spark.sql.DataFrame = [testQTY: string]

scala> val schema = ArrayType(StructType(StructField("packQty",DoubleType,true):: StructField("gtin",StringType,true) :: Nil)) // Column names are not matched with json & schema.
schema: org.apache.spark.sql.types.ArrayType = ArrayType(StructType(StructField(packQty,DoubleType,true), StructField(gtin,StringType,true)),true)

scala> json.withColumn("jsonData",from_json($"testQTY",schema)).select(explode($"jsonData").as("jsonData")).select($"jsonData.*").show(false)
+-------+----+
|packQty|gtin|
+-------+----+
|null   |null|
|null   |null|
|null   |null|
|null   |null|
+-------+----+

Альтернативный способ синтаксического анализа json string в DataFrame с использованием DataSet

scala> val json = Seq("""[{"A":120.0,"B":"0005236"},{"A":10.0,"B":"0005200"},{"A":12.0,"B":"00042276"},{"A":20.0,"B":"00052000"}]""").toDS // Creating DataSet from json string.
json: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val schema = StructType(StructField("A",DoubleType,true):: StructField("B",StringType,true) :: Nil) // Creating schema.
schema: org.apache.spark.sql.types.StructType = StructType(StructField(A,DoubleType,true), StructField(B,StringType,true))

scala> spark.read.schema(schema).json(json).select($"A".as("packQty"),$"B".as("gtin")).show(false)
+-------+--------+
|packQty|gtin    |
+-------+--------+
|120.0  |0005236 |
|10.0   |0005200 |
|12.0   |00042276|
|20.0   |00052000|
+-------+--------+

...