Может быть сделано с созданием сложной структуры из трех последних столбцов, а затем применить UDF:
val data = List(
("A_123", 12, "A", 1),
("A_123", 12, "B", 2),
("A_123", 23, "A", 1),
("B_456", 56, "DB", 4),
("B_456", 56, "BD", 5),
("B_456", 60, "BD", 3))
val df = data.toDF("column1_ID", "column2", "column3", "column4")
val twoLastCompacted = df.withColumn("lastTwo", struct($"column3", $"column4"))
twoLastCompacted.show(false)
val grouppedByTwoFirst = twoLastCompacted.groupBy("column1_ID", "column2").agg(collect_list("lastTwo").alias("lastTwoArray"))
grouppedByTwoFirst.show(false)
val treeLastCompacted = grouppedByTwoFirst.withColumn("lastThree", struct($"column2", $"lastTwoArray"))
treeLastCompacted.show(false)
val gruppedByFirst = treeLastCompacted.groupBy("column1_ID").agg(collect_list("lastThree").alias("lastThreeArray"))
gruppedByFirst.printSchema()
gruppedByFirst.show(false)
val structToMap = (value: Seq[Row]) =>
value.map(v => v.getInt(0) ->
v.getSeq(1).asInstanceOf[Seq[Row]].map(r => r.getString(0) -> r.getInt(1)).toMap)
.toMap
val structToMapUDF = udf(structToMap)
gruppedByFirst.select($"column1_ID", structToMapUDF($"lastThreeArray")).show(false)
Вывод:
+----------+-------+-------+-------+-------+
|column1_ID|column2|column3|column4|lastTwo|
+----------+-------+-------+-------+-------+
|A_123 |12 |A |1 |[A,1] |
|A_123 |12 |B |2 |[B,2] |
|A_123 |23 |A |1 |[A,1] |
|B_456 |56 |DB |4 |[DB,4] |
|B_456 |56 |BD |5 |[BD,5] |
|B_456 |60 |BD |3 |[BD,3] |
+----------+-------+-------+-------+-------+
+----------+-------+----------------+
|column1_ID|column2|lastTwoArray |
+----------+-------+----------------+
|B_456 |60 |[[BD,3]] |
|A_123 |12 |[[A,1], [B,2]] |
|B_456 |56 |[[DB,4], [BD,5]]|
|A_123 |23 |[[A,1]] |
+----------+-------+----------------+
+----------+-------+----------------+---------------------------------+
|column1_ID|column2|lastTwoArray |lastThree |
+----------+-------+----------------+---------------------------------+
|B_456 |60 |[[BD,3]] |[60,WrappedArray([BD,3])] |
|A_123 |12 |[[A,1], [B,2]] |[12,WrappedArray([A,1], [B,2])] |
|B_456 |56 |[[DB,4], [BD,5]]|[56,WrappedArray([DB,4], [BD,5])]|
|A_123 |23 |[[A,1]] |[23,WrappedArray([A,1])] |
+----------+-------+----------------+---------------------------------+
root
|-- column1_ID: string (nullable = true)
|-- lastThreeArray: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- column2: integer (nullable = false)
| | |-- lastTwoArray: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- column3: string (nullable = true)
| | | | |-- column4: integer (nullable = false)
+----------+--------------------------------------------------------------+
|column1_ID|lastThreeArray |
+----------+--------------------------------------------------------------+
|B_456 |[[60,WrappedArray([BD,3])], [56,WrappedArray([DB,4], [BD,5])]]|
|A_123 |[[12,WrappedArray([A,1], [B,2])], [23,WrappedArray([A,1])]] |
+----------+--------------------------------------------------------------+
+----------+----------------------------------------------------+
|column1_ID|UDF(lastThreeArray) |
+----------+----------------------------------------------------+
|B_456 |Map(60 -> Map(BD -> 3), 56 -> Map(DB -> 4, BD -> 5))|
|A_123 |Map(12 -> Map(A -> 1, B -> 2), 23 -> Map(A -> 1)) |
+----------+----------------------------------------------------+