Пытался решить эту проблему, используя следующий подход -
1. Прочитать вход
val df1 = Seq(
("a", 2, "c"),
("a", 2, "c"),
("a", 2, "c"),
("b", 2, "d"),
("b", 2, "d")
).toDF("col1", "col2", "col3").groupBy("col2").agg(
collect_list("col1").as("col1"),
collect_list("col3").as("col3")
)
df1.show(false)
df1.printSchema()
Выход-
+----+---------------+---------------+
|col2|col1 |col3 |
+----+---------------+---------------+
|2 |[a, a, a, b, b]|[c, c, c, d, d]|
+----+---------------+---------------+
root
|-- col2: integer (nullable = false)
|-- col1: array (nullable = true)
| |-- element: string (containsNull = true)
|-- col3: array (nullable = true)
| |-- element: string (containsNull = true)
2. Используйте функцию Transform для обработки значений массива
val transform = (str: String) => expr(s"TRANSFORM($str, x -> concat('$str-', x)) as $str")
val cols = df1.schema.map(f => if (f.dataType.isInstanceOf[ArrayType]) {
transform(f.name)
} else expr(f.name))
df1.select(cols: _*).show(false)
Output-
+----+----------------------------------------+----------------------------------------+
|col2|col1 |col3 |
+----+----------------------------------------+----------------------------------------+
|2 |[col1-a, col1-a, col1-a, col1-b, col1-b]|[col3-c, col3-c, col3-c, col3-d, col3-d]|
+----+----------------------------------------+----------------------------------------+