Объединить два столбца массива структур на основе ключа - PullRequest
1 голос
/ 28 апреля 2020

У меня есть схема данных схемы, как показано ниже:

входной кадр данных

 |-- A: string (nullable = true)
 |-- B_2020: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- x: double (nullable = true)
 |    |    |-- y: double (nullable = true)
 |    |    |-- z: double (nullable = true)
 |-- B_2019: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- x: double (nullable = true)
 |    |    |-- y: double (nullable = true)

Я хочу объединить столбцы 2020 и 2019 в один столбец массива структур также на основе сопоставления key значение.

Желаемая схема:

ожидаемый выходной кадр данных

 |-- A: string (nullable = true)
 |-- B: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- x_this_year: double (nullable = true)
 |    |    |-- y_this_year: double (nullable = true)
 |    |    |-- x_last_year: double (nullable = true)
 |    |    |-- y_last_year: double (nullable = true)
 |    |    |-- z_this_year: double (nullable = true)

Я хотел бы объединить соответствующий ключ в структурах. Также обратите внимание, что если ключ присутствует только в одном из данных 2019 или 2020 года, то для замены значений другого года в объединенном столбце необходимо использовать null.

1 Ответ

2 голосов
/ 28 апреля 2020
scala> val df = Seq(
     |   ("ABC", 
     |   Seq(
     |     ("a", 2, 4, 6),
     |     ("b", 3, 6, 9),
     |     ("c", 1, 2, 3)
     |   ),
     |   Seq(
     |     ("a", 4, 8),
     |     ("d", 3, 4)
     |   ))
     | ).toDF("A", "B_2020", "B_2019").select(
     |   $"A",
     |   $"B_2020" cast "array<struct<key:string,x:double,y:double,z:double>>",
     |   $"B_2019" cast "array<struct<key:string,x:double,y:double>>"
     | )
df: org.apache.spark.sql.DataFrame = [A: string, B_2020: array<struct<key:string,x:double,y:double,z:double>> ... 1 more field]

scala> df.printSchema
root
 |-- A: string (nullable = true)
 |-- B_2020: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- x: double (nullable = true)
 |    |    |-- y: double (nullable = true)
 |    |    |-- z: double (nullable = true)
 |-- B_2019: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- x: double (nullable = true)
 |    |    |-- y: double (nullable = true)



scala> df.show(false)
+---+------------------------------------------------------------+------------------------------+
|A  |B_2020                                                      |B_2019                        |
+---+------------------------------------------------------------+------------------------------+
|ABC|[[a, 2.0, 4.0, 6.0], [b, 3.0, 6.0, 9.0], [c, 1.0, 2.0, 3.0]]|[[a, 4.0, 8.0], [d, 3.0, 4.0]]|
+---+------------------------------------------------------------+------------------------------+



scala> val df2020 = df.select($"A", explode($"B_2020") as "this_year").select($"A", 
     | $"this_year.key" as "key", $"this_year.x" as "x_this_year", 
     | $"this_year.y" as "y_this_year", $"this_year.z" as "z_this_year")
df2020: org.apache.spark.sql.DataFrame = [A: string, key: string ... 3 more fields]



scala> val df2019 = df.select($"A", explode($"B_2019") as "last_year").select($"A", 
     | $"last_year.key" as "key", $"last_year.x" as "x_last_year", 
     | $"last_year.y" as "y_last_year")
df2019: org.apache.spark.sql.DataFrame = [A: string, key: string ... 2 more fields]



scala> df2020.show(false)
+---+---+-----------+-----------+-----------+
|A  |key|x_this_year|y_this_year|z_this_year|
+---+---+-----------+-----------+-----------+
|ABC|a  |2.0        |4.0        |6.0        |
|ABC|b  |3.0        |6.0        |9.0        |
|ABC|c  |1.0        |2.0        |3.0        |
+---+---+-----------+-----------+-----------+



scala> df2019.show(false)
+---+---+-----------+-----------+
|A  |key|x_last_year|y_last_year|
+---+---+-----------+-----------+
|ABC|a  |4.0        |8.0        |
|ABC|d  |3.0        |4.0        |
+---+---+-----------+-----------+



scala> val outputDF = df2020.join(df2019, Seq("A", "key"),  "outer").select(
     |   $"A" as "market_name", 
     |   struct($"key", $"x_this_year", $"y_this_year", $"x_last_year", 
     |     $"y_last_year", $"z_this_year") as "cancellation_policy_booking")
outputDF: org.apache.spark.sql.DataFrame = [market_name: string, cancellation_policy_booking: struct<key: string, x_this_year: double ... 4 more fields>]

scala> outputDF.printSchema
root
 |-- market_name: string (nullable = true)
 |-- cancellation_policy_booking: struct (nullable = false)
 |    |-- key: string (nullable = true)
 |    |-- x_this_year: double (nullable = true)
 |    |-- y_this_year: double (nullable = true)
 |    |-- x_last_year: double (nullable = true)
 |    |-- y_last_year: double (nullable = true)
 |    |-- z_this_year: double (nullable = true)


scala> outputDF.show(false)
+-----------+----------------------------+                                      
|market_name|cancellation_policy_booking |
+-----------+----------------------------+
|ABC        |[b, 3.0, 6.0,,, 9.0]        |
|ABC        |[a, 2.0, 4.0, 4.0, 8.0, 6.0]|
|ABC        |[d,,, 3.0, 4.0,]            |
|ABC        |[c, 1.0, 2.0,,, 3.0]        |
+-----------+----------------------------+



...