Когда вы join
и используете JoinWindows.of(JOIN_WINDOW)
, вы неявно определили метаданные основного хранилища состояний.
Из Javadoc JoinWindows.of :
Указывает, что записи одного и того же ключа являются присоединяемыми, если их временные метки находятся в пределах timeDifference, то есть временная метка записи из вторичного потока имеет значение max timeDifference раньше или позже, чем временная метка записи из первичного потока.
Так называемый срок хранения (он же длительность сохранения окна ) был ранее (до Kafka Streams 2.1.0), заданным с помощью до :
Установите длительность сохранения окна (время хранения) в миллисекундах.Это время хранения является гарантированной нижней границей того, как долго будет поддерживаться окно.
Поскольку по умолчанию срок хранения составляет 1 день (в данный момент не удается найти ссылку), это является причинойисключение.
Начиная с Kafka Streams 2.1.0, вы должны использовать Материализованный API:
Используется для описания того, как должен создаваться StateStore.Вы можете предоставить пользовательский бэкэнд StateStore с помощью одного из предоставленных методов, принимающих поставщика, или использовать бэкэнды RocksDB по умолчанию, указав только имя магазина.
Materialized
дает вам полный контроль над базовымхранилище состояний для соединения и дает withRetention (java.time.Duration retention) :
Настройка периода хранения для оконных и сеансовых хранилищ.
Обратите внимание, что сохранениепериод должен быть по крайней мере достаточно длинным, чтобы содержать весь жизненный цикл оконных данных, от начала окна до конца окна, и для всего льготного периода.