ранний выход из мультипроцессинга. Pool.map (повышение в дочернем процессе не работает) - PullRequest
1 голос
/ 30 января 2020

Мое воспроизведение неверно, как отмечено в Ответ Ругнара . Я оставляю код в основном как есть, так как я не уверен, где это находится между , уточняя и изменив значение .

У меня есть несколько тысяч заданий, которые мне нужно выполнить, и я хотел бы, чтобы любые ошибки немедленно останавливали выполнение. Я обертываю задачу в try / exceptraise, чтобы можно было регистрировать ошибку (без всего шума многопроцессорной обработки / многопоточности), а затем повторно вызывать. Это не убивает основной процесс.

Что происходит, и как я могу получить ранний выход, который я ищу? sys.exit(1) в дочерних взаимоблокировках, перенос функции try / exceptraise в еще одну функцию также не работает.

$ python3 mp_reraise.py
(0,)
(1,)
(2,)
(3,)
(4,)
(5,)
(6,)
(7,)
(8,)
(9,)
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "mp_reraise.py", line 5, in f_reraise
    raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mp_reraise.py", line 14, in <module>
    test_reraise()
  File "mp_reraise.py", line 12, in test_reraise
    p.map(f_reraise, range(10))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
Exception: (0,)

mp_reraise.py

import multiprocessing

def f_reraise(*args):
    try:
        raise Exception(args)
    except Exception as e:
        print(e)
        raise

def test_reraise():
    with multiprocessing.Pool() as p:
        p.map(f_reraise, range(10))

test_reraise()

Если я не поймаю и не ререйзу, выполнение останавливается рано, как и ожидалось: [это фактически не останавливается, согласно ответу Ругнара]

$ python3 mp_raise.py 
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "mp_raise.py", line 4, in f_raise
    raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mp_raise.py", line 10, in <module>
    test_raise()
  File "mp_raise.py", line 8, in test_raise
    p.map(f_raise, range(10))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
Exception: (0,)  

mp_raise.py

import multiprocessing

def f_raise(*args):
    # missing print, which would demonstrate that
    # this actually does not stop early
    raise Exception(args)

def test_raise():
    with multiprocessing.Pool() as p:
        p.map(f_raise, range(10))

test_raise()

1 Ответ

1 голос
/ 30 января 2020

В вашем mp_raise.py вы ничего не печатаете, поэтому не видите, сколько работ было выполнено. Я добавил print и обнаружил, что пул видит исключение дочернего элемента только тогда, когда итератор заданий исчерпан. Так что это никогда не останавливается рано.

Если вам нужно остановиться рано после исключения, попробуйте это

import time
import multiprocessing as mp


def f_reraise(i):
    if abort.is_set():  # cancel job if abort happened
        return
    time.sleep(i / 1000)  # add sleep so jobs are not instant, like in real life
    if abort.is_set():  # probably we need stop job in the middle of execution if abort happened
        return
    print(i)
    try:
        raise Exception(i)
    except Exception as e:
        abort.set()
        print('error:', e)
        raise


def init(a):
    global abort
    abort = a


def test_reraise():
    _abort = mp.Event()

    # jobs should stop being fed to the pool when abort happened
    # so we wrap jobs iterator this way
    def pool_args():
        for i in range(100):
            if not _abort.is_set():
                yield i

    # initializer and init is a way to share event between processes
    # thanks to https://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes
    with mp.Pool(8, initializer=init, initargs=(_abort,)) as p:
        p.map(f_reraise, pool_args())


if __name__ == '__main__':
    test_reraise()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...