Весенний ботинок Kafka для потребителя - PullRequest
1 голос
/ 01 июля 2019

Используя пружинную интеграцию Kafka dsl, мне интересно, почему слушатель не получает сообщения?Но то же самое приложение. Если я заменю DSL весенней интеграции на метод, аннотированный с помощью KafkaListener, он сможет нормально использовать сообщения.Что мне не хватает в DSL?

Код DSL, который не использует:

@Configuration
@EnableKafka
class KafkaConfig {
    //consumer factory provided by Spring boot
    @Bean
    IntegrationFlow inboundKafkaEventFlow(ConsumerFactory consumerFactory) {
        IntegrationFlows
                .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory, "kafkaTopic")
                .configureListenerContainer({ c -> c.groupId('kafka-consumer-staging') })
                .id("kafkaTopicListener").autoStartup(true)
        )

                .channel("logChannel")
                .get()
    }
}

logChannel (или любой другой канал, который я использую), не отражает входящие сообщения.

Вместо приведенного выше кода, если я использую обычный слушатель, он отлично работает для приема сообщений.

@Component
class KafkaConsumer {
    @KafkaListener(topics = ['kafkaTopic'], groupId = 'kafka-consumer-staging')
    void inboundKafkaEvent(String message) {
        log.debug("message is {}", message)
    }
}

Оба подхода используют одинаковые application.properties для потребителя Kafka.

1 Ответ

1 голос
/ 01 июля 2019

Вам не хватает того факта, что вы используете Spring Integration, но вы не включили его в своем приложении. Вам не нужно делать это для Кафки, так как вы не собираетесь использовать его с @KafkaListener. Итак, чтобы включить инфраструктуру Spring Integration, вам нужно добавить @EnableIntegration в ваш @Configuration класс: https://docs.spring.io/spring-integration/docs/5.1.6.RELEASE/reference/html/#configuration-enable-integration

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...