Как я могу использовать функцию Window PySpark для моделирования экспоненциального затухания? - PullRequest
1 голос
/ 27 мая 2020

Я пытаюсь применить функцию PySpark Window для «экспоненциального затухания». Формула:

todays_score = yesterdays_score * (weight) + todays_raw_score

Так, например, предположим, что у нас есть фрейм данных, который упорядочен в днях и имеет оценку 1 каждый день:

+---+----+---------+
|day|user|raw_score|
+---+----+---------+
|  0|   a|        1|
|  1|   a|        1|
|  2|   a|        1|
|  3|   a|        1|
+---+----+---------+

Если бы я вычислить todays_score, это будет выглядеть так:

+---+----+---------+------------+
|day|user|raw_score|todays_score| # Here's the math:
+---+----+---------+------------+
|  0|   a|        1|         1.0| (0 * .90) + 1
|  1|   a|        1|         1.9| (1.0 * .90) + 1
|  2|   a|        1|        2.71| (1.9 * .90) + 1
|  3|   a|        1|       3.439| (2.71 * .90) + 1
+---+----+---------+------------+

Я пробовал использовать оконные функции; однако, исходя из того, что я видел, они могут использовать только «stati c values» из исходного фрейма данных, а не только что вычисленные нами значения. Я даже пробовал создать «фиктивный столбец», чтобы начать процесс; однако это тоже не сработало.

Мой код попытки:

df = sqlContext.createDataFrame([
                                 (0, 'a', 1),
                                 (1, 'a', 1),
                                 (2, 'a', 1),
                                 (3, 'a', 1)],
    ['day', 'user', 'raw_score']
)
df.show()

# Create a "dummy column" (weighted score) so we can use it.
df2 = df.select('*', col('raw_score').alias('todays_score'))
df2.show()

w = Window.partitionBy('user') 

df2.withColumn('todays_score', 
              F.lag(F.col('todays_score'), count=1, default=0).over(w.orderBy('day'))* 0.9 + F.col('raw_score')) \
  .show()

(нежелательный) результат этого:

+---+----+---------+------------+
|day|user|raw_score|todays_score|
+---+----+---------+------------+
|  0|   a|        1|         1.0|
|  1|   a|        1|         1.9|
|  2|   a|        1|         1.9|
|  3|   a|        1|         1.9|
+---+----+---------+------------+

, который принимает только предыдущий значение * (.90), а не то, что было только что рассчитано.

Как мне получить доступ к значениям, которые были только что рассчитаны оконной функцией?

1 Ответ

2 голосов
/ 27 мая 2020

Для Spark2.4+ вы можете использовать функции более высокого порядка transform, aggregate, filter и arrays_zip вот так. Он будет работать для любой комбинации raw_score и будет быстрее, чем pandas_udaf. (при условии, что данные были упорядочены по дням для каждого пользователя, как показано в примере)

df.show() #sample dataframe
#+---+----+---------+
#|day|user|raw_score|
#+---+----+---------+
#|  0|   a|        1|
#|  1|   a|        1|
#|  2|   a|        1|
#|  3|   a|        1|
#+---+----+---------+


from pyspark.sql import functions as F

df\
  .groupBy("user").agg(F.collect_list("raw_score").alias("raw_score"),F.collect_list("day").alias("day"))\
   .withColumn("raw_score1", F.expr("""transform(raw_score,(x,i)-> struct(x as raw,i as index))"""))\
   .withColumn("todays_score", F.expr("""transform(raw_score1, x-> aggregate(filter(raw_score1,z-> z.index<=x.index)\
                                             ,cast(0 as double),(acc,y)->(acc*0.9)+y.raw))"""))\
   .withColumn("zip", F.explode(F.arrays_zip("day","raw_score","todays_score")))\
   .select("user", "zip.*")\
   .show(truncate=False)


#+----+---+---------+------------+
#|user|day|raw_score|todays_score|
#+----+---+---------+------------+
#|a   |0  |1        |1.0         |
#|a   |1  |1        |1.9         |
#|a   |2  |1        |2.71        |
#|a   |3  |1        |3.439       |
#+----+---+---------+------------+

UPDATE:

Предполагая, что данные были упорядочены по дням, как показано в примере , вы можете использовать Pandas Grouped Map UDAF следующим образом:

import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType


@pandas_udf(df.withColumn("raw_score", F.lit(1.2456)).schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df):
     for i in range(1,len(df)):
          df.loc[i,'raw_score']=(df.loc[i-1,'raw_score'] * 0.9)+1   

     return df
df\
  .groupby("user").apply(grouped_map).show()

#+---+----+---------+
#|day|user|raw_score|
#+---+----+---------+
#|  0|   a|      1.0|
#|  1|   a|      1.9|
#|  2|   a|     2.71|
#|  3|   a|    3.439|
#+---+----+---------+
...