Spring Kafka @KafkaListener - Повторите попытку отправки ошибочных сообщений и вручную передайте смещение - PullRequest
0 голосов
/ 12 февраля 2019

Я использую @KafkaListener для потребления из темы kafka, у меня есть логика приложения для обработки каждой записи несколькими потребителями в разных группах потребителей.

Теперь моя проблема в том, что -Я должен отправитьизрасходованное сообщение для сторонней точки отдыха.Если отправка сообщения завершается неудачно, мне не следует фиксировать смещение, и мне нужно повторить попытку отправки сообщения на основе настраиваемого числа раз.После настраиваемых нет повторных попыток, если не удается отправить сообщение.Мне нужно записать причину исключения и зафиксировать смещение.

Я использую пружинную кафку 2.2.0.Ниже моя конфигурация для подтверждения вручную

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

spring.kafka.listener.ack-mode=manual-immediate

logging.level.org.springframework.kafka=debug

До сих пор я могу использовать сообщение, опубликованное в теме другим потребителем.а также в состоянии вручную подтвердить смещение от другого потребителя.например,

@KafkaListener(topics = "testTopic",groupId="group-1")
    public void consumerOne(ConsumerRecord<String, String> record,Acknowledgment ack) {
        logger.info("OF PayLoad:{}", record);
        logger.info("OF value:"+record.value());
        logger.info("OF Offset:{}",record.offset());
        ack.acknowledge();

    }
    @KafkaListener(topics = "testTopic",groupId="group-2")
    public void consumerTwo(ConsumerRecord<String, String> record,Acknowledgment ack) {
        logger.info("MF PayLoad_2:{}", record);
        logger.info("MF value_2:"+record.value());
        logger.info("MF Offset_2:{}",record.offset());
        //ack.acknowledge();
    }

здесь, consumerTwo не подтверждает сообщение.Я хочу повторить попытку отправить все сообщение потребителю снова в зависимости от настраиваемого промежутка времени.

Любой пример или предложение будут очень полезны.

1 Ответ

0 голосов
/ 12 февраля 2019

См. этот ответ .

В версии 2.2 мы добавили возможность добавить восстановитель к SeekToCurrentErrorHandler, который может выполнять некоторые действия.

Запрос на извлечениев моем последнем комментарии показано, как зафиксировать смещения, если программа восстановления успешно восстановит сообщение.Это будет в выпуске 2.2.4 в четверг.

В вашем случае программа восстановления может вызвать службу REST и вызвать исключение в случае сбоя.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...