Да, вы можете использовать ту же технику, что и ErrorHandlingDeserializer2
(которая заменяет ErrorHandlingDeserializer
), чтобы вернуть объект «маркер» вместо десериализации, а затем добавить RecordFilterStrategy
, который фильтрует записи с такимиобъекты, к слушателю (фабрика контейнеров при использовании @KafkaListener
или использование адаптера фильтрации для явного слушателя).
EDIT
Spring Boot и добавление фильтра...
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
kafkaConsumerFactory.setsetRecordFilterStrategy(myFilter());
return factory;
}