Поток Kafka - определите политику хранения для журнала изменений - PullRequest
0 голосов
/ 12 февраля 2019

Я использую Kafka Streams для некоторых агрегатов TimeWindow.Меня интересует только конечный результат каждого окна, поэтому я использую функцию .suppress (), которая создает раздел журнала изменений для его состояния.

Конфигурация политики хранения для этого раздела журнала изменений определена как «компактная»."который, насколько я понимаю, сохранит по крайней мере последнее событие для каждого ключа в прошлом.

Проблема в моем приложении заключается в том, что ключи часто меняются.Это означает, что тема будет расти бесконечно (каждое окно будет приносить новые ключи, которые никогда не будут удалены).

Поскольку агрегация выполняется для каждого окна, после агрегации мне не нужно "старый""keys.

Есть ли способ сообщить Kafka Streams об удалении ключей из предыдущих окон?

В этом отношении я думаю, что настройка политики хранения тем журнала изменений на" compact, delete "подойдетзадание (которое доступно в кафке в соответствии с этим: KIP-71 , KAFKA-4015 .

Но возможно ли изменить политику хранения, используяAPI Kafka Streams?

1 Ответ

0 голосов
/ 12 февраля 2019
Оператор

suppress() отправляет сообщения-захоронения в раздел журнала изменений, если запись удалена из ее буфера и отправлена ​​в нисходящем направлении.Таким образом, вам не нужно беспокоиться о неограниченном росте темы.Изменение политики сжатия может фактически нарушить гарантии, которые предоставляет оператор, и вы можете потерять данные.

...