Spring Kafka создает сообщение после завершения повторной попытки потребителя - PullRequest
0 голосов
/ 13 июля 2020

У меня есть потребитель kafka, который принимает сообщения от топи c и обрабатывает их. Я применил 5 повторных попыток, чтобы обработать сообщение и записать это сообщение в другой topi c для дальнейшего использования в случае сбоя обработки даже после 5 попыток. Мой код выглядит следующим образом:

KafkaConsumerConfig:

   @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaManualAckListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
        factory.setErrorHandler(new SeekToCurrentErrorHandler(null, 5));
        return factory;
    }

KafkaConsumer:

    @Value("${kafka.consumer.failed.topic}")
    private String failedTopic;
    
    @KafkaListener(topics = "${kafka.consumer.topic}", groupId = "${kafka.consumer.groupId}", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void processMessage(String kafkaMessage) throws Exception {
        log.info("parsing new kafka message {}", kafkaMessage);
        TransactionDTO transactionDTO = TransformUtil.fromJson(kafkaMessage, TransactionDTO.class);
        try {
            service.parseTransactionDTO(transactionDTO);
        } catch (Exception e) {
            log.error(e.getMessage());
            kafkaTemplate.send(failedTopic, kafkaMessage);
            Thread.sleep(10000);
            throw e;
        }
    }

Потребитель правильно пытается обработать через 5 попыток с задержкой 10 секунд, но каждый раз, когда это не удается, новое сообщение записывается в неудачный topi c. Есть ли способ, чтобы сообщение записывалось в сбойный topi c только один раз, когда все попытки повторных попыток были исчерпаны, вместо того, чтобы писать его каждый раз при сбое?

1 Ответ

0 голосов
/ 13 июля 2020

factory.setErrorHandler(new SeekToCurrentErrorHandler(null, 5));

Используйте DeadLetterPublishingRecoverer вместо null в обработчике ошибок.

Это было введено в 2.2; Я не уверен, какую версию вы используете. Если более ранняя, вы можете либо обновить (текущая версия 2.5.3), либо вы можете переместить свой код публикации в настраиваемое средство восстановления.

Более новые версии (начиная с 2.3) позволяют добавлять задержку отката между попытками повторения .

...