Я пытаюсь синхронизировать мои разъемы PUB / SUB с помощью разъемов REQ / REP. Это работает большую часть времени, но иногда сокет REP блокируется во время приема. Я использую тот же контекст ZMQ для PUB / REP и SUB / REQ. Поэтому для синхронизации мои подписчики отправляют запросы издателю. Затем издатель отправляет ответ. Синхронизация заканчивается, если издатель получил запрос от всех ожидаемых подписчиков.
Издатель
Publisher::Publisher(zmq::context_t & ctx) :
mZMQcontext(ctx), mZMQpublisher(mZMQcontext, ZMQ_PUB), mZMQSyncService(
mZMQcontext, ZMQ_REP)
{
mZMQpublisher.setsockopt(ZMQ_LINGER, 100);
}
void Publisher::bindSocket(std::string port)
{
mZMQpublisher.bind("tcp://*:" + port);
}
void Publisher::preparePubSynchronization(std::string port)
{
mZMQSyncService.bind("tcp://*:" + port);
}
void Publisher::synchronizePub(uint64_t expectedSubscribers)
{
// Synchronization with subscribers
uint64_t subscribers = 0;
while (subscribers < expectedSubscribers)
{
// >>>>> Is randomely blocked here <<<<<
std::string request = s_recv(mZMQSyncService);
s_send(mZMQSyncService, request);
subscribers++;
}
}
Абонентская
Subscriber::Subscriber(zmq::context_t & ctx) :
mZMQcontext(ctx), mZMQsubscriber(mZMQcontext, ZMQ_SUB)
{
mZMQsubscriber.setsockopt(ZMQ_LINGER, 100);
}
bool Subscriber::connectToPub(std::string ip, std::string port)
{
mZMQsubscriber.connect("tcp://" + ip + ":" + port);
}
bool Subscriber::prepareSubSynchronization(std::string ip, std::string port)
{
mSyncIP = ip;
mSyncPort = port;
}
// Helper function to get a new socket
zmq::socket_t* Subscriber::newSyncSocket()
{
zmq::socket_t* client = new zmq::socket_t(mZMQcontext, ZMQ_REQ);
client->connect("tcp://" + mSyncIP + ":" + mSyncPort);
client->setsockopt(ZMQ_LINGER, 0);
return client;
}
bool Subscriber::synchronizeSub()
{
zmq::socket_t* client = newSyncSocket();
int retries_left = 3;
std::string request = mOwner;
s_send(*client, request );
bool expect_reply = true;
while (expect_reply)
{
// Poll socket for a reply, with timeout
zmq::pollitem_t items[] = { { *client, 0, ZMQ_POLLIN, 0 } };
zmq::poll(&items[0], 1, REQUEST_TIMEOUT);
// If we got a reply, process it
if (items[0].revents & ZMQ_POLLIN)
{
// We got a reply from the publisher -> must match request
std::string reply = s_recv(*client);
if (reply == request)
{
// Valid reply
expect_reply = false;
}
} else if (--retries_left == 0)
{
// publisher seems to be offline, abandoning
delete client;
return false;
} else
{
// No response from publisher, retrying ...
// Old socket will be confused; close it and open a new one
delete client;
client = newSyncSocket();
// Send request again, on new socket
s_send(*client, request);
}
}
delete client;
return true;
}