Избегайте дублирования сообщения производителя Kafka - PullRequest
1 голос
/ 27 апреля 2020

Я использую KafkaTemplate из весенней загрузки. Java 8

Моя главная цель - чтобы потребитель не использовал сообщение дважды.

1) Вызов таблицы для получения 100 строк и отправить его в kafka

2) Предположим, что я обработал 70 строк (я получил успешное подтверждение), а затем Кафка вышел из строя (Кафка не восстанавливается в течение времени механизма RETRY)

Поэтому, когда я перезагружаюсь тогда как я могу убедиться, что эти 70 сообщений не отправляются снова.

Один из вариантов - у меня может быть флаг в сообщении таблицы БД is_sent = Y or N.

Есть ли другой эффективный способ?

Ответы [ 2 ]

0 голосов
/ 27 апреля 2020

Для Кафки я видел реализацию хранения указателя на идентификатор для отслеживания того, где вы находитесь в топи c, и использования какого-то распределенного хранилища для отслеживания этого на уровне кластера. Я не проделал много работы там, поэтому я постараюсь предоставить решение, которое мы использовали с SQS для обнаружения дублирования. Вполне вероятно, что у Кафки есть лучшее решение, чем это, которое нужно решить для дублирования, просто хочу добавить туда, чтобы вы могли также посмотреть альтернативные решения.

У меня была такая же проблема при работе с AWS SQS для случаев использования обмена сообщениями «точка-точка», поскольку он обеспечивает гарантированную доставку как минимум один раз, а не один раз и только один раз.

Мы решили использовать Redis с его стратегией распределенной блокировки для решения этой проблемы. У меня есть запись здесь https://angularthinking.blogspot.com/.

Высокоуровневый подход заключается в создании распределенной блокировки для помещения записи в кэш с соответствующим TTL для вашего варианта использования. Мы используем LUA скрипт, чтобы сделать метод putIfNotExists (), как показано в блоге выше. Масштабирование было одной из наших задач, и благодаря вышеописанной реализации мы смогли обработать 10 тысяч сообщений в секунду без каких-либо проблем в SQS, а redis очень хорошо масштабировался. Нам пришлось настроить TTL на оптимальное значение, основанное на пропускной способности и увеличении кеша. У нас было преимущество в том, что окно дублирования было 24 часа или меньше, поэтому в зависимости от redis для этого решения было все в порядке. Если у вас больше windows, где дубликаты могут возникать в течение нескольких дней или месяцев, опция redis может не подойти.

Мы также рассмотрели DynamoDB для реализации putIfNotExists (), но redis показался более производительным для этого использования. случай, особенно с его собственной реализацией putIfNotExists, использующей скрипт LUA.

Удачи в поиске.

0 голосов
/ 27 апреля 2020

Я бы использовал JDB C разъем источника (в зависимости от того, какую базу данных вы сейчас используете) с Kafka Connect , который правильно обрабатывает этот сценарий.


Если вы все еще хотите написать своего собственного продюсера, этот раздел часто задаваемых вопросов Kafka должен быть полезен:

Как мне получать точно один раз сообщения от Kafka?

Точно одна семантика состоит из двух частей: избегать дублирования при получении данных и избегать дублирования при использовании данных.

Существует два подхода к получению ровно одного раза семантика во время производства данных:

  1. Используйте по одному записывающему устройству на раздел, и каждый раз, когда вы получаете сетевую ошибку, проверяйте последнее сообщение в этом разделе, чтобы увидеть, была ли ваша последняя запись успешной
  2. Включить первичный ключ (UUID или что-то) в сообщении и дедупликация на получателя.

Если вы выполните одно из этих действий, журнал, в котором размещаются узлы Kafka, не будет содержать дубликатов. Однако чтение без дубликатов также зависит от некоторого сотрудничества со стороны потребителя. Если потребитель периодически проверяет свою позицию, тогда, если он терпит неудачу и перезапускается, он перезапускается с позиции контрольной точки. Таким образом, если вывод данных и контрольная точка не записаны атомарно, здесь также можно будет получить дубликаты. Эта проблема характерна для вашей системы хранения. Например, если вы используете базу данных, вы можете зафиксировать их вместе в транзакции. Загрузчик HDFS Camus, который написал LinkedIn, делает что-то подобное для загрузки oop. Другой альтернативой, которая не требует транзакции, является сохранение смещения с загруженными данными и дедупликация с использованием комбинации тема / раздел / смещение.

Я думаю, что есть два улучшения, которые облегчили бы это:

  1. Идемпотентность производителя может быть достигнута автоматически и намного дешевле, если дополнительно интегрировать поддержку для этого на сервере.
  2. Существующий потребитель высокого уровня не предоставляет возможности более тонкого контроля смещений (например, для сброса вашей позиции). Мы будем работать над этим в ближайшее время
...