Развернуть вложенную структуру в Spark Dataframe - PullRequest
0 голосов
/ 16 января 2020

Я работаю над фреймом данных, который выглядит следующим образом:

root
 |-- _id: string (nullable = true)
 |-- positions: struct (nullable = true)
 |    |-- precise: struct (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lng: double (nullable = true)
 |    |-- unprecise: struct (nullable = true)
 |    |    |-- lat: double (nullable = true)
 |    |    |-- lng: double (nullable = true)

Объекты Struct в позициях Struct могут содержать «точный» или «неточный», или оба, или несколько других объектов Struct. Таким образом, строка с точным и неточным местоположением должна быть разбита на две строки.

Каков наилучший способ разбить такой фрейм данных? В идеале мне бы хотелось, чтобы:

root
 |-- _id: string (nullable = true)
 |-- positions_type: string (nullable = true) // "precise" or "unprecise"
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)

Я следовал Взрыв вложенной структуры в кадре данных Spark речь идет о взрыве столбца Struct, а не вложенной структуры.

Другая идея состояла бы в том, чтобы сгладить все и иметь столько столбцов, сколько имеется вложенных объектов struct, но это не совсем идеально, поскольку схема будет меняться при добавлении новых объектов struct.

Заранее спасибо.

Ответы [ 2 ]

1 голос
/ 16 января 2020

Просто используйте функции create_map и explode, например:

df = df.select("_id", explode(create_map(lit("precise"), col("positions.precise"),
                                         lit("unprecise"), col("positions.unprecise")
                                        )
                             ).alias("positions_type", "pos")
              ) \
      .select("_id", "positions_type", "pos.*") \

Схема результата:

root
 |-- _id: string (nullable = true)
 |-- positions_type: string (nullable = false)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
1 голос
/ 16 января 2020

Вы можете попробовать преобразовать его в карту или список, а затем использовать разнесение для создания необходимого вам фрейма данных. Я сделал это в scala. Я надеюсь, что это полезно.

/* Creating sample data */
case class Position(lat : Double, lng : Double)
case class Positions(precise : Position, unprecise : Position)

import spark.implicits._

val list = List(("0",Positions(Position(0.1, 0.2), Position(1.1, 1.2))),
  ("1",Positions(Position(0.1, 0.2), Position(1.1, 1.2))))
val df = list.toDF("_id", "positions")
df.printSchema()

val resDF = df.withColumn("positions_arr",
  array(
    struct(lit("precise").as("positions_type"), $"positions.precise.lat", $"positions.precise.lng"),
    struct(lit("unprecise").as("positions_type"), $"positions.unprecise.lat", $"positions.unprecise.lng")
  )
).withColumn("position", explode($"positions_arr"))
  .withColumn("positions_type", $"position.positions_type")
  .withColumn("lat", $"position.lat")
  .withColumn("lng", $"position.lng")
  .drop("positions","positions_arr","position")

resDF.show(false)
resDF.printSchema()

+---+--------------+---+---+ |_id|positions_type|lat|lng| +---+--------------+---+---+ |0 |precise |0.1|0.2| |0 |unprecise |1.1|1.2| |1 |precise |0.1|0.2| |1 |unprecise |1.1|1.2| +---+--------------+---+---+

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...