У меня есть построчная операция, которую я хочу выполнить на моем фрейме данных, которая принимает в качестве параметров некоторые фиксированные переменные. Единственный способ, которым я знаю, как это сделать, это использовать вложенные функции. Я пытаюсь использовать Cython для компиляции части моего кода, затем вызываю функцию Cython из mapPartitions, но возникает ошибка PicklingError: Can't pickle <cyfunction outer_function.<locals>._nested_function at 0xfffffff>
.
При использовании чистого Python я делаю
def outer_function(fixed_var_1, fixed_var_2):
def _nested_function(partition):
for row in partition:
yield dosomething(row, fixed_var_1, fixed_var_2)
return _nested_function
output_df = input_df.repartition(some_col).rdd \
.mapPartitions(outer_function(a, b))
Прямо сейчас у меня есть outer_function
, определенный в отдельном файле, как это
# outer_func.pyx
def outer_function(fixed_var_1, fixed_var_2):
def _nested_function(partition):
for row in partition:
yield dosomething(row, fixed_var_1, fixed_var_2)
return _nested_function
и это
# runner.py
from outer_func import outer_function
output_df = input_df.repartition(some_col).rdd \
.mapPartitions(outer_function(a, b))
И это выдает ошибку травления выше.
Я посмотрел на https://docs.databricks.com/user-guide/faq/cython.html и попытался получить outer_function
. Тем не менее, возникает та же ошибка. Проблема в том, что вложенная функция не появляется в глобальном пространстве модуля, поэтому ее нельзя найти и сериализовать.
Я тоже пытался это сделать
def outer_function(fixed_var_1, fixed_var_2):
global _nested_function
def _nested_function(partition):
for row in partition:
yield dosomething(row, fixed_var_1, fixed_var_2)
return _nested_function
Это выдает другую ошибку AttributeError: 'module' object has no attribute '_nested_function'
.
Есть ли способ не использовать вложенную функцию в этом случае? Или есть другой способ сделать вложенную функцию "сериализуемой"?
Спасибо!
РЕДАКТИРОВАТЬ: Я также пытался сделать
# outer_func.pyx
class PartitionFuncs:
def __init__(self, fixed_var_1, fixed_var_2):
self.fixed_var_1 = fixed_var_1
self.fixed_var_2 = fixed_var_2
def nested_func(self, partition):
for row in partition:
yield dosomething(row, self.fixed_var_1, self.fixed_var_2)
# main.py
from outer_func import PartitionFuncs
p_funcs = PartitionFuncs(a, b)
output_df = input_df.repartition(some_col).rdd \
.mapPartitions(p_funcs.nested_func)
И все же я получаю PicklingError: Can't pickle <cyfunction PartitionFuncs.nested_func at 0xfffffff>
. Ну что ж, идея не сработала.