Как опросить ActiveMQ, используя stomp.py - PullRequest
0 голосов
/ 03 апреля 2019

Можем ли мы слушать назначение ActiveMQ и непрерывно читать с камеры, используя OpenCV в одном процессе?

Я могу добиться этого, создав новый процесс, который отвечает за чтение с камеры, а основной процесс отвечает за прослушивание ActiveMQ. Это добавляет накладные расходы на дополнительный процесс, который я хотел бы устранить.

Моя попытка ниже и мой вопрос: как мне объединить оба действия в одном процессе?

def Camera(num, msg_queue):
    print('=========== Reading start ==============', num, msg_queue.empty())
    while True:
        msg = msg_queue.get()
        if msg == 'STOP':
            print('>>> Stop reading from CAM <<<')
            break
        # read frame using openCv
    print('=========== Reading stop ==============')
    return


class CamReader(object):

    def __init__(self):
        self.cctv_camera = None
        self.msg_queue = None

    def start(self):
        if self.cctv_camera and self.cctv_camera.is_alive():
            print('Already reading from Camera. NoOp')
            return

        # otherwise, spawn a new thread, make it run.
        self.msg_queue = multiprocessing.Queue()
        self.cctv_camera = multiprocessing.Process(target=Camera, args=(10, self.msg_queue))
        # self.cctv_camera.setName(str(int(time.time())))
        self.cctv_camera.start()
        return

    def stop(self):
        self.msg_queue.put('STOP')
        # wait for closure
        self.cctv_camera.join()
        return


class MyListener(stomp.ConnectionListener):

    def __init__(self, cam_config):
        self.cam_config = cam_config
        self.cam_reader = CamReader(cam_config)

    def on_message(self, headers, message):
        # print('received a message "%s"' % message)
        if message == "START":
            self.cam_reader.start()
        elif message == "STOP":
            self.cam_reader.stop()
        else:
            print('Undefined message in Queue')


conn = None

try:

    def make_amqp_connection():
        global conn
        conn = stomp.Connection()
        conn.set_listener(name='some_name_here',
                          lstnr=MyListener('camera ip here'))
        conn.start()
        conn.connect('admin', 'admin',
                     wait=True,
                     headers={
                         'client-id': 'clientname'
                     })
        conn.subscribe(destination='/queue/foo',
                       id=str(1),
                       ack='auto',
                       headers={'subscription-type': 'MULTICAST',
                                'durable-subscription-name': 'someValue'
                                }
                       )
        time.sleep(10000)
        conn.disconnect()
    make_amqp_connection()
except KeyboardInterrupt as kbi:
    print('Disconnecting...')
    conn.disconnect()

Любое потенциальное решение / предложение приветствуется и не стесняйтесь. Кроме того, я также открыт для многопоточного подхода, поскольку потоки - это действительно отдельный поток выполнения (совершенно новый поток Linux, как это происходит в Java)

...