Я пытаюсь получить сообщения из темы в Kafka через простое загрузочное приложение Spring через библиотеку spring-cloud-stream-binder-kafka-streams.
Я слежу за документацией здесь - https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/master/docs/src/main/asciidoc/kafka-streams.adoc
Мой код выглядит следующим образом: package co.zip.poc.springbootkafkakotlin
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.apache.kafka.streams.kstream.KStream
import org.springframework.context.annotation.Bean
import java.util.function.Consumer
@SpringBootApplication
class SpringBootKafkaConsumer {
@Bean
fun process(): Consumer<KStream<Any, Any>> {
println("=======================> Creating a consumer for reading stuff<=====================")
return Consumer { input -> input.foreach { key, value ->
println("============key = $key")
println("===========value = $value")
}}
}
}
fun main(args: Array<String>) {
runApplication<SpringBootKafkaConsumer>(*args)
}
application.yml
spring:
application:
name: customer-balance
cloud:
stream:
kafka:
streams:
binder:
configuration:
application:
id: customer-balance
bindings:
process_in:
destination: "dbserver1.inventory.customers"
logging:
level:
org.springframework.kafka.config: trace
Моя локальная кафка работает на порту9092. Я могу получать сообщения через консольный приемник kafka -
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
Выше 4 выводит на консоль сообщения. Но когда я запускаю приложение весенней загрузки, я ничего не печатаю для сообщений key = .. и value = print.
Что я делаю не так?