Я использую Spring Kafka 2.2.7, я настроил @EnableKafka
с kafkaListenerContainerFactory
и использую @KafkaListener
для приема сообщений, все работает как положено.
Я хочу добавить RecordInterceptor
для регистрации всех потребляемых сообщений, но это затрудняет его настройку. Документация гласит, что RecordInterceptor может быть установлен для контейнера, однако я не уверен, как получить экземпляр контейнера.
Начиная сверсия 2.2.7, вы можете добавить RecordInterceptor в контейнер слушателя;он будет вызван перед вызовом слушателя, разрешающего проверку или изменение записи.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createConsumerFactory());
factory.setConcurrency(consumerCount);
return factory;
}
Я просматривал документацию Spring, но не нашел решения,это кажется простой вещью, но, возможно, я что-то упустил.
Любая помощь в этом отношении будет признательна.
Заранее спасибо.