У меня есть данные о покупках с течением времени. Каждая покупка принадлежит пользователю и подгруппе. Кроме того, каждая покупка имеет дату. Чего я хотел бы добиться, так это применить скользящую функцию (используя даты) к столбцу прибыли для каждой комбинации пользователя и подгруппы.
Ниже я создал автономный пример с решением pandas моей проблемы и pyspark решением моей проблемы. У меня сейчас вопрос: Есть ли более эффективный способ сделать это в pyspark?
Автономный пример:
# Create some dummy date
date_today = datetime.now()
days = pd.date_range(date_today, date_today + timedelta(250), freq='D')
np.random.seed(seed=1111)
data = np.random.randint(1, high=100, size=len(days))
df1 = pd.DataFrame({'dates': days, 'profit': data,'user':1,'subgroup':np.random.randint(0,5,len(days))})
days = pd.date_range(date_today+ timedelta(hours = 5), date_today + timedelta(130), freq='D')
data = np.random.randint(1, high=100, size=len(days))
df2 = pd.DataFrame({'dates': days, 'profit': data,'user':2,'subgroup':np.random.randint(0,5,len(days))})
df_all = pd.concat([df1,df2])
Раствор панд:
df_agg = df_all.groupby(by = ["user","subgroup"]).apply(lambda df: df.set_index('dates').rolling("20d").agg({"profit":[np.mean,np.max]}))
Медленное решение pySpark:
sdf_all = spark.createDataFrame(df_all)
days = lambda i: i * 86400
sdf_output = (sdf_all.withColumn("profit_mean",F.avg(F.col("profit"))
.over(Window.partitionBy("user","subgroup")
.orderBy(F.col("dates").cast("long"))
.rangeBetween(-days(20),0)))
.withColumn("profit_max",F.max(F.col("profit"))
.over(Window.partitionBy("user","subgroup")
.orderBy(F.col("dates").cast("long"))
.rangeBetween(-days(20),0)))
)