Фильтровать сообщения перед десериализацией на основе заголовков - PullRequest
0 голосов
/ 19 декабря 2018

Иногда сообщения могут быть отфильтрованы перед десериализацией на основе значений заголовка.Существуют ли какие-либо шаблоны для этого сценария, использующие пружинную кафку.Я имею в виду реализацию, аналогичную ErrorHandlingDeserializer, в дополнение к предикату фильтра приема делегата также в качестве свойства.Какие-либо предложения?спасибо.

1 Ответ

0 голосов
/ 19 декабря 2018

Да, вы можете использовать ту же технику, что и ErrorHandlingDeserializer2 (которая заменяет ErrorHandlingDeserializer), чтобы вернуть объект «маркер» вместо десериализации, а затем добавить RecordFilterStrategy, который фильтрует записи с такимиобъекты, к слушателю (фабрика контейнеров при использовании @KafkaListener или использование адаптера фильтрации для явного слушателя).

EDIT

Spring Boot и добавление фильтра...

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    kafkaConsumerFactory.setsetRecordFilterStrategy(myFilter());
    return factory;
}
...