Многопроцессорная обработка Python: прервать отображение при первой дочерней ошибке - PullRequest
0 голосов
/ 11 сентября 2018

Как правильно прервать многопроцессорную обработку, когда один из детей прерывает работу и / или выдает исключение?

Я нашел несколько вопросов по этому поводу ( универсальная обработка многопроцессорных ошибок , как закрыть многопроцессорный пул при исключении, но без ответа , ...), но нет четкого ответа о том, как остановите многопроцессорную обработку для дочернего исключения.

Например, я ожидаю следующий код:

def f(x):
    sleep(x)
    print(f"f({x})")
    return 1.0 / (x - 2)


def main():
    with Pool(4) as p:
        try:
            r = p.map(f, range(7))
        except Exception as e:
            print(f"oops: {e}")
            p.close()
            p.terminate()
    print("end")


if __name__ == '__main__':
    main()

Для вывода:

f(0)
f(1)
f(2)
oops: float division by zero
end

Вместо этого он применяет функцию f ко всем элементам перед обнаружением / обработкой исключения:

f(0)
f(1)
f(2)
f(4)
f(3)
f(5)
f(6)
oops: float division by zero
end

Нет ли способа поймать исключение напрямую?

1 Ответ

0 голосов
/ 12 сентября 2018

Я думаю, вам понадобится apply_async для этого, так что вы можете воздействовать на каждый отдельный результат, а не на совокупный результат. pool.apply_async предлагает параметр error_callback, который можно использовать для регистрации вашего обработчика ошибок. apply_async не блокирует, поэтому вам нужно join() пул. Я также использую флаг terminated, чтобы знать, когда результаты могут быть обработаны в обычном случае, если исключение не произошло.

from time import sleep
from multiprocessing import Pool

def f(x):
    sleep(x)
    print(f"f({x})")
    return 1.0 / (x - 2)

def on_error(e):
    global terminated
    terminated = True
    pool.terminate()
    print(f"oops:{e}")


def main():
    global pool
    global terminated

    terminated = False

    pool = Pool(4)
    results = [pool.apply_async(f, (x,), error_callback=on_error)
               for x in range(7)]
    pool.close()
    pool.join()

    if not terminated:
        for r in results:
            print(r.get())

    print("end")


if __name__ == '__main__':
    main()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...