Преобразование столбцов данных Spark с массивом объектов JSON в несколько строк - PullRequest
0 голосов
/ 31 октября 2018

У меня есть потоковые данные JSON, структуру которых можно описать с помощью класса case ниже

case class Hello(A: String, B: Array[Map[String, String]])

Пример данных для того же, что и ниже

|  A    | B                                        |
|-------|------------------------------------------|
|  ABC  |  [{C:1, D:1}, {C:2, D:4}]                | 
|  XYZ  |  [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |

Я хочу преобразовать его в

|   A   |  C  |  D   |
|-------|-----|------|
|  ABC  |  1  |  1   |
|  ABC  |  2  |  4   |
|  XYZ  |  3  |  6   |
|  XYZ  |  9  |  11  |
|  XYZ  |  5  |  12  | 

Любая помощь будет оценена.

Ответы [ 2 ]

0 голосов
/ 02 ноября 2018

Поскольку вопрос прошел эволюцию, я оставляю первоначальный ответ там, и это касается последнего вопроса.

Важный момент, вход, упомянутый ниже, теперь обслуживается:

val df0 = Seq (
            ("ABC", List(Map("C" -> "1", "D" -> "2"), Map("C" -> "3", "D" -> "4"))),
            ("XYZ", List(Map("C" -> "44", "D" -> "55"), Map("C" -> "188", "D" -> "199"), Map("C" -> "88", "D" -> "99")))
              )
             .toDF("A", "B")

Можно также сделать так, но тогда для этого нужно изменить скрипт, хотя и тривиально:

val df0 = Seq (
           ("ABC", List(Map("C" -> "1",  "D" -> "2"))), 
           ("ABC", List(Map("C" -> "44", "D" -> "55"))),
           ("XYZ", List(Map("C" -> "11", "D" -> "22")))
              )
            .toDF("A", "B")

Затем из запрошенного формата:

val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")

val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum") 

val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")

val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))

val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C")) 

val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")

df6.show(false)

возвращается:

+---+---+---+
|C  |D  |A  |
+---+---+---+
|3  |4  |ABC|
|1  |2  |ABC|
|88 |99 |XYZ|
|188|199|XYZ|
|44 |55 |XYZ|
+---+---+---+

Вы можете изменить последовательность столбцов.

0 голосов
/ 01 ноября 2018

Не уверен, что лучший подход, но в двухэтапном процессе это может быть сделано. Оставляя в стороне ваш класс дела, следующее:

import org.apache.spark.sql.functions._
//case class ComponentPlacement(A: String, B: Array[Map[String, String]])
val df = Seq (
              ("ABC", List(Map("C" -> "1",  "D" -> "2"))),
              ("XYZ", List(Map("C" -> "11", "D" -> "22")))
             ).toDF("A", "B")

val df2 = df.select($"A", explode($"B")).toDF("A", "Bn")

val df3 = df2.select($"A", explode($"Bn")).toDF("A", "B", "C")

val df4 = df3.select($"A", $"B", $"C").groupBy("A").pivot("B").agg(first($"C"))

возвращается:

+---+---+---+
|  A|  C|  D|
+---+---+---+
|XYZ| 11| 22|
|ABC|  1|  2|
+---+---+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...