Как мы можем вручную сбросить смещение темы kafka, которая используется через Java-приложение с весенней загрузкой? - PullRequest
0 голосов
/ 09 января 2019

Мое требование - сбросить смещение темы kafka, когда приложению не удалось обработать сообщение, считанное из текущего смещения темы kafka, которое используется через Java-приложение с весенней загрузкой. После сброса смещения вручную или отправки отрицательного подтверждения сообщения должны быть снова опрошены из значения смещения, не занесенного в него, через потребитель kafka Java-приложения с весенней загрузкой. Можно ли этого достичь?

1 Ответ

0 голосов
/ 09 января 2019

Это проблема с версиями Spring Kafka <2.0.1, когда потерянные записи теряются, а слушатель продолжает чтение со следующего смещения. </p>

Если вы используете Spring kafka 2.0.1+, вы можете использовать SeekToCurrentErrorHandler, что приведет к отправке неудачных и необработанных записей в следующем опросе.

Чтобы настроить контейнер слушателя с указанным выше обработчиком, вам нужно добавить его в ContainerProperties

Вот пример контейнерной фабрики KafkaListener

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(this.consumerFactory);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    return factory;
}

Вот подробная документация:
https://docs.spring.io/spring-kafka/reference/html/_reference.html#_seek_to_current_container_error_handlers

...