Приведенный ниже код «работает», но я запутался в значении значений, передаваемых в Stores.persistentWindowStore ().Я нашел документацию (https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/state/Stores.html#persistentWindowStore-java.lang.String-long-int-long-boolean-), но определение аргументов мне не понятно.
Должно ли значение windowBy () всегда соответствовать windowSize в persistentWindowStore ()?
Чтодолжен ли быть установлен период хранения? Политика хранения исходного раздела?
Что делает число сегментов?
Для чего сохраняются дубликаты? В документе, похоже, указано значение trueдля объединений?
long windowSize = TimeUnit.MINUTES.toMillis(15);
long retentionPeriod = windowSize*4*6 //6 hours
int numSegments = 2;
boolean retainDuplicates = false;
bdrStream.groupByKey().windowedBy(TimeWindows.of(windowSize))
.aggregate(() -> Lists.newArrayList(),
(aggKey, newValue, aggValue) -> {
BdrData d = new BdrData();
d.setCharge(newValue.getBdr().getCost());
aggValue.add(d);
return aggValue;
},
Materialized.<String, ArrayList<BdrData>>as(
Stores.persistentWindowStore("store5",
retentionPeriod,
numSegments,
windowSize,
retainDuplicates))
.withKeySerde(Serdes.String())
.withValueSerde(listBdrDataSerde))
.toStream()
.process(() -> new WindowAggregatorProcessor());