Используя пружинную интеграцию Kafka dsl, мне интересно, почему слушатель не получает сообщения?Но то же самое приложение. Если я заменю DSL весенней интеграции на метод, аннотированный с помощью KafkaListener, он сможет нормально использовать сообщения.Что мне не хватает в DSL?
Код DSL, который не использует:
@Configuration
@EnableKafka
class KafkaConfig {
//consumer factory provided by Spring boot
@Bean
IntegrationFlow inboundKafkaEventFlow(ConsumerFactory consumerFactory) {
IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory, "kafkaTopic")
.configureListenerContainer({ c -> c.groupId('kafka-consumer-staging') })
.id("kafkaTopicListener").autoStartup(true)
)
.channel("logChannel")
.get()
}
}
logChannel (или любой другой канал, который я использую), не отражает входящие сообщения.
Вместо приведенного выше кода, если я использую обычный слушатель, он отлично работает для приема сообщений.
@Component
class KafkaConsumer {
@KafkaListener(topics = ['kafkaTopic'], groupId = 'kafka-consumer-staging')
void inboundKafkaEvent(String message) {
log.debug("message is {}", message)
}
}
Оба подхода используют одинаковые application.properties для потребителя Kafka.