Вызов другой пользовательской функции Python из UDF Pyspark - PullRequest
2 голосов
/ 15 апреля 2019

Предположим, у вас есть файл, назовем его udfs.py и в нем:

def nested_f(x):
    return x + 1

def main_f(x):
    return nested_f(x) + 1

Затем вы хотите создать UDF из функции main_f и запустить его на фрейме данных:

import pyspark.sql.functions as fn
import pandas as pd

pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)

_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

Это работает нормально, если мы делаем это из того же файла, где определены две функции (udfs.py). Однако попытка сделать это из другого файла (скажем, main.py) приводит к ошибке ModuleNotFoundError: No module named ...:

...
import udfs

_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

Я заметил, что если я на самом деле вкладываю nested_f в main_f, то вот так:

def main_f(x):
    def nested_f(x):
        return x + 1

    return nested_f(x) + 1

все работает нормально. Однако моя цель здесь состоит в том, чтобы логика была хорошо разделена на несколько функций, которые я также могу проверить по отдельности.

Я думаю, это можно решить, отправив файл udfs.py (или всю заархивированную папку) исполнителям, используя spark.sparkContext.addPyFile('...udfs.py'). Тем не менее:

  1. Я нахожу это немного скучным (особенно, если вам нужно архивировать папки и т.д. ...)
  2. Это не всегда легко / возможно (например, udfs.py может использовать множество других модулей, которые затем также должны быть представлены, что приведет к цепной реакции ...)
  3. Есть некоторые другие неудобства, связанные с addPyFile (например, автозагрузка может перестать работать и т. Д.)

Итак, вопрос : есть ли способ сделать все это одновременно:

  • логика UDF хорошо разделена на несколько функций Python
  • использовать UDF из файла, отличного от того, где определена логика
  • не нужно отправлять какие-либо зависимости с помощью addPyFile

Бонусные баллы за разъяснение, как это работает / почему это не работает!

1 Ответ

1 голос
/ 15 апреля 2019

Для небольших (один или два локальных файла) зависимостей вы можете использовать --py-файлы и перечислять их с чем-то большим или большим количеством зависимостей - лучше их упаковать в zip-файл или файл egg.

Файл udfs.py:

def my_function(*args, **kwargs):
    # code

Файл main.py:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from udfs import my_function

sc = SparkContext()
spark = SparkSession(sc)
my_udf = udf(my_function)

df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("my_f", my_udf("..."))

Для запуска:

pyspark --py-files /path/to/udfs.py
# or
spark-submit --py-files /path/to/udfs.py main.py

Если вы написали свой собственный модуль Python или дажесторонние модули (которые не нуждаются в компиляции C), мне лично это нужно было с geoip2, лучше создать файл zip или egg.

# pip with -t install all modules and dependencies in directory `src`
pip install geoip2 -t ./src
# Or from local directory
pip install ./my_module -t ./src

# Best is 
pip install -r requirements.txt -t ./src

# If you need add some additionals files
cp ./some_scripts/* ./src/

# And pack it
cd ./src
zip -r ../libs.zip .
cd ..

pyspark --py-files libs.zip
spark-submit --py-files libs.zip

Будьте осторожны при использовании pyspark --master yarn (возможно, с другими нелокальными опциями мастера), в оболочке pyspark с --py-files:

>>> import sys
>>> sys.path.insert(0, '/path/to/libs.zip')  # You can use relative path: .insert(0, 'libs.zip')
>>> import MyModule  # libs.zip/MyModule

EDIT - Ответ на вопрос о том, как получить функции для исполнителей безaddPyFile () и --py-files:

Необходимо иметь данный файл с функциями для отдельных исполнителей.И достижимый через ПУТЬ env.Поэтому, я бы, вероятно, написал модуль Python, который я затем установил на исполнителей и был доступен в среде.

...