У меня есть потребитель kafka, который принимает сообщения от топи c и обрабатывает их. Я применил 5 повторных попыток, чтобы обработать сообщение и записать это сообщение в другой topi c для дальнейшего использования в случае сбоя обработки даже после 5 попыток. Мой код выглядит следующим образом:
KafkaConsumerConfig:
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
factory.setErrorHandler(new SeekToCurrentErrorHandler(null, 5));
return factory;
}
KafkaConsumer:
@Value("${kafka.consumer.failed.topic}")
private String failedTopic;
@KafkaListener(topics = "${kafka.consumer.topic}", groupId = "${kafka.consumer.groupId}", containerFactory = "kafkaManualAckListenerContainerFactory")
public void processMessage(String kafkaMessage) throws Exception {
log.info("parsing new kafka message {}", kafkaMessage);
TransactionDTO transactionDTO = TransformUtil.fromJson(kafkaMessage, TransactionDTO.class);
try {
service.parseTransactionDTO(transactionDTO);
} catch (Exception e) {
log.error(e.getMessage());
kafkaTemplate.send(failedTopic, kafkaMessage);
Thread.sleep(10000);
throw e;
}
}
Потребитель правильно пытается обработать через 5 попыток с задержкой 10 секунд, но каждый раз, когда это не удается, новое сообщение записывается в неудачный topi c. Есть ли способ, чтобы сообщение записывалось в сбойный topi c только один раз, когда все попытки повторных попыток были исчерпаны, вместо того, чтобы писать его каждый раз при сбое?