Кто-нибудь сталкивался с проблемами, когда nn_poll () возвращал ложные срабатывания?
Мы создали очень простой паб-саб с промежуточным брокером в некоторой нетрадиционной манере, поскольку хотели уменьшить потери:
издатели -> PUSH -> PULL -> брокер -> PUB -> подписчики
Это выглядит многообещающе, но имеет небольшой недостаток: мы всегда получаем, что nn_poll ложно сообщает, что данные существуют в тех случаях, когда их не должно быть.
Мы создаем небольшое тестовое приложение, которое воспроизводит его на 100% - кто-нибудь может увидеть, что мы сделали не так?
Сначала тест запускает посредник в фоновом режиме, затем подписчик, который ожидает ровно 2 сообщения, затем в любой момент мы можем запустить издателя, который отправит 2 сообщения, а затем выйти.
код брокера
Посредник прост - с помощью гнезда PULL он получает публикации от всех издателей (и поскольку существует только один посредник, каждое сообщение будет приниматься одним и тем же посредником, поэтому нет проблем с использованием PUSH / PULL).
По какой-то причине происходит сбой nn_device при попытке подключить PULL <-> PUB (или, возможно, это подсказка для проблемы, с которой мы сталкиваемся?)
static constexpr const char* kSubAddr = "ipc://tmp/subscribers";
static constexpr const char* kPubAddr = "ipc://tmp/publishers";
const char *data[] = {"AAAAA:1234567890", "BBBBB:0987654321"};
const char *topic[] = {"AAAAA", "BBBBB"};
void broker()
{
int pub, pull, rc;
pull = nn_socket (AF_SP, NN_PULL);
mapf_assert(pull >= 0);
rc = nn_bind(pull, kPubAddr);
mapf_assert(rc);
pub = nn_socket (AF_SP, NN_PUB);
mapf_assert(pub);
rc = nn_bind (pub, kSubAddr);
nn_pollfd item = {pull, NN_POLLIN, 0};
while(1)
{
rc = nn_poll(&item, 1, -1);
mapf_assert(rc == 1);
char buf[256] = {0};
rc = nn_recv(pull, buf, 256, 0);
std::cout << "broker: received " << rc << " bytes: " << buf << std::endl;
int size = rc;
rc = nn_send(pub, buf, rc, 0);
std::cout << "broker: sent " << rc << " bytes: " << buf << std::endl;
mapf_assert(rc == size);
}
}
** Абонентский код **
У подписчика есть 2 SUB-сокета, каждый из которых подписан на отдельную тему.
Он использует nn_poll () без тайм-аута, чтобы проверить, готовы ли данные в обоих сокетах, а затем читает их.
Затем он запускает другой nn_poll () с коротким тайм-аутом и утверждает, что один из сокетов помечен как имеющий данные, которых там быть не должно.
void subscriber()
{
int rc, sub[2];
for (int i = 0; i < 2; i++) {
std::cout << "sub: " << i << ": Connect " << kSubAddr << " subscribe " << topic[i] << std::endl;
sub[i] = nn_socket(AF_SP, NN_SUB);
mapf_assert(sub[i] >= 0);
rc = nn_connect(sub[i], kSubAddr);
mapf_assert(rc > 0);
rc = nn_setsockopt(sub[i], NN_SUB, NN_SUB_SUBSCRIBE, topic[i], 5);
mapf_assert(rc == 0);
}
nn_pollfd items[2] = {
{sub[0], NN_POLLIN, 0},
{sub[1], NN_POLLIN, 0},
};
int num_events = 0;
std::cout << "sub: polling" << std::endl;
do {
num_events = nn_poll(items, 2, -1);
} while (num_events != 2);
std::cout << "sub: try receive from both sockets" << std::endl;
for (int i = 0; i < 2; i++) {
char buf[256] = {0};
rc = nn_recv(sub[i], buf, 256, 0);
std::cout << "sub: received " << buf << " from socket " << i << std::endl;
}
num_events = nn_poll(items, 2, 100);
mapf_assert(num_events == 0);
}
Код издателя
Издатель подключается к конечной точке издателя с помощью сокета PUSH, а затем отправляет 2 сообщения, каждое из которых имеет свою тему.
void publisher()
{
int pub = nn_socket(AF_SP, NN_PUSH);
mapf_assert(pub >= 0);
std::cout << "pub: connect " << kPubAddr << std::endl;
int rc = nn_connect(pub, kPubAddr);
for (int i = 0; i < 2; i++) {
std::cout << "pub: send " << data[i] << std::endl;
rc = nn_send(pub, data[i], strlen(data[i]), 0);
mapf_assert(rc == (int)strlen(data[i]));
sleep(1);
}
}
А вот и вывод:
[1]$ ./test b &
[2]$ ./test s &
sub: 0: Connect ipc:///tmp/tbeliyah/subscribers subscribe AAAAA
sub: 1: Connect ipc:///tmp/tbeliyah/subscribers subscribe BBBBB
sub: polling
[]$ ./test p
pub: connect ipc:///tmp/tbeliyah/publishers
pub: send AAAAA:1234567890
broker: received 16 bytes: AAAAA:1234567890
broker: sent 16 bytes: AAAAA:1234567890
sub: try receive from both sockets
sub: received AAAAA:1234567890 from socket 0
pub: send BBBBB:0987654321
broker: received 16 bytes: BBBBB:0987654321
broker: sent 16 bytes: BBBBB:0987654321
sub: received BBBBB:0987654321 from socket 1
Assertion failed: num_events == 0 (/********/test.cpp:72)
[2]+ Aborted (core dumped) ./test s