Pyspark; перебирайте данные по кадрам, чтобы суммировать группы на основе фильтров даты - PullRequest
1 голос
/ 10 марта 2020

У меня есть фрейм данных, который выглядит следующим образом. Не у всех клиентов были зарегистрированы покупки в каждой комбинации года и месяца. Я хочу повторить и суммировать покупки, сделанные за последние 3 месяца, 6 месяцев и 12 месяцев.

Я не могу вставить новые строки за пропущенные месяцы, так как мой набор данных очень большой.

Вещи, которые я пробовал а) Конвертировать год и месяц в дату б) Использовать сумму и случай, когда это не сработало. c) Использовал итерацию по строкам для суммирования, но дата, которую я создал, была меткой времени и вычитала в день, давая ошибки.

Ввод

Customer_ID, Purchase_Year, Purchase_Month, Purchases
1 2019 1 4
1 2019 2 6
1 2019 3 4
1 2019 4 2
2 2019 1 2
2 2019 5 3
3 2019 1 9

Ожидаемый вывод

Customer_ID, Purchase_Year, Purchase_Month, Purchases, L3M
1 2019 1 4 4
1 2019 2 6 10
1 2019 3 4 14
1 2019 4 2 12
2 2019 1 2 2
2 2019 5 3 3
3 2019 1 9 9

Моя первоначальная попытка, которая работает (но не для пропущенных строк)

sqlContext.sql("""select *, sum(Purchases) over (partition by customer_id
                          order by Purchase_Year, Purchase_Month
                           rows between 3 preceding and current row) as total_s
 from customer""").show()

1 Ответ

1 голос
/ 10 марта 2020

Я думаю, что подход, который вы использовали ранее с использованием меток времени, был правильным, потому что, если ваши последние 6 месяцев переходят на 2018 год, то как бы вы go с 2019 по 2018 использовали год покупки и месяц покупки.

Вы могли бы преобразуйте временную метку в long, а затем используйте rangebetween в оконной функции, чтобы пройти столько дней, сколько вы хотите. За 3 месяца вы можете использовать 89 с текущего дня (всего 90).

from pyspark.sql import functions as F
from pyspark.sql.window import Window
days= lambda i: i * 86400
w=Window().partitionBy("Customer_ID").orderBy("sec").rangeBetween(-days(89),0)
df.withColumn("sec", F.to_timestamp(F.concat("Purchase_Year","Purchase_Month"),"yyyyM").cast("long"))\
  .withColumn("L3", F.sum("Purchases").over(w)).orderBy("Customer_ID","Purchase_Month").drop("sec").show()

+-----------+-------------+--------------+---------+---+
|Customer_ID|Purchase_Year|Purchase_Month|Purchases| L3|
+-----------+-------------+--------------+---------+---+
|          1|         2019|             1|        4|  4|
|          1|         2019|             2|        6| 10|
|          1|         2019|             3|        4| 14|
|          1|         2019|             4|        2| 12|
|          2|         2019|             1|        2|  2|
|          2|         2019|             5|        3|  3|
|          3|         2019|             1|        9|  9|
+-----------+-------------+--------------+---------+---+
...