Kafka RecordFilterStrategy не фильтрует записи при использовании spring-kafaka ReplyingKafkaTemplate - PullRequest
0 голосов
/ 08 апреля 2020

Привет, у меня есть следующая конфигурация для ReplyingKafkaTemplate, и я хочу отфильтровать сообщение перед потребителем на основе correlationID, но по какой-то причине его фильтр не может подсказать, что с этим не так.

@Bean
public ConcurrentMessageListenerContainer<String, FireflyResponse> replyContainer() {
    ConcurrentKafkaListenerContainerFactory<String, FireflyResponse> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retry));
    factory.setRetryTemplate(retryTemplate);
    factory.setConcurrency(3);
    factory.setBatchListener(true);
    factory.setAckDiscarded(true);
     factory.setRecordFilterStrategy(new RecordFilterStrategy<String, FireflyResponse>() {
        @Override
        public boolean filter(ConsumerRecord<String, FireflyResponse> consumerRecord) {
            return consumerRecord.headers().lastHeader(KafkaHeaders.CORRELATION_ID) == null;
        }
    });
    return factory.createContainer(responseTopic);
}

@Bean
public ReplyingKafkaTemplate<String, FireflyRequest, FireflyResponse> kafkaTemplate(
    ConcurrentMessageListenerContainer<String, FireflyResponse> replyContainer) {
    ReplyingKafkaTemplate<String, FireflyRequest, FireflyResponse> template = new ReplyingKafkaTemplate<>(
        producerFactory(), replyContainer);
    template.setDefaultReplyTimeout(Duration.ofSeconds(connectionTimeout));
    template.setSharedReplyTopic(true);
    return template;
}

1 Ответ

0 голосов
/ 08 апреля 2020

Отвечающий шаблон ВСЕГДА устанавливает заголовок идентификатора корреляции ...

@Override
public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout) {
    Assert.state(this.running, "Template has not been start()ed"); // NOSONAR (sync)
    CorrelationKey correlationId = this.correlationStrategy.apply(record);
    Assert.notNull(correlationId, "the created 'correlationId' cannot be null");
    ...

Требуется сопоставить ответ с запросом.

РЕДАКТИРОВАТЬ

Похоже, вы пытаетесь отфильтровать ответ; это не поддерживается; фильтруются только запросы.

Просто верните null из прослушивателя, если вы не хотите отвечать.

...