Буферизация производителя кафки - PullRequest
0 голосов
/ 27 ноября 2018

Предположим, что есть продюсер, и я запускаю потребителя через несколько минут.Я заметил, что потребитель будет использовать старые сообщения, созданные производителем, но я не хочу, чтобы это произошло.Как я могу это сделать?Есть ли какие-либо параметры конфигурации в брокере, которые нужно установить и решить эту проблему?

1 Ответ

0 голосов
/ 27 ноября 2018

Это действительно зависит от варианта использования, вы не предоставили много информации об архитектуре.Например, когда потребитель встал, он долго работает, или он просто просыпается на короткое время и потребляет новые поступающие сообщения?

Вы можете выбрать любой из следующих подходов:

  • Фильтруйте ConsumerRecord по отметке времени, чтобы вы автоматически отбрасывали сообщения, которые были созданы в течение настраиваемого времени.
  • В моей команде мы используем эфемерные группы.То есть - каждый раз, когда сервис запускается, мы генерируем новый идентификатор группы для группы потребителей, устанавливая auto.offset.reset в latest
  • Seek to timestamp - начиная с kafka 0.10 вы можете искать определенную позицию.Используйте consumer.offsetsForTimes, чтобы получить смещение каждого тематического раздела в течение требуемого времени, а затем используйте consumer.seek, чтобы получить указанное смещение.
  • Если вы используете группу потребителей, но никогда не фиксируете kafka, тогдакаждый раз, когда потребитель назначается тематическому разделу, он начинает потреблять в соответствии с auto.offset.reset policy ...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...