@KafkaListener отдельная логика фильтрации c для каждого слушателя - PullRequest
2 голосов
/ 20 марта 2020

Мне нужно определить собственную стратегию фильтрации для каждого слушателя, созданного фабрикой слушателей. В настоящее время я использую RecordFilterStrategy для этого:

@Bean
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactoryProject() {
    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(new RecordFilterStrategy<String, GenericRecord>() {
        @Override
        public boolean filter(ConsumerRecord<String, GenericRecord> consumerRecord) {
          return true;
        }
    });
    return factory;
}

Но такая фильтрация применяется ко всем слушателям, созданным этой фабрикой. Мне нужно что-то вроде определения различных логик c для каждого слушателя:

@Component
@SendTo("out")
@KafkaListener(topics = "incoming")
public class TestListener {

    @Filter
    public boolean filter(){
        return true;
    }

    @KafkaHandler
    public TestObject listener(TestObject testObject) {
        log.debug("Received Message: " + testObject);
        return testObject;
    }

}

Есть ли у spring-kafka некоторые инструменты для этого? Или мне нужно написать такую ​​логику c самостоятельно?

Заранее спасибо!

1 Ответ

2 голосов
/ 20 марта 2020

Нет, нет. Вам просто нужен набор ConcurrentKafkaListenerContainerFactory бобов с определенным RecordFilterStrategy. Тогда ваш @KafkaListener должен просто указать, на какой фабрике они основаны:

/**
 * The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
 * to use to create the message listener container responsible to serve this endpoint.
 * <p>If not specified, the default container factory is used, if any.
 * @return the container factory bean name.
 */
String containerFactory() default "";
...