Лучшая практика Pyspark для нескольких простых агрегаций - PullRequest
0 голосов
/ 13 мая 2019

Я относительно новичок в Pyspark и ищу совет о том, как сделать несколько простых агрегаций на длинном фрейме данных.

У меня есть блок данных транзакций, где клиенты совершают несколько транзакций в день, и я бы хотел сгруппировать клиента и сохранить некоторые переменные, такие как общая сумма, и некоторые переменные, такие как countdistinct дат, когда выполняется условие.

Итак, я хотел бы знать для каждого клиента:

  • Сколько дней они приобрели в категории A
  • На сколько дней выходных они совершили покупку
  • Общая сумма расходов по всем транзакциям
  • Плюс в идеале также некоторые другие вещи, такие как транзакции в прошлом месяце, максимальные расходы, максимальные расходы в выходные и т. Д.

Таким образом, в терминологии Excel, по сути, довольно много "знаков" или "суффиксов".

Я чувствую, что это не лучшая вещь, чтобы вычислить все это по отдельности, как показано ниже, а затем соединить их вместе (согласно ответу на запрос pyspark sql: подсчитать различные значения с условиями ), потому что у меня довольно много клиентов, так что объединение будет дорогим, и, поскольку некоторые клиенты не совершают транзакции в выходные дни, я думаю, что это должно быть объединение, а не просто конкатенация:

total_variables = transactions.groupby('cust_id').agg(sum("spend").alias("total_spend")) 
weekend_variables = transactions.where(transactions.weekend_flag == "Y").groupby('cust_id').agg(countDistinct("date").alias("days_txn_on_weekend"))  
catA_variables = transactions.where(transactions.category == "CatA").groupby('cust_id').agg(countDistinct("date").alias("days_txn_cat_a")) 
final_df = total_variables.join(weekend_variables, col('total_variables.id') == col('weekend_variables.id'), 'left') \
                          .join(catA_variables, col('df1.id') == col('catA_variables.id'), 'left')

Один из подходов состоит в том, чтобы создать частично пустые столбцы и затем подсчитать количество или вычислить сумму по ним, например:

transactions_additional = transactions.withColumn('date_if_weekend',
                                                psf.when(psf.col("weekend_flag") == "Y",
                                                psf.col('date')).otherwise(psf.lit(None)))
                                      .withColumn('date_if_CatA',
                                                psf.when(psf.col("category") == "CatA",
                                                psf.col('date')).otherwise(psf.lit(None)))
final_df = total_variables .groupby('cust_id').agg(psf.countDistinct("date_if_weekend").alias("days_txn_on_weekend"),
                                                   psf.countDistinct("date_if_CatA").alias("days_txn_cat_a"),
                                                   psf.sum("spend").alias("total_spend"))

Но это кажется расточительным с точки зрения генерации столбцов и может выйти из-под контроля с тем, что я в итоге хочу вычислить.

Я думаю, что я мог бы сделать это в pyspark-sql с countdistinct и case, но я надеюсь, что есть лучший способ использования синтаксиса pyspark - возможно, с использованием пользовательских UDF агрегирования в формате:

aggregated_df = transactions.groupby('cust_id').agg(<something that returns total spend>,
                                                    <something that returns days purchased cat A>,
                                                    <something that returns days purchased on the weekend>,)

это возможно?

1 Ответ

1 голос
/ 13 мая 2019
Функции

spark pandas_udf весьма полезны и читаемы для агрегированных результатов.Вот пример кода, который вы можете расширить для добавления любых других агрегированных результатов.

import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType,IntegerType,LongType,StructType,StructField,StringType
import pandas as pd

#you can add last month maximum spend, maximum spend on the weekend etc and 
#update agg_data function
agg_schema = StructType(
    [StructField("cust_id", StringType(), True),
     StructField("days_txn_on_weekend", IntegerType(), True),
     StructField("days_txn_cat_a", IntegerType(), True),
     StructField("total_spend", IntegerType(), True)
     ]
)

@F.pandas_udf(agg_schema, F.PandasUDFType.GROUPED_MAP)
def agg_data(pdf):
    days_txn_on_weekend =  pdf.query("weekend_flag == 'Y'")['date'].nunique()
    days_txn_cat_a = pdf.query("category == 'CatA'")['date'].nunique()
    total_spend = pdf['spend'].sum()
    return pd.DataFrame([(pdf.cust_id[0],days_txn_on_weekend,days_txn_cat_a,total_spend)])

transactions = spark.createDataFrame(
    [
    ('cust_1', 'CatA', 20190101, 'N', 10),
    ('cust_1', 'CatA', 20190101, 'N', 20),
    ('cust_1', 'CatA', 20190105, 'Y',40),
    ('cust_1', 'CatA', 20190105, 'Y',10),
    ('cust_1', 'CatA', 20190112, 'Y', 20),
    ('cust_1', 'CatA', 20190113, 'Y', 10),
    ('cust_1', 'CatA', 20190101, 'N',20),
    ('cust_1', 'CatB', 20190105, 'Y', 50),
    ('cust_1', 'CatB', 20190105, 'Y', 50),
    ('cust_2', 'CatA', 20190115, 'N', 10),
    ('cust_2', 'CatA', 20190116, 'N', 20),
    ('cust_2', 'CatA', 20190117, 'N', 40),
    ('cust_2', 'CatA', 20190119, 'Y', 10),
    ('cust_2', 'CatA', 20190119, 'Y', 20),
    ('cust_2', 'CatA', 20190120, 'Y', 10),
    ('cust_3', 'CatB', 20190108, 'N', 10),
    ],
    ['cust_id','category','date','weekend_flag','spend']
)
transactions.groupBy('cust_id').apply(agg_data).show()

, что приводит к

+-------+-------------------+--------------+-----------+
|cust_id|days_txn_on_weekend|days_txn_cat_a|total_spend|
+-------+-------------------+--------------+-----------+
| cust_2|                  2|             5|        110|
| cust_3|                  0|             0|         10|
| cust_1|                  3|             4|        230|
+-------+-------------------+--------------+-----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...