Предположим, у вас есть файл, назовем его udfs.py
и в нем:
def nested_f(x):
return x + 1
def main_f(x):
return nested_f(x) + 1
Затем вы хотите создать UDF из функции main_f
и запустить его на фрейме данных:
import pyspark.sql.functions as fn
import pandas as pd
pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)
_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()
Это работает нормально, если мы делаем это из того же файла, где определены две функции (udfs.py
). Однако попытка сделать это из другого файла (скажем, main.py
) приводит к ошибке ModuleNotFoundError: No module named ...
:
...
import udfs
_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()
Я заметил, что если я на самом деле вкладываю nested_f
в main_f
, то вот так:
def main_f(x):
def nested_f(x):
return x + 1
return nested_f(x) + 1
все работает нормально. Однако моя цель здесь состоит в том, чтобы логика была хорошо разделена на несколько функций, которые я также могу проверить по отдельности.
Я думаю, это можно решить, отправив файл udfs.py
(или всю заархивированную папку) исполнителям, используя spark.sparkContext.addPyFile('...udfs.py')
. Тем не менее:
- Я нахожу это немного скучным (особенно, если вам нужно архивировать папки и т.д. ...)
- Это не всегда легко / возможно (например,
udfs.py
может использовать множество других модулей, которые затем также должны быть представлены, что приведет к цепной реакции ...)
- Есть некоторые другие неудобства, связанные с
addPyFile
(например, автозагрузка может перестать работать и т. Д.)
Итак, вопрос : есть ли способ сделать все это одновременно:
- логика UDF хорошо разделена на несколько функций Python
- использовать UDF из файла, отличного от того, где определена логика
- не нужно отправлять какие-либо зависимости с помощью
addPyFile
Бонусные баллы за разъяснение, как это работает / почему это не работает!