На машине с Ubuntu 16.04 я использую Python3 с pyzmq для отправки JPEG с окончанием base64 через сокет ZeroMQ PUB / SUB в приложение Qt5.Приложение Qt5 использует QSocketNotifier для мониторинга сокета ZMQ и должно запускаться всякий раз, когда сокет «активируется» с большим количеством данных для чтения.В моем примере данные полностью получены на стороне Qt5, и JPEG выглядит нормально - однако, уведомитель сокета срабатывает только один раз, даже если Python Publisher продолжает отправлять данные.Если я выйду из приложения Python с помощью ctrl-C, еще один экземпляр активированного слота уведомлений о сокетах будет запущен на стороне приложения Qt5.Я хочу, чтобы слот запускался каждый раз при отправке сообщения ZMQ из Python.Я приложил целые приложения Python и Qt5 ниже.Как я могу сделать эту работу?Спасибо.
-------- Header File ------
#ifndef SOCKETRECEIVER_H
#define SOCKETRECEIVER_H
#include <QObject>
// Debug
#include <QDebug>
#include <QDataStream>
#include <QFile>
#include <unistd.h>
// User ZeroMQ Sockets
#include <QSocketNotifier>
// ZeroMQ Includes
#include <zmq.h>
// Debug prints
static void AppDebug(QString message) {
qDebug() << Q_FUNC_INFO << message;
}
class SocketReceiver : public QObject
{
Q_OBJECT
public:
explicit SocketReceiver(QObject *parent = nullptr);
~SocketReceiver();
private slots:
void readZMQData();
private:
QSocketNotifier *m_SocketNotifier;
void *m_Context;
void *m_Subscriber;
long long int m_RxFrameCounter;
bool m_DidRXFrame;
signals:
public slots:
};
#endif // SOCKETRECEIVER_H
-------- CCP File ------
#include "socketreceiver.h"
SocketReceiver::SocketReceiver(QObject *parent) : QObject(parent) , m_SocketNotifier(nullptr) , m_Context(nullptr) , m_Subscriber(nullptr) , m_RxFrameCounter(0) , m_DidRXFrame(false)
{
/***** ZMQ *****/
int major, minor, patch;
zmq_version (&major, &minor, &patch);
m_Context = zmq_ctx_new();
m_Subscriber = zmq_socket (m_Context, ZMQ_SUB);
int rc = -1;
unsigned int fd = 0;
do {
const char *filter = std::string("").c_str();
rc = zmq_connect (m_Subscriber, "tcp://localhost:5556");
rc = zmq_setsockopt (m_Subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));
size_t fd_size = sizeof(fd);
rc = zmq_getsockopt(m_Subscriber,ZMQ_FD,&fd,&fd_size);
m_SocketNotifier = new QSocketNotifier(fd, QSocketNotifier::Read, this);
connect(m_SocketNotifier, SIGNAL(activated(int)), this, SLOT(readZMQData()), Qt::DirectConnection);
}
while ( rc < 0 );
AppDebug("Done setting up");
}
SocketReceiver::~SocketReceiver()
{
zmq_close (this->m_Subscriber);
zmq_ctx_destroy (this->m_Context);
}
void SocketReceiver::readZMQData()
{
m_SocketNotifier->setEnabled(false);
AppDebug("Waiting for next frame...");
QByteArray newFrame;
int events = 0;
std::size_t eventsSize = sizeof(events);
zmq_getsockopt(m_Subscriber,ZMQ_EVENTS, &events, &eventsSize);
if(events & ZMQ_POLLIN){
AppDebug("Read Data...");
// Receive data from socket
zmq_msg_t message;
zmq_msg_init(&message);
zmq_recvmsg(m_Subscriber, &message, 0);
size_t size = zmq_msg_size (&message);
AppDebug(QString("Message Size: %1").arg(size));
char *string = static_cast<char*>(malloc(size + 1));
memcpy (string, zmq_msg_data(&message), size);
zmq_msg_close (&message);
string [size] = 0;
if ( string != nullptr ) {
QByteArray newDecodedData = QByteArray::fromBase64(QByteArray(string));
newFrame.append(newDecodedData);
free(string);
if ( !m_DidRXFrame ) {
m_DidRXFrame = true;
}
if ( m_RxFrameCounter == 0 && m_DidRXFrame ) {
AppDebug(QString("Debug RX Frame Size: %1").arg(newFrame.size()));
QFile output("/tmp/abcd.jpeg");
if ( output.open(QIODevice::WriteOnly) ) {
output.write(newFrame);
output.close();
//sleep(86400);
}
}
m_RxFrameCounter++;
}
}
AppDebug("Setting enabled true...");
m_SocketNotifier->setEnabled(true);
}
------- Python Script ---------
#!/usr/bin/python3
import zmq
import random
import sys
import time
import base64
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
time.sleep(1)
while True:
f = open("test.jpg",'rb')
bytes = bytearray(f.read())
print("Encoded Data Length: %s"%(len(bytes)))
strng = base64.b64encode(bytes)
print("Encoded Data Length: %s"%(len(strng)))
socket.send(strng)
f.close()
time.sleep(1)