Сокет ZeroMQ REP блокируется при получении запроса - PullRequest
0 голосов
/ 24 января 2019

Я пытаюсь синхронизировать мои разъемы PUB / SUB с помощью разъемов REQ / REP. Это работает большую часть времени, но иногда сокет REP блокируется во время приема. Я использую тот же контекст ZMQ для PUB / REP и SUB / REQ. Поэтому для синхронизации мои подписчики отправляют запросы издателю. Затем издатель отправляет ответ. Синхронизация заканчивается, если издатель получил запрос от всех ожидаемых подписчиков.

Издатель

Publisher::Publisher(zmq::context_t & ctx) :
    mZMQcontext(ctx), mZMQpublisher(mZMQcontext, ZMQ_PUB), mZMQSyncService(
            mZMQcontext, ZMQ_REP) 
{
    mZMQpublisher.setsockopt(ZMQ_LINGER, 100);
}

void Publisher::bindSocket(std::string port) 
{
    mZMQpublisher.bind("tcp://*:" + port);
}

void Publisher::preparePubSynchronization(std::string port) 
{
    mZMQSyncService.bind("tcp://*:" + port);
}

void Publisher::synchronizePub(uint64_t expectedSubscribers) 
{
    //  Synchronization with subscribers
    uint64_t subscribers = 0;

    while (subscribers < expectedSubscribers) 
    {
         // >>>>> Is randomely blocked here <<<<<
         std::string request = s_recv(mZMQSyncService);
         s_send(mZMQSyncService, request);
         subscribers++;
    }
}

Абонентская

Subscriber::Subscriber(zmq::context_t & ctx) :
    mZMQcontext(ctx), mZMQsubscriber(mZMQcontext, ZMQ_SUB)
{
     mZMQsubscriber.setsockopt(ZMQ_LINGER, 100);
}

bool Subscriber::connectToPub(std::string ip, std::string port)
{
     mZMQsubscriber.connect("tcp://" + ip + ":" + port);
}

bool Subscriber::prepareSubSynchronization(std::string ip, std::string port) 
{
    mSyncIP = ip;
    mSyncPort = port;
}

// Helper function to get a new socket
zmq::socket_t* Subscriber::newSyncSocket() 
{
    zmq::socket_t* client = new zmq::socket_t(mZMQcontext, ZMQ_REQ);
    client->connect("tcp://" + mSyncIP + ":" + mSyncPort);
    client->setsockopt(ZMQ_LINGER, 0);
    return client;
}

bool Subscriber::synchronizeSub() 
{
    zmq::socket_t* client = newSyncSocket();

    int retries_left = 3;
    std::string request = mOwner;
    s_send(*client, request );

    bool expect_reply = true;
    while (expect_reply) 
    {
         //  Poll socket for a reply, with timeout
         zmq::pollitem_t items[] = { { *client, 0, ZMQ_POLLIN, 0 } };
         zmq::poll(&items[0], 1, REQUEST_TIMEOUT);

         //  If we got a reply, process it
         if (items[0].revents & ZMQ_POLLIN) 
         {
              //  We got a reply from the publisher -> must match request
              std::string reply = s_recv(*client);

              if (reply == request) 
              {
                   // Valid reply
                   expect_reply = false;
              } 

         } else if (--retries_left == 0) 
         {
              // publisher seems to be offline, abandoning
              delete client;
              return false;
         } else 
         {
              // No response from publisher, retrying ...
              // Old socket will be confused; close it and open a new one
              delete client;
              client = newSyncSocket();
              // Send request again, on new socket
              s_send(*client, request);
         }
}
    delete client;
    return true;
}
...