Размер состояния окна Flink и управление состоянием - PullRequest
3 голосов
/ 19 марта 2019

Прочитав документацию по flink и обыскав вокруг, я не мог полностью понять, как дескрипторы flink находятся в его окнах.Допустим, у меня есть почасовое окно с функцией агрегации, которая накапливает сообщения в некоторый класс java pojo или scala.Будет ли размер этого окна привязан к количеству событий, поступающих в это окно за один час, или он будет просто привязан к классу pojo / case, так как я собираю события в этом объекте.(Например, если считать 10000 msgs в целое число, будет ли размер близок к 10000 * msg size или size для int?) Также, если я использую pojos или case-классы, flink обрабатывает состояние для меня (выливается на диск, если памятьисчерпан / сохраняет состояние в контрольных точках и т. д.) или я должен использовать для этого объекты состояния flink?

Спасибо за вашу помощь!

1 Ответ

6 голосов
/ 19 марта 2019

Размер состояния окна зависит от типа функции, которую вы применяете.Если вы применяете ReduceFunction или AggregateFunction, поступающие данные немедленно агрегируются, и окно содержит только агрегированное значение.Если вы применяете ProcessWindowFunction или WindowFunction, Flink собирает все входные записи и применяет функцию, когда время (время события или обработки в зависимости от типа окна) превышает время окончания окна.

Вы также можете комбинировать оба типа функций, т. Е. Иметь AggregateFunction, а затем ProcessWindowFunction.В этом случае поступающие записи немедленно агрегируются, а когда окно закрывается, результат агрегации передается в виде ProcessWindowFunction как одно значение.Это полезно, потому что у вас есть инкрементная агрегация (из-за ReduceFunction / AggregateFunction), но также есть доступ к метаданным окна, таким как метка времени начала и конца (из-за ProcessWindowFunction).

Как управляется состояниезависит от выбранного состояния бэкэнда.Если вы сконфигурируете FsStateBackend, все локальное состояние будет сохранено в куче TaskManager, а процесс JVM будет остановлен с помощью OutOfMemoryError, если состояние станет слишком большим.Если вы сконфигурируете состояние RocksDBStateBackend, оно будет отправлено на диск.Это связано с расходами на де / сериализацию для каждого доступа к состоянию, но дает гораздо больше памяти для состояния.

...