nanomsg nn_poll возвращает ложное срабатывание - PullRequest
0 голосов
/ 21 апреля 2019

Кто-нибудь сталкивался с проблемами, когда nn_poll () возвращал ложные срабатывания? Мы создали очень простой паб-саб с промежуточным брокером в некоторой нетрадиционной манере, поскольку хотели уменьшить потери: издатели -> PUSH -> PULL -> брокер -> PUB -> подписчики Это выглядит многообещающе, но имеет небольшой недостаток: мы всегда получаем, что nn_poll ложно сообщает, что данные существуют в тех случаях, когда их не должно быть. Мы создаем небольшое тестовое приложение, которое воспроизводит его на 100% - кто-нибудь может увидеть, что мы сделали не так? Сначала тест запускает посредник в фоновом режиме, затем подписчик, который ожидает ровно 2 сообщения, затем в любой момент мы можем запустить издателя, который отправит 2 сообщения, а затем выйти.

код брокера Посредник прост - с помощью гнезда PULL он получает публикации от всех издателей (и поскольку существует только один посредник, каждое сообщение будет приниматься одним и тем же посредником, поэтому нет проблем с использованием PUSH / PULL). По какой-то причине происходит сбой nn_device при попытке подключить PULL <-> PUB (или, возможно, это подсказка для проблемы, с которой мы сталкиваемся?)

static constexpr const char* kSubAddr = "ipc://tmp/subscribers";
static constexpr const char* kPubAddr = "ipc://tmp/publishers";
const char *data[] = {"AAAAA:1234567890", "BBBBB:0987654321"};
const char *topic[] = {"AAAAA", "BBBBB"};

void broker()
{
    int pub, pull, rc;
    pull = nn_socket (AF_SP, NN_PULL);
    mapf_assert(pull >= 0);
    rc = nn_bind(pull, kPubAddr);
    mapf_assert(rc);
    pub = nn_socket (AF_SP, NN_PUB);
    mapf_assert(pub);
    rc = nn_bind (pub, kSubAddr);

    nn_pollfd item = {pull, NN_POLLIN, 0};
    while(1)
    {
        rc = nn_poll(&item, 1, -1);
        mapf_assert(rc == 1);
        char buf[256] = {0};
        rc = nn_recv(pull, buf, 256, 0);    
        std::cout << "broker: received " << rc << " bytes: " << buf << std::endl;
        int size = rc;
        rc = nn_send(pub, buf, rc, 0);
        std::cout << "broker: sent " << rc << " bytes: " << buf << std::endl;
        mapf_assert(rc == size);
    }
}

** Абонентский код ** У подписчика есть 2 SUB-сокета, каждый из которых подписан на отдельную тему. Он использует nn_poll () без тайм-аута, чтобы проверить, готовы ли данные в обоих сокетах, а затем читает их. Затем он запускает другой nn_poll () с коротким тайм-аутом и утверждает, что один из сокетов помечен как имеющий данные, которых там быть не должно.

void subscriber()
{
    int rc, sub[2];

    for (int i = 0; i < 2; i++) {
        std::cout << "sub: " << i << ": Connect " << kSubAddr << " subscribe " << topic[i] << std::endl;
        sub[i] = nn_socket(AF_SP, NN_SUB);
        mapf_assert(sub[i] >= 0);
        rc = nn_connect(sub[i], kSubAddr);
        mapf_assert(rc > 0);
        rc = nn_setsockopt(sub[i], NN_SUB, NN_SUB_SUBSCRIBE, topic[i], 5);
        mapf_assert(rc == 0);
    }

    nn_pollfd items[2] = {
        {sub[0], NN_POLLIN, 0},
        {sub[1], NN_POLLIN, 0},
    };

    int num_events = 0;
    std::cout << "sub: polling" << std::endl;
    do {
        num_events = nn_poll(items, 2, -1);
    } while (num_events != 2);

    std::cout << "sub: try receive from both sockets" << std::endl;
    for (int i = 0; i < 2; i++) {
        char buf[256] = {0};
        rc = nn_recv(sub[i], buf, 256, 0);
        std::cout << "sub: received " << buf << " from socket " << i << std::endl;
    }
    num_events = nn_poll(items, 2, 100);
    mapf_assert(num_events == 0);
}

Код издателя Издатель подключается к конечной точке издателя с помощью сокета PUSH, а затем отправляет 2 сообщения, каждое из которых имеет свою тему.

void publisher()
{
    int pub = nn_socket(AF_SP, NN_PUSH);
    mapf_assert(pub >= 0);
    std::cout << "pub: connect " << kPubAddr << std::endl;
    int rc = nn_connect(pub, kPubAddr);
    for (int i = 0; i < 2; i++) {
        std::cout << "pub: send " << data[i] << std::endl;
        rc = nn_send(pub, data[i], strlen(data[i]), 0);
        mapf_assert(rc == (int)strlen(data[i]));
        sleep(1);
    }
}

А вот и вывод:

[1]$ ./test b &

[2]$ ./test s &
sub: 0: Connect ipc:///tmp/tbeliyah/subscribers subscribe AAAAA
sub: 1: Connect ipc:///tmp/tbeliyah/subscribers subscribe BBBBB
sub: polling

[]$ ./test p
pub: connect ipc:///tmp/tbeliyah/publishers
pub: send AAAAA:1234567890
broker: received 16 bytes: AAAAA:1234567890
broker: sent 16 bytes: AAAAA:1234567890
sub: try receive from both sockets
sub: received AAAAA:1234567890 from socket 0
pub: send BBBBB:0987654321
broker: received 16 bytes: BBBBB:0987654321
broker: sent 16 bytes: BBBBB:0987654321
sub: received BBBBB:0987654321 from socket 1
Assertion failed: num_events == 0 (/********/test.cpp:72)
[2]+  Aborted                 (core dumped) ./test s
...