У меня есть такой фрейм данных:
case class CC(id: String, p2: Double, p3: Double, time: Int)
val df = List(
CC("a", 1.1d, 2.2d, 1),
CC("b", 3.3d, 4.4d, 2),
CC("c", 5.5d, 6.6d, 3)).toDF
+---+---+---+----+
| id| p2| p3|time|
+---+---+---+----+
| a|1.1|2.2| 1|
| b|3.3|4.4| 2|
| c|5.5|6.6| 3|
+---+---+---+----+
Я хочу объединить p2
и p3
предыдущей строки и поместить в столбец p5
и объединить p2
и p3
из текущую строку и поместить в столбец p6
. Чтобы получить:
+---+---+---+----+---------+---------+
| id| p2| p3|time| p5 | p6 |
+---+---+---+----+---------+---------+
| a|1.1|2.2| 1| |1.1: 2.2 |
| b|3.3|4.4| 2|1.1: 2.2 |3.3: 4.4 |
| c|5.5|6.6| 3|3.3: 4.4 |5.5: 6.6 |
+---+---+---+----+---------+---------+
Для текущей строки, т.е. p6
я могу легко использовать
.withColumn("p6", concat(col("p2"), col("p3")))
, а для предыдущей строки я подумал об использовании оконной функции и lag
как показано ниже, но это не работает.
val wf = Window.partitionBy("id").orderBy("time")
df.withColumn("p5", concat(lag(col("p2"), 1) + lag("p3", 1)).over(w))
Но я получаю сообщение об ошибке, что выражение concat...
не поддерживается в оконной функции. Некоторые StackOverflow ответы говорят об использовании определяемой пользователем агрегатной функции, но я не смог найти простого примера, которому я мог бы следовать.
Любое объяснение этой проблемы приветствуется. Если вы знаете, предложите альтернативные методы решения этой проблемы. Спасибо!