Я хочу отправить динамически загружаемые функции в concurrent.futures.ProcessPoolExecutor
. Вот пример. Существует module.py
, который содержит функцию.
# Content of module.py
def func():
return 1
А остальное находится в file.py
# Content of file.py
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import importlib
from pathlib import Path
import inspect
def load_function_from_module(path):
spec = importlib.util.spec_from_file_location(path.stem, str(path))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def func_top_level():
return 2
if __name__ == '__main__':
# Dynamically load function from other module.
path = Path(__file__).parent / "module.py"
func = dict(inspect.getmembers(load_function_from_module(path)))["func"]
with ProcessPoolExecutor(2) as executor:
future = executor.submit(func)
future_ = executor.submit(func_top_level)
# Here comes the exception.
print(future.result())
Отслеживание:
Traceback (most recent call last):
_pickle.PicklingError: Can't pickle <function func at 0x7f5a548eb050>: it's not the same object as module.func
Решение 1. Обертывание func
с функцией верхнего уровня
Поместите def myfunc(): return func()
после загрузки функции и отправьте myfunc
.
Это работает для этого пример, но как только вы переместите весь блок if __name__ ...
в его собственную функцию main()
, myfunc()
снова станет локальным, и взлом не сработает. Поскольку проблема возникает глубоко в моем приложении, это невозможно.
Попытка 2: Замена pickle
на cloudpickle
Мой личный фаворит для решения было бы изменить способ ProcessPoolExecutor
сериализации объектов. Например, cloudpickle
может сериализовать func
.
Хотя этот ответ предполагает, что можно зарегистрировать настраиваемый редуктор, следующие PR и проблемы предполагают, что функция не работает или я просто не могу заменить pickle
на cloudpickle
.
Большое спасибо за вашу помощь.