Вот пример KTable
, который я построил и который представляет собой простую агрегацию:
String name = stream
.groupByKey()
.aggregate(
() -> new Aggregate(config),
(key, value, aggregate) -> aggregate.addAndReturn(value),
Materialized
.<String, Aggregate>as(Stores.inMemoryKeyValueStore(config.OutputStore()))
.withCachingEnabled()
.withKeySerde(Serdes.String())
.withValueSerde(CustomSerdes.ObjectSerde()))
.filter(((key, value) -> value.isStateChanged()))
.filter((key, value) -> !value.getRecentlyViewed().isEmpty())
.queryableStoreName();
Что мне нужно сделать, это сохранить окончательный KTable
(после применения фильтрации) вГосударственный магазин, а не начальный KTable
.В настоящее время KTable.queryableStoreName()
возвращает null
.
Мое текущее решение - применить filter()
, затем преобразовать в поток, используя KTable.toStream()
, и, наконец, снова сохранить как KTable
, что я считаю неэффективным.Есть ли другое решение