Как Flink очищает состояние ключа? - PullRequest
0 голосов
/ 29 апреля 2020

Когда я размышляю над действием ключа, я обычно думаю о аналогии, когда все события, соответствующие ключу, выбрасываются в одно и то же ведро. Как вы можете себе представить, когда приложение Flink начинает обрабатывать большое количество данных, то, что вы выбираете для ввода, начинает становиться важным, потому что вы хотите убедиться, что вы хорошо очищаете состояние. Это приводит меня к моему вопросу, как именно Флинк очищает эти "ведра"? Если корзина пуста (все MapStates и ValueStates пусты), Flink закрывает эту область ключевого пространства и удаляет корзину?

Пример:

Формат входящих данных: {userId, computerId, amountOfTimeLoggedOn}

Ключ: UserId / ComputerId

Текущее пространство ключа:

  • Алиса, компьютер 10: в нем 2 события. Оба события хранятся в состоянии.
  • Боб, компьютер 11: в нем нет событий. Ничто не сохраняется в состоянии.

Придет ли Флинк и в конце концов удалит Боба, Компьютер 11 из Ключевого пространства, или он просто живет вечно, потому что в какой-то момент в нем было событие?

Ответы [ 2 ]

1 голос
/ 30 апреля 2020

Flink не хранит никаких данных для ключей состояния, которые не имеют никакого пользовательского значения, связанного с ними, по крайней мере, в существующих бэкэндах состояния: Heap (в памяти) или RocksDB.

Пространство ключей является виртуальным в Flink Flink не делает никаких предположений о том, какие конкретные ключи могут потенциально существовать. Для каждого ключа или подмножества ключей нет заранее выделенных сегментов. Только когда пользовательское приложение записывает какое-то значение для некоторого ключа, оно занимает хранилище.

Общая идея состоит в том, что все записи с одним и тем же ключом обрабатываются на одной и той же машине (что-то вроде того, как вы говорите) , Локальное состояние для определенного ключа также всегда сохраняется на одном компьютере (если оно вообще сохраняется). Это не относится к контрольным точкам.

Например, если какое-то значение было записано для [Bob, Computer 11] в какой-то момент времени, а затем впоследствии удалено, Flink полностью удалит его с помощью ключа.

0 голосов
/ 01 мая 2020

Короткий ответ

Он очищается с помощью функции Time To Live (TTL) функции состояния Flink и Java Сборщик мусора (G C) . Функция TTL удалит любую ссылку на запись состояния, а G C вернет выделенную память.

Длинный ответ

Ваш вопрос можно разделить на 3 подвопроса:

Я постараюсь быть максимально кратким.

Как Flink разбивает данные на основе ключа?

Для оператора через поток с ключом Flink разбивает данные на ключ с помощью согласованного алгоритма хеширования . Создает max_parallelism количество сегментов. Каждому экземпляру оператора назначается одно или несколько из этих сегментов. Всякий раз, когда данные должны отправляться в нисходящем направлении, ключ назначается одному из этих сегментов и, следовательно, отправляется соответствующему экземпляру оператора. Здесь ключ не сохраняется, поскольку диапазоны рассчитываются математически. Следовательно, ни одна область не очищается или корзина не удаляется в любое время . Вы можете создать любой тип ключа, который вы хотите. Это не повлияет на память с точки зрения пространства ключей или диапазонов.

Как Flink сохраняет состояние с помощью ключа?

Все экземпляры операторов имеют хранилище состояний уровня экземпляра. Это хранилище определяет контекст состояния этого экземпляра оператора и может хранить несколько хранилищ именованных состояний, например, «count», «sum», «some-name» et c. Эти хранилища named-state являются хранилищами Key-Value, которые могут хранить значения на основе ключа данных.

Эти хранилища KV создаются, когда мы инициализируем состояние с помощью дескриптора состояния в функции оператора open(). то есть getRuntimeContext().getValueState().

Эти хранилища KV будут хранить данные только тогда, когда что-то необходимо сохранить в состоянии. (как HashMap.put(k,v)). Таким образом, ключ или значение не сохраняются, пока не будут вызваны методы обновления состояния (например, update, add, put).

Итак,

  • Если Флинк не видел ключ, для этого ключа ничего не сохраняется.
  • Если Flink видел ключ, но не вызывал методы обновления состояния, для этого ключа ничего не сохраняется.
  • Если для ключа вызывается метод обновления состояния, значение ключа пара будет сохранена в хранилище KV.

Как Flink очищает состояние для ключа?

Flink не удаляет состояние, если это не требуется пользователем или не выполнено пользователем вручную. Как упоминалось ранее, Flink имеет функцию TTL для государства. Этот TTL отметит истечение срока действия состояния и удалит его при вызове стратегии очистки. Эти стратегии очистки различаются в зависимости от типа бэкэнда и времени очистки. Для Backend состояния кучи, он удалит запись из таблицы состояний, т.е. удалит любую ссылку на запись. Память, занятая этой несвязанной записью, будет очищена Java G C. Для бэкэнда состояния RocksDB он просто вызывает собственный метод delete для RocksDB.

...