Pool.apply_async (): вложенная функция не выполняется - PullRequest
2 голосов
/ 11 июня 2019

Я знакомлюсь с модулем Python multiprocessing.Следующий код работает, как и ожидалось:

#outputs 0 1 2 3
from multiprocessing import Pool
def run_one(x):
    print x
    return

pool = Pool(processes=12)
for i in range(4):
    pool.apply_async(run_one, (i,))
pool.close()
pool.join() 

Теперь, однако, если я обертываю функцию вокруг приведенного выше кода, операторы print не выполняются (или вывод перенаправляется по крайней мере):

#outputs nothing
def run():
    def run_one(x):
        print x
        return    

    pool = Pool(processes=12)
    for i in range(4):    
        pool.apply_async(run_one, (i,))
    pool.close()
    pool.join()

Если я переместу определение run_one за пределы run, выход снова будет ожидаемым, когда я звоню run():

#outputs 0 1 2 3
def run_one(x):
    print x
    return

def run():    
    pool = Pool(processes=12)
    for i in range(4):       
        pool.apply_async(run_one, (i,))
    pool.close()
    pool.join() 

Что яздесь не хватает?Почему второй фрагмент ничего не печатает?Если я просто вызываю функцию run_one(i) вместо использования apply_async, все три кода выдают одинаковые.

1 Ответ

2 голосов
/ 11 июня 2019

Пул должен выбрать (сериализовать) все, что он посылает своим рабочим процессам.Активирование на самом деле сохраняет только имя функции, а для восстановления требуется повторный импорт функции по имени.Чтобы это работало, функция должна быть определена на верхнем уровне, вложенные функции не будут импортироваться дочерним элементом, и уже при попытке их выбора возникает исключение:

from multiprocessing.connection import _ForkingPickler

def run():
    def foo(x):
        pass
    _ForkingPickler.dumps(foo)  # multiprocessing custom pickler;
                                # same effect with pickle.dumps(foo)

run()
# Out:
Traceback (most recent call last):
...
AttributeError: Can't pickle local object 'run.<locals>.foo'

Причина, по которой выне вижу исключений, потому что Pool уже начинает перехватывать исключения во время задач выбора в родительском объекте и только повторно вызывает их, когда вы вызываете .get() для объекта AsyncResult, который вы сразу получаете, когда вызываете pool.apply_async().

Вот почему (с Python 2) вам лучше всегда использовать его так, даже если ваша целевая функция ничего не возвращает (по-прежнему возвращает неявное None):

    results = [pool.apply_async(foo, (i,)) for i in range(4)]
    # `pool.apply_async()` immediately returns AsyncResult (ApplyResult) object
    for res in results:
        res.get()

В Python 3 есть error_callback -параметр для асинхронных методов пула, который вы можете использовать вместо этого для обработки исключений.

...