Мне нужно определить собственную стратегию фильтрации для каждого слушателя, созданного фабрикой слушателей. В настоящее время я использую RecordFilterStrategy
для этого:
@Bean
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactoryProject() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(new RecordFilterStrategy<String, GenericRecord>() {
@Override
public boolean filter(ConsumerRecord<String, GenericRecord> consumerRecord) {
return true;
}
});
return factory;
}
Но такая фильтрация применяется ко всем слушателям, созданным этой фабрикой. Мне нужно что-то вроде определения различных логик c для каждого слушателя:
@Component
@SendTo("out")
@KafkaListener(topics = "incoming")
public class TestListener {
@Filter
public boolean filter(){
return true;
}
@KafkaHandler
public TestObject listener(TestObject testObject) {
log.debug("Received Message: " + testObject);
return testObject;
}
}
Есть ли у spring-kafka некоторые инструменты для этого? Или мне нужно написать такую логику c самостоятельно?
Заранее спасибо!