Написание пользовательских UDAF в pyspark - PullRequest
1 голос
/ 04 апреля 2019

У меня есть требование написать пользовательский UDAF для PySpark, я наткнулся на этот пример Применение UDF к GroupedData в PySpark (с примером работающего Python) .В аналогичных строках, как показано в последней части потока, я придумал следующую функцию

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_value1", DoubleType()),
    StructField("avg_value2", DoubleType()),
    StructField("sum_avg", DoubleType()),
    StructField("sub_avg", DoubleType()),
    StructField("bf_signature", Binary())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    bloomfilter = BloomFilter(8, 1)
    bloomfilter.set(df.value1)
    p=bloomfilter
    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]+[p]])

df3.groupby("key").apply(g).show()

. Как показано в коде, я хочу создать пользовательский BloomFilter, который будет создавать bloomfilter длявесь столбец, аналогично обработке функции mean (), агрегирующей весь столбец и производящей по одному агрегированному результату для каждой группы.

Как мне написать этот пользовательский UDAF на python?

...