Spring Integration и Kafka: как фильтровать сообщения на основе заголовка сообщения - PullRequest
0 голосов
/ 28 февраля 2020

У меня есть вопрос, основанный на этом вопросе: Фильтрация сообщений перед десериализацией на основе заголовков

Я бы хотел отфильтровать по заголовку записи потребителя kafka с помощью Spring Integration DSL.

В настоящее время у меня есть этот поток:

@Bean
IntegrationFlow readTicketsFlow(KafkaProperties kafkaProperties,
                                ObjectMapper jacksonObjectMapper,
                                EventService<Ticket> service) {
    Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

    return IntegrationFlows.from(
            Kafka.messageDrivenChannelAdapter(
                    consumerFactory, TICKET_TOPIC))
            .transform(fromJson(Ticket.class, new Jackson2JsonObjectMapper(jacksonObjectMapper)))
            .handle(service)
            .get();
}

Как я могу зарегистрировать org.springframework.kafka.listener.adapter.RecordFilterStrategy в этом потоке?

1 Ответ

3 голосов
/ 28 февраля 2020

Вы можете просто добавить элемент .filter() в поток.

.filter("!'bar'.equals(headers['foo'])")

Будет отфильтровывать (игнорировать) любые сообщения с заголовком с именем foo, равным bar.

Примечание. Spring Kafka RecordFilterStrategy имеет обратный смысл фильтров Spring Integration

public interface RecordFilterStrategy<K, V> {

    /**
     * Return true if the record should be discarded.
     * @param consumerRecord the record.
     * @return true to discard.
     */
    boolean filter(ConsumerRecord<K, V> consumerRecord);

}

Фильтры Spring Integration отбрасывают сообщения, если фильтр возвращает false.

EDIT

Или вы можете добавить RecordFilterStrategy к адаптеру канала.

return IntegrationFlows
        .from(Kafka.messageDrivenChannelAdapter(consumerFactory(), TEST_TOPIC1)
                .recordFilterStrategy(record -> {
                    Header header = record.headers().lastHeader("foo");
                    return header != null ? new String(header.value()).equals("bar") : false;
                })
                ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...