Поведение ZMQ_CONFLATE в паттерне PUB / SUB - PullRequest
0 голосов
/ 12 декабря 2018

Кросспостинг от ZeroMQ GitHub выпускает страницу для большей наглядности.

Описание проблемы

Примечание: это может быть предполагаемое поведение.

Я реализую шаблон PUB / SUB, в котором есть один издатель (довольно быстрый) и несколько подписчиков (у них разные тарифы).

Меня интересует только большинство недавнее сообщение.Поэтому я решил включить опции ZMQ_CONFLATE.

Насколько я понимаю (только из документа, я не проверял исходный код библиотеки), что сохраняется только самое новое сообщение - очередь размером1 и с самым последним сообщением.Что-то, что мы не можем сделать с верхней отметкой, которая отбрасывает новые сообщения.Я предполагаю, что «пользовательский» код отправляет сообщение (zmq_send), пул потоков в zmq_context будет обрабатывать сообщение, которое будет отправлено в сокет.Если более новое сообщение приходит до того, как отправлено старое, то (с ZMQ_CONFLATE = true) старое сообщение отбрасывается, а новое отправляется.То же самое по размеру приемника.Если код пользователя медленно вызывает zmq_recv, читается только последнее сообщение.

Теперь, это работает идеально, если у меня 1 издатель и 1 подписчик.Если я добавлю больше подписчиков, сообщения получат только первый (подключенный tcp) или последний (ipc).Обратите внимание, что это сторона издателя, которая контролирует поведение.Добавление ZMQ_CONFLATE в подписчике не имеет никакого эффекта (в том смысле, что подписчики могут подключиться)

Итак, мой вопрос: это предполагаемое поведение?ZMQ_CONFLATE, установленный на стороне издателя, отбрасывает сообщение после его отправки первому подписчику (а не потому, что был вызван новый zmq_send)?

Среда

Я проверял этов macOS 13.6, lib zmq (устанавливается homebrew) 4.2.5.Я также проверил между двумя Debian (4.2.3-1 и 4.2.1-4)

Код, который я запускаю, следующий:

Код

Издатель:

#include <zmq.h>

#include <cstdlib>
#include <chrono>
#include <iostream>
#include <thread>
#include <atomic>
#include <csignal>

static std::atomic<bool> s_shutdown(false);

void signal_handler(int) {
    s_shutdown = true;
}

int main(int argc, char** argv)
{
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    void* context = zmq_ctx_new();
    void* output = zmq_socket(context, ZMQ_PUB);


    // We set the internal queue to have only one (the most recent) message and throw away
    // everything else.
    int trueValue = 1;
    zmq_setsockopt(output, ZMQ_CONFLATE, &trueValue, sizeof(int));

    // We are ready. Create the endpoints.
//    zmq_bind(output, "tcp://*:10000");
    zmq_bind(output, "ipc:///tmp/zmq_test");


    unsigned i = 0;
    while(true) {

        if (s_shutdown) {
            break;
        }

        if (zmq_send(output, &i, sizeof(unsigned), 0) ==  -1) {
            std::cerr << "Failed to send message." << std::endl;
        } else {
            std::cerr << "Sending " << i << std::endl;
        }
        i++;

        // fake a sleep (1ms)
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }

    zmq_close(output);
    zmq_ctx_term(context);
    return 0;
}

Подписчик

#include <zmq.h>

#include <cstdlib>
#include <atomic>
#include <csignal>
#include <iostream>
#include <thread>

static std::atomic<bool> s_shutdown(false);

void signal_handler(int) {
    s_shutdown = true;
}

int main(int argc, char** argv)
{
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    void* context = zmq_ctx_new();

    void* input = zmq_socket(context, ZMQ_SUB);

    // subscribe to everything
    zmq_setsockopt(input, ZMQ_SUBSCRIBE, "", 0);
    // Setting the buffer to accept a single message and to drop old messages.
    int trueValue = 1;
     zmq_setsockopt(input, ZMQ_CONFLATE, &trueValue, sizeof(int));

//    zmq_connect(input, "tcp://localhost:10000");
    zmq_connect(input, "ipc:///tmp/zmq_test");

    while(true) {
        // reading the state
        if (s_shutdown) {
            break;
        }

        unsigned readValue = 0;
        int readBytes = zmq_recv(input, &readValue, sizeof(unsigned), ZMQ_DONTWAIT);
        if (readBytes > 0) {
            std::cerr << "Read " << readValue << std::endl;
        }


        // fake a sleep (100ms)
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    zmq_close(input);
    zmq_ctx_term(context);
    return 0;
}

Комментирование zmq_setsockopt(output, ZMQ_CONFLATE, &trueValue, sizeof(int)); в издателе позволяет нескольким подписчикам получать сообщение.

...