Мы агрегируем в окнах сеансов, используя следующий код:
.windowedBy(SessionWindows.with(...))
.aggregate(..., ..., ...)
Хранилище состояний, которое создается для нас автоматически, поддерживается разделом журнала изменений с cleanup.policy=compact
.
При повторном развертывании нашей топологии мы обнаружили, что восстановление хранилища состояний заняло гораздо больше времени, чем ожидалось (более 10 минут).Похоже, что объяснение таково, что хотя сессия была закрыта, она все еще присутствует в теме журнала изменений.
Мы заметили, что окна сеансов по умолчанию имеют продолжительность обслуживания один день, но даже после продолжительности бездействия + поддержкипревышено, не похоже, что сообщения удаляются из темы журнала изменений.
a) Нужно ли вручную удалять «старые» (по нашему определению) сообщения, чтобы держать размер темы журнала изменений под контролем?(Это может быть случай, на который намекает [1].)
b) Можно ли каким-то образом создать тему журнала изменений, созданную с помощью cleanup.policy=compact,delete
, и будет ли это вообще иметь смысл?
[1] Похоже, что хранилище сеансов создается внутри UnwindowedChangelogTopicConfig
Kafka Stream (а не WindowedChangelogTopicConfig
), что может сделать этот комментарий из Kafka Streams - сокращение объема памяти для больших хранилищ состояний :«Для хранилища без окон нет политики хранения. Основная тема сжата только. Таким образом, если вы знаете, что вам больше не нужна запись, вам нужно будет удалить ее через надгробную плиту. Но это немногосложно достичь ... - Матиас Дж. Сакс 27 июня '17 в 22:07 "