за подтверждение сообщения в Kafka / RabbitMQ - PullRequest
0 голосов
/ 03 марта 2019

У нас есть рабочий кролик. Реализация, из-за объема, мы планируем перейти на кафку.

У меня есть сомнения в одном месте.

В rabbitMQ, когда потребитель потребляетсообщение от Q сообщение переходит на другую стадию, неупакованная стадия.клиент / потребитель занимает некоторое время, чтобы обработать сообщение, после успешного процесса он отправляет подтверждение Q и сообщение удаляется из Q. в случае неудачи, по истечении определенного периода, если Q не получает подтверждение, сообщениедобавлен в конце Q.Таким образом, мы не теряем ни одного сообщения.

С моим небольшим знанием Кафки я понимаю, что если, например, сообщение 100 не было успешно обработано, смещение не будет увеличено, но оно будет увеличено, если сообщение 101 будет обработано.успешноПоэтому я потерял сообщение 100.

Есть ли способ гарантировать, что ни одно из сообщений не будет потеряно.

Ответы [ 4 ]

0 голосов
/ 07 марта 2019

Я тоже сталкивался с тем же вопросом.Если я хочу выразиться проще, RabbitMQ ведет подсчет каждого

  1. опубликованных и не использованных
  2. опубликованных, использованных и неподтвержденных сообщений.

Кафка не может, поэтому вы не можете сделать его готовым, вы должны реализовать его самостоятельно.

Хотя есть варианты, используйте kmq, производительность станет меньше 50%,посмотрите

https://softwaremill.com/kafka-with-selective-acknowledgments-performance/

0 голосов
/ 03 марта 2019

Кафка не удаляет сообщения из тем, пока не достигнет одного из log.retention.bytes log.retention.hours log.retention.minutes log.retention.ms конфигов.так что если смещение увеличивается, вы не потеряете предыдущие сообщения и можете просто изменить смещение на желаемую позицию.

0 голосов
/ 03 марта 2019

Вы должны прочитать немного о том, как работает потребление сообщений в Kafka.Вот ссылка на потребительский раздел официальных документов Kafka: https://kafka.apache.org/documentation/#theconsumer

По сути, в Kafka сообщения удаляются только по прошествии достаточного времени, и это настраивается с помощью log.retention.hours, log.retention.minutesи log.retention.ms как сказал @Amin.

В Kafka любое количество потребителей может начать получать сообщения из любой темы в любой момент, независимо от того, потребляют ли другие потребители эту же тему.Кафка отслеживает, где каждый потребитель в каждой теме / разделе использует смещения, которые хранятся в самой Кафке.Таким образом, если вашему потребителю необходимо принять сообщение 100, как вы описали в своем вопросе, вы можете просто «перемотать» назад на желаемое сообщение и снова начать нормально потреблять.Неважно, если вы ранее потребляли его, или если другие потребители читают из этой темы или нет.

Из официальных документов Kafka:

Потребитель может намеренно перемотатьвернуться к старому смещению и повторно использовать данные.Это нарушает общий контракт очереди, но оказывается важной особенностью для многих потребителей.Например, если код потребителя содержит ошибку и обнаруживается после использования некоторых сообщений, потребитель может повторно использовать эти сообщения после исправления ошибки.

0 голосов
/ 03 марта 2019

Ваше смещение сообщения не будет увеличено, если вы не запросите новые сообщения.Таким образом, вы должны быть обеспокоены повторной обработкой вашего сообщения.

Если вы хотите сохранить результат обработки ваших данных в кластере Kafka, вы можете использовать функцию транзакции Kafka .Таким образом, вы можете поддерживать только один раз доставки.Все ваши изменения будут сохранены или ни одно из них не будет сохранено.

Другой подход - сделать ваш сценарий обработки идемпотентным.Вы будете назначать уникальный идентификатор для каждого сообщения в Kafka.При обработке сообщения вы сохраняете идентификатор в базе данных.После сбоя вы проверяете, что ваш идентификатор сообщения уже обработан, просмотрев базу данных.

...