Тупик в коде concurrent.futures - PullRequest
       20

Тупик в коде concurrent.futures

6 голосов
/ 09 февраля 2012

Я пытался распараллелить некоторый код, используя concurrent.futures.ProcessPoolExecutor, но продолжал иметь странные тупики, которые не возникают с ThreadPoolExecutor. Минимальный пример:

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        executor.submit(test)

В python 3.2.2 (в 64-битной Ubuntu), похоже, он постоянно зависает после отправки всех заданий - и это происходит, когда число отправляемых заданий больше, чем количество работников. Если я заменю ProcessPoolExecutor на ThreadPoolExecutor, это будет работать безупречно.

В качестве попытки расследования я дал каждому будущему обратный вызов для вывода значения i:

from concurrent import futures

def test():
    pass

with futures.ProcessPoolExecutor(4) as executor:
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test)

        def callback(f):
            print('callback {}'.format(i))
        future.add_done_callback(callback)

Это только смутило меня еще больше - значение i, напечатанное callback, является значением во время его вызова, а не во время его определения (поэтому я никогда не вижу callback 0, но я получить много callback 99 с). Опять же, ThreadPoolExecutor выводит ожидаемое значение.

Задаваясь вопросом, может ли это быть ошибкой, я попробовал последнюю версию Python для разработки. Теперь, по крайней мере, кажется, что код заканчивается, но я все равно получаю неправильное значение i.

Так может кто-нибудь объяснить:

  • что случилось с ProcessPoolExecutor между python 3.2 и текущей версией разработчика, которая, по-видимому, исправила этот тупик

  • почему печатается «неправильное» значение i

РЕДАКТИРОВАТЬ: как jukiewicz указал ниже, конечно, печать i напечатает значение во время вызова обратного вызова, я не знаю, о чем я думал ... если я передам вызываемый объект со значением из i как один из его атрибутов, который работает как ожидалось.

EDIT: немного больше информации: все обратные вызовы выполняются, поэтому похоже, что executor.shutdown (вызывается executor.__exit__) не может сказать, что процессы завершены. Это кажется полностью исправленным в текущем питоне 3.3, но в multiprocessing и concurrent.futures, похоже, было много изменений, поэтому я не знаю, что это исправило. Поскольку я не могу использовать 3.3 (кажется, что он не совместим ни с релизной, ни с dev-версиями numpy), я попытался просто скопировать его многопроцессорные и параллельные пакеты в мою инсталляцию 3.2, которая, кажется, работает нормально. Тем не менее, кажется немного странным, что - насколько я вижу - ProcessPoolExecutor полностью сломан в последней версии релиза, но никто больше не затронут.

1 Ответ

2 голосов
/ 04 марта 2012

Я изменил код следующим образом, что решило обе проблемы. Функция callback была определена как замыкание, поэтому каждый раз будет использовать обновленное значение i. Что касается взаимоблокировки, то это может быть причиной выключения Исполнителя до завершения всей задачи. Ожидание завершения фьючерсов решает и это тоже.

from concurrent import futures

def test(i):
    return i

def callback(f):
    print('callback {}'.format(f.result()))


with futures.ProcessPoolExecutor(4) as executor:
    fs = []
    for i in range(100):
        print('submitting {}'.format(i))
        future = executor.submit(test, i)
        future.add_done_callback(callback)
        fs.append(future)

    for _ in futures.as_completed(fs): pass

ОБНОВЛЕНИЕ: о, извините, я не читал ваши обновления, кажется, это уже решено.

...