Как восстановить пропущенные сообщения от издателя, когда подписчик отключен? - PullRequest
1 голос
/ 03 мая 2019

Существует:

  1. издатель, который связывается с localhost:5556 и отправляет сообщение (topic1, topic2) каждые 2 секунды.
  2. два подписчика, которые подключаются к localhost:5556 исоответственно получает "topic1" и "topic2"

Все отлично работает, когда PUB отправляет сообщения и подписчики получают их.

Моя проблема в том, что перестает работать один из двух подписчиков.Мне бы хотелось, чтобы все сообщения, сброшенные PUB, были помещены в очередь и отправлены «мертвому» подписчику, когда он оживет.Этого не происходитКогда подписчик повторно подключается к издателю, все сообщения, отправленные в этот период времени (от мертвого к живому), отбрасываются.

Я пытаюсь кодировать на python, используя модуль pyzmq.

Более того, я заметил, что если вместо того, чтобы закрыть процесс подписчика Python, попробуйте добавить time.sleep(10), PUB ставит сообщения в очередь, и когда подписчик просыпается, все сообщения отправляются.Этого не произойдет, если процесс будет закрыт (CTRL + C) и перезапущен.

Но если я пытаюсь инвертировать bind() с connect() между издателем и подписчиком, приложение работает так, как я хочу,Но в этом случае есть большая проблема.Я не мог иметь больше подписчиков, связанных с одним издателем, потому что подписчики связываются на другом порту, и издатель может соединиться только на одном порту.

Как я могу решить это?

Ответы [ 2 ]

0 голосов
/ 08 июля 2019

ZeroMQ не будет поддерживать это без большого количества дополнительной работы / кода.

Если вам действительно нужна система пабов / подсистем сообщений, куда подписчики могут приходить и уходить с без потерь , было бы намного проще использовать систему, созданную для обеспечения этой функциональности.

Одной из таких систем является kafka: https://kafka.apache.org/intro

Если вы действительно хотите использовать zeromq, то недавно были запущены проекты в том же духе.https://github.com/zeromq/dafka

0 голосов
/ 05 июля 2019

"Моя проблема ..." решаема

ZeroMQ - это интеллектуальная и высокопроизводительная платформа сигнализации / обмена сообщениями, созданная для предоставления уровня обслуживания, состоящего изот масштабируемых формальных коммуникационных архетипов.В течение многих лет он был оптимизирован для малой задержки, высокой производительности, почти линейного масштабирования и простоты использования распределенных поведений агентов, которые напоминают поведение людей (например, один REQ - квестыдругой будет REP -ly).

Все это, по замыслу, БРОКЕРСКОЕ - т. е. не существует такого понятия, как "человек в человеке".промежуточное хранилище, предназначенное для повторной отправки сообщений, которые были отправлены через инфраструктуру Framework в такие моменты, когда агент не работал.

Это не значит, что такую ​​функцию нельзя реализовать на уровне приложений, еслидействительно нужен такой набор свойств.В роли дизайнера будет входить определение всех добавляемых функций (отметка времени, хранилище сообщений, флаги счетчиков индексов при доставке сообщений в формате NACK / POSACK, частные двунаправленные протоколы для запроса пропущенных сообщений и т. Д. И т. Д.).

Таким образом, эта часть разрешима, но она не потребляет ни ресурсов, ни увеличивает задержку для базовой платформы, которая была разработана с наивысшей достижимой производительностью и наименьшей возможной задержкой, используя Zen-of-Zero (нулевое копирование, где это возможно), Нулевая гарантия - либо предоставить точную копию исходного сообщения, либо ничего, если присутствуют поврежденные детали и т. Д.).

В более новых версиях ZeroMQ есть даже прямо противоположная функция, метод .setsockopt( ZMQ_CONFLATE, 1 )Это позволяет диспетчеру данных Context экземпляра отбрасывать все, кроме самого «свежего» сообщения, из очереди сообщений отдельного контрагента, доставляя, таким образом, при следующем запросе, но самое последнее сообщение и ничего другого.Это очень удобно для многих прикладных сценариев, где «старые» сообщения просто теряют свое значение, если не доставляются «сейчас», а режим CONFLATE позволяет не перемещать их все через медленное или нестабильное соединение, таким образом (косвенно) расставляя приоритеты в распределениитолько в «самом новом» сообщении о состоянии мира (и удаление устаревших сообщений также уменьшает ресурсы и рабочие нагрузки управления очередями, не так ли?).

В случае, если концепции ZeroMQ являются новыми для ваших усилий по проектированию,Обязательно прочитайте книгу Питера ХИНТЖЕНСА, библию не только самого фреймворка ZeroMQ, либо можете прочитать хотя бы об основных концептуальных различиях ZeroMQ, с которых нужно начинать в иерархии [ ZeroMQ, менее чем за пять секунд ] Section.

Более того, я заметил ...

Да, это будет работать нормально, потому что оба распределенных экземпляра Context используют отдельныепотоки, которые остаются в контакте, даже несмотря на то, что интерпретатор Python получил команду .sleep() (что он делает, но не подключенподдерживающие сигнализацию усилия обеих сторон протокольных насосов данных)

, если я пытаюсь инвертировать .bind() с .connect()

О, конечно, он должен вести себя так.Представьте себе случай с радиостанцией BBC - все люди во всем мире знают, какую радиостанцию ​​слушать, в то время как ни одна станция BBC emplyee не могла знать всех слушателей радиостанции, поэтому никогда не могла «наладить» связь между BBC и всеми.их, вокруг света (верно по причине, почему у каждого из них есть уникальный адрес, который априори неизвестен вещателю, который, таким образом, не может настроить ту же инфраструктуру со своей стороны).ZeroMQ PUB -lisher - такая же история - вы рекламируете «центральный» адрес для подключения, и все, кто хочет и может, захотят.Не наоборот.

В любом случае, наслаждайтесь мирами ZeroMQ в своих будущих дизайнерских работах.Это стоит освоить.

...