Вы можете просто добавить элемент .filter()
в поток.
.filter("!'bar'.equals(headers['foo'])")
Будет отфильтровывать (игнорировать) любые сообщения с заголовком с именем foo
, равным bar
.
Примечание. Spring Kafka RecordFilterStrategy
имеет обратный смысл фильтров Spring Integration
public interface RecordFilterStrategy<K, V> {
/**
* Return true if the record should be discarded.
* @param consumerRecord the record.
* @return true to discard.
*/
boolean filter(ConsumerRecord<K, V> consumerRecord);
}
Фильтры Spring Integration отбрасывают сообщения, если фильтр возвращает false.
EDIT
Или вы можете добавить RecordFilterStrategy
к адаптеру канала.
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(), TEST_TOPIC1)
.recordFilterStrategy(record -> {
Header header = record.headers().lastHeader("foo");
return header != null ? new String(header.value()).equals("bar") : false;
})
...