Как взорвать вложенный массив json во фрейме данных - PullRequest
0 голосов
/ 02 мая 2020

Это моя структура фрейма входных данных

root
|--Name (String)
|--Version (int)
|--Details (array)

Примерно так:

"Name":"json",
"Version":1,
"Details":[
"{
    \"Id\":\"123\",
    \"TaxDetails\":[\"TaxDetail1\":\"val1\", \"TaxDetail2\":\"val2\"]
}",
"{
    \"Id\":\"234\",
    \"TaxDetails\":[\"TaxDetail3\":\"val3\", \"TaxDetail4\":\"val4\"]
}"
]

Я хочу разобрать это на уровне TaxDetails примерно так:

"Name":"json",
"Version":1,
"TaxDetail":{\"TaxDetail1\":\"val1\"}   

"Name":"json",
"Version":1,
"TaxDetail":{\"TaxDetail2\":\"val2\"}   

"Name":"json",
"Version":1,
"TaxDetail":{\"TaxDetail3\":\"val3\"}   

"Name":"json",
"Version":1,
"TaxDetail":{\"TaxDetail4\":\"va4\"}

Я детализировал детали с помощью функции разнесения, как это

val explodedDetailDf = inputDf.withColumn("Detail", explode($"Details"))

Теперь тип данных столбца «Подробности» - строка, и когда я пытаюсь это сделать:

val explodedTaxDetail = explodedDetailDf.withColumn("TaxDetail", explode($"Detail.TaxDetails"))

Вышеприведенная операция завершается с ошибкой «AnalysisException из-за несоответствия типов данных: вход для функции разнесения должен быть массивом или типом карты, а не строкой»

Как я могу взорвать вложенный массив json на основе его имени

Ответы [ 2 ]

3 голосов
/ 03 мая 2020

explode будет принимать значения типа map или array. но не строка

Из вашего примера json Detail.TaxDetails имеет тип string, а не array.

Для извлечения Detail.TaxDetails значений строкового типа вы должны использовать

def from_json(e: org.apache.spark.sql.Column,schema: org.apache.spark.sql.types.StructType): org.apache.spark.sql.Column

Note

Ваш json поврежден, я изменили ваш json, как показано ниже.

scala> val json = """{
     |   "Name": "json",
     |   "Version": 1,
     |   "Details": [
     |     "{\"Id\":\"123\",\"TaxDetails\":[{\"TaxDetail1\":\"val1\", \"TaxDetail2\":\"val2\"}]}",
     |     "{\"Id\":\"234\",\"TaxDetails\":[{\"TaxDetail3\":\"val3\", \"TaxDetail4\":\"val4\"}]}"
     |   ]
     | }"""

json: String =
{
  "Name": "json",
  "Version": 1,
  "Details": [
    "{\"Id\":\"123\",\"TaxDetails\":[{\"TaxDetail1\":\"val1\", \"TaxDetail2\":\"val2\"}]}",
    "{\"Id\":\"234\",\"TaxDetails\":[{\"TaxDetail3\":\"val3\", \"TaxDetail4\":\"val4\"}]}"
  ]
}

Пожалуйста, проверьте следующий код, как извлечь значение для Detail.TaxDetails


scala> val df = spark.read.json(Seq(json).toDS)
df: org.apache.spark.sql.DataFrame = [Details: array<string>, Name: string ... 1 more field]

scala> df.printSchema
root
 |-- Details: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Name: string (nullable = true)
 |-- Version: long (nullable = true)

scala> df.withColumn("details",explode($"details").as("details")).show(false) // inside details array has string values.
+----------------------------------------------------------------------+----+-------+
|details                                                               |Name|Version|
+----------------------------------------------------------------------+----+-------+
|{"Id":"123","TaxDetails":[{"TaxDetail1":"val1", "TaxDetail2":"val2"}]}|json|1      |
|{"Id":"234","TaxDetails":[{"TaxDetail3":"val3", "TaxDetail4":"val4"}]}|json|1      |
+----------------------------------------------------------------------+----+-------+

scala> val json = spark.read.json(Seq("""[{"Id": "123","TaxDetails": [{"TaxDetail1": "val1","TaxDetail2": "val2"}]},{"Id": "234","TaxDetails": [{"TaxDetail3": "val3","TaxDetail4": "val4"}]}]""").toDS).schema.json
json: String = {"type":"struct","fields":[{"name":"Id","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetails","type":{"type":"array","elementType":{"type":"struct","fields":[{"name":"TaxDetail1","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetail2","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetail3","type":"string","nullable":true,"metadata":{}},{"name":"TaxDetail4","type":"string","nullable":true,"metadata":{}}]},"containsNull":true},"nullable":true,"metadata":{}}]}

scala> val schema = DataType.fromJson(json).asInstanceOf[StructType] // Creating schema for inner string
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))

scala> spark.time(df.withColumn("details",explode($"details")).withColumn("details",from_json($"details",schema)).withColumn("id",$"details.id").withColumn("taxdetails",explode($"details.taxdetails")).select($"name",$"version",$"id",$"taxdetails.*").show(false))
+----+-------+---+----------+----------+----------+----------+
|name|version|id |TaxDetail1|TaxDetail2|TaxDetail3|TaxDetail4|
+----+-------+---+----------+----------+----------+----------+
|json|1      |123|val1      |val2      |null      |null      |
|json|1      |234|null      |null      |val3      |val4      |
+----+-------+---+----------+----------+----------+----------+


scala>

Updated

выше Я взял json вручную и создал схему. Пожалуйста, проверьте код ниже, чтобы получить схему из доступных данных.

scala> spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).printSchema
root
 |-- Id: string (nullable = true)
 |-- TaxDetails: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- TaxDetail1: string (nullable = true)
 |    |    |-- TaxDetail2: string (nullable = true)
 |    |    |-- TaxDetail3: string (nullable = true)
 |    |    |-- TaxDetail4: string (nullable = true)


scala> spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).schema
res12: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))

scala> val schema = spark.read.json(df.withColumn("details",explode($"details").as("details")).select("details").map(_.getAs[String](0))).schema
schema: org.apache.spark.sql.types.StructType = StructType(StructField(Id,StringType,true), StructField(TaxDetails,ArrayType(StructType(StructField(TaxDetail1,StringType,true), StructField(TaxDetail2,StringType,true), StructField(TaxDetail3,StringType,true), StructField(TaxDetail4,StringType,true)),true),true))

scala> spark.time(df.withColumn("details",explode($"details")).withColumn("details",from_json($"details",schema)).withColumn("id",$"details.id").withColumn("taxdetails",explode($"details.taxdetails")).select($"name",$"version",$"id",$"taxdetails.*").show(false))
+----+-------+---+----------+----------+----------+----------+
|name|version|id |TaxDetail1|TaxDetail2|TaxDetail3|TaxDetail4|
+----+-------+---+----------+----------+----------+----------+
|json|1      |123|val1      |val2      |null      |null      |
|json|1      |234|null      |null      |val3      |val4      |
+----+-------+---+----------+----------+----------+----------+

Time taken: 212 ms

scala>

1 голос
/ 03 мая 2020

Поскольку ранее предоставленный вами json был поврежден, я отформатировал json таким образом, чтобы вы могли работать с explode 2 раза и сгладить кадр данных.

Реализовано, как показано ниже ...

 package examples

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object JsonTest extends App {
  Logger.getLogger("org").setLevel(Level.OFF)

  private[this] implicit val spark = SparkSession.builder().master("local[*]").getOrCreate()

  import spark.implicits._

  val jsonString =
    """
      |{
      |  "Name": "json",
      |  "Version": "1",
      |  "Details": [
      |    {
      |      "Id": "123",
      |      "TaxDetails": [
      |        {
      |          "TaxDetail1": "val1",
      |          "TaxDetail2": "val2"
      |        }
      |      ]
      |    },
      |    {
      |    "Id":"234",
      |    "TaxDetails":[
      |    {
      |    "TaxDetail3":"val3"
      |    , "TaxDetail4":"val4"
      |    }
      |    ]
      |}
      |  ]
      |}
    """.stripMargin
  val df3 = spark.read.json(Seq(jsonString).toDS)
  df3.printSchema()
  df3.show(false)
  val explodedDetailDf = df3.withColumn("Detail", explode($"Details"))
  // explodedDetailDf.show(false)
  val explodedTaxDetail = explodedDetailDf.withColumn("TaxDetail", explode($"Detail.TaxDetails"))
  explodedTaxDetail.show(false)

  val finaldf = explodedTaxDetail.select($"Name", $"Version"
    , to_json(struct
    (col("TaxDetail.TaxDetail1").as("TaxDetail1"))
    ).as("TaxDetails"))
    .union(
      explodedTaxDetail.select($"Name", $"Version"
        , to_json(struct
        (col("TaxDetail.TaxDetail2").as("TaxDetail2"))
        ).as("TaxDetails"))
    )
    .union(
      explodedTaxDetail.select($"Name", $"Version"
        , to_json(struct
        (col("TaxDetail.TaxDetail3").as("TaxDetail3"))
        ).as("TaxDetails"))
    )
    .union(
      explodedTaxDetail.select($"Name", $"Version"
        , to_json(struct
        (col("TaxDetail.TaxDetail4").as("TaxDetail4"))
        ).as("TaxDetails"))
    ).filter(!($"TaxDetails" === "{}"))

  finaldf.show(false)
  finaldf.toJSON.show(false)
}

Результат:

root
 |-- Details: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Id: string (nullable = true)
 |    |    |-- TaxDetails: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- TaxDetail1: string (nullable = true)
 |    |    |    |    |-- TaxDetail2: string (nullable = true)
 |    |    |    |    |-- TaxDetail3: string (nullable = true)
 |    |    |    |    |-- TaxDetail4: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Version: string (nullable = true)

+---------------------------------------------------+----+-------+
|Details                                            |Name|Version|
+---------------------------------------------------+----+-------+
|[[123, [[val1, val2,,]]], [234, [[,, val3, val4]]]]|json|1      |
+---------------------------------------------------+----+-------+

+---------------------------------------------------+----+-------+------------------------+---------------+
|Details                                            |Name|Version|Detail                  |TaxDetail      |
+---------------------------------------------------+----+-------+------------------------+---------------+
|[[123, [[val1, val2,,]]], [234, [[,, val3, val4]]]]|json|1      |[123, [[val1, val2,,]]] |[val1, val2,,] |
|[[123, [[val1, val2,,]]], [234, [[,, val3, val4]]]]|json|1      |[234, [[,, val3, val4]]]|[,, val3, val4]|
+---------------------------------------------------+----+-------+------------------------+---------------+

+----+-------+---------------------+
|Name|Version|TaxDetails           |
+----+-------+---------------------+
|json|1      |{"TaxDetail1":"val1"}|
|json|1      |{"TaxDetail2":"val2"}|
|json|1      |{"TaxDetail3":"val3"}|
|json|1      |{"TaxDetail4":"val4"}|
+----+-------+---------------------+

Окончательный результат, как вы ожидали:

+----------------------------------------------------------------------+
|value                                                                 |
+----------------------------------------------------------------------+
|{"Name":"json","Version":"1","TaxDetails":"{\"TaxDetail1\":\"val1\"}"}|
|{"Name":"json","Version":"1","TaxDetails":"{\"TaxDetail2\":\"val2\"}"}|
|{"Name":"json","Version":"1","TaxDetails":"{\"TaxDetail3\":\"val3\"}"}|
|{"Name":"json","Version":"1","TaxDetails":"{\"TaxDetail4\":\"val4\"}"}|
+----------------------------------------------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...