Многопроцессорный пул зависает, если дочерний процесс убит - PullRequest
1 голос
/ 29 апреля 2020

Я запустил пул рабочих процессов и отправил кучу задач. Системе не хватило памяти, и oomkiller убил один из рабочих процессов. Родительский процесс просто завис там, ожидая, пока задачи закончатся sh, и не вернется.

Вот работающий пример, который воспроизводит проблему. Вместо того чтобы ждать, пока oomkiller завершит работу одного из рабочих процессов, я нахожу идентификаторы всех рабочих процессов и указываю первой задаче уничтожить этот процесс. (Вызов ps не будет работать во всех операционных системах.)

import os
import signal
from multiprocessing import Pool
from random import choice
from subprocess import run, PIPE
from time import sleep


def run_task(task):
    target_process_id, n = task
    print(f'Processing item {n} in process {os.getpid()}.')
    delay = n + 1
    sleep(delay)
    if n == 0:
        print(f'Item {n} killing process {target_process_id}.')
        os.kill(target_process_id, signal.SIGKILL)
    else:
        print(f'Item {n} finished.')
    return n, delay


def main():
    print('Starting.')
    pool = Pool()

    ps_output = run(['ps', '-opid', '--no-headers', '--ppid', str(os.getpid())],
                    stdout=PIPE, encoding='utf8')
    child_process_ids = [int(line) for line in ps_output.stdout.splitlines()]
    target_process_id = choice(child_process_ids[1:-1])

    tasks = ((target_process_id, i) for i in range(10))
    for n, delay in pool.imap_unordered(run_task, tasks):
        print(f'Received {delay} from item {n}.')

    print('Closing.')
    pool.close()
    pool.join()
    print('Done.')


if __name__ == '__main__':
    main()

Когда я запускаю его в системе с восемью процессорами, я вижу такой вывод:

Starting.
Processing item 0 in process 303.
Processing item 1 in process 304.
Processing item 2 in process 305.
Processing item 3 in process 306.
Processing item 4 in process 307.
Processing item 5 in process 308.
Processing item 6 in process 309.
Processing item 7 in process 310.
Item 0 killing process 308.
Processing item 8 in process 303.
Received 1 from item 0.
Processing item 9 in process 315.
Item 1 finished.
Received 2 from item 1.
Item 2 finished.
Received 3 from item 2.
Item 3 finished.
Received 4 from item 3.
Item 4 finished.
Received 5 from item 4.
Item 6 finished.
Received 7 from item 6.
Item 7 finished.
Received 8 from item 7.
Item 8 finished.
Received 9 from item 8.
Item 9 finished.
Received 10 from item 9.

Вы видите, что элемент 5 никогда не возвращается, и пул просто ждет вечно.

Как я могу заставить родительский процесс замечать, когда дочерний процесс убит?

1 Ответ

1 голос
/ 29 апреля 2020

Эта проблема описана в Python ошибка 9205 , но они решили исправить ее в concurrent.futures модуле вместо модуля multiprocessing. Чтобы воспользоваться преимуществами исправления, переключитесь на более новый пул процессов.

import os
import signal
from concurrent.futures.process import ProcessPoolExecutor
from random import choice
from subprocess import run, PIPE
from time import sleep


def run_task(task):
    target_process_id, n = task
    print(f'Processing item {n} in process {os.getpid()}.')
    delay = n + 1
    sleep(delay)
    if n == 0:
        print(f'Item {n} killing process {target_process_id}.')
        os.kill(target_process_id, signal.SIGKILL)
    else:
        print(f'Item {n} finished.')
    return n, delay


def main():
    print('Starting.')
    pool = ProcessPoolExecutor()

    pool.submit(lambda: None)  # Force the pool to launch some child processes.
    ps_output = run(['ps', '-opid', '--no-headers', '--ppid', str(os.getpid())],
                    stdout=PIPE, encoding='utf8')
    child_process_ids = [int(line) for line in ps_output.stdout.splitlines()]
    target_process_id = choice(child_process_ids[1:-1])

    tasks = ((target_process_id, i) for i in range(10))
    for n, delay in pool.map(run_task, tasks):
        print(f'Received {delay} from item {n}.')

    print('Closing.')
    pool.shutdown()
    print('Done.')


if __name__ == '__main__':
    main()

Теперь при его запуске выдается четкое сообщение об ошибке.

Starting.
Processing item 0 in process 549.
Processing item 1 in process 550.
Processing item 2 in process 552.
Processing item 3 in process 551.
Processing item 4 in process 553.
Processing item 5 in process 554.
Processing item 6 in process 555.
Processing item 7 in process 556.
Item 0 killing process 556.
Processing item 8 in process 549.
Received 1 from item 0.
Traceback (most recent call last):
  File "/home/don/.config/JetBrains/PyCharm2020.1/scratches/scratch2.py", line 42, in <module>
    main()
  File "/home/don/.config/JetBrains/PyCharm2020.1/scratches/scratch2.py", line 33, in main
    for n, delay in pool.map(run_task, tasks):
  File "/usr/lib/python3.7/concurrent/futures/process.py", line 483, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 598, in result_iterator
    yield fs.pop().result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...