Объяснение:
Причина проблемы в том, что бесконечный l oop блокирует Qt eventl oop, не позволяя ему обрабатывать такие задачи, как обновление gui, прослушивание событий ОС и т. Д. c.
Решение:
Вы должны реализовать некоторый logi c, который позволяет общаться с ZMQ и не блокирует событие oop, и для этого есть несколько вариантов ( в следующем коде я показываю пример сервера моего теста):
server.py
import random
import time
import zmq
import numpy as np
import logging
logging.basicConfig(level=logging.DEBUG)
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")
while True:
d = []
start = random.randint(0, 10)
for i in range(10):
x = start + i
d.append(x)
d.append(x ** 2)
arr = np.array(d, dtype="float32")
buf = arr.tobytes()
logging.debug(buf)
socket.send(buf)
time.sleep(1)
1. Резьба
import sys
import threading
from PyQt5 import QtCore, QtWidgets
import zmq
import numpy as np
class ZMQReceiver(QtCore.QObject):
dataChanged = QtCore.pyqtSignal(bytes)
def start(self):
threading.Thread(target=self._execute, daemon=True).start()
def _execute(self):
context = zmq.Context()
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
while True:
buff = consumer_receiver.recv()
self.dataChanged.emit(buff)
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
zmq_receiver = ZMQReceiver(self)
zmq_receiver.dataChanged.connect(self.on_data_changed)
zmq_receiver.start()
@QtCore.pyqtSlot(bytes)
def on_data_changed(self, buff):
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
w = Widget()
w.show()
sys.exit(app.exec_())
2. QSocketNotifier
import sys
from PyQt5 import QtCore, QtWidgets
import zmq
import numpy as np
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
context = zmq.Context()
self.consumer_receiver = context.socket(zmq.PULL)
self.consumer_receiver.connect("tcp://127.0.0.1:5556")
self.read_notifier = QtCore.QSocketNotifier(
self.consumer_receiver.getsockopt(zmq.FD), QtCore.QSocketNotifier.Read, self
)
self.read_notifier.activated.connect(self.on_read_msg)
@QtCore.pyqtSlot()
def on_read_msg(self):
self.read_notifier.setEnabled(False)
if self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:
while self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:
buff = self.consumer_receiver.recv()
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
self.read_notifier.setEnabled(True)
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
w = Widget()
w.show()
sys.exit(app.exec_())
3. асинхронный
import sys
import asyncio
from PyQt5 import QtCore, QtWidgets
from asyncqt import QEventLoop, asyncClose
# from qasync import QEventLoop, asyncClose
import zmq
from zmq.asyncio import Context, Poller
import numpy as np
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
context = Context()
self.consumer_receiver = context.socket(zmq.PULL)
self.consumer_receiver.connect("tcp://127.0.0.1:5557")
self.poller = Poller()
self.poller.register(self.consumer_receiver, zmq.POLLIN)
async def start_consumer(self):
while True:
events = await self.poller.poll()
if self.consumer_receiver in dict(events):
buff = await self.consumer_receiver.recv()
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
@asyncClose
async def closeEvent(self, event):
self.consumer_receiver.close()
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
w = Widget()
w.show()
try:
loop.run_until_complete(w.start_consumer())
except asyncio.CancelledError:
print("start_consumer is cancelled now")
finally:
loop.close()