Как использовать интерактивный запрос в топологии процесса kafka в spring-cloud-stream? - PullRequest
0 голосов
/ 01 мая 2019

Можно ли использовать интерактивный запрос (InteractiveQueryService) в Spring Cloud Stream для класса с аннотацией @EnableBinding или в методе с @StreamListener? Я попытался создать экземпляр ReadOnlyKeyValueStore в предоставленном KStreamMusicSampleApplication классе и методе процесса, но его значение всегда равно нулю.

Мой метод @StreamListener прослушивает несколько KTables и KStreams, и во время топологии процесса, например, фильтрации, я должен проверить, существует ли ключ от KStream в конкретном KTable.

Я попытался выяснить, как сканировать входящий KTable, чтобы проверить, существует ли ключ, но не повезло. Затем я наткнулся на InteractiveQueryService, чей метод get () можно было бы использовать для проверки того, существует ли ключ в хранилище состояний materializedAs из KTable. Проблема в том, что я не могу получить к нему доступ с помощью топологии процесса (@EnableBinding или @StreamListener). Доступ к ним можно получить только снаружи этой аннотации, например, RestController.

Есть ли способ сканировать входящий KTable, чтобы проверить наличие ключа или значения? если нет, то можем ли мы получить доступ к InteractiveQueryService в рамках топологии процесса?

1 Ответ

2 голосов
/ 01 мая 2019

InteractiveQueryService в Spring Cloud Stream недоступно для использования в рамках фактической топологии в StreamListener. Как вы упомянули, он должен использоваться за пределами вашей основной топологии. Однако в описанном вами случае использования вы все еще можете использовать хранилище состояний из своего основного потока. Например, если у вас есть входящий KStream и KTable, который материализован как хранилище состояний, то вы можете позвонить process на KStream и получить доступ к хранилищу состояний таким образом. Вот приблизительный код для достижения этой цели. Вы должны преобразовать это, чтобы соответствовать вашему конкретному случаю использования, но вот идея.

ReadOnlyKeyValueStore<Object, String> store;

 input.process(() -> new Processor<Object, Product>() {

                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");


                }

                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }

                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");
...