Это проблема с версиями Spring Kafka <2.0.1, когда потерянные записи теряются, а слушатель продолжает чтение со следующего смещения. </p>
Если вы используете Spring kafka 2.0.1+, вы можете использовать SeekToCurrentErrorHandler
, что приведет к отправке неудачных и необработанных записей в следующем опросе.
Чтобы настроить контейнер слушателя с указанным выше обработчиком, вам нужно добавить его в ContainerProperties
Вот пример контейнерной фабрики KafkaListener
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(this.consumerFactory);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
return factory;
}
Вот подробная документация:
https://docs.spring.io/spring-kafka/reference/html/_reference.html#_seek_to_current_container_error_handlers