Можно ли реализовать операцию агрегирования (например, sum, avg, variance, et c.) На сумке (т. Е. Массиве структур) с Pandas UDF как альтернативой функциям более высокого порядка?
Например, используя функции более высокого порядка, я могу реализовать среднее значение для сумок следующим образом.
df = df.withColumn("average_field", expr("aggregate(bag['bag_field'], (cast( '0' as double) as sum, cast( '0' as double) as n), (acc, x) -> (acc.sum + x, acc.n + 1), acc -> acc.sum / acc.n * 1.0)"))
Проблема с этой реализацией состоит в том, что мне нужно реализовать среднее значение, а не возможность использовать существующую реализацию библиотеки. Использование pandas UDF, напротив, привлекательно, поскольку я могу использовать библиотечные функции pandas и numpy, минимизируя (де) издержки сериализации с помощью Apache Arrow.
Я пробовал Scalar Pandas UDF, но это не сработало для меня, потому что я получал ошибку несоответствия размера при передаче мешка в udf и ошибку ArrowNotImplemented при передаче bag ['bag_field'], потому что он не поддерживает StructTypes.
Моя последняя попытка состояла в использовании GROUPED_AGG Pandas UDF следующим образом.
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
def pavg(v):
return v.mean()
udf_avg = pandas_udf(pavg, functionType=PandasUDFType.GROUPED_AGG, returnType=DoubleType())
my_original_fields = df.columns
df = df.groupBy(my_original_fields).agg(udf_avg(df.bag['bag_field'])).alias("average_field")
df.show()
Эта реализация получает следующую ошибку: ArrowInvalid: Не удалось преобразовать [1.99 1.99 1.99 1.99 1.99] с типом numpy .ndarray: попытался преобразовать в удвоение.