Как сделать так, чтобы сообщения буфера сокета клиента паба ZMQ были отключены при отключенном сокете субсервера - PullRequest
1 голос
/ 18 октября 2019

Учитывая 2 приложения, в которых приложение A использует клиент-издатель для непрерывной потоковой передачи данных в приложение B, у которого есть сокет вспомогательного сервера для приема этих данных, как мы можем сконфигурировать сокет клиента паба в приложении A таким образом, чтобы, когда B был недоступен (как его повторное развертывание, перезапуск) A буферизует все ожидающие сообщения, и когда B становится доступным, буферизированные сообщения проходят через и сокет перехватывает поток в реальном времени?

В двух словах, как нам сделать PUB CLIENTсообщения буфера сокета с некоторым лимитом, когда SUB SERVER недоступен?

Поведение по умолчанию для клиента PUB - отключение приглушения, но было бы здорово, если бы мы могли изменить его на буфер с ограниченным размером,это возможно с ZMQ? или мне нужно сделать это на уровне приложения ...

Я пытался установить HWM и LINGER в моих сокетах, но если я не ошибаюсь, они отвечают только за случай медленного потребителя, где мой издательподключен к подписчику, но подписчик настолько медленный, что издатель начинает буферизировать сообщения (hwm ограничит количество этих сообщений) ...

Я использую jeromq, так как я нацеливаюсь на платформу jvm.

Ответы [ 3 ]

2 голосов
/ 18 октября 2019

Прежде всего, добро пожаловать в мир Zen-of-Zero, где латентность имеет наибольшее значение

ПРОЛОГ:

ZeroMQ был разработан Питером ХИНТЖЕНСОМ 'команда опытных мастеров - Martin SUSTRIK, которая будет названа первой. Дизайн был профессионально разработан, чтобы избежать ненужных задержек. Итак, спросить о наличии (ограниченной) настойчивости? Нет, сэр, не подтверждено - PUB/SUB Масштабируемый шаблон формального шаблона обмена Архетип не будет иметь его встроенным, верно из-за дополнительных проблем и снижения производительности и масштабируемости (задержка надстройки, обработка надстройки, дополнительное управление памятью).

Если нужно (ограниченное) постоянство (при отсутствии соединений с агентами на удаленной стороне SUB), не стесняйтесь внедрять его на стороне приложения,или можно спроектировать и реализовать новый ZMTP-совместимый такой образец поведения Archetype, расширяющий структуру ZeroMQ, если такая работа переходит в стабильное и общедоступное состояние, но не запрашивает высокопроизводительный стандарт с латентной задержкой PUB/SUB, имеющийотполировал почти линейную масштабируемость ad astra, чтобы измениться в этом направлении. Это определенно не тот путь.

Решение?

Сторона приложения может легко реализовать добавленную логику, используя круговые буферы с двумя указателями, работая в некотором роде (* на стороне приложения) - Постоянство-ПРОКСИ , но перед отправителем PUB.

Ваш дизайн может быть успешнымв выдавливании дополнительного соуса из внутренних деталей ZeroMQ в случае, если ваш дизайн также использует недавно предоставленный встроенный компонент ZeroMQ- socket_monitor для настройки дополнительного контрольного слоя и получения там поток событий , если смотреть "изнутри" на стороне PUB Context - экземпляр, где некоторые дополнительные события, связанные с сетью и управлением соединением, могут пролить больше света на (* на стороне приложения) - Постоянство-ПРОКСИ

Тем не менее, имейте в виду, что

_zmq_socket_monitor()_*Метод 1046 * поддерживает только транспорты с установлением соединения, то есть TCP, IPC и TIPC.

так что об этом можно забыть в случае, если планировалось использовать любой из чрезвычайно интересных транспортных классов { inproc:// | norm:// | pgm:// | epgm:// | vmci:// }


Heads up!

Есть неточные, если не ошибочные, сведения от почетного члена нашего Сообщества smac89 , который изо всех сил старался ответить на ваш дополнительный интерес, выраженный в комментарии:

"... zmq оптимизирует публикацию по темам? например, если вы продолжаете публиковать на скорости около 100 символов topic быстро, действительно ли он отправляет topic каждый раз или он сопоставляется с некоторым int и впоследствии отправляет int ...? "

говорю вам:

"Он всегда будет публиковать topic. Когда я использую pub-sub pattern, я обычно публикую сначала topic, а затем реальное сообщение, поэтому в подписчике я просто читаю первый кадр , игнорирую его и затем читаю реальное сообщение "

ZeroMQ не работает таким образом. Нет ничего как «отдельный» <topic>, за которым следует <message-body>, а скорее как

TOPIC и механизация тематическая фильтрация работает совсем по-другому.

1) Вы никогда не знаете, кто .connect() -s:
, т. Е. Можно быть почти уверенным, что версия 2.x до версии 4.2+ будет по-разному обрабатывать фильтрацию тем (ZMTP: RFC определяет начальное квитирование версии с возможностью, чтобы позволить Context -экземпляру решить, какую версию тематической фильтрации нужно будет использовать:
ver 2.x , используемая для перемещения всех сообщенийвсем одноранговым узлам, и пусть все SUB-стороны (из версии 2.x +) доставят сообщение (и пусть * SUB -side * Context -instance обрабатывает локальный * 1115Обработка фильтра * -list )

, тогда как
ver 4.2 + обязательно выполнит обработку фильтра topic -list на ** стороне PUB Context-instance (увеличивается загрузка ЦП, наоборот - сетевой транспорт), поэтому вашей стороне SUB никогда не будет доставлен байт «бесполезного» * ​​1124 * чтения «неподписанных» для сообщений.

2) (можете, но) нет необходимости разделять «тему» ​​на первый кадр подразумеваемого таким образом многокадрового сообщения. Возможно, как раз наоборот (это довольно противоречивый шаблон для высокопроизводительной, распределенной системы с низкой задержкой.

Процесс фильтрации тем определен и работает побайтно , отСлева направо, сопоставление с шаблоном для каждого значения элемента списка тем повторяет полезную нагрузку доставленного сообщения.

Добавление дополнительных данных, дополнительная обработка управления кадрами только и только увеличивает сквозную задержкуи непроизводительные затраты. Никогда не стоит делать это вместо правильной проектных работ.


ЭПИЛОГ:

Нет ни легких побед, ни каких-либонизко висящие фрукты в профессиональном дизайне, тем меньше, если или со сверхнизкой задержкой являются проектными целями.

С другой стороныУбедитесь, что ZeroMQ Framework был создан с учетом этого, и эти усилия увенчались стабильным, в конечном счете, хорошо сбалансированным набором инструментов для интеллектуального (по замыслу), быстрого (в рабочем состоянии)и) и масштабируемые (как может завидовать ад) службы сигнализации / обмена сообщениями, которые люди любят правильно использовать из-за этой мудрости дизайна.

Желаю вам жить счастливо с ZeroMQ, как есть, и не стесняйтесь добавлять любой дополнительный набор функций"перед" слоя ZeroMQ, внутри выбранного набора приложений.

1 голос
/ 03 ноября 2019

Я публикую быстрое обновление, так как два других ответа (хотя очень информативные были на самом деле неправильными), и я не хочу, чтобы другие были дезинформированы из моего принятого ответа. Мало того, что вы можете сделать это с zmq, на самом деле это поведение по умолчанию .

Хитрость заключается в том, что если ваш клиент издателя никогда не подключался к серверу подписчика до того, как он продолжает отбрасывать сообщения (и именно поэтому я думал, что он не буферизует сообщения), но если ваш издатель подключается к подписчику, и вы перезапускаетеподписчик, издатель будет буферизировать сообщения до тех пор, пока не будет достигнут HWM, и это именно то, о чем я просил ... поэтому вкратце издатель хочет знать, что кто-то на другом конце принимает сообщения, только после этого он будет буферизировать сообщения ...

Вот пример кода, который демонстрирует это (вам может потребоваться внести некоторые базовые изменения для его компиляции).

Я использовал эту зависимость только org.zeromq:jeromq:0.5.1.

zmq-publisher.kt

fun main() {
   val uri = "tcp://localhost:3006"
   val context = ZContext(1)
   val socket = context.createSocket(SocketType.PUB)

   socket.hwm = 10000
   socket.linger = 0
   "connecting to $uri".log()
   socket.connect(uri)

   fun publish(path: String, msg: Msg) {
      ">> $path | ${msg.json()}".log()
      socket.sendMore(path)
      socket.send(msg.toByteArray())
   }

   var count = 0

   while (notInterrupted()) {
      val msg = telegramMessage("message : ${++count}")
      publish("/some/feed", msg)
      println()

      sleepInterruptible(1.second)
   }
}

и, конечно, zmq-subscriber.kt


fun main() {
   val uri = "tcp://localhost:3006"
   val context = ZContext(1)
   val socket = context.createSocket(SocketType.SUB)

   socket.hwm = 10000
   socket.receiveTimeOut = 250

   "connecting to $uri".log()
   socket.bind(uri)

   socket.subscribe("/some/feed")

   while (true) {
      val path = socket.recvStr() ?: continue
      val bytes = socket.recv()
      val msg = Msg.parseFrom(bytes)
      "<< $path | ${msg.json()}".log()
   }
}

Попробуйте сначала запустить издатель без подписчика, затем при запуске подписчика вы пропустили все сообщения до сих пор ... теперь без перезапуска издателя остановитеподписчик подождет некоторое время и запустит его снова.

Вот пример одного из моих сервисов на самом делеНе смотря на это ... Это структура [current service]sub:server <= pub:client[service being restarted]sub:server <=* pub:client[multiple publishers]

Поскольку я перезапускаю службу посередине, все издатели начинают буферизовывать свои сообщения, а последняя служба, которая наблюдала ~ 200 сообщений в секунду, наблюдает падение на0 (эти 1 или 2 - биения), затем приходит внезапный пакет из 1000+ сообщений, потому что все издатели сбросили свои буферы (перезапуск занял около 5 секунд) ... Я на самом деле не теряю здесь ни одного сообщения ...

enter image description here

Обратите внимание, что у вас должна быть пара subscriber:server <= publisher:client (таким образом, издатель знает, что «есть только одно место, куда мне нужно доставить эти сообщения» (вы можете попробоватьпривязка к издателю и подключение к подписчику, но вы больше не будете видеть сообщения буферизации издателя просто потому, что сомнительно, если только что отключившийся подписчик сделал это, потому что он больше не нуждается в данных или из-за сбоя)

1 голос
/ 18 октября 2019

Как мы уже обсуждали в комментариях, у издателя нет возможности буферизовать сообщения, не имея к нему никакого отношения, он просто отбросит все новые сообщения:

Из документов:

Если у издателя нет подключенных подписчиков, он просто отбросит все сообщения.

Это означает ваш буфер необходимбыть вне заботы zeromq . Тогда ваш буфер может быть списком, базой данных или любым другим способом хранения, который вы выберете, но вы не можете использовать своего издателя для этого.

Теперь следующая проблема связана с тем, как обнаружить этого подписчика. подключен / отключен. Это нужно, чтобы сообщить нам, когда нам нужно начать чтение из буфера / заполнение буфера.

Я предлагаю использовать Socket.monitor и прослушивать ZMQ_EVENT_CONNECTED и ZMQ_EVENT_DISCONNECTED, так как они сообщат вам, когда клиент подключился / отключился, и, таким образом, позволят вам переключиться на заполнение буфера по вашему выбору. Конечно, могут быть и другие способы сделать это, которые напрямую не связаны с zeromq, но решать вам.

...