Правильная обработка завершения дочернего процесса в python - PullRequest
0 голосов
/ 31 октября 2018

Я использую Python 3.7 и следую этой документации . Я хочу иметь процесс, который должен порождать дочерний процесс, ждать, пока он завершит задачу, и получить некоторую информацию обратно. Я использую следующий код:

if __name__ == '__main__':
    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()
    print q.get()
    p.join()

Когда дочерний процесс завершается правильно, проблем нет, и он прекрасно работает, но проблема начинается, когда мой дочерний процесс завершается до его завершения. В этом случае мое приложение висит на ожидании.

Задание тайм-аута для q.get() и p.join() не полностью решает проблему, потому что я хочу немедленно узнать, что дочерний процесс умер, и не ждать истечения времени ожидания.

Другая проблема заключается в том, что тайм-аут на q.get() приводит к исключению, которого я предпочитаю избегать.

Может кто-нибудь предложить мне более элегантный способ преодоления этих проблем?

1 Ответ

0 голосов
/ 01 ноября 2018

Очередь и сигнал

Одна из возможностей - зарегистрировать обработчик сигнала и использовать его для передачи значения часового. В Unix вы можете обрабатывать SIGCHLD в родительском, но это не вариант в вашем случае. По сигнальному модулю документы :

В Windows signal () может вызываться только с SIGABRT, SIGFPE, SIGILL, SIGINT, SIGSEGV, SIGTERM или SIGBREAK.

Не уверен, что если убить его с помощью диспетчера задач, это приведет к SIGTERM, но вы можете попробовать.

Для обработки SIGTERM вам необходимо зарегистрировать обработчик сигнала у ребенка.

import os
import sys
import time
import signal
from functools import partial
from multiprocessing import Process, Queue

SENTINEL = None


def _sigterm_handler(signum, frame, queue):
    print("received SIGTERM")
    queue.put(SENTINEL)
    sys.exit()


def register_sigterm(queue):
    global _sigterm_handler
    _sigterm_handler = partial(_sigterm_handler, queue=queue)
    signal.signal(signal.SIGTERM, _sigterm_handler)


def some_func(q):
    register_sigterm(q)
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        q.put(f'msg_{i}')


if __name__ == '__main__':

    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()
    for msg in iter(q.get, SENTINEL):
        print(msg)
    p.join()

Пример вывода:

12273
msg_0
msg_1
msg_2
msg_3
received SIGTERM

Process finished with exit code 0

Queue & Process.is_alive ()

Даже если это работает с диспетчером задач, ваш вариант использования звучит так, как будто вы не можете исключить принудительные убийства, поэтому я думаю, что вам лучше использовать подход, который не зависит от сигналов.

Вы можете проверить в цикле, если ваш процесс p.is_alive(), позвонить queue.get() с указанным timeout и обработать Empty исключения:

import os
import time
from queue import Empty
from multiprocessing import Process, Queue

def some_func(q):
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        q.put(f'msg_{i}')


if __name__ == '__main__':

    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()

    while p.is_alive():
        try:
            msg = q.get(timeout=0.1)
        except Empty:
            pass
        else:
            print(msg)

    p.join()

Было бы также возможно избежать исключения, но я бы не рекомендовал это, потому что вы не тратите свое время ожидания "в очереди", следовательно, уменьшая отзывчивость:

while p.is_alive():
    if not q.empty():
        msg = q.get_nowait()
        print(msg)
        time.sleep(0.1)


Pipe & Process.is_alive ()

Если вы намереваетесь использовать одно соединение на каждого ребенка, однако можно было бы использовать канал вместо очереди. Это более производительно, чем очередь (который монтируется на трубе), и вы можете использовать multiprocessing.connection.wait для ожидания готовности сразу нескольких объектов.

multiprocessing.connection.wait (object_list, timeout = None)

Подождите, пока объект в object_list будет готов. Возвращает список тех объектов в object_list, которые готовы. Если тайм-аут - это число с плавающей точкой, то вызов блокируется не более, чем на несколько секунд. Если время ожидания равно None, оно блокируется на неограниченный период Отрицательный тайм-аут эквивалентен нулевому тайм-ауту.

Как для Unix, так и для Windows объект может появиться в object_list, если он является читаемым объектом Connection; подключенный и читаемый объект socket.socket; или же атрибут sentinel объекта Process. Объект соединения или сокета готов, когда имеются данные, доступные для чтения из него, или другой конец был закрыт.

Unix : ожидание (object_list, timeout) почти эквивалентно select.select (object_list, [], [], timeout). Разница в том, что если select.select () прерван сигналом, он может вызвать OSError с номером ошибки EINTR, тогда как wait () не будет.

Windows : элемент в object_list должен быть либо целочисленным дескриптором, который является ожидаемым (согласно определению, используемому в документации функции Win32 WaitForMultipleObjects ()), либо это может быть объект с файловым именем () метод, который возвращает дескриптор сокета или дескриптор канала. (Обратите внимание, что дескрипторы труб и дескрипторы не являются ручками ожидания).

Новое в версии 3.3. документы

Это можно использовать для одновременного ожидания атрибута sentinel процесса и родительского конца канала.

import os
import time
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait


def some_func(conn_write):
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        conn_write.send(f'msg_{i}')


if __name__ == '__main__':

    conn_read, conn_write = Pipe(duplex=False)
    p = Process(target=some_func, args=(conn_write,))
    p.start()

    while p.is_alive():
        wait([p.sentinel, conn_read])  # block-wait until something gets ready
        if conn_read.poll():  # check if something can be received
            print(conn_read.recv())
    p.join()
...