Current DF (фильтр по одному userId, флаг 1, если убыток> 0, -1, если <= 0): </p>
display(df):
+------+----------+---------+----+
| user|Date |RealLoss |flag|
+------+----------+---------+----+
|100364|2019-02-01| -16.5| 1|
|100364|2019-02-02| 73.5| -1|
|100364|2019-02-03| 31| -1|
|100364|2019-02-09| -5.2| 1|
|100364|2019-02-10| -34.5| 1|
|100364|2019-02-13| -8.1| 1|
|100364|2019-02-18| 5.68| -1|
|100364|2019-02-19| 5.76| -1|
|100364|2019-02-20| 9.12| -1|
|100364|2019-02-26| 9.4| -1|
|100364|2019-02-27| -30.6| 1|
+----------+------+---------+----+
желаемый результат df должен показывать количество дней с момента lastwin ('RecencyLastWin') и с момента lastloss ('RecencyLastLoss')
display(df):
+------+----------+---------+----+--------------+---------------+
| user|Date |RealLoss |flag|RecencyLastWin|RecencyLastLoss|
+------+----------+---------+----+--------------+---------------+
|100364|2019-02-01| -16.5| 1| null| null|
|100364|2019-02-02| 73.5| -1| 1| null|
|100364|2019-02-03| 31| -1| 2| 1|
|100364|2019-02-09| -5.2| 1| 8| 6|
|100364|2019-02-10| -34.5| 1| 1| 7|
|100364|2019-02-13| -8.1| 1| 1| 10|
|100364|2019-02-18| 5.68| -1| 5| 15|
|100364|2019-02-19| 5.76| -1| 6| 1|
|100364|2019-02-20| 9.12| -1| 7| 1|
|100364|2019-02-26| 9.4| -1| 13| 6|
|100364|2019-02-27| -30.6| 1| 14| 1|
+----------+------+---------+----+--------------+---------------+
Мой подход был следующим:
from pyspark.sql.window import Window
w = Window.partitionBy("userId", 'PlayerSiteCode').orderBy("EventDate")
last_positive = check.filter('flag = "1"').withColumn('last_positive_day' , F.lag('EventDate').over(w))
last_negative = check.filter('flag = "-1"').withColumn('last_negative_day' , F.lag('EventDate').over(w))
finalcheck = check.join(last_positive.select('userId', 'PlayerSiteCode', 'EventDate', 'last_positive_day'), ['userId', 'PlayerSiteCode', 'EventDate'], how = 'left')\
.join(last_negative.select('userId', 'PlayerSiteCode', 'EventDate', 'last_negative_day'), ['userId', 'PlayerSiteCode', 'EventDate'], how = 'left')\
.withColumn('previous_date_played' , F.lag('EventDate').over(w))\
.withColumn('last_positive_day_count', F.datediff(F.col('EventDate'), F.col('last_positive_day')))\
.withColumn('last_negative_day_count', F.datediff(F.col('EventDate'), F.col('last_negative_day')))
затем я попытался добавить (несколько попыток ..) но не смог «идеально» вернуть то, что я хочу.
finalcheck = finalcheck.withColumn('previous_last_pos' , F.last('last_positive_day_count', True).over(w2))\
.withColumn('previous_last_neg' , F.last('last_negative_day_count', True).over(w2))\
.withColumn('previous_last_pos_date' , F.last('last_positive_day', True).over(w2))\
.withColumn('previous_last_neg_date' , F.last('last_negative_day', True).over(w2))\
.withColumn('recency_last_positive' , F.datediff(F.col('EventDate'), F.col('previous_last_pos_date')))\
.withColumn('day_since_last_negative_v1' , F.datediff(F.col('EventDate'), F.col('previous_last_neg_date')))\
.withColumn('days_off' , F.datediff(F.col('EventDate'), F.col('previous_date_played')))\
.withColumn('recency_last_negative' , F.when((F.col('day_since_last_negative_v1').isNull()), F.col('days_off')).otherwise(F.col('day_since_last_negative_v1')))\
.withColumn('recency_last_negative_v2' , F.when((F.col('last_negative_day').isNull()), F.col('days_off')).otherwise(F.col('day_since_last_negative_v1')))\
.withColumn('recency_last_positive_v2' , F.when((F.col('last_positive_day').isNull()), F.col('days_off')).otherwise(F.col('recency_last_positive')))
Есть предложения / советы? (Я нашел аналогичный вопрос, но не понял, как подать заявку в моем случае c): Как рассчитать количество дней между выполнением последнего условия?