Функции
spark pandas_udf
весьма полезны и читаемы для агрегированных результатов.Вот пример кода, который вы можете расширить для добавления любых других агрегированных результатов.
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType,IntegerType,LongType,StructType,StructField,StringType
import pandas as pd
#you can add last month maximum spend, maximum spend on the weekend etc and
#update agg_data function
agg_schema = StructType(
[StructField("cust_id", StringType(), True),
StructField("days_txn_on_weekend", IntegerType(), True),
StructField("days_txn_cat_a", IntegerType(), True),
StructField("total_spend", IntegerType(), True)
]
)
@F.pandas_udf(agg_schema, F.PandasUDFType.GROUPED_MAP)
def agg_data(pdf):
days_txn_on_weekend = pdf.query("weekend_flag == 'Y'")['date'].nunique()
days_txn_cat_a = pdf.query("category == 'CatA'")['date'].nunique()
total_spend = pdf['spend'].sum()
return pd.DataFrame([(pdf.cust_id[0],days_txn_on_weekend,days_txn_cat_a,total_spend)])
transactions = spark.createDataFrame(
[
('cust_1', 'CatA', 20190101, 'N', 10),
('cust_1', 'CatA', 20190101, 'N', 20),
('cust_1', 'CatA', 20190105, 'Y',40),
('cust_1', 'CatA', 20190105, 'Y',10),
('cust_1', 'CatA', 20190112, 'Y', 20),
('cust_1', 'CatA', 20190113, 'Y', 10),
('cust_1', 'CatA', 20190101, 'N',20),
('cust_1', 'CatB', 20190105, 'Y', 50),
('cust_1', 'CatB', 20190105, 'Y', 50),
('cust_2', 'CatA', 20190115, 'N', 10),
('cust_2', 'CatA', 20190116, 'N', 20),
('cust_2', 'CatA', 20190117, 'N', 40),
('cust_2', 'CatA', 20190119, 'Y', 10),
('cust_2', 'CatA', 20190119, 'Y', 20),
('cust_2', 'CatA', 20190120, 'Y', 10),
('cust_3', 'CatB', 20190108, 'N', 10),
],
['cust_id','category','date','weekend_flag','spend']
)
transactions.groupBy('cust_id').apply(agg_data).show()
, что приводит к
+-------+-------------------+--------------+-----------+
|cust_id|days_txn_on_weekend|days_txn_cat_a|total_spend|
+-------+-------------------+--------------+-----------+
| cust_2| 2| 5| 110|
| cust_3| 0| 0| 10|
| cust_1| 3| 4| 230|
+-------+-------------------+--------------+-----------+