Queryble отфильтрованный KTable - PullRequest
1 голос
/ 31 мая 2019

Вот пример KTable, который я построил и который представляет собой простую агрегацию:

String name = stream
    .groupByKey()
    .aggregate(
        () -> new Aggregate(config),
        (key, value, aggregate) -> aggregate.addAndReturn(value),
        Materialized
            .<String, Aggregate>as(Stores.inMemoryKeyValueStore(config.OutputStore()))
            .withCachingEnabled()
            .withKeySerde(Serdes.String())
            .withValueSerde(CustomSerdes.ObjectSerde()))
    .filter(((key, value) -> value.isStateChanged()))
    .filter((key, value) -> !value.getRecentlyViewed().isEmpty())
    .queryableStoreName();

Что мне нужно сделать, это сохранить окончательный KTable (после применения фильтрации) вГосударственный магазин, а не начальный KTable.В настоящее время KTable.queryableStoreName() возвращает null.

Мое текущее решение - применить filter(), затем преобразовать в поток, используя KTable.toStream(), и, наконец, снова сохранить как KTable, что я считаю неэффективным.Есть ли другое решение

1 Ответ

0 голосов
/ 03 июня 2019

Вы можете применить материализацию KTable, указав запрашиваемое имя магазина:

.aggregate()
.filter(..., Materialized.as("your-custom-store-name"));

В зависимости от версии, которую вы используете, вам может понадобиться указать несколько обобщений для компиляции:

Materialized<KEY_TYPE, VALUE_TYPE, KeyValueStore<Bytes, byte[]>>.as("your-custom-store-name"))
...