Извлечение нескольких значений среднего и скользящего среднего по разным столбцам в кадре данных с помощью Pyspark - PullRequest
0 голосов
/ 20 января 2020

У меня есть кадр данных, как показано ниже:

enter image description here

Я хотел бы рассчитать и создать новые столбцы со следующими функциями:

  • Avg / MovingAvg за последние 1 день для качества / силы
  • Avg / MovingAvg за последние 2 дня для качества / силы
  • Avg / MovingAvg за последние 5 дней для качество / сила
  • Ср. / MovingAvg за последние 1 неделю для качества / прочности
  • Сред. / MovingAvg за последние 2 недели для качества / прочности
  • Ср. / MovingAvg над последние 1 месяц по качеству / прочности

Не могли бы вы дать мне знать, как мне этого добиться в pyspark.

Ответы [ 2 ]

1 голос
/ 20 января 2020

Вы можете использовать window в выражении group by, чтобы указать продолжительность дней для агрегации.

from pyspark.sql.window import Window
from pyspark.sql import functions as f

Для 1 дня агрегации с 1 днем ​​скольжения:

df.groupBy(f.window(col('date'), '1 day', '1 day')).agg(f.avg('strength'))

За 3 дня агрегации с 1 днем ​​скольжения:

df.groupBy(f.window(col('date'), '3 days', '1 day')).agg(f.avg('strength'))
0 голосов
/ 21 января 2020

Так я решил свою проблему. Ответ, предоставленный @WaqarAhmed, также является другим способом решения той же проблемы

from pyspark.sql.functions import *
from pyspark.sql.window import *


w_1day = Window().partitionBy('device').orderBy('date').rowsBetween(-1, 0)
w_2day = Window().partitionBy('device').orderBy('date').rowsBetween(-2, 0)
w_5day = Window().partitionBy('device').orderBy('date').rowsBetween(-5, 0)
w_7day = Window().partitionBy('device').orderBy('date').rowsBetween(-7, 0)
w_14day = Window().partitionBy('device').orderBy('date').rowsBetween(-14, 0)
w_21day = Window().partitionBy('device').orderBy('date').rowsBetween(-21, 0)
w_30day = Window().partitionBy('device').orderBy('date').rowsBetween(-30, 0)

df_avg = df.withColumn('rolling_1_day_average_q', avg("quality").over(w_1day))
df_avg = df_avg.withColumn('rolling_2_day_average_q', avg("quality").over(w_2day))
df_avg = df_avg.withColumn('rolling_5_day_average_q', avg("quality").over(w_5day))
df_avg = df_avg.withColumn('rolling_7_day_average_q', avg("quality").over(w_7day))
df_avg = df_avg.withColumn('rolling_14_day_average_q', avg("quality").over(w_14day))
df_avg = df_avg.withColumn('rolling_21_day_average_q', avg("quality").over(w_21day))
df_avg = df_avg.withColumn('rolling_30_day_average_q', avg("quality").over(w_30day))
...