Почему ZeroMQ PUSH / PULL работает, а не PUB / SUB? - PullRequest
1 голос
/ 05 февраля 2020

Окружающая среда: Ubuntu 18.01 со вкусом NVIDIA на их плате разработки Jetson с процессором TX2i. ZMQ 4.3.2, использующий оболочку cppzmq C ++ для ZMQ.

У меня есть код, работающий с использованием буферов протокола Google с ZeroMQ, и все это PUSH / PULL, и это работает хорошо, за исключением того, что у меня есть один случай, который не является двухточечным, но 1: 3. Правильное решение здесь - выполнить PUB / SUB, но я не могу передать сообщения своему подписчику.

Я сократил свой код до этого простого примера. Если я раскомментирую операторы #define, подписчик ничего не получит. Прокомментированный (который компилируется как PUSH / PULL вместо PUB / SUB), тогда подписчик получает сообщение, как и ожидалось. С чрезмерными sleep_for() разами я бы ожидал, что у подписчика будет достаточно времени для регистрации, прежде чем издатель выполнит отправку.

РЕДАКТИРОВАТЬ:

Почему попытка / перехват подписчика? Вначале я получал исключение и полагал, что это потому, что издатель не был готов. Похоже, это больше не так, поэтому я думал, что это не так.

// Publisher
#include "/usr/local/include/zmq.hpp"
#include "protobuf_namespace.pb.h"
#include <chrono>
#include <thread>


#define PUB_SUB

int main( void )
{
  zmq::context_t* m_pContext = new zmq::context_t( 1 );

#ifdef PUB_SUB
  zmq::socket_t*  m_pSocket  = new zmq::socket_t( *m_pContext, ZMQ_PUB );
#else
  zmq::socket_t*  m_pSocket  = new zmq::socket_t( *m_pContext, ZMQ_PUSH );
#endif

  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  //m_pSocket->bind( "tcp://*:53001" );       // using '*' or specific IP doesn't change result
  m_pSocket->bind( "tcp://127.0.0.1:53001" );
  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );

  // Send the parameters
  protobuf_namespace::Params params;
  params.set_calibrationdata( protobuf_namespace::CalDataType::CAL_REQUESTED ); // init one value to non-zero
  std::string        params_str = params.SerializeAsString();
  zmq::message_t     zmsg( params_str.size() );

  memcpy( zmsg.data(), params_str.c_str(), params_str.size() );
  m_pSocket->send( zmsg, zmq::send_flags::none );

  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  m_pSocket->close();
  zmq_ctx_destroy( m_pContext );
}
// Subscriber - start me first!
#include "/usr/local/include/zmq.hpp"
#include "protobuf_namespace.pb.h"
#include <chrono>
#include <thread>
#include <stdio.h>

#define PUB_SUB


int main( void )
{
  zmq::context_t* m_pContext = new zmq::context_t( 1 );

#ifdef PUB_SUB
  zmq::socket_t*  m_pSocket  = new zmq::socket_t( *m_pContext, ZMQ_SUB );
  m_pSocket->connect( "tcp://127.0.0.1:53001" );

  int linger = 0;
  zmq_setsockopt( m_pSocket, ZMQ_LINGER, &linger, sizeof( linger ) );
  zmq_setsockopt( m_pSocket, ZMQ_SUBSCRIBE, "", 0 );
#else
  zmq::socket_t*  m_pSocket  = new zmq::socket_t( *m_pContext, ZMQ_PULL );
  m_pSocket->connect( "tcp://127.0.0.1:53001" );
#endif

  protobuf_namespace::Params params;
  zmq::message_t zmsg;
  bool retry = true;

  do {
    try {
      m_pSocket->recv( zmsg, zmq::recv_flags::none );
      retry = false;
      std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
    } catch( ... ) { 
      printf("caught\n");
    }
    std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  } while( retry );

  std::string param_str( static_cast<char*>( zmsg.data() ), zmsg.size() );
  params.ParseFromString( param_str );

  if( params.calibrationdata() == protobuf_namespace::CalDataType::CAL_REQUESTED )
    printf( "CAL_REQUESTED\n" );
  else
    printf( "bad data\n" );


  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  m_pSocket->close();
  zmq_ctx_destroy( m_pContext );
}

1 Ответ

0 голосов
/ 05 февраля 2020

В случае, если вы никогда не работали с ZeroMQ,
здесь можно сначала посмотреть на " ZeroMQ Принципы менее чем за Пять секунд "
, прежде чем углубляться в подробности

Q : Почему попытка / отлов подписчика?

Потому что:
a)
// Subscriber - start me first! и в то же время он спит на стороне PUB почти "навсегда" перед тем, как делать tcp:// настройка пути транспортного класса для принятия любых первых .connect() в .bind(), здесь предшествует массивный сон ...
std::this_thread::sleep_for( std::chrono::seconds( 1 ) ); и

b) try 'd m_pSocket->recv( zmsg, zmq::recv_flags::none ); должен по определению генерировать исключение, так как пока нет настройки пути транспортного класса tcp:// (поскольку сторона PUB еще не вернулась из спящего режима)

Q : Почему ZeroMQ PUSH / PULL работает, но не PUB / SUB?

Ну, оба будут , если разработано правильно, соблюдая опубликованный API.

Просто удалите все блоки sleep(), не позволяя присоединиться SUB, чтобы .connect() смогли успешно завершить работу. Кроме того, возможно, перейдите в неблокирующую форму .recv() -ops (refactor try/catch), как это принято в , чтобы лучше отражать природу профилактически спроектированных .poll() -основанных или реактивная .recv(..., ZMQ_NOBLOCK ) -обработка событий.


Последнее, но не менее важное:

ZeroMQ v4 + (в отличие от v2 + и pre-v3.? API) был переключен на использование PUB сторонняя фильтрация сообщений, поэтому необходимо должным образом учитывать управление подпиской (синхронизация / обработка ошибок / устойчивость).

В любом случае сомнение может интегрировать использование встроенных в ZeroMQ инструментов socket_monitor, расширяющих экземпляр Context(), и отслеживать / проверять каждое событие, внутреннее по отношению к экземпляру Context(), в соответствии с опубликованным API -события, вплоть до самого низкого уровня детализации.

Не стесняйтесь прочитайте и спросите больше

...