Kafka Streams - Создание Windowed State Store - PullRequest
0 голосов
/ 13 ноября 2018

Приведенный ниже код «работает», но я запутался в значении значений, передаваемых в Stores.persistentWindowStore ().Я нашел документацию (https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore-java.lang.String-long-int-long-boolean-), но определение аргументов мне не понятно.

Должно ли значение windowBy () всегда соответствовать windowSize в persistentWindowStore ()?

Чтодолжен ли быть установлен период хранения? Политика хранения исходного раздела?

Что делает число сегментов?

Для чего сохраняются дубликаты? В документе, похоже, указано значение trueдля объединений?

long windowSize = TimeUnit.MINUTES.toMillis(15);
long retentionPeriod = windowSize*4*6 //6 hours
int numSegments = 2;
boolean retainDuplicates = false;

bdrStream.groupByKey().windowedBy(TimeWindows.of(windowSize))
    .aggregate(() -> Lists.newArrayList(),
        (aggKey, newValue, aggValue) -> {
            BdrData d = new BdrData();
            d.setCharge(newValue.getBdr().getCost());
            aggValue.add(d);
            return aggValue;
        },
        Materialized.<String, ArrayList<BdrData>>as(
            Stores.persistentWindowStore("store5", 
                retentionPeriod, 
                numSegments, 
                windowSize,
                retainDuplicates))
                .withKeySerde(Serdes.String())
                .withValueSerde(listBdrDataSerde))
    .toStream()
    .process(() -> new WindowAggregatorProcessor());

1 Ответ

0 голосов
/ 13 ноября 2018

Должно ли значение windowBy () всегда соответствовать windowSize в persistentWindowStore ()?

Да.

Какой период хранения должен быть установлен?Политика хранения исходной темы?

Она должна соответствовать сроку хранения окон, который можно указать с помощью Windows#until() (по умолчанию 1 день)

Что делает число сегментов?

Количество сегментов определяет, как истек срок годности грубых / мелкозернистых данных (т. Е. Старых окон).Размер сегмента будет «период хранения / (#segments + 1)».Обратите внимание, что большее количество сегментов обеспечивает более точное истечение срока действия данных, но увеличивает накладные расходы (каждый сегмент использует свой собственный экземпляр RocksDB)

Для чего сохраняются дубликаты?Кажется, в документе указано значение true для объединений?

По умолчанию ключи должны быть уникальными.Если вы включите сохранение дубликатов, вы можете хранить один и тот же ключ несколько раз.Включение дубликатов сопровождается падением производительности.

Примечание:

Эта часть API была переработана и упрощена в следующем выпуске 2.1.Сравните KIP-319 и KIP-328 для деталей.

...