Кумулятивная функция при искровой скале - PullRequest
0 голосов
/ 26 сентября 2019

Я пытался вычислить кумулятивное значение, но если поле даты совпадает, эти значения добавляются в кумулятивное поле, может ли кто-нибудь предложить решение, аналогичное этому вопросу

val windowval = (Window.partitionBy($"userID").orderBy($"lastModified")
             .rangeBetween(Window.unboundedPreceding, 0))
val df_w_cumsum = ms1_userlogRewards.withColumn("totalRewards", sum($"noOfJumps").over(windowval)).orderBy($"lastModified".asc)
df_w_cumsum.filter($"batchType".isNull).filter($"userID"==="355163").select($"userID", $"noOfJumps", $"totalRewards",$"lastModified").show()

enter image description here enter image description here

Ответы [ 2 ]

1 голос
/ 26 сентября 2019

Обратите внимание, что ваше самое первое totalRewards=147 является суммой предыдущего значения 49 + все значения с отметкой времени "2019-08-07 18:25:06": 49 + (36 + 0 + 60 + 2) = 147.

Первый вариант - агрегировать все значения с одним и тем же кулаком отметки времени, например, groupBy($"userId", $"lastModified").agg(sum($"noOfJumps").as("noOfJumps")) (или что-то в этом роде), а затем запустить сумму суммирования.Это полностью удалит дубликаты временных отметок.

Второй вариант заключается в использовании row_number для определения порядка между строками с одним и тем же полем lastModified, а затем для запуска вашей совокупной суммы с .orderBy($"lastModified, $"row_number")(или что-то типа того).Это должно вести все записи и давать вам частичную сумму по пути: totalRewards = 49 -> 85 -> 85 -> 145 -> 147 (или что-то подобное в зависимости от порядка, определенного в row_number)

0 голосов
/ 26 сентября 2019

Я думаю, что вы хотите суммировать по ID пользователя и метке времени.Таким образом, вам нужно разделить по идентификатору пользователя и дате и использовать функцию окна для sym, как показано ниже:

import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.expressions.Window

val window = Window.partitionBy("userID", "lastModified")
df.withColumn("cumulativeSum", sum(col("noOfJumps").over(window))
...