Я запускаю приложение потоков Kafka с оконной функцией.Но после 24 часов работы использование локального диска увеличилось с 5G до 20G и продолжает расти.Из того, что я гуглил, как только я ввел windowedBy
, он должен удалить старые данные автоматически.
Моя топология выглядит следующим образом:
stream.selectKey(selectKey A)
.groupByKey(..)
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
.reduce((value1,value2) -> value2)
.suppress.toStreams()
.selectKey(selectKey B).mapValues().filter()
.groupByKey().reduce.toStream().to()
Одна вещь, которую я не могу понять, это то, чтоВ этой топологии будут созданы две темы внутреннего перераспределения, такие как repartition-03
и repartition-14
для двух действий groupBy
.С диска все машины, на которых выполняются задачи repartition-03
, имеют высокую загрузку диска и, похоже, никогда не удаляют старые данные, в то время как машины, на которых выполняются задачи repartition-14
, всегда находятся под низким использованием диска.
Когда я захожу на машины, я нашел разные пути для этих двух машин, как показано ниже:
/tmp/kafka-streams/test-group/2_40/rocksdb/KSTREAM-REDUCE-STATE-STORE-0000000014
/tmp/kafka-streams/test-group/1_4/KSTREAM-REDUCE-STATE-STORE-0000000003/KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000
Почему у них разные пути?2_40
предназначен для repartition-14
задач и имеет rocksdb
в пути, в то время как другой не содержит rocksdb
.Тем временем, taks 1_4
хранит пару папок, например KSTREAM-REDUCE-STATE-STORE-0000000003.1568808000000
, но с другим суффиксом
Я думал, что однажды, введя функцию windowedBy, rockdb удалит старые данные после истечения срока действия окна?И почему выше две темы внутреннего перераспределения имеют разный путь и поведение хранения?
Любая помощь высоко ценится!Спасибо!