Окружающая среда: Ubuntu 18.01 со вкусом NVIDIA на их плате разработки Jetson с процессором TX2i. ZMQ 4.3.2, использующий оболочку cppzmq C ++ для ZMQ.
У меня есть код, работающий с использованием буферов протокола Google с ZeroMQ, и все это PUSH / PULL, и это работает хорошо, за исключением того, что у меня есть один случай, который не является двухточечным, но 1: 3. Правильное решение здесь - выполнить PUB / SUB, но я не могу передать сообщения своему подписчику.
Я сократил свой код до этого простого примера. Если я раскомментирую операторы #define
, подписчик ничего не получит. Прокомментированный (который компилируется как PUSH / PULL вместо PUB / SUB), тогда подписчик получает сообщение, как и ожидалось. С чрезмерными sleep_for()
разами я бы ожидал, что у подписчика будет достаточно времени для регистрации, прежде чем издатель выполнит отправку.
РЕДАКТИРОВАТЬ:
Почему попытка / перехват подписчика? Вначале я получал исключение и полагал, что это потому, что издатель не был готов. Похоже, это больше не так, поэтому я думал, что это не так.
// Publisher
#include "/usr/local/include/zmq.hpp"
#include "protobuf_namespace.pb.h"
#include <chrono>
#include <thread>
#define PUB_SUB
int main( void )
{
zmq::context_t* m_pContext = new zmq::context_t( 1 );
#ifdef PUB_SUB
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_PUB );
#else
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_PUSH );
#endif
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
//m_pSocket->bind( "tcp://*:53001" ); // using '*' or specific IP doesn't change result
m_pSocket->bind( "tcp://127.0.0.1:53001" );
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
// Send the parameters
protobuf_namespace::Params params;
params.set_calibrationdata( protobuf_namespace::CalDataType::CAL_REQUESTED ); // init one value to non-zero
std::string params_str = params.SerializeAsString();
zmq::message_t zmsg( params_str.size() );
memcpy( zmsg.data(), params_str.c_str(), params_str.size() );
m_pSocket->send( zmsg, zmq::send_flags::none );
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
m_pSocket->close();
zmq_ctx_destroy( m_pContext );
}
// Subscriber - start me first!
#include "/usr/local/include/zmq.hpp"
#include "protobuf_namespace.pb.h"
#include <chrono>
#include <thread>
#include <stdio.h>
#define PUB_SUB
int main( void )
{
zmq::context_t* m_pContext = new zmq::context_t( 1 );
#ifdef PUB_SUB
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_SUB );
m_pSocket->connect( "tcp://127.0.0.1:53001" );
int linger = 0;
zmq_setsockopt( m_pSocket, ZMQ_LINGER, &linger, sizeof( linger ) );
zmq_setsockopt( m_pSocket, ZMQ_SUBSCRIBE, "", 0 );
#else
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_PULL );
m_pSocket->connect( "tcp://127.0.0.1:53001" );
#endif
protobuf_namespace::Params params;
zmq::message_t zmsg;
bool retry = true;
do {
try {
m_pSocket->recv( zmsg, zmq::recv_flags::none );
retry = false;
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
} catch( ... ) {
printf("caught\n");
}
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
} while( retry );
std::string param_str( static_cast<char*>( zmsg.data() ), zmsg.size() );
params.ParseFromString( param_str );
if( params.calibrationdata() == protobuf_namespace::CalDataType::CAL_REQUESTED )
printf( "CAL_REQUESTED\n" );
else
printf( "bad data\n" );
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
m_pSocket->close();
zmq_ctx_destroy( m_pContext );
}