Spring-Cloud-Stream обрабатывает сообщения kafka только после того, как хранилища состояний, созданные при запуске приложения, будут полностью заполнены и готовы - PullRequest
0 голосов
/ 09 мая 2019

Ссылаясь на это решение , файл My spring-cloud-stream application.yml имеет следующую конфигурацию:

#application.yml

spring.cloud.stream.bindings.input:
  destination: my-topic-name
  contentType: application/json
  consumer:
    useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.input:
  consumer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    materializedAs: my-store

Внутри моего основного приложения с классом, аннотированным @EnableBinding, и insideметод, аннотированный @StreamListener, я использую интеграцию DSL и Processor API kafka stream для доступа к хранилищам состояний, которые должны быть материализованы при запуске приложения из-за присутствия materializedAs в файле application.yml.

ReadOnlyKeyValueStore<Object, String> store;

 input.process(() -> new Processor<Object, Product>() {

                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");


                }

                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }

                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");

проблема заключается в том, что при первом запуске приложения хранилища состояний не были полностью заполнены и еще не готовы (например, пустое хранилище состояний), но сообщения все еще поступают раньше и обрабатываются топологией потока Kafka, дающей неожиданные результаты.

Как мы можем быть уверены, что при первом запуске приложения все или конкретные (пользователь может определить, если это возможно) хранилища состояний, которым было поручено материализоваться с использованием materializedAs в файле application.yml, полностью заполнены и готовыиспользовать до того, как любая топология обработки потока, определенная внутри @StreamListener, начнет обрабатывать входящие сообщения.Можем ли мы заставить потоковую обработку сообщений ждать, пока хранилища состояний не будут полностью заполнены при первом запуске приложения?

Я попытался повторить проблему, изменив один из spring-cloud-stream-sample и толкнул модифицированную версию сюда .Более подробное обсуждение этого также можно найти здесь

1 Ответ

0 голосов
/ 14 мая 2019

Похоже, вам нужно присоединиться к потоку с помощью GlobalKTable (хранилище состояний) вместо использования интерфейса процесса.Вот соединение Kstream GlobalKTable .Пожалуйста, попробуйте.

...