Я думаю, что передача аргументов в udf неверна.
Правильный путь указан ниже:
>>> ls
[[1, 2, 3, 4], [5, 6, 7, 8]]
>>> from pyspark.sql import Row
>>> R = Row("A1", "A2")
>>> df = sc.parallelize([R(*r) for r in zip(*ls)]).toDF()
>>> df.show
<bound method DataFrame.show of DataFrame[A1: bigint, A2: bigint]>
>>> df.show()
+---+---+
| A1| A2|
+---+---+
| 1| 5|
| 2| 6|
| 3| 7|
| 4| 8|
+---+---+
>>> def foo(x,y):
... if x%2 == 0:
... return x
... else:
... return y
...
>>>
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql.types import IntegerType
>>>
>>> custom_udf = udf(foo, IntegerType())
>>> df1 = df.withColumn("res", custom_udf(col("A1"), col("A2")))
>>> df1.show()
+---+---+---+
| A1| A2|res|
+---+---+---+
| 1| 5| 5|
| 2| 6| 2|
| 3| 7| 7|
| 4| 8| 4|
+---+---+---+
Дайте мне знать, если это поможет.