Чистый выход с python многопроцессорностью, иногда зависает - PullRequest
0 голосов
/ 17 февраля 2020

Я пытаюсь реализовать чистый выход в многопроцессорной среде. У меня есть процесс, который читает кадры с камеры, помещает фиксированный номер этого в очередь, только когда он получает сигнал. Сигнал отправляется из другого процесса, который, когда будет готов, отправляет сигнал и начинает обрабатывать кадры, помещая результат в другую очередь. Затем поток читает из очереди результатов и публикует sh сообщение mqtt. Чтобы очистить выход, я пытаюсь обработать сигналы SIGINT и SIGTERM и очищаю очереди, но иногда программное обеспечение иногда зависает, оставляя открытыми некоторые процессы, и мне приходится выполнять SIGKILL. Я попытался упростить код для воспроизведения mwe, надеюсь, не слишком долго. Любая помощь?

import paho.mqtt.client as mqtt
import threading
import time
import numpy as np
import signal as signal
from multiprocessing import Process, Event, Queue


local_connected_flag = 1
local_host = "127.0.0.1"
local_port = 13118
local_topic = "mytopic"

queue = Queue(11)
queue_result = Queue()
exit_event = Event()
ready_event = Event()


def PublicResult():

    if local_connected_flag == 1:
        if not queue_result.empty():
            try:
                aux = queue_result.get(block=False)
                local_client.publish(local_topic, aux, qos=0, retain=False)
                print(aux)
            except Exception as ex:
                print(ex, "publish fault!")
    if not exit_event.is_set():
        t1 = threading.Timer(1, PublicResult)
        t1.start()
    else:
        print("Publish stopped, cleaning up")
        while not queue_result.empty():
            queue_result.get()


t = threading.Timer(1, PublicResult)
t.start()


def on_local_connect(local_client, userdata, flags, rc):
    print("LOCAL CONNECTED!")
    local_client.subscribe(local_topic)
    global local_connected_flag
    local_connected_flag = 1


def on_local_disconnect(client, userdata, rc):
    print("local disconnected...")
    global local_connected_flag
    local_connected_flag = 0

def acquire(queue, exit_event, ready_event):

    count = 0
    while not exit_event.is_set():
        if ready_event.is_set():
            if count < 11:
                frame_depth = np.zeros([100, 100], dtype=np.uint8)
                frame_depth.fill(255)
                if not exit_event.is_set():
                    queue.put(frame_depth.copy(), block=False)
                count += 1
            else:
                ready_event.clear()
                count = 0


def elab(queue, queue_result, exit_event, ready_event):

    tempo = 0
    i = 0
    triplet = np.empty([1, 100, 100, 3])

    ready_event.set()
    while not exit_event.is_set():
        if not queue.empty():
            frame_depth_data = queue.get(block=False)
            if tempo % 5 == 0:
                depth_array = frame_depth_data

                triplet[0, :, :, i] = depth_array
                i += 1
                if i == 3:
                    i = 0
                    c = triplet[0, :, :, 1] + triplet[0, :, :, 2]
                    if not exit_event.is_set():
                        queue_result.put(c, block=False)
                    if not exit_event.is_set():
                        ready_event.set()
                if tempo == 10:
                    tempo = -1
            tempo += 1
    print("Elab stopped")
    while not queue.empty():
        queue.get()
    while not queue_result.empty():
        queue_result.get()


def sig_handler(code, frame):
    print("*****   EXIT, WAIT FOR CLEANING  !!  ****")
    exit_event.set()


if __name__ == '__main__':

    signal.signal(signal.SIGINT, signal.SIG_IGN)
    signal.signal(signal.SIGTERM, signal.SIG_IGN)

    acq = Process(target=acquire, args=(queue, exit_event, ready_event))
    acq.daemon = False
    acq.start()
    pos = Process(target=elab, args=(queue, queue_result, exit_event, ready_event))
    pos.daemon = False
    pos.start()

    signal.signal(signal.SIGINT, sig_handler)
    signal.signal(signal.SIGTERM, sig_handler)

    local_client = mqtt.Client(client_id="pose_task")
    local_client.on_connect = on_local_connect
    local_client.on_disconnect = on_local_disconnect
    while not exit_event.is_set():
        if local_connected_flag == 0:
            try:
                time.sleep(2)
                local_client.connect_async(local_host, port=local_port, keepalive=60)
                local_client.loop_start()
            except Exception as ex:
                print(ex, " local_retry..")
                time.sleep(2)
    print("Stopping loop")
    local_client.loop_stop(True)
    local_client.disconnect()
    print("Loop Stopped")
    acq.join()
    pos.join()
    print("BYE BYE")
...