Для чтения данных в KStream я использую связку kafka из весеннего облака. При чтении данных из одной темы мне нужно читать с самого начала.
Я попытался установить свойства сброса и запуска смещения Кафки. Но не смог найти никаких ссылок.
Не могли бы вы помочь мне предоставить любой пример application.yaml для сброса смещения, чтобы я мог использовать сообщения из темы с начала
Добавление application.yaml, которое я использовал:
spring.cloud.stream.bindings.input:
destination: input-topic1
consumer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.bindings.output:
destination: output-topic
producer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.bindings.beginningInput:
destination: beginning-topic
consumer:
useNativeDecoding: true
headerMode: raw
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.beginningInput:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
resetOffsets: true
startOffset: earliest
spring.cloud.stream.kafka.streams.binder:
brokers: 127.0.0.1
zkNodes: 127.0.0.1
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 1000