У меня есть дата-фрейм с баллами за каждый день, и я хочу рассчитать суммарный балл для каждого пользователя. Мне нужно суммировать совокупный балл за предыдущий день с сегодняшним баллом в новом столбце. Я попытался использовать функцию lag
для расчета, но по некоторым причинам она выдает ошибку.
Вот код, который я пробовал:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val genre = sc.parallelize(List(("Alice", "2016-05-01", "action",0),
("Alice", "2016-05-02", "0",1),
("Alice", "2016-05-03", "comedy",0),
("Alice", "2016-05-04", "action",1),
("Alice", "2016-05-05", "action",0),
("Alice", "2016-05-06", "horror",1),
("Bob", "2016-05-01", "art",0),
("Bob", "2016-05-02", "0",1),
("Bob", "2016-05-03", "0",0),
("Bob", "2016-05-04", "art",0),
("Bob", "2016-05-05", "comedy",1),
("Bob", "2016-05-06", "action",0))).
toDF("name", "date", "genre","score")
val wSpec2 = Window.partitionBy("name","genre").orderBy("date").rowsBetween(Long.MinValue, 0)
genre.withColumn( "CumScore",genre("score")*2+ lag(("CumScore"),1).over(wSpec2)*2 ).show()
dataframe:
-----+----------+------+-----+
| name| date| genre|score|
+-----+----------+------+-----+
|Alice|2016-05-01|action| 0|
|Alice|2016-05-02| 0| 1|
|Alice|2016-05-03|comedy| 0|
|Alice|2016-05-04|action| 1|
|Alice|2016-05-05|action| 0|
|Alice|2016-05-06|horror| 1|
| Bob|2016-05-01| art| 0|
| Bob|2016-05-02| 0| 1|
| Bob|2016-05-03| 0| 0|
| Bob|2016-05-04| art| 0|
| Bob|2016-05-05|comedy| 1|
| Bob|2016-05-06|action| 0|
+-----+----------+------+-----+
Ошибка, которую я получаю
org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$()) must match the required frame specifiedwindowframe(RowFrame, -1, -1);
at org.apa