Запуск функции на сгруппированном фрейме данных в pyspark с помощью apply - PullRequest
0 голосов
/ 03 марта 2020

У меня есть скрипт Python, который я пытаюсь перевести в pyspark, у меня есть функция func, которая в Python работает на pd.DataFrame и возвращает список списков.

например,

[[14],[2,3]]

В python я в конечном итоге запускаю это для сгруппированного pd.Dataframe и запускается так:

data.groupby('CUSTOMER_ID').apply(func)

Выходной pd.Series с индексом CUSTOMER_ID, и каждый элемент представляет собой список списков, относящихся к каждому CUSTOMER_ID

например

CUSTOMER_ID
123                   [[14], [2, 3]]
124    [[1, 31, 5, 41, 12...

Я пытаюсь использовать pandas_udf для запуска это в pyspark как так:

schema = StructType([
  StructField("GROUP", ArrayType(IntegerType()))
])

@pandas_udf(returnType=schema, functionType=PandasUDFType.GROUPED_MAP)
def func_udf(data):
    return func(data=data)

и запустите команду:

data.groupby('CUSTOMER_ID').apply(func_udf)

Однако я получаю сообщение об ошибке:

TypeError: Return type of the user-defined function should be pandas.DataFrame, but is <class 'list'

Я почти уверен это связано с тем, как я определяю схему вывода, однако я не совсем уверен, как с этим справиться, учитывая, что вывод представляет собой список списков целых чисел.

Если это имеет значение, у меня есть две версии func, исходная версия python, которая обрабатывает все операции, как если бы она выполнялась в DataFrames, и версия Spark, которая работает в DataFrame Spark с одним customer_id в нем (то есть, не используя group by) - я называю оригинальную python версию.

1 Ответ

0 голосов
/ 04 марта 2020

Приведенный ниже код дает желаемый результат.

schema = StructType([
    StructField("CUSTOMER_ID", IntegerType()),
    StructField("GROUP", ArrayType(ArrayType(IntegerType())))
])

@pandas_udf(returnType=schema, functionType=PandasUDFType.GROUPED_MAP)
def func_udf(df):
    id = df.CUSTOMER_ID.iloc[0]
    gr = func(df)
    return pd.DataFrame({'CUSTOMER_ID':id,'GROUP':[gr]})

data.groupby("CUSTOMER_ID").apply(func_udf).show()
...