Как активно удалить состояние в Flink после определенного промежутка времени? - PullRequest
0 голосов
/ 24 января 2019

В задании Flink я хочу удалить состояние в памяти через 24 часа после его создания. Я проверил это post и установил время жизни (ttl), но, как упоминалось в этой статье, удаление состояний лениво / пассивно, что может привести к утечке памяти.

Например, через 23 часа 57 минут я получил последнее сообщение для ключа («США», «Мужской», 2018), и после этого больше не приходит сообщение для этого ключа. Тогда я не смогу вызвать функцию и т. Д. Состояния для этой клавиши («США», «Мужской», 2018), тогда она будет сохранена в памяти навсегда.

Эта статья упоминается с использованием таймера: The idea is to register a timer with the TTL per state value and access. When the timer elapses, the state can be cleared if no other state access happened since the timer was registered. но я не могу понять, как это сделать.

Я думаю об использовании ProcessFunction, у которого есть метод onTimer(). Мой план состоит в том, чтобы зарегистрировать ProcessingTimeTimer в его методе open() и удалить состояние в onTimer(), но я не знаю, пассивен ли этот таймер, то есть он не сработает, если нет вызова ProcessFunction даже через 24 часа.

1 Ответ

0 голосов
/ 24 января 2019

Использование ProcessFunction для этого хорошая идея.ProcessFunction обязательно будет содержать состояние ключа, о котором идет речь, и будет в курсе всех операций чтения и записи в состояние, которые вы можете использовать для создания и удаления таймеров любым способом, который имеет смысл для вашего приложения.

Таймеры имеют ключ (так же, как состояние), и таймер времени обработки будет запускаться по расписанию независимо от активности потока или неактивности для этого ключа (или других ключей, в этом отношении).Если работа по какой-либо причине не выполняется в запланированное время, таймеры обработки, которые должны были сработать во время простоя, сработают при восстановлении задания.

...