Ссылаясь на это решение , файл My spring-cloud-stream application.yml имеет следующую конфигурацию:
#application.yml
spring.cloud.stream.bindings.input:
destination: my-topic-name
contentType: application/json
consumer:
useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
materializedAs: my-store
Внутри моего основного приложения с классом, аннотированным @EnableBinding, и insideметод, аннотированный @StreamListener, я использую интеграцию DSL и Processor API kafka stream для доступа к хранилищам состояний, которые должны быть материализованы при запуске приложения из-за присутствия materializedAs в файле application.yml.
ReadOnlyKeyValueStore<Object, String> store;
input.process(() -> new Processor<Object, Product>() {
@Override
public void init(ProcessorContext processorContext) {
store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");
}
@Override
public void process(Object key, Object value) {
//find the key
store.get(key);
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "my-store");
проблема заключается в том, что при первом запуске приложения хранилища состояний не были полностью заполнены и еще не готовы (например, пустое хранилище состояний), но сообщения все еще поступают раньше и обрабатываются топологией потока Kafka, дающей неожиданные результаты.
Как мы можем быть уверены, что при первом запуске приложения все или конкретные (пользователь может определить, если это возможно) хранилища состояний, которым было поручено материализоваться с использованием materializedAs в файле application.yml, полностью заполнены и готовыиспользовать до того, как любая топология обработки потока, определенная внутри @StreamListener, начнет обрабатывать входящие сообщения.Можем ли мы заставить потоковую обработку сообщений ждать, пока хранилища состояний не будут полностью заполнены при первом запуске приложения?
Я попытался повторить проблему, изменив один из spring-cloud-stream-sample и толкнул модифицированную версию сюда .Более подробное обсуждение этого также можно найти здесь