Несколько моментов, которые я добровольно предложу:
- Я новичок во Flink (работаю с ним около месяца)
- Я использую Kinesis Аналитика (AWS размещенное решение Flink). По общему мнению, это на самом деле не ограничивает универсальность Flink или опций отказоустойчивости, но я все равно буду ее называть.
У нас есть довольно прямолинейное приложение со скользящим окном. Поток с ключами организует события по определенному ключу, например, по IP-адресу, а затем обрабатывает их в ProcessorFunction. Мы в основном используем это, чтобы отслеживать количество вещей. Например, сколько логинов для определенного IP-адреса за последние 24 часа. Каждые 30 секунд мы подсчитываем события в окне для каждого ключа и сохраняем это значение во внешнем хранилище данных. Состояние также обновляется, чтобы отразить события в этом окне, так что старые события истекают и не занимают память.
Интересно, что кардинальность не является проблемой. Если в течение 24 часов у нас заходит 200 тыс. Человек, все идеально. Ситуация начинает раздражаться, когда один IP-адрес входит в систему 200 раз за 24 часа. На этом этапе контрольные точки начинают занимать все больше и больше времени. Средняя контрольная точка занимает 2-3 секунды, но при таком поведении пользователя контрольные точки начинают занимать 5 минут, затем 10, затем 15, затем 30, затем 40, et c et c.
Приложение может работать без сбоев некоторое время, что удивительно. Возможно 10 или 12 часов. Но рано или поздно контрольные точки полностью терпят неудачу, и затем наш максимальный возраст итератора начинает всплывать, и никакие новые события не обрабатываются и т. Д. c и c.
В этой точке я несколько раз пробовал :
- Бросать больше металла при проблеме (также включено автоматическое масштабирование)
- Суетиться с CheckpointingInterval и MinimumPauseBetweenCheckpoints https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CheckpointConfiguration.html
- Рефакторинг для уменьшения занимаемого нами состояния
(1) на самом деле мало что сделал. (2) Это, казалось, помогло, но затем еще один гораздо больший всплеск трафика c, чем то, что мы видели раньше, уничтожил любое из преимуществ (3) Неясно, помогло ли это. Я думаю, что объем памяти нашего приложения довольно мал по сравнению с тем, что вы можете себе представить от Yelp или Airbnb, которые используют кластеры Flink для массовых приложений, поэтому я не могу представить, что мое состояние действительно проблематично c.
Я скажу, что надеюсь, что нам не нужно сильно менять ожидания вывода приложения. Это скользящее окно - действительно ценная часть данных.
РЕДАКТИРОВАТЬ: Кто-то спросил о том, как выглядит мое состояние, это ValueState [FooState]
case class FooState(
entityType: String,
entityID: String,
events: List[BarStateEvent],
tableName: String,
baseFeatureName: String,
)
case class BarStateEvent(target: Double, eventID: String, timestamp: Long)
РЕДАКТИРОВАТЬ: Я хочу выделить то, что сказал пользователь Дэвид Андерсон в комментариях:
Один из подходов, иногда используемых для реализации скольжения windows, заключается в использовании MapState, где ключами являются временные метки для срезов, а значениями являются списки событий.
Это было существенный. Для всех, кто пытается идти по этому пути, я не смог найти работоспособного решения, которое бы не объединяло события в определенный промежуток времени. Мое окончательное решение состоит в том, чтобы объединить события в пакеты по 30 секунд, а затем записать их в состояние карты, как предложил Дэвид. Это, кажется, делает трюк. Для наших высоких периодов нагрузки контрольные точки остаются на 3 МБ, и они всегда заканчиваются sh в секунду.