Создание вложенных данных после объединения в Spark Scala - PullRequest
0 голосов
/ 23 мая 2018

Моя цель состоит в том, чтобы подготовить в spark / Hadoop информационный фрейм, который я буду индексировать в эластичном поиске.

У меня 2 стола орков: client и person.Отношение один-ко-многим

1 клиент может иметь несколько человек.

Итак, я буду использовать Spark / Spark SQL, поэтому давайте поговорим о фрейме данных:

Схема фрейма клиента:

root 
|-- client_id: string (nullable = true) 
|-- c1: string (nullable = true) 
|-- c2: string (nullable = true) 
|-- c3: string (nullable = true) 

Схема фрейма персоны:

root 
|-- person_id: string (nullable = true) 
|-- p1: string (nullable = true) 
|-- p2: string (nullable = true) 
|-- p3: string (nullable = true) 
|-- client_id: string (nullable = true) 

Моя цель - создать Dataframe, который будет иметь следующую схему:

root 
|-- client_id: string (nullable = true) 
|-- c1: string (nullable = true) 
|-- c2: string (nullable = true) 
|-- c3: string (nullable = true) 
|-- persons: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- person_id: string (nullable = true) 
| | |-- p1: string (nullable = true) 
| | |-- p2: string (nullable = true) 
| | |-- p3: string (nullable = true)

Как мне этого добиться?

Заранее спасибо за помощь.

1 Ответ

0 голосов
/ 23 мая 2018

Вы можете group person фрейм данных client_id и создать list из всех других columns и join с client фреймом данных, как показано ниже

//client data 
val client = Seq(
  ("1", "a", "b", "c"),
  ("2", "a", "b", "c"),
  ("3", "a", "b", "c")
).toDF("client_id", "c1", "c2", "c2")

//person data 
val person = Seq(
  ("p1", "a", "b", "c", "1"),
  ("p2", "a", "b", "c", "1"),
  ("p1", "a", "b", "c", "2")
).toDF("person_id", "p1", "p2", "p3", "client_id")

//Group the person data by client_id and create a list of remaining columns 
val groupedPerson = person.groupBy("client_id")
  .agg(collect_list(struct("person_id", "p1", "p2", "p3")).as("persons"))


//Join the client and groupedPerson Data 
val resultDF = client.join(groupedPerson, Seq("client_id"), "left")

resultDF.show(false)

Схема:

root
 |-- client_id: string (nullable = true)
 |-- c1: string (nullable = true)
 |-- c2: string (nullable = true)
 |-- c2: string (nullable = true)
 |-- persons: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- person_id: string (nullable = true)
 |    |    |-- p1: string (nullable = true)
 |    |    |-- p2: string (nullable = true)
 |    |    |-- p3: string (nullable = true)

Вывод:

+---------+---+---+---+------------------------+
|client_id|c1 |c2 |c2 |persons                 |
+---------+---+---+---+------------------------+
|1        |a  |b  |c  |[[p1,a,b,c], [p2,a,b,c]]|
|2        |a  |b  |c  |[[p1,a,b,c]]            |
|3        |a  |b  |c  |null                    |
+---------+---+---+---+------------------------+

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...