В моем приемнике после использования сообщения, если произойдет какое-либо исключение, я выдаю исключение. Если это успешно, то я признаю. Но даже если сгенерировано исключение, смещение не будет восстановлено. то есть повторение не произошло, как ожидалось. Событие ошибки не приходит снова.
Также я вижу, что я не использую все ожидаемые сообщения. Есть ли что-то, что я делаю неправильно?
@Bean
public ConcurrentKafkaListenerContainerFactory<String,Result<FormatException,String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Result<FormatException, String>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(this.consumerFactory());
factory.setErrorHandler(new SeekToCurrentErrorHandler(3));
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
public ConsumerFactory<String, Result<FormatException, String>> consumerFactory() {
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
Класс слушателя
@KafkaListener(topics = "${gpc.consumer.topic}")
public void listenWithHeadersGPC(ConsumerRecord<String, Result<FormatException, String>> consumerRecord,
Acknowledgment acknowledgment) {
try {
String body = consumerRecord.value().get();
log.logInfo("Received message for " + GPCSourceName + ": " + body);
log.logInfo("consumption successful from kafka. partition="+consumerRecord.partition()+", "
+ "offset="+consumerRecord.offset());
if(body.contains("berlin+braze@gmail.com")) {
throw new RuntimeException("custom exception");
}
log.logInfo("Filtered message for " + GPCSourceName + " : " + body);
acknowledgment.acknowledge();
}catch(Exception e) {
log.logError("Exception occurred in listenWithHeadersGPC: ", e);
throw e;
}
}
Я делаю acknowledgment.acknowledge();
в моем слушателе, если нет исключений
Редактировать: Версия Spring Kafka: 2.2.8