В задании Flink я хочу удалить состояние в памяти через 24 часа после его создания. Я проверил это post и установил время жизни (ttl), но, как упоминалось в этой статье, удаление состояний лениво / пассивно, что может привести к утечке памяти.
Например, через 23 часа 57 минут я получил последнее сообщение для ключа («США», «Мужской», 2018), и после этого больше не приходит сообщение для этого ключа. Тогда я не смогу вызвать функцию и т. Д. Состояния для этой клавиши («США», «Мужской», 2018), тогда она будет сохранена в памяти навсегда.
Эта статья упоминается с использованием таймера:
The idea is to register a timer with the TTL per state value and access. When the timer elapses, the state can be cleared if no other state access happened since the timer was registered.
но я не могу понять, как это сделать.
Я думаю об использовании ProcessFunction
, у которого есть метод onTimer()
. Мой план состоит в том, чтобы зарегистрировать ProcessingTimeTimer
в его методе open()
и удалить состояние в onTimer()
, но я не знаю, пассивен ли этот таймер, то есть он не сработает, если нет вызова ProcessFunction
даже через 24 часа.