Используйте Spark Scala для преобразования плоских данных во вложенный объект - PullRequest
0 голосов
/ 16 сентября 2018

Мне нужна помощь в преобразовании плоского набора данных во вложенный формат с помощью Apache Spark / Scala.

Можно ли автоматически создать вложенную структуру, полученную из пространств имен входного столбца

[уровень 1] . [уровень 2] ?В моем примере уровень вложенности определяется символом периода '.' в заголовках столбцов.

Я предполагаю, что этого можно достичь с помощью функции карты.Я открыт для альтернативных решений, особенно если есть более элегантный способ достижения того же результата.

package org.acme.au

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import scala.collection.Seq

object testNestedObject extends App {

  // Configure spark
  val spark = SparkSession.builder()
    .appName("Spark batch demo")
    .master("local[*]")
    .config("spark.driver.host", "localhost")
    .getOrCreate()

  // Start spark
  val sc = spark.sparkContext
  sc.setLogLevel("ERROR")
  val sqlContext = new SQLContext(sc)

  // Define schema for input data
  val flatSchema = new StructType()
    .add(StructField("id", StringType, false))
    .add(StructField("name", StringType, false))
    .add(StructField("custom_fields.fav_colour", StringType, true))
    .add(StructField("custom_fields.star_sign", StringType, true))

  // Create a row with dummy data
  val row1 = Row("123456", "John Citizen", "Blue", "Scorpio")
  val row2 = Row("990087", "Jane Simth", "Green", "Taurus")
  val flatData = Seq(row1, row2)

  // Convert into dataframe
  val dfIn = spark.createDataFrame(spark.sparkContext.parallelize(flatData), flatSchema)

  // Print to console
  dfIn.printSchema()
  dfIn.show()

  // Convert flat data into nested structure as either Parquet or JSON format
  val dfOut = dfIn.rdd
    .map(
      row => ( /* TODO: Need help with mapping flat data to nested structure derived from input column namespaces
           * 
           * For example:
           * 
           * <id>12345<id>
           * <name>John Citizen</name>
           * <custom_fields>
           *   <fav_colour>Blue</fav_colour>
           *   <star_sign>Scorpio</star_sign>
           * </custom_fields>
           * 
           */ ))

  // Stop spark
  sc.stop()

}

Ответы [ 3 ]

0 голосов
/ 16 сентября 2018

Вот обобщенное решение, которое сначала собирает карту имен столбцов, содержащих ., пересекает карту, чтобы добавить преобразованные столбцы struct в DataFrame, и, наконец, отбрасывает исходные столбцы с ..Несколько более обобщенный dfIn используется в качестве данных выборки.

import org.apache.spark.sql.functions._

val dfIn = Seq(
  (123456, "John Citizen", "Blue", "Scorpio", "a", 1),
  (990087, "Jane Simth", "Green", "Taurus", "b", 2)
).
toDF("id", "name", "custom_fields.fav_colour", "custom_fields.star_sign", "s.c1", "s.c2")

val structCols = dfIn.columns.filter(_.contains("."))
// structCols: Array[String] =
//   Array(custom_fields.fav_colour, custom_fields.star_sign, s.c1, s.c2)

val structColsMap = structCols.map(_.split("\\.")).
  groupBy(_(0)).mapValues(_.map(_(1)))
// structColsMap: scala.collection.immutable.Map[String,Array[String]] =
//   Map(s -> Array(c1, c2), custom_fields -> Array(fav_colour, star_sign))

val dfExpanded = structColsMap.foldLeft(dfIn){ (accDF, kv) =>
  val cols = kv._2.map(v => col("`" + kv._1 + "." + v + "`").as(v))
  accDF.withColumn(kv._1, struct(cols: _*))
}

val dfResult = structCols.foldLeft(dfExpanded)(_ drop _)

dfResult.show
// +------+------------+-----+--------------+
// |id    |name        |s    |custom_fields |
// +------+------------+-----+--------------+
// |123456|John Citizen|[a,1]|[Blue,Scorpio]|
// |990087|Jane Simth  |[b,2]|[Green,Taurus]|
// +------+------------+-----+--------------+

dfResult.printSchema
// root
//  |-- id: integer (nullable = false)
//  |-- name: string (nullable = true)
//  |-- s: struct (nullable = false)
//  |    |-- c1: string (nullable = true)
//  |    |-- c2: integer (nullable = false)
//  |-- custom_fields: struct (nullable = false)
//  |    |-- fav_colour: string (nullable = true)
//  |    |-- star_sign: string (nullable = true)

Обратите внимание, что это решение обрабатывает только один вложенный уровень.

Чтобы преобразовать каждую строку в формат JSON, рассмотрите возможность использования toJSON следующим образом:

dfResult.toJSON.show(false)
// +---------------------------------------------------------------------------------------------------------------------+
// |value                                                                                                                |
// +---------------------------------------------------------------------------------------------------------------------+
// |{"id":123456,"name":"John Citizen","s":{"c1":"a","c2":1},"custom_fields":{"fav_colour":"Blue","star_sign":"Scorpio"}}|
// |{"id":990087,"name":"Jane Simth","s":{"c1":"b","c2":2},"custom_fields":{"fav_colour":"Green","star_sign":"Taurus"}}  |
// +---------------------------------------------------------------------------------------------------------------------+
0 голосов
/ 17 сентября 2018

Это решение для пересмотренного требования, чтобы вывод JSON состоял из array of {K:valueK, V:valueV}, а не {valueK1: valueV1, valueK2: valueV2, ...}. Например:

// FROM:
"custom_fields":{"fav_colour":"Blue", "star_sign":"Scorpio"}

// TO:
"custom_fields":[{"key":"fav_colour", "value":"Blue"}, {"key":"star_sign", "value":"Scorpio"}]

Пример кода ниже:

import org.apache.spark.sql.functions._

val dfIn = Seq(
  (123456, "John Citizen", "Blue", "Scorpio"),
  (990087, "Jane Simth", "Green", "Taurus")
).toDF("id", "name", "custom_fields.fav_colour", "custom_fields.star_sign")

val structCols = dfIn.columns.filter(_.contains("."))
// structCols: Array[String] =
//   Array(custom_fields.fav_colour, custom_fields.star_sign)

val structColsMap = structCols.map(_.split("\\.")).
  groupBy(_(0)).mapValues(_.map(_(1)))
// structColsMap: scala.collection.immutable.Map[String,Array[String]] =
//   Map(custom_fields -> Array(fav_colour, star_sign))

val dfExpanded = structColsMap.foldLeft(dfIn){ (accDF, kv) =>
  val cols = kv._2.map( v =>
    struct(lit(v).as("key"), col("`" + kv._1 + "." + v + "`").as("value"))
  )
  accDF.withColumn(kv._1, array(cols: _*))
}

val dfResult = structCols.foldLeft(dfExpanded)(_ drop _)

dfResult.show(false)
// +------+------------+----------------------------------------+
// |id    |name        |custom_fields                           |
// +------+------------+----------------------------------------+
// |123456|John Citizen|[[fav_colour,Blue], [star_sign,Scorpio]]|
// |990087|Jane Simth  |[[fav_colour,Green], [star_sign,Taurus]]|
// +------+------------+----------------------------------------+

dfResult.printSchema
// root
//  |-- id: integer (nullable = false)
//  |-- name: string (nullable = true)
//  |-- custom_fields: array (nullable = false)
//  |    |-- element: struct (containsNull = false)
//  |    |    |-- key: string (nullable = false)
//  |    |    |-- value: string (nullable = true)

dfResult.toJSON.show(false)
// +-------------------------------------------------------------------------------------------------------------------------------+
// |value                                                                                                                          |
// +-------------------------------------------------------------------------------------------------------------------------------+
// |{"id":123456,"name":"John Citizen","custom_fields":[{"key":"fav_colour","value":"Blue"},{"key":"star_sign","value":"Scorpio"}]}|
// |{"id":990087,"name":"Jane Simth","custom_fields":[{"key":"fav_colour","value":"Green"},{"key":"star_sign","value":"Taurus"}]}  |
// +-------------------------------------------------------------------------------------------------------------------------------+

Обратите внимание, что мы не можем сделать value type Any для размещения разных типов, поскольку Spark DataFrame API не поддерживает тип Any. Как следствие, value в массиве должен быть заданного типа (например, String). Как и в предыдущем решении, он также обрабатывает только один вложенный уровень.

0 голосов
/ 16 сентября 2018

Эту проблему можно решить с помощью выделенных case class и UDF, которые преобразуют входные данные в экземпляры класса дел. Например:

Определить класс дела

case class NestedFields(fav_colour: String, star_sign: String)

Определите UDF, который принимает исходные значения столбца в качестве входных данных и возвращает экземпляр NestedFields:

private val asNestedFields = udf((fc: String, ss: String) => NestedFields(fc, ss))

Преобразование исходного DataFrame и удаление плоских столбцов:

val res = dfIn.withColumn("custom_fields", asNestedFields($"`custom_fields.fav_colour`", $"`custom_fields.star_sign`"))
              .drop($"`custom_fields.fav_colour`")
              .drop($"`custom_fields.star_sign`")

производит

root
|-- id: string (nullable = false)
|-- name: string (nullable = false)
|-- custom_fields: struct (nullable = true)
|    |-- fav_colour: string (nullable = true)
|    |-- star_sign: string (nullable = true)

+------+------------+---------------+
|    id|        name|  custom_fields|
+------+------------+---------------+
|123456|John Citizen|[Blue, Scorpio]|
|990087|  Jane Simth|[Green, Taurus]|
+------+------------+---------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...