Я пытаюсь применить функцию PySpark Window для «экспоненциального затухания». Формула:
todays_score = yesterdays_score * (weight) + todays_raw_score
Так, например, предположим, что у нас есть фрейм данных, который упорядочен в днях и имеет оценку 1 каждый день:
+---+----+---------+
|day|user|raw_score|
+---+----+---------+
| 0| a| 1|
| 1| a| 1|
| 2| a| 1|
| 3| a| 1|
+---+----+---------+
Если бы я вычислить todays_score, это будет выглядеть так:
+---+----+---------+------------+
|day|user|raw_score|todays_score| # Here's the math:
+---+----+---------+------------+
| 0| a| 1| 1.0| (0 * .90) + 1
| 1| a| 1| 1.9| (1.0 * .90) + 1
| 2| a| 1| 2.71| (1.9 * .90) + 1
| 3| a| 1| 3.439| (2.71 * .90) + 1
+---+----+---------+------------+
Я пробовал использовать оконные функции; однако, исходя из того, что я видел, они могут использовать только «stati c values» из исходного фрейма данных, а не только что вычисленные нами значения. Я даже пробовал создать «фиктивный столбец», чтобы начать процесс; однако это тоже не сработало.
Мой код попытки:
df = sqlContext.createDataFrame([
(0, 'a', 1),
(1, 'a', 1),
(2, 'a', 1),
(3, 'a', 1)],
['day', 'user', 'raw_score']
)
df.show()
# Create a "dummy column" (weighted score) so we can use it.
df2 = df.select('*', col('raw_score').alias('todays_score'))
df2.show()
w = Window.partitionBy('user')
df2.withColumn('todays_score',
F.lag(F.col('todays_score'), count=1, default=0).over(w.orderBy('day'))* 0.9 + F.col('raw_score')) \
.show()
(нежелательный) результат этого:
+---+----+---------+------------+
|day|user|raw_score|todays_score|
+---+----+---------+------------+
| 0| a| 1| 1.0|
| 1| a| 1| 1.9|
| 2| a| 1| 1.9|
| 3| a| 1| 1.9|
+---+----+---------+------------+
, который принимает только предыдущий значение * (.90), а не то, что было только что рассчитано.
Как мне получить доступ к значениям, которые были только что рассчитаны оконной функцией?