Почему concurrent.futures.ProcessPoolExecutor и multiprocessing.pool.Pool не работают с super в Python? - PullRequest
4 голосов
/ 15 июня 2019

Почему следующий код Python, использующий модуль concurrent.futures, зависает навсегда?

import concurrent.futures


class A:

    def f(self):
        print("called")


class B(A):

    def f(self):
        executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
        executor.submit(super().f)


if __name__ == "__main__":
    B().f()

Вызов вызывает невидимое исключение [Errno 24] Too many open files (чтобы увидеть его, замените строкуexecutor.submit(super().f) с print(executor.submit(super().f).exception())).

Однако, при замене ProcessPoolExecutor на ThreadPoolExecutor печатается «вызвано», как и ожидалось.

Почему следующий код Python, использующий *Модуль 1017 * вызывает исключение AssertionError: daemonic processes are not allowed to have children?

import multiprocessing.pool


class A:

    def f(self):
        print("called")


class B(A):

    def f(self):
        pool = multiprocessing.pool.Pool(2)
        pool.apply(super().f)


if __name__ == "__main__":
    B().f()

Тем не менее, при замене Pool на ThreadPool печатается «call», как и ожидалось.

Environment: CPython 3.7,MacOS 10.14.

1 Ответ

5 голосов
/ 16 июня 2019

concurrent.futures.ProcessPoolExecutor и multiprocessing.pool.Pool использует multiprocessing.queues.Queue для передачи объекта рабочей функции от вызывающего к рабочему процессу, Queue использует модуль pickle для сериализации / десериализации, но не удалось правильно обработать связанный объект метода с дочерним элементом экземпляр класса:

f = super().f
print(f)
pf = pickle.loads(pickle.dumps(f))
print(pf)

выходы:

<bound method A.f of <__main__.B object at 0x104b24da0>>
<bound method B.f of <__main__.B object at 0x104cfab38>>

A.f становится B.f, это фактически создает бесконечный рекурсивный вызов от B.f до B.f в рабочем процессе.

pickle.dumps использует __reduce__ метод связанного метода объекта, IMO, его реализация , не учитывает этот сценарий, который не заботится о реальном func объекте, а только пытается получить обратно из экземпляра self obj (B()) с простым именем (f), в результате чего B.f, скорее всего, ошибка.

Хорошая новость заключается в том, что, поскольку мы знаем, в чем проблема, мы могли бы ее исправить, реализовав нашу собственную функцию сокращения, которая пытается воссоздать связанный объект метода из исходной функции (A.f) и экземпляра obj (B()). :

import types
import copyreg
import multiprocessing

def my_reduce(obj):
    return (obj.__func__.__get__, (obj.__self__,))

copyreg.pickle(types.MethodType, my_reduce)
multiprocessing.reduction.register(types.MethodType, my_reduce)

мы могли бы сделать это, потому что связанный метод является дескриптором.

ps: я подал отчет об ошибке .

...