Колонка с avg за предыдущий день pyspark - PullRequest
0 голосов
/ 11 февраля 2019

Я хочу создать новый столбец, который является средним значением продаж за предыдущий день, используя pyspark.

учтите, что эти значения находятся на другой отметке времени.

, например, для преобразования этого значения:

| Date       | value |
|------------|-------|
| 2019/02/11 | 30    |
| 2019/02/11 | 40    |
| 2019/02/11 | 20    |
| 2019/02/12 | 10    |
| 2019/02/12 | 15    |

к этому

| Date       | value | avg  |
|------------|-------|------|
| 2019/02/11 | 30    | null |
| 2019/02/11 | 40    | null |
| 2019/02/11 | 20    | null |
| 2019/02/12 | 10    | 30   |
| 2019/02/12 | 15    | 30   | 

Мое мышление:

Используйте функцию фильтрации и агрегирования, чтобы получить среднее значение, но с ошибкой выбрасывания.Не уверен, где я делаю не так.

df = df.withColumn("avg",lit((df.filter(df["date"] == date_sub("date",1)).agg({"value": "avg"}))))

Ответы [ 2 ]

0 голосов
/ 11 февраля 2019

Шаг 0: Создание фрейма данных

from pyspark.sql.window import Window
from pyspark.sql.functions import col, avg, lag
df = sqlContext.createDataFrame([('2019/02/11',30),('2019/02/11',40),('2019/02/11',20),
                                 ('2019/02/12',10),('2019/02/12',15),
                                 ('2019/02/13',10),('2019/02/13',20)],['Date','value']) 

Шаг 1: Сначала вычисляется среднее значение, а затем используется функция windows для полученияотставание на 1 день.

my_window = Window.partitionBy().orderBy('Date')
df_avg_previous = df.groupBy('Date').agg(avg(col('value')).alias('avg'))
df_avg_previous = df_avg_previous.withColumn('avg', lag(col('avg'),1).over(my_window))
df_avg_previous.show()
+----------+----+
|      Date| avg|
+----------+----+
|2019/02/11|null|
|2019/02/12|30.0|
|2019/02/13|12.5|
+----------+----+

Шаг 2: Окончательное объединение двух информационных фреймов с помощью объединения left.

df = df.join(df_avg_previous, ['Date'],how='left').orderBy('Date')
df.show()
+----------+-----+----+
|      Date|value| avg|
+----------+-----+----+
|2019/02/11|   40|null|
|2019/02/11|   20|null|
|2019/02/11|   30|null|
|2019/02/12|   10|30.0|
|2019/02/12|   15|30.0|
|2019/02/13|   10|12.5|
|2019/02/13|   20|12.5|
+----------+-----+----+
0 голосов
/ 11 февраля 2019

Вы можете сделать это, используя функции Windows, но вам нужно создать новый столбец для обработки дат.Я добавил несколько строк к вашему примеру:

df.withColumn(
  "rnk",
  F.dense_rank().over(Window.partitionBy().orderBy("date"))
).withColumn(
  "avg",
  F.avg("value").over(Window.partitionBy().orderBy("rnk").rangeBetween(-1,-1))
).show()

+----------+-----+---+----+
|      date|value|rnk| avg|
+----------+-----+---+----+
|2018-01-01|   20|  1|null|
|2018-01-01|   30|  1|null|
|2018-01-01|   40|  1|null|
|2018-01-02|   40|  2|30.0|
|2018-01-02|   30|  2|30.0|
|2018-01-03|   40|  3|35.0|
|2018-01-03|   40|  3|35.0|
+----------+-----+---+----+

Вы также можете сделать это с помощью агрегации:

agg_df = df.withColumn("date", F.date_add("date", 1)).groupBy('date').avg("value")
df.join(agg_df, how="full_outer", on="date").orderBy("date").show()

+----------+-----+----------+
|      date|value|avg(value)|
+----------+-----+----------+
|2018-01-01|   20|      null|
|2018-01-01|   30|      null|
|2018-01-01|   40|      null|
|2018-01-02|   30|      30.0|
|2018-01-02|   40|      30.0|
|2018-01-03|   40|      35.0|
|2018-01-03|   40|      35.0|
|2018-01-04| null|      40.0|
+----------+-----+----------+
...