Я пытаюсь использовать RecordFilterStrategy без использования аннотации слушателя Kafka.
ConcurrentKafkaListenerContainerFactory<String, Task> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory);
factory.setRecordFilterStrategy((consumerRecord) -> <My filter logic>);
container = factory.createContainer(topicName);
container.setupMessageListener((MessageListener<String, Task>) data -> {
messageListener.onMessage(data);
count++;
});
container.start();
Но при отладке кажется, что стратегия фильтра записей никогда не используется.
Есть какие-нибудь рекомендации?