потерянные сообщения на сабе паба zeromq - PullRequest
14 голосов
/ 19 сентября 2011

Я пытаюсь реализовать шаблон проектирования sub pub с использованием фреймворка zeromq.Идея состоит в том, чтобы запустить подписчика, а затем запустить издателя.Подписчик прослушает 100 сообщений, а издатель опубликует 100 сообщений.Пока все хорошо ... Однако на самом деле происходит то, что даже после того, как подписчик уже запущен и работает, когда запускается издатель, не все сообщения получаются подписчиком (100 сообщений будут получены подписчиком, еслииздатель отправит не менее 500 сообщений).Похоже, что первые сообщения, отправленные издателем, не отправляются подписчику.

Есть идеи?

Заранее спасибо, Омер.

Код подписчика (запускается перед издателем)

int i=0;
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);

for (int update_nbr = 0; update_nbr < 100; update_nbr++) 
{        
    zmq::message_t update;
    subscriber.recv(&update);
    i++;
    std::cout<<"receiving  :"<<i<<std::endl;
}

Код подписчика (запускается после подписчика)

zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");

int i = 0;
for (int update_nbr = 0; update_nbr < 100; update_nbr++) 
{        
    //  Send message to all subscribers
    zmq::message_t request (20);

    time_t seconds;
    seconds = time (NULL);

    char update [20]="";
    sprintf (update, "%ld", seconds);

    memcpy ((void *) request.data (), update,strlen(update));
    publisher.send(request);
    i++;
    std::cout << "sending :" << i << std::endl;

} * * тысячу двадцать-один

Ответы [ 4 ]

18 голосов
/ 22 сентября 2011

См. http://zguide.zeromq.org/page:all#Missing-Message-Problem-Solver и выполните поиск "медленного соединения" на этой веб-странице.

В принципе, для установки соединения требуется немного времени (несколько миллисекунд), и в этом случаевремя много сообщений может быть потеряно.Издателю нужно немного поспать, прежде чем начать публикацию, или (что лучше) ему необходимо явно синхронизироваться с подписчиком.

2 голосов
/ 30 августа 2014

Пожалуйста, посмотрите руководство .

  1. издатель отправляет "привет"
  2. каждому подписчику, получившему "привет", отправьте сообщение издателю через REQ /REP сокет
  3. когда издатель получает достаточно сообщения REQ / REP, он начинает публиковать данные
2 голосов
/ 19 сентября 2011

В 0MQ успешная отправка () не означает, что данные отправляются немедленно по сети.http://api.zeromq.org/2-1:zmq-send. Ваши сообщения довольно малы, и AFAIR 0MQ выполняет некоторую буферизацию для небольших сообщений для более эффективного использования сети.

Если я правильно помню, out_batch_size в config.hpp из 0MQ контролирует такое поведение.

1 голос
/ 31 октября 2011

Одна вещь, на которую нужно обратить внимание (помимо того, что отмечали предыдущие комментаторы), это ваша процедура выключения.

Фрагменты кода могут быть просто неполными, но я не понимаю, как вы справляетесь с завершением работы. В частности, вы можете потерять последние отправленные сообщения. Посмотрите документацию для zmq_close , zmq_term и ZMQ_LINGER . Если вы не на самом деле вызываете эти функции и вместо этого просто завершаете приложение, то есть вероятность, что сообщения, которые были отправлены с помощью zmq_send () , но не были переданы к сети теряются при отключении.

Чтобы проверить, какие сообщения теряются, вы можете попытаться отправить порядковый номер в дополнение к отметке времени.

...