Pyspark - получить совокупную сумму столбца с условием - PullRequest
0 голосов
/ 10 января 2019

У меня есть датафрейм с картами, временем и суммой, и мне нужно объединить сумму карты (сумму и количество) с окном за один месяц.

Вот как выглядят данные:

+--------------------+-------------------+------------+
|            card_uid|               date|amount_local|
+--------------------+-------------------+------------+
|card_001H4Mw1Ha0M...|2016-05-04 17:54:30|        8.99|
|card_0026uGZQwZQd...|2016-05-06 12:16:18|       16.19|
|card_0026uGZQwZQd...|2016-07-06 12:17:57|       16.19|
|card_003STfrgB8SZ...|2016-12-04 10:05:21|        58.8|
|card_005gBxyiDc6b...|2016-09-10 18:58:25|       27.95|
|card_005gBxyiDc6b...|2016-11-12 11:18:29|       12.99|

Это то, что я сделал до сих пор.

+--------------------+-------------------+------------+----------------+
|            card_uid|               date|amount_local|duration_cum_sum|
+--------------------+-------------------+------------+----------------+
|card_001H4Mw1Ha0M...|2016-05-04 17:54:30|        8.99|            8.99|
|card_0026uGZQwZQd...|2016-05-06 12:16:18|       16.19|           16.19|
|card_0026uGZQwZQd...|2016-07-06 12:17:57|       16.19|           32.38|
|card_003STfrgB8SZ...|2016-12-04 10:05:21|        58.8|            58.8|
|card_005gBxyiDc6b...|2016-09-10 18:58:25|       27.95|           27.95|
|card_005gBxyiDc6b...|2016-11-12 11:18:29|       12.99|           40.94|

С функциями окна ниже:

partition = Window.partitionBy("card_uid").orderBy("date")

previousTransactionDate = data.withColumn("previous_tr_time", lag(data.date).over(partition)).select("transaction_id", "card_uid", "date", "previous_tr_time") 

df_cum_sum = data.withColumn("duration_cum_sum", sum('amount_local').over(partition))

df_cum_sum.orderBy("card_uid","date").select("card_uid", "date", "amount_local", "duration_cum_sum").show()

Но я хочу добавить только две вещи:

  • Агрегировать аналогичным образом, только если дата меньше один месяц
  • Положите ноль вместо того же количества для cum_sum

Таким образом, требуемый вывод выглядит так:

+--------------------+-------------------+------------+----------------+
|            card_uid|               date|amount_local|duration_cum_sum|
+--------------------+-------------------+------------+----------------+
|card_001H4Mw1Ha0M...|2016-05-04 17:54:30|        8.99|               0|
|card_0026uGZQwZQd...|2016-05-06 12:16:18|       16.19|               0|
|card_0026uGZQwZQd...|2016-05-12 12:17:57|        4.00|           16.19|
|card_0026uGZQwZQd...|2016-06-06 12:23:51|       16.19|            4.00| => Only 4 because de 16.19 was more than one month ago
|card_003STfrgB8SZ...|2016-12-04 10:05:21|        58.8|               0|
|card_005gBxyiDc6b...|2016-09-10 18:58:25|       27.95|               0|
|card_005gBxyiDc6b...|2016-09-12 11:18:29|       12.99|           27.95| => Previous amount 
|card_005gBxyiDc6b...|2016-09-22 14:25:44|       23.99|           40.94| => 27.95 + 12.99

Я не могу groupBy card_uid, поскольку мне нужно столько же строк, сколько в оригинале для ссылки на другую таблицу

1 Ответ

0 голосов
/ 11 января 2019

Вам нужно скользящее окно на дату с окном в диапазоне от последних 30 дней до предыдущего дня. Поскольку интервальные функции недоступны для окна, вы можете преобразовать даты в длинные значения и использовать значение дней для создания диапазона окна.

from pyspark.sql.functions import *
days = lambda i: i * 86400 

partition = Window.partitionBy("card_uid").orderBy(col("date").cast("timestamp").cast("long")).rangeBetween(days(-30), days(-1))

df_cum_sum = data.withColumn("duration_cum_sum",sum(col('amount_local')).over(partition))\
                 .fillna(0,subset=['duration_cum_sum'])
df_cum_sum.show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...