Кросспостинг от ZeroMQ GitHub выпускает страницу для большей наглядности.
Описание проблемы
Примечание: это может быть предполагаемое поведение.
Я реализую шаблон PUB / SUB, в котором есть один издатель (довольно быстрый) и несколько подписчиков (у них разные тарифы).
Меня интересует только большинство недавнее сообщение.Поэтому я решил включить опции ZMQ_CONFLATE
.
Насколько я понимаю (только из документа, я не проверял исходный код библиотеки), что сохраняется только самое новое сообщение - очередь размером1 и с самым последним сообщением.Что-то, что мы не можем сделать с верхней отметкой, которая отбрасывает новые сообщения.Я предполагаю, что «пользовательский» код отправляет сообщение (zmq_send
), пул потоков в zmq_context
будет обрабатывать сообщение, которое будет отправлено в сокет.Если более новое сообщение приходит до того, как отправлено старое, то (с ZMQ_CONFLATE
= true) старое сообщение отбрасывается, а новое отправляется.То же самое по размеру приемника.Если код пользователя медленно вызывает zmq_recv
, читается только последнее сообщение.
Теперь, это работает идеально, если у меня 1 издатель и 1 подписчик.Если я добавлю больше подписчиков, сообщения получат только первый (подключенный tcp
) или последний (ipc
).Обратите внимание, что это сторона издателя, которая контролирует поведение.Добавление ZMQ_CONFLATE
в подписчике не имеет никакого эффекта (в том смысле, что подписчики могут подключиться)
Итак, мой вопрос: это предполагаемое поведение?ZMQ_CONFLATE
, установленный на стороне издателя, отбрасывает сообщение после его отправки первому подписчику (а не потому, что был вызван новый zmq_send
)?
Среда
Я проверял этов macOS 13.6, lib zmq (устанавливается homebrew) 4.2.5.Я также проверил между двумя Debian (4.2.3-1 и 4.2.1-4)
Код, который я запускаю, следующий:
Код
Издатель:
#include <zmq.h>
#include <cstdlib>
#include <chrono>
#include <iostream>
#include <thread>
#include <atomic>
#include <csignal>
static std::atomic<bool> s_shutdown(false);
void signal_handler(int) {
s_shutdown = true;
}
int main(int argc, char** argv)
{
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
void* context = zmq_ctx_new();
void* output = zmq_socket(context, ZMQ_PUB);
// We set the internal queue to have only one (the most recent) message and throw away
// everything else.
int trueValue = 1;
zmq_setsockopt(output, ZMQ_CONFLATE, &trueValue, sizeof(int));
// We are ready. Create the endpoints.
// zmq_bind(output, "tcp://*:10000");
zmq_bind(output, "ipc:///tmp/zmq_test");
unsigned i = 0;
while(true) {
if (s_shutdown) {
break;
}
if (zmq_send(output, &i, sizeof(unsigned), 0) == -1) {
std::cerr << "Failed to send message." << std::endl;
} else {
std::cerr << "Sending " << i << std::endl;
}
i++;
// fake a sleep (1ms)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
zmq_close(output);
zmq_ctx_term(context);
return 0;
}
Подписчик
#include <zmq.h>
#include <cstdlib>
#include <atomic>
#include <csignal>
#include <iostream>
#include <thread>
static std::atomic<bool> s_shutdown(false);
void signal_handler(int) {
s_shutdown = true;
}
int main(int argc, char** argv)
{
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
void* context = zmq_ctx_new();
void* input = zmq_socket(context, ZMQ_SUB);
// subscribe to everything
zmq_setsockopt(input, ZMQ_SUBSCRIBE, "", 0);
// Setting the buffer to accept a single message and to drop old messages.
int trueValue = 1;
zmq_setsockopt(input, ZMQ_CONFLATE, &trueValue, sizeof(int));
// zmq_connect(input, "tcp://localhost:10000");
zmq_connect(input, "ipc:///tmp/zmq_test");
while(true) {
// reading the state
if (s_shutdown) {
break;
}
unsigned readValue = 0;
int readBytes = zmq_recv(input, &readValue, sizeof(unsigned), ZMQ_DONTWAIT);
if (readBytes > 0) {
std::cerr << "Read " << readValue << std::endl;
}
// fake a sleep (100ms)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
zmq_close(input);
zmq_ctx_term(context);
return 0;
}
Комментирование zmq_setsockopt(output, ZMQ_CONFLATE, &trueValue, sizeof(int));
в издателе позволяет нескольким подписчикам получать сообщение.