Как установить RecordInterceptor в ConcurrentKafkaListenerContainerFactory - PullRequest
2 голосов
/ 28 октября 2019

Я использую Spring Kafka 2.2.7, я настроил @EnableKafka с kafkaListenerContainerFactory и использую @KafkaListener для приема сообщений, все работает как положено.

Я хочу добавить RecordInterceptor для регистрации всех потребляемых сообщений, но это затрудняет его настройку. Документация гласит, что RecordInterceptor может быть установлен для контейнера, однако я не уверен, как получить экземпляр контейнера.

Начиная сверсия 2.2.7, вы можете добавить RecordInterceptor в контейнер слушателя;он будет вызван перед вызовом слушателя, разрешающего проверку или изменение записи.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(createConsumerFactory());
        factory.setConcurrency(consumerCount);
        return factory;
    }

Я просматривал документацию Spring, но не нашел решения,это кажется простой вещью, но, возможно, я что-то упустил.

Любая помощь в этом отношении будет признательна.

Заранее спасибо.

1 Ответ

1 голос
/ 28 октября 2019

Существует метод setRecordInterceptor , начиная с 2.2.7

factory.setRecordInterceptor(new RecordInterceptor);

И другая информация RecordInterceptor не будет работать для пакетного прослушивателя

Начиная с версии 2.2.7, вы можете добавить RecordInterceptor в контейнер слушателя;он будет вызван перед вызовом слушателя, позволяющего проверить или изменить запись. Если перехватчик возвращает ноль, слушатель не вызывается. Перехватчик не вызывается, когда слушатель является пакетным слушателем.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...