Spark Windowspec лага функция расчета совокупных баллов - PullRequest
0 голосов
/ 17 мая 2018

У меня есть дата-фрейм с баллами за каждый день, и я хочу рассчитать суммарный балл для каждого пользователя. Мне нужно суммировать совокупный балл за предыдущий день с сегодняшним баллом в новом столбце. Я попытался использовать функцию lag для расчета, но по некоторым причинам она выдает ошибку.

Вот код, который я пробовал:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val genre = sc.parallelize(List(("Alice", "2016-05-01", "action",0),
                                    ("Alice", "2016-05-02", "0",1),
                                    ("Alice", "2016-05-03", "comedy",0),
                                    ("Alice", "2016-05-04", "action",1),
                                    ("Alice", "2016-05-05", "action",0),
                                    ("Alice", "2016-05-06", "horror",1),
                                    ("Bob", "2016-05-01", "art",0),
                                    ("Bob", "2016-05-02", "0",1),
                                    ("Bob", "2016-05-03", "0",0),
                                    ("Bob", "2016-05-04", "art",0),
                                    ("Bob", "2016-05-05", "comedy",1),
                                    ("Bob", "2016-05-06", "action",0))).
                               toDF("name", "date", "genre","score")

val wSpec2 = Window.partitionBy("name","genre").orderBy("date").rowsBetween(Long.MinValue, 0)
genre.withColumn( "CumScore",genre("score")*2+ lag(("CumScore"),1).over(wSpec2)*2  ).show()

dataframe:

-----+----------+------+-----+
| name|      date| genre|score|
+-----+----------+------+-----+
|Alice|2016-05-01|action|    0|
|Alice|2016-05-02|     0|    1|
|Alice|2016-05-03|comedy|    0|
|Alice|2016-05-04|action|    1|
|Alice|2016-05-05|action|    0|
|Alice|2016-05-06|horror|    1|
|  Bob|2016-05-01|   art|    0|
|  Bob|2016-05-02|     0|    1|
|  Bob|2016-05-03|     0|    0|
|  Bob|2016-05-04|   art|    0|
|  Bob|2016-05-05|comedy|    1|
|  Bob|2016-05-06|action|    0|
+-----+----------+------+-----+

Ошибка, которую я получаю

org.apache.spark.sql.AnalysisException: Window Frame specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$()) must match the required frame specifiedwindowframe(RowFrame, -1, -1);
    at org.apa

Ответы [ 2 ]

0 голосов
/ 17 мая 2018

Нет необходимости использовать lag, просто используйте окно, разделенное на пользователя, а затем используйте sum:

val window = Window.partitionBy("name").orderBy("date").rowsBetween(Long.MinValue, 0)
genre.withColumn("CumScore", sum($"score").over(window))

Используя входные данные из вопроса, это даст:

+-----+----------+------+-----+--------+
| name|      date| genre|score|CumScore|
+-----+----------+------+-----+--------+
|  Bob|2016-05-01|   art|    0|       0|
|  Bob|2016-05-02|     0|    1|       1|
|  Bob|2016-05-03|     0|    0|       1|
|  Bob|2016-05-04|   art|    0|       1|
|  Bob|2016-05-05|comedy|    1|       2|
|  Bob|2016-05-06|action|    0|       2|
|Alice|2016-05-01|action|    0|       0|
|Alice|2016-05-02|     0|    1|       1|
|Alice|2016-05-03|comedy|    0|       1|
|Alice|2016-05-04|action|    1|       2|
|Alice|2016-05-05|action|    0|       2|
|Alice|2016-05-06|horror|    1|       3|
+-----+----------+------+-----+--------+

Проблема с использованием lag заключается в том, что столбец используется в том же выражении, в котором он создан (столбец используется в выражении withColumn. Даже если это предыдущее значениечто указано, это не разрешено.

0 голосов
/ 17 мая 2018

Я попробовал следующий подход:

val wSpec2 = Window.partitionBy("name","genre").orderBy("date").rowsBetween(Long.MinValue, 0)
val test = genre.withColumn( "CumScore",genre("score")*2)
test.show()
val wSpec3 = Window.partitionBy("name").orderBy("date")
test.withColumn("CumScore_1",test("CumScore")+lag(test("CumScore"),1).over(wSpec3)).show()

Нам нужно определить другую оконную функцию, так как нам не нужно указывать рамку строки при суммировании предыдущего дня. Совокупный счет с сегодняшним счетом в новом столбце.

Вы можете сослаться: http://xinhstechblog.blogspot.in/2016/04/spark-window-functions-for-dataframes.html

...