Альтернативы использованию вложенных функций в PySpark mapPartitions при использовании Cython? - PullRequest
1 голос
/ 13 мая 2019

У меня есть построчная операция, которую я хочу выполнить на моем фрейме данных, которая принимает в качестве параметров некоторые фиксированные переменные. Единственный способ, которым я знаю, как это сделать, это использовать вложенные функции. Я пытаюсь использовать Cython для компиляции части моего кода, затем вызываю функцию Cython из mapPartitions, но возникает ошибка PicklingError: Can't pickle <cyfunction outer_function.<locals>._nested_function at 0xfffffff>.

При использовании чистого Python я делаю

def outer_function(fixed_var_1, fixed_var_2):
    def _nested_function(partition):
        for row in partition:
            yield dosomething(row, fixed_var_1, fixed_var_2)
    return _nested_function

output_df = input_df.repartition(some_col).rdd \
    .mapPartitions(outer_function(a, b))

Прямо сейчас у меня есть outer_function, определенный в отдельном файле, как это

# outer_func.pyx

def outer_function(fixed_var_1, fixed_var_2):
    def _nested_function(partition):
        for row in partition:
            yield dosomething(row, fixed_var_1, fixed_var_2)
    return _nested_function

и это

# runner.py

from outer_func import outer_function

output_df = input_df.repartition(some_col).rdd \
    .mapPartitions(outer_function(a, b))

И это выдает ошибку травления выше.

Я посмотрел на https://docs.databricks.com/user-guide/faq/cython.html и попытался получить outer_function. Тем не менее, возникает та же ошибка. Проблема в том, что вложенная функция не появляется в глобальном пространстве модуля, поэтому ее нельзя найти и сериализовать.

Я тоже пытался это сделать

def outer_function(fixed_var_1, fixed_var_2):
    global _nested_function
    def _nested_function(partition):
        for row in partition:
            yield dosomething(row, fixed_var_1, fixed_var_2)
    return _nested_function

Это выдает другую ошибку AttributeError: 'module' object has no attribute '_nested_function'.

Есть ли способ не использовать вложенную функцию в этом случае? Или есть другой способ сделать вложенную функцию "сериализуемой"?

Спасибо!

РЕДАКТИРОВАТЬ: Я также пытался сделать

# outer_func.pyx

class PartitionFuncs:

    def __init__(self, fixed_var_1, fixed_var_2):
        self.fixed_var_1 = fixed_var_1
        self.fixed_var_2 = fixed_var_2

    def nested_func(self, partition):
        for row in partition:
            yield dosomething(row, self.fixed_var_1, self.fixed_var_2)
# main.py

from outer_func import PartitionFuncs

p_funcs = PartitionFuncs(a, b)
output_df = input_df.repartition(some_col).rdd \
    .mapPartitions(p_funcs.nested_func)

И все же я получаю PicklingError: Can't pickle <cyfunction PartitionFuncs.nested_func at 0xfffffff>. Ну что ж, идея не сработала.

1 Ответ

1 голос
/ 17 мая 2019

Это ответ наполовину, потому что, когда я попробовал ваш class PartitionFuncs метод p_funcs.nested_func отлично для маринованных / не засоленных (хотя я не пытался объединить его с PySpark), поэтому необходимо ли решение ниже может зависеть от вашей версии / платформы Python и т. д. Pickle должен поддерживать связанные методы из Python 3.4 , однако похоже, что PySpark заставляет протокол pickle равняться 3 , что остановит эту работу. Могут быть способы изменить это, но я их не знаю.

Известно, что вложенные функции не могут быть перехвачены, поэтому такой подход определенно работает. Классовый подход правильный.

Мое предложение в комментариях состояло в том, чтобы просто попробовать выбрать класс, а не связанную функцию. Чтобы это работало, экземпляр класса должен вызываться, поэтому вы переименовываете свою функцию в __call__

class PartitionFuncs:
    def __init__(self, fixed_var_1, fixed_var_2):
        self.fixed_var_1 = fixed_var_1
        self.fixed_var_2 = fixed_var_2

    def __call__(self, partition):
        for row in partition:
            yield dosomething(row, self.fixed_var_1, self.fixed_var_2)

Это зависит от того, могут ли переменные fixed_var быть выбраны по умолчанию. Если это не так, вы можете написать пользовательские методы сохранения и загрузки, как описано в документации к серверу .

Как вы указали в своем комментарии, это означает, что вам нужен отдельный класс для каждой определенной вами функции. Опции здесь включают наследование, поэтому, имея отдельный класс PickleableData, каждый из классов Func может содержать ссылку.

...