Конфигурация для потоков Кафки:
threads = 1;
replicationFactor = 1;
ktableCommitInterval= 10000;
ktableMemory=72000000;
timeDuration=10;
Топология:
KStream<Windowed<String>,String> windowedStringKStream =
streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(),Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(),Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(timeDuration)).grace(Duration.ofSeconds(0)))
.reduce(Numners::append,Materialized.<String, String, WindowStore<Bytes,byte[]>>as(storeName).withCachingEnabled().withRetention(Duration.ofSeconds(timeDuration)).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
.toStream();
Код Описание:
Code appends numbers in a 10 Second window. Incremental Number records are sent exactly at a interval of 1 second into input topic.
Выход:
Проблема:
Commit interval is set to 10 seconds. Cache size is set to 72 mb. Data is in bytes. State store has caching enabled. Documentation states that the operating semantics of kafka streams pushing data to downstream is dependent on cache size or commit interval whatever happens first. But as per the experiments, commit is happening two times in a minute. The observation is Commit interval is starting when application starts but windowing starts when data starts coming. As seen in the image, intermediate window results are pushed and final window results are also pushed.
Using Suppress() is not possible for the usecase i am working on as it will not flush the data if there is no new data coming on the topic.
Any help would be appreciated. If somebody is facing this or wants to reproduce this let me know.