Как определить, что хранилище ключей обновлено в Spring Cloud Stream API - PullRequest
0 голосов
/ 23 ноября 2018

Я создаю материализованное представление из темы с помощью функции агрегирования из API весеннего облачного потока.Это выглядит следующим образом:

public void process(KStream<Object, Object> input){
input
  .peek((key, value) ->{...}
  .map((key, value) -> {...}
  .groupByKey()
  .windowedBy(TimeWindows.of(5000))
  .aggregate(Initializer, Aggregator, Materialized)

Затем я запрашиваю мое созданное хранилище состояний с помощью:

 ReadOnlyWindowStore<Object, Object> windowStore =
  queryService.getQueryableStoreType("test", QueryableStoreTypes.windowStore());

Теперь мой вопрос: как я могу определить, что это хранилище состояний обновилось после того, как новое событие былообрабатывается методом процесса?Это какое-то событие, которое я могу слушать или могу создать?

1 Ответ

0 голосов
/ 23 ноября 2018

Ваша программа:

input
  .peek((key, value) ->{...}
  .map((key, value) -> {...}
  .groupByKey()
  .windowedBy(TimeWindows.of(5000))
  .aggregate(Initializer, Aggregator, Materialized)

Фактически, последний aggregate() возвращает объект KTable.Если вы отключите кэширование через Materialized, вы сможете получать информацию о каждом обновлении до KTable через:

input
  .peek((key, value) ->{...}
  .map((key, value) -> {...}
  .groupByKey()
  .windowedBy(TimeWindows.of(5000))
  .aggregate(Initializer, Aggregator, Materialized) // disable caching via Materialized
  .toStream()
  .foreach(...) // react to every update to the KTable
...