Pyspark - Ошибка вызова pandas_udf, возвращающего Series.interpolate () в качестве результата - PullRequest
0 голосов
/ 07 февраля 2019

Я пытаюсь создать UDF, возвращающую функцию с интерполяцией , но функция возвращает серию с индексом и выбрасывает исключение.

from pyspark.sql.types import FloatType

@F.pandas_udf(FloatType(), F.PandasUDFType.GROUPED_AGG)
def udf_interpolate(v):
  return v.interpolate('linear')

## Test data
df = spark.createDataFrame([
    ("charles", 1),
    ("charles", None),
    ("charles", 3),
], ["name", "value"])

window = Window.partitionBy('name').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('test_interp', udf_interpolate(df.value).over(window)).show()

Сообщение об ошибке:

pyarrow.lib.ArrowInvalid: Could not convert 0    3.0
1    2.0
2    1.0
Name: _0, dtype: float64 with type Series: tried to convert to float32

Я попытался принудительно преобразовать в float32, но ошибка не исчезла.Моя первоначальная идея заключается в том, что я возвращаю Series с несколькими значениями в «одно ожидаемое значение», но я не знаю точно, как решить эту проблему.

Если я, например, изменю свою функцию навернуть v.mean(), хорошо работает.

Ценю любую помощь.

Спасибо.

1 Ответ

0 голосов
/ 07 февраля 2019

GROUPED_AGG требует, чтобы UDF вернул скаляр ;В вашем случае лучше использовать GROUPED_MAP, так как вы возвращаете серию и вам необходимо выполнить расчет по группе;По сути, вы передаете фрейм данных для каждого имени в pandas_udf, преобразуете его с помощью API pandas и возвращаете преобразованный фрейм данных обратно:

@F.pandas_udf(df.schema, F.PandasUDFType.GROUPED_MAP)
def udf_interpolate(g):
    return g.assign(value=g.value.interpolate('linear'))

df.groupby('name').apply(udf_interpolate).show()
+-------+-----+                                                                 
|   name|value|
+-------+-----+
|charles|    1|
|charles|    2|
|charles|    3|
+-------+-----+
...