Как установить подтверждение в случае исчерпания повторных попыток в потребителе Kafka - PullRequest
0 голосов
/ 03 апреля 2019

У меня есть потребитель Kafka, который повторяет 5 раз, и я использую Spring Kafka с шаблоном повторных попыток.Теперь, если все повторные попытки не пройдены, то как подтвердить работу в этом случае.Также, если я установил режим подтверждения вручную, то как подтвердить это сообщение

Потребитель

@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.setRetryTemplate(retryTemplate);
    factory.setRecoveryCallback(context -> {
        log.error("Maximum retry policy has been reached {}", context.getAttribute("record"));
        Acknowledgment ack = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
        ack.acknowledge();
        return null;
    });
    factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
    return factory;
}

Слушатель Кафки

@KafkaListener(topics = "${kafka.topic.json}", containerFactory = "kafkaListenerContainerFactory")
public void recieveSegmentService(String KafkaPayload, Acknowledgment acknowledgment) throws Exception {
    KafkaSegmentTrigger kafkaSegmentTrigger;
    kafkaSegmentTrigger = TransformUtil.fromJson(KafkaPayload, KafkaSegmentTrigger.class);
    log.info("Trigger recieved from segment service {}", kafkaSegmentTrigger);
    processMessage(kafkaSegmentTrigger);
    acknowledgment.acknowledge();
}

1 Ответ

1 голос
/ 03 апреля 2019

Если у вас есть RecoveryCallback, который обрабатывает запись, когда повторные попытки исчерпаны, и вы НЕ используете ручные подтверждения, контейнер выполнит смещение.

Если вы используете ручные подтверждения, вы можете подтвердитьсмещение в обратном вызове восстановления.

Объект контекста, переданный обратному вызову, имеет атрибут RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT («подтверждение»).

Он также имеет CONTEXT_CONSUMER и CONTEXT_RECORD.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...