Шаг 0: Создание фрейма данных
from pyspark.sql.window import Window
from pyspark.sql.functions import col, avg, lag
df = sqlContext.createDataFrame([('2019/02/11',30),('2019/02/11',40),('2019/02/11',20),
('2019/02/12',10),('2019/02/12',15),
('2019/02/13',10),('2019/02/13',20)],['Date','value'])
Шаг 1: Сначала вычисляется среднее значение, а затем используется функция windows для полученияотставание на 1 день.
my_window = Window.partitionBy().orderBy('Date')
df_avg_previous = df.groupBy('Date').agg(avg(col('value')).alias('avg'))
df_avg_previous = df_avg_previous.withColumn('avg', lag(col('avg'),1).over(my_window))
df_avg_previous.show()
+----------+----+
| Date| avg|
+----------+----+
|2019/02/11|null|
|2019/02/12|30.0|
|2019/02/13|12.5|
+----------+----+
Шаг 2: Окончательное объединение двух информационных фреймов с помощью объединения left
.
df = df.join(df_avg_previous, ['Date'],how='left').orderBy('Date')
df.show()
+----------+-----+----+
| Date|value| avg|
+----------+-----+----+
|2019/02/11| 40|null|
|2019/02/11| 20|null|
|2019/02/11| 30|null|
|2019/02/12| 10|30.0|
|2019/02/12| 15|30.0|
|2019/02/13| 10|12.5|
|2019/02/13| 20|12.5|
+----------+-----+----+