Удержание Kafka Streams Rocksdb не удаляло старые данные с помощью оконной функции - PullRequest
0 голосов
/ 20 сентября 2019

Я запускаю приложение потоков Kafka с оконной функцией.Но после 24 часов работы использование локального диска увеличилось с 5G до 20G и продолжает расти.Из того, что я гуглил, как только я ввел windowedBy, он должен удалить старые данные автоматически.

Моя топология выглядит следующим образом:

stream.selectKey(selectKey A)
.groupByKey(..)
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
.reduce((value1,value2) -> value2)
.suppress.toStreams()
.selectKey(selectKey B).mapValues().filter()
.groupByKey().reduce.toStream().to()

Одна вещь, которую я не могу понять, это то, чтоВ этой топологии будут созданы две темы внутреннего перераспределения, такие как repartition-03 и repartition-14 для двух действий groupBy.С диска все машины, на которых выполняются задачи repartition-03, имеют высокую загрузку диска и, похоже, никогда не удаляют старые данные, в то время как машины, на которых выполняются задачи repartition-14, всегда находятся под низким использованием диска.

Когда я захожу на машины, я нашел разные пути для этих двух машин, как показано ниже:

/tmp/kafka-streams/test-group/2_40/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000014
/tmp/kafka-streams/test-group/1_4/KSTREAM-REDUCE-STATE-STORE-0000000003/KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000

Почему у них разные пути?2_40 предназначен для repartition-14 задач и имеет rocksdb в пути, в то время как другой не содержит rocksdb.Тем временем, taks 1_4 хранит пару папок, например KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000, но с другим суффиксом

Я думал, что однажды, введя функцию windowedBy, rockdb удалит старые данные после истечения срока действия окна?И почему выше две темы внутреннего перераспределения имеют разный путь и поведение хранения?

Любая помощь высоко ценится!Спасибо!

1 Ответ

1 голос
/ 22 сентября 2019

Срок хранения по умолчанию - 24 часа.Вы можете уменьшить его с помощью

.reduce(..., Materialized.with(...).withRetention(...));
...