Очередь и сигнал
Одна из возможностей - зарегистрировать обработчик сигнала и использовать его для передачи значения часового.
В Unix вы можете обрабатывать SIGCHLD
в родительском, но это не вариант в вашем случае. По сигнальному модулю документы :
В Windows signal () может вызываться только с SIGABRT, SIGFPE, SIGILL, SIGINT, SIGSEGV, SIGTERM или SIGBREAK.
Не уверен, что если убить его с помощью диспетчера задач, это приведет к SIGTERM
, но вы можете попробовать.
Для обработки SIGTERM
вам необходимо зарегистрировать обработчик сигнала у ребенка.
import os
import sys
import time
import signal
from functools import partial
from multiprocessing import Process, Queue
SENTINEL = None
def _sigterm_handler(signum, frame, queue):
print("received SIGTERM")
queue.put(SENTINEL)
sys.exit()
def register_sigterm(queue):
global _sigterm_handler
_sigterm_handler = partial(_sigterm_handler, queue=queue)
signal.signal(signal.SIGTERM, _sigterm_handler)
def some_func(q):
register_sigterm(q)
print(os.getpid())
for i in range(30):
time.sleep(1)
q.put(f'msg_{i}')
if __name__ == '__main__':
q = Queue()
p = Process(target=some_func, args=(q,))
p.start()
for msg in iter(q.get, SENTINEL):
print(msg)
p.join()
Пример вывода:
12273
msg_0
msg_1
msg_2
msg_3
received SIGTERM
Process finished with exit code 0
Queue & Process.is_alive ()
Даже если это работает с диспетчером задач, ваш вариант использования звучит так, как будто вы не можете исключить принудительные убийства, поэтому я думаю, что вам лучше использовать подход, который не зависит от сигналов.
Вы можете проверить в цикле, если ваш процесс p.is_alive()
, позвонить queue.get()
с указанным timeout
и обработать Empty
исключения:
import os
import time
from queue import Empty
from multiprocessing import Process, Queue
def some_func(q):
print(os.getpid())
for i in range(30):
time.sleep(1)
q.put(f'msg_{i}')
if __name__ == '__main__':
q = Queue()
p = Process(target=some_func, args=(q,))
p.start()
while p.is_alive():
try:
msg = q.get(timeout=0.1)
except Empty:
pass
else:
print(msg)
p.join()
Было бы также возможно избежать исключения, но я бы не рекомендовал это, потому что вы не тратите свое время ожидания "в очереди", следовательно, уменьшая отзывчивость:
while p.is_alive():
if not q.empty():
msg = q.get_nowait()
print(msg)
time.sleep(0.1)
Pipe & Process.is_alive ()
Если вы намереваетесь использовать одно соединение на каждого ребенка, однако можно было бы использовать канал вместо очереди. Это более производительно, чем очередь
(который монтируется на трубе), и вы можете использовать multiprocessing.connection.wait
для ожидания готовности сразу нескольких объектов.
multiprocessing.connection.wait (object_list, timeout = None)
Подождите, пока объект в object_list будет готов. Возвращает список тех объектов в object_list, которые готовы. Если тайм-аут - это число с плавающей точкой, то вызов блокируется не более, чем на несколько секунд. Если время ожидания равно None, оно блокируется на неограниченный период Отрицательный тайм-аут эквивалентен нулевому тайм-ауту.
Как для Unix, так и для Windows объект может появиться в object_list, если он является читаемым объектом Connection;
подключенный и читаемый объект socket.socket; или же
атрибут sentinel объекта Process.
Объект соединения или сокета готов, когда имеются данные, доступные для чтения из него, или другой конец был закрыт.
Unix : ожидание (object_list, timeout) почти эквивалентно select.select (object_list, [], [], timeout). Разница в том, что если select.select () прерван сигналом, он может вызвать OSError с номером ошибки EINTR, тогда как wait () не будет.
Windows : элемент в object_list должен быть либо целочисленным дескриптором, который является ожидаемым (согласно определению, используемому в документации функции Win32 WaitForMultipleObjects ()), либо это может быть объект с файловым именем () метод, который возвращает дескриптор сокета или дескриптор канала. (Обратите внимание, что дескрипторы труб и дескрипторы не являются ручками ожидания).
Новое в версии 3.3. документы
Это можно использовать для одновременного ожидания атрибута sentinel процесса и родительского конца канала.
import os
import time
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait
def some_func(conn_write):
print(os.getpid())
for i in range(30):
time.sleep(1)
conn_write.send(f'msg_{i}')
if __name__ == '__main__':
conn_read, conn_write = Pipe(duplex=False)
p = Process(target=some_func, args=(conn_write,))
p.start()
while p.is_alive():
wait([p.sentinel, conn_read]) # block-wait until something gets ready
if conn_read.poll(): # check if something can be received
print(conn_read.recv())
p.join()