Как эффективно и динамически зарегистрировать Spark UDF от Python - PullRequest
1 голос
/ 30 января 2020

Мне нужно реализовать динамическую c функцию «принеси свой код» для регистрации пользовательских функций, созданных вне моего собственного кода. Это контейнер, и точка входа - стандартный интерпретатор python (не pypsark). Основываясь на настройках конфигурации при запуске, спарк-контейнер инициализирует себя примерно так: Мы не знаем заранее определения функции, но мы можем предварительно установить зависимости, если необходимо, на контейнере.

def register_udf_module(udf_name, zip_or_py_path, file_name, function_name, return_type="int"):
    # Psueduocode:
    global sc, spark

    sc.addPyFile(zip_or_py_path)
    module_ref = some_inspect_function_1(zip_or_py_path)
    file_ref = module_ref[file_name]
    function_ref = module_ref[function_name]
    spark.udf.register(udf_name, function_ref, return_type)

Кажется, я не могу найти никаких ссылок на то, как это сделать sh это. И, в частности, случай использования заключается в том, что после инициализации искрового кластера путем его запуска пользователям потребуется этот UDF, доступный для функций SQL (через соединение Thrift JDB C). Я не знаю никакого интерфейса между соединением JDBC / SQL и возможностью регистрации UDF, поэтому он должен быть запущен для запросов SQL, и я не могу ожидать, что позже пользователь вызовет 'spark .udf.register 'на их стороне.

1 Ответ

0 голосов
/ 03 февраля 2020

Решение, которое я нашел, заключается в том, чтобы при запуске запустить переменную среды, которая указывает на каталог UDF, а затем загрузить и проверить каждый файл .py по этому пути, загружая все функции, найденные как функции UDF в spark.

Пример рабочего кода ниже:

def init_spark():
    global sc

    # Init spark (nothing special here)
    conf = SparkConf()
    spark = (
        SparkSession.builder.config(conf=conf)
        .master("local")
        .appName("Python Spark")
        .enableHiveSupport()
        .getOrCreate()
    )

    if "SPARK_UDFS_PATH" in os.environ:
        add_udf_module(os.environ.get("SPARK_UDFS_PATH"))


def add_udf_module(module_dir=None):
    global sc

    from inspect import getmembers, isfunction

    module_dir = os.path.realpath(module_dir)
    if not os.path.isdir(module_dir):
        raise ValueError(f"Folder '{module_dir}' does not exist.")
    for file in io.list_files(module_dir):
        if file.endswith(".py"):
            module = path_import(file)
            for member in getmembers(module):
                if isfunction(member[1]):
                    logging.info(f"Found module function: {member}")
                    func_name, func = member[0], member[1]
                    if func_name[:1] != "_" and func_name != "udf":
                        logging.info(f"Registering UDF '{func_name}':\n{func.__dict__}")
                        spark.udf.register(func_name, func)

def path_import(absolute_file_path):
    module_name = os.path.basename(absolute_file_path)
    module_name = ".".join(module_name.split(".")[:-1]) # removes '.py'
    spec = importlib.util.spec_from_file_location(module_name, absolute_file_path)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)
    return module

Связанный:

Образец UDF python файл:

from pyspark.sql.functions import udf
from pyspark.sql import types


@udf(types.Long())
def times_five(value):
    return value * 5

@udf("long")
def times_six(value):
    return value * 6

Образец SQL:

SELECT times_six(7) AS the_answer
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...