A SCALAR udf ожидает, что ряд панд будет вводиться вместо фрейма данных.Для вашего случая нет необходимости использовать udf.Прямой расчет по столбцам a
, b
, c
после отсечения должен работать:
import pyspark.sql.functions as f
df = spark.createDataFrame([[1,2,4],[-1,2,2]], ['a', 'b', 'c'])
clip = lambda x: f.when(df.a < 0, 0).otherwise(x)
df.withColumn('d', (clip(df.a) - clip(df.b)) / clip(df.c)).show()
#+---+---+---+-----+
#| a| b| c| d|
#+---+---+---+-----+
#| 1| 2| 4|-0.25|
#| -1| 2| 2| null|
#+---+---+---+-----+
И если вам нужно использовать pandas_udf
, ваш тип возврата должен быть double
,не df.schema
, потому что вы возвращаете только серию панд, а не фрейм данных панд ;А также вам нужно передать столбцы как Series в функцию, а не весь фрейм данных:
@pandas_udf('double', PandasUDFType.SCALAR)
def fun_function(a, b, c):
clip = lambda x: x.where(a >= 0, 0)
return (clip(a) - clip(b)) / clip(c)
df.withColumn('d', fun_function(df.a, df.b, df.c)).show()
#+---+---+---+-----+
#| a| b| c| d|
#+---+---+---+-----+
#| 1| 2| 4|-0.25|
#| -1| 2| 2| null|
#+---+---+---+-----+