Можно ли использовать Pandas UDF для трансформации сумок? - PullRequest
1 голос
/ 05 марта 2020

Можно ли реализовать операцию агрегирования (например, sum, avg, variance, et c.) На сумке (т. Е. Массиве структур) с Pandas UDF как альтернативой функциям более высокого порядка?

Например, используя функции более высокого порядка, я могу реализовать среднее значение для сумок следующим образом.

df = df.withColumn("average_field", expr("aggregate(bag['bag_field'], (cast( '0' as double) as sum, cast( '0' as double) as n), (acc, x) -> (acc.sum + x, acc.n + 1), acc -> acc.sum / acc.n * 1.0)"))

Проблема с этой реализацией состоит в том, что мне нужно реализовать среднее значение, а не возможность использовать существующую реализацию библиотеки. Использование pandas UDF, напротив, привлекательно, поскольку я могу использовать библиотечные функции pandas и numpy, минимизируя (де) издержки сериализации с помощью Apache Arrow.

Я пробовал Scalar Pandas UDF, но это не сработало для меня, потому что я получал ошибку несоответствия размера при передаче мешка в udf и ошибку ArrowNotImplemented при передаче bag ['bag_field'], потому что он не поддерживает StructTypes.

Моя последняя попытка состояла в использовании GROUPED_AGG Pandas UDF следующим образом.

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

def pavg(v):
    return v.mean() 

udf_avg = pandas_udf(pavg, functionType=PandasUDFType.GROUPED_AGG, returnType=DoubleType())

my_original_fields = df.columns
df = df.groupBy(my_original_fields).agg(udf_avg(df.bag['bag_field'])).alias("average_field")
df.show()

Эта реализация получает следующую ошибку: ArrowInvalid: Не удалось преобразовать [1.99 1.99 1.99 1.99 1.99] с типом numpy .ndarray: попытался преобразовать в удвоение.

...