Как отобразить вывод ZMQ в Qt GUI во время выполнения? - PullRequest
1 голос
/ 14 июля 2020

Я пытаюсь взять вывод клиента ZeroMQ и отобразить его через скрипт python конструктора Qt. Код уже получает данные правильно, но я не могу отобразить его в окне GUI и получить его. Он отображает только первую строку. Я пытаюсь использовать потоки и подпроцесс, но это не работает. Я хорошо получаю данные здесь, в функции (потребитель) в классе (Ui_Form)

class Ui_Form(object):
def setupUi(self, Form):

    def consumer(self):
    consumer_id = random.randrange(1, 10005)
    # print("I am consumer #%s" % (consumer_id))
    context = zmq.Context()
    consumer_receiver = context.socket(zmq.PULL)
    consumer_receiver.connect("tcp://127.0.0.1:5557")
    while True:
        buff = consumer_receiver.recv()
        # print(time.time())
        data = np.frombuffer(buff, dtype="float32")
        l = np.where(data == 0)
        data = data[0:l[0][0]:1]
        print(data)

        bandwidth = []
        signals = []
        for i in range(0, len(data), 2):
            signals.append(data[i])
            bandwidth.append(data[i + 1])

        for i in range(0, len(signals)):
            self.textEdit.append(str(signals[i]) + "\n")
            self.textEdit_2.append(str(bandwidth[i]) + "\n")

, и когда я запускаю основной:

if __name__ == "__main__":
   app = QtWidgets.QApplication(sys.argv)
   Form = QtWidgets.QWidget()
   ui = Ui_Form()
   ui.setupUi(Form)
   Form.show()
   ui.consumer()
   sys.exit(app.exec_())

дата получает его правильно, но может ' t показать это.

1 Ответ

0 голосов
/ 15 июля 2020

Объяснение:

Причина проблемы в том, что бесконечный l oop блокирует Qt eventl oop, не позволяя ему обрабатывать такие задачи, как обновление gui, прослушивание событий ОС и т. Д. c.

Решение:

Вы должны реализовать некоторый logi c, который позволяет общаться с ZMQ и не блокирует событие oop, и для этого есть несколько вариантов ( в следующем коде я показываю пример сервера моего теста):

server.py

import random
import time

import zmq

import numpy as np

import logging

logging.basicConfig(level=logging.DEBUG)

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")


while True:
    d = []
    start = random.randint(0, 10)
    for i in range(10):
        x = start + i
        d.append(x)
        d.append(x ** 2)
    arr = np.array(d, dtype="float32")
    buf = arr.tobytes()
    logging.debug(buf)
    socket.send(buf)
    time.sleep(1)

1. Резьба

import sys
import threading

from PyQt5 import QtCore, QtWidgets
import zmq
import numpy as np


class ZMQReceiver(QtCore.QObject):
    dataChanged = QtCore.pyqtSignal(bytes)

    def start(self):
        threading.Thread(target=self._execute, daemon=True).start()

    def _execute(self):
        context = zmq.Context()
        consumer_receiver = context.socket(zmq.PULL)
        consumer_receiver.connect("tcp://127.0.0.1:5557")
        while True:
            buff = consumer_receiver.recv()
            self.dataChanged.emit(buff)


class Widget(QtWidgets.QWidget):
    def __init__(self, parent=None):
        super().__init__(parent)

        self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
        self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)

        lay = QtWidgets.QHBoxLayout(self)
        lay.addWidget(self.logedit_1)
        lay.addWidget(self.logedit_2)

        zmq_receiver = ZMQReceiver(self)
        zmq_receiver.dataChanged.connect(self.on_data_changed)
        zmq_receiver.start()

    @QtCore.pyqtSlot(bytes)
    def on_data_changed(self, buff):
        data = np.frombuffer(buff, dtype="float32")
        bandwidth = []
        signals = []
        for i in range(0, len(data), 2):
            signals.append(data[i])
            bandwidth.append(data[i + 1])

        for sg, bw in zip(signals, bandwidth):
            self.logedit_1.append(str(sg))
            self.logedit_2.append(str(bw))


if __name__ == "__main__":
    app = QtWidgets.QApplication(sys.argv)
    w = Widget()
    w.show()
    sys.exit(app.exec_())

2. QSocketNotifier

import sys

from PyQt5 import QtCore, QtWidgets
import zmq
import numpy as np


class Widget(QtWidgets.QWidget):
    def __init__(self, parent=None):
        super().__init__(parent)

        self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
        self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)

        lay = QtWidgets.QHBoxLayout(self)
        lay.addWidget(self.logedit_1)
        lay.addWidget(self.logedit_2)

        context = zmq.Context()
        self.consumer_receiver = context.socket(zmq.PULL)
        self.consumer_receiver.connect("tcp://127.0.0.1:5556")

        self.read_notifier = QtCore.QSocketNotifier(
            self.consumer_receiver.getsockopt(zmq.FD), QtCore.QSocketNotifier.Read, self
        )
        self.read_notifier.activated.connect(self.on_read_msg)

    @QtCore.pyqtSlot()
    def on_read_msg(self):
        self.read_notifier.setEnabled(False)
        if self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:
            while self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:
                buff = self.consumer_receiver.recv()
                data = np.frombuffer(buff, dtype="float32")
                bandwidth = []
                signals = []
                for i in range(0, len(data), 2):
                    signals.append(data[i])
                    bandwidth.append(data[i + 1])

                for sg, bw in zip(signals, bandwidth):
                    self.logedit_1.append(str(sg))
                    self.logedit_2.append(str(bw))

        self.read_notifier.setEnabled(True)


if __name__ == "__main__":
    app = QtWidgets.QApplication(sys.argv)
    w = Widget()
    w.show()
    sys.exit(app.exec_())

3. асинхронный

import sys
import asyncio

from PyQt5 import QtCore, QtWidgets

from asyncqt import QEventLoop, asyncClose
# from qasync import QEventLoop, asyncClose

import zmq
from zmq.asyncio import Context, Poller

import numpy as np


class Widget(QtWidgets.QWidget):
    def __init__(self, parent=None):
        super().__init__(parent)

        self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
        self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)

        lay = QtWidgets.QHBoxLayout(self)
        lay.addWidget(self.logedit_1)
        lay.addWidget(self.logedit_2)

        context = Context()
        self.consumer_receiver = context.socket(zmq.PULL)
        self.consumer_receiver.connect("tcp://127.0.0.1:5557")

        self.poller = Poller()
        self.poller.register(self.consumer_receiver, zmq.POLLIN)

    async def start_consumer(self):
        while True:
            events = await self.poller.poll()
            if self.consumer_receiver in dict(events):
                buff = await self.consumer_receiver.recv()
                data = np.frombuffer(buff, dtype="float32")
                bandwidth = []
                signals = []
                for i in range(0, len(data), 2):
                    signals.append(data[i])
                    bandwidth.append(data[i + 1])

                for sg, bw in zip(signals, bandwidth):
                    self.logedit_1.append(str(sg))
                    self.logedit_2.append(str(bw))

    @asyncClose
    async def closeEvent(self, event):
        self.consumer_receiver.close()


if __name__ == "__main__":
    app = QtWidgets.QApplication(sys.argv)
    loop = QEventLoop(app)
    asyncio.set_event_loop(loop)
    w = Widget()
    w.show()
    try:
        loop.run_until_complete(w.start_consumer())
    except asyncio.CancelledError:
        print("start_consumer is cancelled now")
    finally:
        loop.close()
...