scala> val df = Seq(
| ("ABC",
| Seq(
| ("a", 2, 4, 6),
| ("b", 3, 6, 9),
| ("c", 1, 2, 3)
| ),
| Seq(
| ("a", 4, 8),
| ("d", 3, 4)
| ))
| ).toDF("A", "B_2020", "B_2019").select(
| $"A",
| $"B_2020" cast "array<struct<key:string,x:double,y:double,z:double>>",
| $"B_2019" cast "array<struct<key:string,x:double,y:double>>"
| )
df: org.apache.spark.sql.DataFrame = [A: string, B_2020: array<struct<key:string,x:double,y:double,z:double>> ... 1 more field]
scala> df.printSchema
root
|-- A: string (nullable = true)
|-- B_2020: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- x: double (nullable = true)
| | |-- y: double (nullable = true)
| | |-- z: double (nullable = true)
|-- B_2019: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- x: double (nullable = true)
| | |-- y: double (nullable = true)
scala> df.show(false)
+---+------------------------------------------------------------+------------------------------+
|A |B_2020 |B_2019 |
+---+------------------------------------------------------------+------------------------------+
|ABC|[[a, 2.0, 4.0, 6.0], [b, 3.0, 6.0, 9.0], [c, 1.0, 2.0, 3.0]]|[[a, 4.0, 8.0], [d, 3.0, 4.0]]|
+---+------------------------------------------------------------+------------------------------+
scala> val df2020 = df.select($"A", explode($"B_2020") as "this_year").select($"A",
| $"this_year.key" as "key", $"this_year.x" as "x_this_year",
| $"this_year.y" as "y_this_year", $"this_year.z" as "z_this_year")
df2020: org.apache.spark.sql.DataFrame = [A: string, key: string ... 3 more fields]
scala> val df2019 = df.select($"A", explode($"B_2019") as "last_year").select($"A",
| $"last_year.key" as "key", $"last_year.x" as "x_last_year",
| $"last_year.y" as "y_last_year")
df2019: org.apache.spark.sql.DataFrame = [A: string, key: string ... 2 more fields]
scala> df2020.show(false)
+---+---+-----------+-----------+-----------+
|A |key|x_this_year|y_this_year|z_this_year|
+---+---+-----------+-----------+-----------+
|ABC|a |2.0 |4.0 |6.0 |
|ABC|b |3.0 |6.0 |9.0 |
|ABC|c |1.0 |2.0 |3.0 |
+---+---+-----------+-----------+-----------+
scala> df2019.show(false)
+---+---+-----------+-----------+
|A |key|x_last_year|y_last_year|
+---+---+-----------+-----------+
|ABC|a |4.0 |8.0 |
|ABC|d |3.0 |4.0 |
+---+---+-----------+-----------+
scala> val outputDF = df2020.join(df2019, Seq("A", "key"), "outer").select(
| $"A" as "market_name",
| struct($"key", $"x_this_year", $"y_this_year", $"x_last_year",
| $"y_last_year", $"z_this_year") as "cancellation_policy_booking")
outputDF: org.apache.spark.sql.DataFrame = [market_name: string, cancellation_policy_booking: struct<key: string, x_this_year: double ... 4 more fields>]
scala> outputDF.printSchema
root
|-- market_name: string (nullable = true)
|-- cancellation_policy_booking: struct (nullable = false)
| |-- key: string (nullable = true)
| |-- x_this_year: double (nullable = true)
| |-- y_this_year: double (nullable = true)
| |-- x_last_year: double (nullable = true)
| |-- y_last_year: double (nullable = true)
| |-- z_this_year: double (nullable = true)
scala> outputDF.show(false)
+-----------+----------------------------+
|market_name|cancellation_policy_booking |
+-----------+----------------------------+
|ABC |[b, 3.0, 6.0,,, 9.0] |
|ABC |[a, 2.0, 4.0, 4.0, 8.0, 6.0]|
|ABC |[d,,, 3.0, 4.0,] |
|ABC |[c, 1.0, 2.0,,, 3.0] |
+-----------+----------------------------+