У меня есть требование написать пользовательский UDAF для PySpark, я наткнулся на этот пример Применение UDF к GroupedData в PySpark (с примером работающего Python) .В аналогичных строках, как показано в последней части потока, я придумал следующую функцию
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_value1", DoubleType()),
StructField("avg_value2", DoubleType()),
StructField("sum_avg", DoubleType()),
StructField("sub_avg", DoubleType()),
StructField("bf_signature", Binary())
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
gr = df['key'].iloc[0]
x = df.value1.mean()
y = df.value2.mean()
w = df.value1.mean() + df.value2.mean()
z = df.value1.mean() - df.value2.mean()
bloomfilter = BloomFilter(8, 1)
bloomfilter.set(df.value1)
p=bloomfilter
return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]+[p]])
df3.groupby("key").apply(g).show()
. Как показано в коде, я хочу создать пользовательский BloomFilter, который будет создавать bloomfilter длявесь столбец, аналогично обработке функции mean (), агрегирующей весь столбец и производящей по одному агрегированному результату для каждой группы.
Как мне написать этот пользовательский UDAF на python?