Потребитель потребляет одно и то же сообщение несколько раз - PullRequest
0 голосов
/ 02 апреля 2019

У меня есть потребительское сообщение от Кафки.Я установил шаблон повтора в KafkaListenerContainerFactory.Но не знаю, почему он потребляет одно и то же сообщение дважды.Когда приложение с помощью шаблона повторных попыток исключения, вызванного с указанным числом, Kafka также использует то же сообщение с тем же числом.

@Bean
RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();

    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(1000L);
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(3);
    retryTemplate.setRetryPolicy(retryPolicy);

    return retryTemplate;
}


@Bean("KafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> KafkaListenerContainerFactory(RetryTemplate retryTemplate) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate);
    factory.setRecoveryCallback(context -> {
        log.error("Maximum retry policy has been reached");
        return null;
    });
    factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
    return factory;
}

Потребитель

@KafkaListener(topics = "${kafka.topic.json}", containerFactory = "kafkaListenerContainerFactory")
public void recieveSegmentService(String KafkaPayload) throws Exception {
    KafkaSegmentTrigger kafkaSegmentTrigger;
    kafkaSegmentTrigger = TransformUtil.fromJson(KafkaPayload, KafkaSegmentTrigger.class);
    log.info("Trigger recieved from segment service {}", kafkaSegmentTrigger);
    try {
        processMessage(kafkaSegmentTrigger);
    } catch (Exception e) {
        retryTemplate.execute(arg0 -> {
            processMessage(kafkaSegmentTrigger);
            return null;
        });
    }finally {
    }
}

ProcessMessage вызывает исключение

1 Ответ

0 голосов
/ 02 апреля 2019

Вы вложили RetryTemplate с - один снаружи слушателя и один внутри.Если вы используете один и тот же шаблон в обоих местах, вы получите 12 попыток (3, управляемые адаптером слушателя x4 внутри слушателя).

Используйте одно или другое.

...