Мы можем взять 2 windows и найти значение следующей строки один раз в 1-м окне, назначив monotonically_increasing_id
и значение last
в другом окне, как показано ниже:
import pyspark.sql.functions as F
w = Window.orderBy('idx')
w1 = Window.partitionBy('value')
(df.withColumn('idx',F.monotonically_increasing_id())
.withColumn("result",F.last(F.lead("value").over(w)).over(w1)).orderBy('idx')
.drop('idx')).show()
+-----+----------+------+
|value|val_joined|result|
+-----+----------+------+
| 3| 3| 4|
| 4| 3+4| 5|
| 5| 3+4+5| 2|
| 5| 3+4+5| 2|
| 5| 3+4+5| 2|
| 2| 3+4+5+2| null|
+-----+----------+------+
Если числа в значении могут повторяться позже, в следующем примере:
+-----+----------+
|value|val_joined|
+-----+----------+
|3 |3 |
|4 |3+4 |
|5 |3+4+5 |
|5 |3+4+5 |
|5 |3+4+5 |
|2 |3+4+5+2 |
|5 |3+4+5+2+5 | <- this value is repeated later
+-----+----------+
Тогда нам нужно будет создать отдельную группу и взять группу в качестве окна:
w = Window.orderBy('idx')
w1 = Window.partitionBy('group')
(df.withColumn('idx',F.monotonically_increasing_id())
.withColumn("lag", F.when(F.lag("value").over(w)!=F.col("value"), F.lit(1))
.otherwise(F.lit(0)))
.withColumn("group", F.sum("lag").over(w) + 1).drop("lag")
.withColumn("result",F.last(F.lead("value").over(w)).over(w1)).orderBy('idx')
.drop('idx',"group")).show()
+-----+----------+------+
|value|val_joined|result|
+-----+----------+------+
| 3| 3| 4|
| 4| 3+4| 5|
| 5| 3+4+5| 2|
| 5| 3+4+5| 2|
| 5| 3+4+5| 2|
| 2| 3+4+5+2| 5|
| 5| 3+4+5+2+5| null|
+-----+----------+------+