Объединить несколько отдельных записей в одну запись в Spark Dataframe - PullRequest
3 голосов
/ 12 марта 2019

Предположим, у меня есть раздел, который выглядит следующим образом

part1:

{"customerId":"1","name":"a"}
{"customerId":"2","name":"b"}

Предположим, я хотел бы изменить схему этого на что-то вроде

{"data":"customers":[{"customerId":"1","name":"a"},{"customerId":"2","name":"b"}]}

, что я пытался сделать, было

case class Customer(customerId:Option[String],name:Option[String])
case class Customers(customers:Option[Seq[Customer]])
case class Datum(data:Option[Customers])

Я попытался прочитать раздел как Json и преобразовать в Dataframe.

val inputJson = spark.read.format("json").load("part1")
inputJson.as[Datum]

Каким-то образом Dataframe, похоже, неявно выводит схему.

Ответы [ 2 ]

2 голосов
/ 13 марта 2019

Имея эту структуру, я полагаю, что вы скрываете / оборачиваете действительно полезную информацию ваших данных.Единственная полезная информация здесь: {"customerId":"1","name":"a"},{"customerId":"2","name":"b"} клиенты вместе с датумом просто скрывают данные, которые вам действительно нужны.Чтобы получить доступ к данным прямо сейчас, вы должны сначала слегка изменить свои данные на:

{"customers":[{"customerId":"1","name":"a"},{"customerId":"2","name":"b"}]}

, а затем получить доступ к этому JSON со следующим кодом:

case class Customer(customerId:String, name:String)
case class Data(customers: Array[Customer])

val df = spark.read.json(path).as[Data]

Если попытаться распечатать этоФрейм данных, который вы получаете:

+----------------+
|       customers|
+----------------+
|[[1, a], [2, b]]|
+----------------+

, что, конечно, является вашими данными, упакованными в массивы.Теперь перейдем к интересной части, чтобы получить к ней доступ, вы должны сделать следующее:

df.foreach{ data => data.customers.foreach(println _) }

Это выведет:

Customer(1,a)
Customer(2,b)

, то есть реальные данные, которые вам нужны, ноне легко получить доступ вообще.

РЕДАКТИРОВАТЬ:

Вместо того, чтобы использовать 2 класса, я бы использовал только один, класс Customer.Затем используйте встроенные фильтры Spark для выбора внутренних объектов JSON.Наконец, вы можете разбить каждый массив клиентов и сгенерировать из разбитого столбца набор данных строго типа класса Customer.

Вот окончательный код:

case class Customer(customerId:String, name:String)

val path = "C:\\temp\\json_data.json"
val df = spark.read.json(path)

df.select(explode($"data.customers"))
  .map{ r => Customer(r.getStruct(0).getString(0), r.getStruct(0).getString(1))}
  .show(false)

И вывод:

+----------+----+
|customerId|name|
+----------+----+
|1         |a   |
|2         |b   |
+----------+----+
0 голосов
/ 13 марта 2019

Я закончил тем, что манипулировал самим кадром данных

val inputJson = spark.read.format("json").load("part1")

val formatted = inputJson.withColumn("dummy",lit(1)).groupBy("dummy")
.agg(collect_list(struct(dataFrame.col("*"))).alias("customers"))

val finalFormatted=formatted.withColumn("data",struct(col("customers")))
.select("data")

Теперь, когда я делаю

finalFormatted.printSchema

Я получаю схему, которая мне нужна

  |-- data: struct (nullable = false)
  |    |-- customers: array (nullable = true)
  |    |    |-- element: struct (containsNull = true)
  |    |    |    |-- customerId: string (nullable = true)
  |    |    |    |-- name: string (nullable = true)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...