Spring kafka - смещение не устанавливается при ошибке - PullRequest
1 голос
/ 11 марта 2020

В моем приемнике после использования сообщения, если произойдет какое-либо исключение, я выдаю исключение. Если это успешно, то я признаю. Но даже если сгенерировано исключение, смещение не будет восстановлено. то есть повторение не произошло, как ожидалось. Событие ошибки не приходит снова.

Также я вижу, что я не использую все ожидаемые сообщения. Есть ли что-то, что я делаю неправильно?

@Bean
public ConcurrentKafkaListenerContainerFactory<String,Result<FormatException,String>> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, Result<FormatException, String>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(this.consumerFactory());
    factory.setErrorHandler(new SeekToCurrentErrorHandler(3));
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

    return factory;
}


public ConsumerFactory<String, Result<FormatException, String>> consumerFactory() {
   kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}

Класс слушателя

@KafkaListener(topics = "${gpc.consumer.topic}")
    public void listenWithHeadersGPC(ConsumerRecord<String, Result<FormatException, String>> consumerRecord,
            Acknowledgment acknowledgment) {
        try {
            String body = consumerRecord.value().get();
            log.logInfo("Received message for " + GPCSourceName + ": " + body);
            log.logInfo("consumption successful from kafka. partition="+consumerRecord.partition()+", "
                    + "offset="+consumerRecord.offset());
                            if(body.contains("berlin+braze@gmail.com")) {
                                throw new RuntimeException("custom exception");
                            }
                            log.logInfo("Filtered message for " + GPCSourceName + " : " + body);
            acknowledgment.acknowledge();
        }catch(Exception e) {
            log.logError("Exception occurred in listenWithHeadersGPC: ", e);
            throw e;
        }
    }

Я делаю acknowledgment.acknowledge(); в моем слушателе, если нет исключений

Редактировать: Версия Spring Kafka: 2.2.8

1 Ответ

0 голосов
/ 11 марта 2020

factory.setErrorHandler(new SeekToCurrentErrorHandler(3));

При такой конфигурации вы должны увидеть 3 попытки с нулевой задержкой между ними; наконец запись будет зарегистрирована. По умолчанию смещение «восстановленной» записи не будет зафиксировано.

При публикации подобных вопросов; укажите, какую версию вы используете.

Начиная с версии 2.3, вы можете использовать acknowledgment.nack(sleepTime); Вы также можете вызвать setCommitRecovered(true) для SeekToCurrentErrorHandler, и смещение восстановленной записи будет зафиксировано (с MANUAL_IMMEDIATE).

Все это произойдет, только если исключение выдается в контейнер; если вы поймаете исключение в своем слушателе, вы не увидите повторов.

Если вы вообще не видите повторов, и ваш слушатель выдает исключение, значит, что-то не так с вашим слушателем или конфигурацией. Показать всю свою конфигурацию и код @KafkaListener.

...