Вот простое окно сеанса с использованием потоков Кафки:
stream
.groupBy()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)).grace(Duration.ofMinutes(0)))
.aggregate(...) // implementation of aggregate function
Используя следующий фрагмент кода, мы можем настроить хранилище состояний:
Materialized
.as(Stores.persistentSessionStore(storeName, Duration.ofHours(2))
.withCachingEnabled()
.withLoggingEnabled()
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
Документация состояния:
Обратите внимание, что период хранения должен быть как минимум достаточно длинным, чтобы содержать весь жизненный цикл оконных данных, от начала окна до конца окна, и для всего льготного периода.
Мы не применяем льготный период. Но рассмотрим такой сценарий: окно сеанса заканчивается до периода хранения, а разрыв неактивности заканчивается после периода хранения. Я хотел бы знать, есть ли вероятность потери данных сеанса? Насколько агрессивно проводится очистка?