Использование метода из класса Python в качестве пользовательской функции PySpark - PullRequest
0 голосов
/ 01 октября 2018

Я пытаюсь написать служебную функцию Python, которая принимает объект локально определенного класса и использует один из методов этого класса в качестве пользовательской функции (UDF) в вызове PySpark DataFrame withColumn.Сигнатура функции утилиты:

 def spark_analyze(lp: LogProcessor):

В классе LogProcessor у меня есть метод, который я хотел бы использовать в качестве UDF.Определение метода:

schema = StructType([
  StructField("total", IntegerType(), False),
  StructField("other", IntegerType(), False)
])

def ProcessLog(self, log_file):
    self.PrepareForLog()
    for event in pyspark_utils.spark_events_from_file(log_file):
      self.ProcessEvent(event)
      return [total, other]

В spark_analyze я делаю следующее, где lp - переданный объект типа LogProcessor:

@udf(lp.schema)
def lpf(lcm_file):
    lp.ProcessLog(lcm_file)
return (df.withColumn('results', lpf(col('logfile_dir')))
...

Создает длинную трассировку стека Python, которая начинается следующим образом:

/ home / david / libs.zip / pyspark_utils.py в spark_analyze (lp) 132 def lpf (lcm_file): 133 lp.ProcessLog (lcm_file)-> 134 return (df.withColumn ('results', lpf (col ('logfile_dir')))) 135 .withColumn ('log name', spark_get_dataset_name (col ('logfile_dir'))) 136 .select ('log name',' results. * ')

/ usr / hdp / current / spark2-client / python / lib / pyspark.zip / pyspark / sql / functions.py в оболочке (* args) 1955 @functools.wraps (f) 1956 def wrapper (* args): -> 1957 return udf_obj (* args) 1958 1959 wrapper.func = udf_obj.func

и заканчивается на:

/ home / david/libs.zip/pyspark_utils.py в spark_analyze (lp) 132 def lpf (lcm_file): 133 lp.ProcessLog (lcm_file) -> 134 return (df.withColumn ('results', lpf (col ('logfile_dir'))) 135 .withColumn ('имя журнала', spark_get_dataset_name (col ('logfile_dir'))) 136 .select ('log name', 'results. *')

/ usr / hdp / current / spark2-client / python / lib / pyspark.zip /pyspark / sql / functions.py в оболочке (* args) 1955 @ functools.wraps (f) 1956 def wrapper (* args): -> 1957 return udf_obj (* args) 1958 1959 wrapper.func = udf_obj.func

Я провел некоторое тестирование и обнаружил, что все работает нормально, если я определяю свой UDF прямо над местом, где я передаю его col.Я также попытался переопределить ProcessLog до return [0,0] и обнаружил, что проблема не исчезла.Таким образом, проблема, кажется, в том, что я использую переданный в методе класса объект как UDF.Есть ли другой способ, чтобы UDF был методом в классе?Спасибо за любую помощь здесь!

1 Ответ

0 голосов
/ 02 октября 2018

Подход, предложенный Усманом Азаром, может сработать.Я решил эту проблему, просто передав определение UDF в качестве аргумента моей библиотечной функции.

...