Ваша функция должна быть статической, чтобы определить ее как udf. Я искал какую-то документацию, чтобы дать хорошее объяснение, но не смог ее найти.
В основном (возможно, не на 100% точные; исправления приветствуются), когда вы определяете udf, он получает травление и копирует в каждыйвыполняется автоматически, но вы не можете выделить один метод класса , который не определен на верхнем уровне (класс является частью верхнего уровня, но не его методами). Посмотрите на этот пост для обходных путей, отличных от статических методов.
import pyspark.sql.functions as F
import pyspark.sql.types as T
class Phases():
def __init__(self, df1):
print("Inside the constructor of Class phases ")
self.df1 = df1
self.phases_udf = F.udf(Phases.phases_commence,T.IntegerType())
#This is the UDF.
@staticmethod
def phases_commence(age):
age = age +3
return age
#This is the function that registers the UDF,
def doSomething(self):
print("Inside the doSomething")
self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(F.col('Age')))
l =[(1, 10 , 'F')
,(2 , 2 , 'M')
,(2 , 10 , 'F')
,(2 , 3 , 'F')
,(3 , 10, 'M')]
columns = ['id', 'Age', 'Gender']
df=spark.createDataFrame(l, columns)
bla = Phases(df)
bla.doSomething()
bla.df1.show()
Вывод:
Inside the constructor of Class phases
Inside the 'initiate_the_phases_on_the_major_track_segment()'
+---+---+------+-----+
| id|Age|Gender|AgeP2|
+---+---+------+-----+
| 1| 10| F| 13|
| 2| 2| M| 5|
| 2| 10| F| 13|
| 2| 3| F| 6|
| 3| 10| M| 13|
+---+---+------+-----+