Массив структурного разбора в фрейме данных Spark - PullRequest
0 голосов
/ 04 августа 2020

У меня есть Dataframe с одним столбцом типа структуры. Пример схемы фрейма данных:

root
 |-- Data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: string (nullable = true)

Поле name содержит имя столбца, а поля value содержат значение столбца. Количество элементов в столбце Data не определено, поэтому может варьироваться. Мне нужно проанализировать эти данные и избавиться от вложенной структуры. (Массив Explode не будет работать в этом случае, потому что данные в одной строке принадлежат одному элементу). Реальная схема намного больше и имеет несколько полей массива, таких как «Данные», поэтому моя цель - создать общее решение, которое я буду применять к аналогичным массивам структур. Пример:

Пример данных:

val data = Seq(
    """{"Data": [{ "name": "FName", "value": "Alex" }, { "name": "LName",   "value": "Strong"  }]}""",
    """{"Data": [{ "name": "FName", "value": "Robert " }, { "name": "MName",   "value": "Nesta "  }]} { "name": "LName",   "value": "Marley"  }]}"""
)
val df = spark.read.json(spark.sparkContext.parallelize(data))

Ожидаемый результат:

+-------+------+
|  FName| LName|
+-------+------+
|   Alex|Strong|
|Robert |Marley|
+-------+------+
 

В качестве решения я создал UDF, который я выполняю для всего столбца Data. В качестве входных параметров я передаю имя столбца и имя поля, которое хочу извлечь.

 val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) => {
    var value = ""
    arr.foreach(el =>
        if(el.getAs[String]("name") == columnName){
            value = el.getAs[String]("value")
        }
    )
    value
}}

Проблема в том, что я использую переменную value для хранения промежуточного результата, и я не хочу создать новую переменную a для каждой строки, в которой будет выполняться мой UDF.

Способ, которым я выполняю свой UDF (этот запрос генерирует ожидаемый результат):

df.select(find_scheme_name_in_array(col("Data"), lit("FName")).as("FName"),find_scheme_name_in_array(col("Data"), lit("LName")).as("LName")).show()

Я бы Будем рады услышать любые комментарии о том, как я могу улучшить logi c UDF, и о некоторых других способах решения проблемы синтаксического анализа.

Ответы [ 2 ]

1 голос
/ 04 августа 2020

Возможно, это поможет -

  val data = Seq(
      """{"Data": [{ "name": "FName", "value": "Alex" }, { "name": "LName",   "value": "Strong"  }]}""",
      """{"Data": [{ "name": "FName", "value": "Robert " }, { "name": "MName",   "value": "Nesta "  }, {
        |"name": "LName",   "value": "Marley"  }]}""".stripMargin
    )
    val df = spark.read
      .json(data.toDS())
    df.show(false)
    df.printSchema()

    /**
      * +----------------------------------------------------+
      * |Data                                                |
      * +----------------------------------------------------+
      * |[[FName, Alex], [LName, Strong]]                    |
      * |[[FName, Robert ], [MName, Nesta ], [LName, Marley]]|
      * +----------------------------------------------------+
      *
      * root
      * |-- Data: array (nullable = true)
      * |    |-- element: struct (containsNull = true)
      * |    |    |-- name: string (nullable = true)
      * |    |    |-- value: string (nullable = true)
      */

    df.selectExpr("inline_outer(Data)")
      .groupBy()
      .pivot("name")
      .agg(collect_list("value"))
      .withColumn("x", arrays_zip($"FName", $"LName"))
      .selectExpr("inline_outer(x)")
      .show(false)

    /**
      * +-------+------+
      * |FName  |LName |
      * +-------+------+
      * |Alex   |Strong|
      * |Robert |Marley|
      * +-------+------+
      */
0 голосов
/ 11 августа 2020

Я решил проблему, заменив foreach l oop на find метод:

val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) =>
    arr.find(_.getAs[String]("name") == columnName) match {
        case Some(i) => i.getAs[String]("value")
        case None => null
    }
}
...