Как не потерять сообщения от Кафки при автономной базе данных - PullRequest
0 голосов
/ 12 января 2019

Я занимаюсь разработкой микросервиса, который использует сообщения от Kaffka, затем обрабатывает эти сообщения и сохраняет выходные данные в MongoDB

Я новичок в kafka и столкнулся с некоторой проблемой с потерей сообщений.

Сценарий довольно прост:

В случае, если mongoDB находится в автономном режиме, микросервис получает сообщение, затем пытается сохранить вывод в Mongo, а затем я получаю сообщение об ошибке, в котором говорится, что mongo находится в автономном режиме и сообщение потеряно.

Мой вопрос: в Кафке есть какой-либо механизм, который в этом случае прекращает отправку сообщений. Следует ли вручную фиксировать смещение в Кафке? Каковы наилучшие методы для устранения ошибок у потребителей Kafka?

Ответы [ 3 ]

0 голосов
/ 12 января 2019

Я думаю, что вместо того, чтобы делать коммит вручную, вы должны использовать Kafka Streams и Kafka Connect. Управлять транзакциями между двумя системами: Apache Kafka и MongoDB может быть не так просто, поэтому лучше использовать уже разработанные и протестированные инструменты (Вы можете узнать больше о Kafka Connect: https://kafka.apache.org/documentation/#connect, https://docs.confluent.io/current/connect/index.html)

Ваш сценарий может выглядеть примерно так:

  • Обработайте ваше сообщение, используя Kafka Streams и отправьте результат новому тема (семантика Kafka Streams поддерживает единовременную семантику)
  • Используйте Kafka Connect (Sink коннектор) для сохранения данных в MongoDB https://www.confluent.io/connector/kafka-connect-mongodb-sink/
0 голосов
/ 12 января 2019

Один способ сделать это можно с помощью методов pause и resume на MessageListenerContainer (но вы должны использовать spring kafka> 2.1.x) spring-kafka-docs

@ KafkaListener Lifecycle Management

Контейнеры слушателя, созданные для аннотаций @KafkaListener, не являются компонентами в контексте приложения. Вместо этого они регистрируются с помощью компонента инфраструктуры типа KafkaListenerEndpointRegistry. Этот компонент автоматически объявляется платформой и управляет жизненными циклами контейнеров; он автоматически запустит все контейнеры, для которых autoStartup установлено на true.

Итак, Autowire KafkaListenerEndpointRegistry Конечная точка реестра в приложении

@Autowired
private KafkaListenerEndpointRegistry registry;

Получить MessageListenerContainer из реестра spring-kafka-docs

public MessageListenerContainer getListenerContainer(java.lang.String id)

Вернуть MessageListenerContainer с указанным идентификатором или нулем, если такой контейнер не существует.

Параметры:

id - идентификатор контейнера

На MessageListenerContainer вы можете использовать pause или resume методы spring-kafka-docs

по умолчанию void pause ()

Приостановить этот контейнер перед следующим опросом ().

резюме по умолчанию void ()

Возобновить этот контейнер, если он был приостановлен, после следующего опроса ().

0 голосов
/ 12 января 2019

Для такого сценария вы должны вручную зафиксировать смещение. Подтвердите смещение, только если ваша обработка сообщения прошла успешно. Вы делаете это, как показано ниже. Однако следует помнить, что сообщения имеют ttl, поэтому сообщения автоматически удаляются из брокера kafka после истечения времени ttl.

consumer.commitSync(); 
...