Отправьте динамически загружаемые функции в ProcessPoolExecutor - PullRequest
1 голос
/ 10 июля 2020

Я хочу отправить динамически загружаемые функции в concurrent.futures.ProcessPoolExecutor. Вот пример. Существует module.py, который содержит функцию.

# Content of module.py

def func():
    return 1

А остальное находится в file.py

# Content of file.py

from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import importlib
from pathlib import Path
import inspect


def load_function_from_module(path):
    spec = importlib.util.spec_from_file_location(path.stem, str(path))
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)

    return mod


def func_top_level():
    return 2


if __name__ == '__main__':
    # Dynamically load function from other module.
    path = Path(__file__).parent / "module.py"
    func = dict(inspect.getmembers(load_function_from_module(path)))["func"]

    with ProcessPoolExecutor(2) as executor:
        future = executor.submit(func)
        future_ = executor.submit(func_top_level)

    # Here comes the exception.
    print(future.result())

Отслеживание:

Traceback (most recent call last):
_pickle.PicklingError: Can't pickle <function func at 0x7f5a548eb050>: it's not the same object as module.func

Решение 1. Обертывание func с функцией верхнего уровня

Поместите def myfunc(): return func() после загрузки функции и отправьте myfunc.

Это работает для этого пример, но как только вы переместите весь блок if __name__ ... в его собственную функцию main(), myfunc() снова станет локальным, и взлом не сработает. Поскольку проблема возникает глубоко в моем приложении, это невозможно.

Попытка 2: Замена pickle на cloudpickle

Мой личный фаворит для решения было бы изменить способ ProcessPoolExecutor сериализации объектов. Например, cloudpickle может сериализовать func.

Хотя этот ответ предполагает, что можно зарегистрировать настраиваемый редуктор, следующие PR и проблемы предполагают, что функция не работает или я просто не могу заменить pickle на cloudpickle.

Большое спасибо за вашу помощь.

1 Ответ

0 голосов
/ 17 июля 2020

Нашел решение с cloudpickle. Начните с содержимого двух файлов. Затем переместите тело под if __name__ ... в новую функцию main(). Это усложняет проблему, потому что первое решение не сработает.

Поскольку pickle не может сериализовать динамически импортированную функцию func() из module.py, мы сериализуем функцию с помощью cloudpickle в байты в основном процессе. Затем мы отправим функцию, которая принимает байты и десериализует функцию. Затем функция может быть выполнена. Эти две функции изменены или добавлены:

import cloudpickle


def main():
    # Dynamically load function from other module.
    path = Path(__file__).parent / "module.py"
    func = dict(inspect.getmembers(load_function_from_module(path)))["func"]

    with ProcessPoolExecutor(2) as executor:
        bytes_ = cloudpickle.dumps(func)
        future = executor.submit(deserialize_and_execute, bytes_)
        future_ = executor.submit(func_top_level)

    # Here comes the exception.
    print(future.result())


def deserialize_and_execute(bytes_):
    func = cloudpickle.loads(bytes_)
    func()
...