Я пытаюсь написать служебную функцию Python, которая принимает объект локально определенного класса и использует один из методов этого класса в качестве пользовательской функции (UDF) в вызове PySpark DataFrame withColumn
.Сигнатура функции утилиты:
def spark_analyze(lp: LogProcessor):
В классе LogProcessor
у меня есть метод, который я хотел бы использовать в качестве UDF.Определение метода:
schema = StructType([
StructField("total", IntegerType(), False),
StructField("other", IntegerType(), False)
])
def ProcessLog(self, log_file):
self.PrepareForLog()
for event in pyspark_utils.spark_events_from_file(log_file):
self.ProcessEvent(event)
return [total, other]
В spark_analyze
я делаю следующее, где lp
- переданный объект типа LogProcessor
:
@udf(lp.schema)
def lpf(lcm_file):
lp.ProcessLog(lcm_file)
return (df.withColumn('results', lpf(col('logfile_dir')))
...
Создает длинную трассировку стека Python, которая начинается следующим образом:
/ home / david / libs.zip / pyspark_utils.py в spark_analyze (lp) 132 def lpf (lcm_file): 133 lp.ProcessLog (lcm_file)-> 134 return (df.withColumn ('results', lpf (col ('logfile_dir')))) 135 .withColumn ('log name', spark_get_dataset_name (col ('logfile_dir'))) 136 .select ('log name',' results. * ')
/ usr / hdp / current / spark2-client / python / lib / pyspark.zip / pyspark / sql / functions.py в оболочке (* args) 1955 @functools.wraps (f) 1956 def wrapper (* args): -> 1957 return udf_obj (* args) 1958 1959 wrapper.func = udf_obj.func
и заканчивается на:
/ home / david/libs.zip/pyspark_utils.py в spark_analyze (lp) 132 def lpf (lcm_file): 133 lp.ProcessLog (lcm_file) -> 134 return (df.withColumn ('results', lpf (col ('logfile_dir'))) 135 .withColumn ('имя журнала', spark_get_dataset_name (col ('logfile_dir'))) 136 .select ('log name', 'results. *')
/ usr / hdp / current / spark2-client / python / lib / pyspark.zip /pyspark / sql / functions.py в оболочке (* args) 1955 @ functools.wraps (f) 1956 def wrapper (* args): -> 1957 return udf_obj (* args) 1958 1959 wrapper.func = udf_obj.func
Я провел некоторое тестирование и обнаружил, что все работает нормально, если я определяю свой UDF прямо над местом, где я передаю его col
.Я также попытался переопределить ProcessLog
до return [0,0]
и обнаружил, что проблема не исчезла.Таким образом, проблема, кажется, в том, что я использую переданный в методе класса объект как UDF.Есть ли другой способ, чтобы UDF был методом в классе?Спасибо за любую помощь здесь!