Обработка нескольких ответов с AggregatingReplyingKafkaTemplate - PullRequest
0 голосов
/ 29 марта 2020

У меня есть сценарий решения проблемы, аналогичный сценарию Обработка нескольких ответов с помощью ReplyingKafkaTemplate

Я понимаю, что AggregatingReplyingKafkaTemplate (с использованием spring-kafka-2.3.7) может помочь решить эту проблему, однако Я изо всех сил пытаюсь найти правильную конфигурацию бина и получить сценарий, в котором я могу агрегировать результаты от нескольких потребителей. Ниже приведен мой конфиг bean, однако я получаю сообщение об ошибке компиляции, в котором говорится, что не удается разрешить конструктор для GenericMessageListenerContainer

    @Bean
    public AggregatingReplyingKafkaTemplate<String, Model, Model> replyKafkaTemplate(ProducerFactory<String, Model> pf,
                                                                                     KafkaMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>> replyContainer,
                                                                                     BiPredicate<List<ConsumerRecord<String, Model>>, Boolean> releaseStrategy) {
        return new AggregatingReplyingKafkaTemplate<>(pf, replyContainer, releaseStrategy);
    }
@Bean
    public GenericMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>>  replyContainer(ConsumerFactory<String, Model> cf) {
        ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
        return new KafkaMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>>(cf,
                containerProperties);
    }

Любой простой пример использования AggregatingReplyingKafkaTemplate будет полезен.

Большое спасибо,

1 Ответ

0 голосов
/ 29 марта 2020

получая ошибку компиляции, чтобы сказать, что не может разрешить конструктор для GenericMessageListenerContainer

Где? Я не вижу, как вы пытаетесь его создать.

Ваш бин replyContainer должен возвращать KafkaMessageListenerContainer, учитывая, что это объявленный тип в фабричном методе replyKafkaTemplate.

Смотрите мой ответ на этот вопрос для примера.

Проще всего использовать ConcurrentKafkaListenerContainerFactory для создания контейнера. Если вы используете Spring Boot, он автоматически настроит его для вас; если нет, вам нужно будет настроить его как @Bean

...