Kafka Streams: очистка промежуточных оконных результатов, поскольку интервал фиксации и время окна не синхронизированы - PullRequest
0 голосов
/ 21 января 2020

Конфигурация для потоков Кафки:

threads = 1;
replicationFactor = 1;
ktableCommitInterval= 10000;
ktableMemory=72000000;
timeDuration=10;

Топология:

KStream<Windowed<String>,String> windowedStringKStream =
                         streamsBuilder.stream(inputTopic, Consumed.with(Serdes.String(),Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(),Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(timeDuration)).grace(Duration.ofSeconds(0)))
                        .reduce(Numners::append,Materialized.<String, String, WindowStore<Bytes,byte[]>>as(storeName).withCachingEnabled().withRetention(Duration.ofSeconds(timeDuration)).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
.toStream();

Код Описание:

Code appends numbers in a 10 Second window. Incremental Number records are sent exactly at a interval of 1 second into input topic.

Выход: Output topic results with timestamp from kafka tool. As we can see every alternate output is a intermediate window result

Проблема:

Commit interval is set to 10 seconds. Cache size is set to 72 mb. Data is in bytes. State store has caching enabled. Documentation states that the operating semantics of kafka streams pushing data to downstream is dependent on cache size or commit interval whatever happens first. But as per the experiments, commit is happening two times in a minute. The observation is Commit interval is starting when application starts but windowing starts when data starts coming. As seen in the image, intermediate window results are pushed and final window results are also pushed.

Using Suppress() is not possible for the usecase i am working on as it will not flush the data if there is no new data coming on the topic.

Any help would be appreciated. If somebody is facing this or wants to reproduce this let me know.

1 Ответ

1 голос
/ 24 января 2020

Единственным решением было бы создание пользовательского transform(), который реализует пользовательскую версию «подавления», которая позволяет генерировать данные, даже если не поступают новые входные данные (например, пунктуация времени на настенных часах может помочь в его реализации). ).

В настоящее время (начиная с версии Apache Kafka 2.4) встроенная поддержка отсутствует. Если вы не используете suppress(), оконная агрегация всегда может выдать некоторый промежуточный результат до того, как окно действительно закроется, и нет способа настроить потоки Kafka по-другому.

...