Как остановить высокую нагрузку от сбоя контрольной точки Flink - PullRequest
2 голосов
/ 08 марта 2020

Несколько моментов, которые я добровольно предложу:

  1. Я новичок во Flink (работаю с ним около месяца)
  2. Я использую Kinesis Аналитика (AWS размещенное решение Flink). По общему мнению, это на самом деле не ограничивает универсальность Flink или опций отказоустойчивости, но я все равно буду ее называть.

У нас есть довольно прямолинейное приложение со скользящим окном. Поток с ключами организует события по определенному ключу, например, по IP-адресу, а затем обрабатывает их в ProcessorFunction. Мы в основном используем это, чтобы отслеживать количество вещей. Например, сколько логинов для определенного IP-адреса за последние 24 часа. Каждые 30 секунд мы подсчитываем события в окне для каждого ключа и сохраняем это значение во внешнем хранилище данных. Состояние также обновляется, чтобы отразить события в этом окне, так что старые события истекают и не занимают память.

Интересно, что кардинальность не является проблемой. Если в течение 24 часов у нас заходит 200 тыс. Человек, все идеально. Ситуация начинает раздражаться, когда один IP-адрес входит в систему 200 раз за 24 часа. На этом этапе контрольные точки начинают занимать все больше и больше времени. Средняя контрольная точка занимает 2-3 секунды, но при таком поведении пользователя контрольные точки начинают занимать 5 минут, затем 10, затем 15, затем 30, затем 40, et c et c.

Приложение может работать без сбоев некоторое время, что удивительно. Возможно 10 или 12 часов. Но рано или поздно контрольные точки полностью терпят неудачу, и затем наш максимальный возраст итератора начинает всплывать, и никакие новые события не обрабатываются и т. Д. c и c.

В этой точке я несколько раз пробовал :

  1. Бросать больше металла при проблеме (также включено автоматическое масштабирование)
  2. Суетиться с CheckpointingInterval и MinimumPauseBetweenCheckpoints https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CheckpointConfiguration.html
  3. Рефакторинг для уменьшения занимаемого нами состояния

(1) на самом деле мало что сделал. (2) Это, казалось, помогло, но затем еще один гораздо больший всплеск трафика c, чем то, что мы видели раньше, уничтожил любое из преимуществ (3) Неясно, помогло ли это. Я думаю, что объем памяти нашего приложения довольно мал по сравнению с тем, что вы можете себе представить от Yelp или Airbnb, которые используют кластеры Flink для массовых приложений, поэтому я не могу представить, что мое состояние действительно проблематично c.

Я скажу, что надеюсь, что нам не нужно сильно менять ожидания вывода приложения. Это скользящее окно - действительно ценная часть данных.

РЕДАКТИРОВАТЬ: Кто-то спросил о том, как выглядит мое состояние, это ValueState [FooState]

case class FooState(
                         entityType: String,
                         entityID: String,
                         events: List[BarStateEvent],
                         tableName: String,
                         baseFeatureName: String,
                       )

case class BarStateEvent(target: Double, eventID: String, timestamp: Long)

РЕДАКТИРОВАТЬ: Я хочу выделить то, что сказал пользователь Дэвид Андерсон в комментариях:

Один из подходов, иногда используемых для реализации скольжения windows, заключается в использовании MapState, где ключами являются временные метки для срезов, а значениями являются списки событий.

Это было существенный. Для всех, кто пытается идти по этому пути, я не смог найти работоспособного решения, которое бы не объединяло события в определенный промежуток времени. Мое окончательное решение состоит в том, чтобы объединить события в пакеты по 30 секунд, а затем записать их в состояние карты, как предложил Дэвид. Это, кажется, делает трюк. Для наших высоких периодов нагрузки контрольные точки остаются на 3 МБ, и они всегда заканчиваются sh в секунду.

1 Ответ

0 голосов
/ 08 марта 2020

Если у вас есть скользящее окно, которое длится 24 часа, и оно скользит на 30 секунд, то каждый логин назначается каждому из 2880 отдельных windows. Это верно, скользящий Флинк windows делает копии. В этом случае 24 * 60 * 2 копии.

Если вы просто подсчитываете события входа в систему, то нет необходимости фактически буферизовать события входа в систему до закрытия windows. Вместо этого вы можете использовать ReduceFunction для выполнения инкрементного агрегирования .

Я предполагаю, что вы не пользуетесь этой оптимизацией, и, следовательно, когда у вас есть горячая клавиша (IP-адрес), затем экземпляр, обрабатывающий эту горячую клавишу, имеет непропорциональный объем данных и занимает много времени для проверки.

С другой стороны, если вы уже выполняете инкрементное агрегирование и контрольные точки столь же проблематичны, как вы описали, 1034 *, то стоит попытаться понять, почему.

Одним из возможных исправлений будет реализация собственного скользящего windows с использованием ProcessFunction. Таким образом вы можете избежать поддержки 2880 отдельных windows и использовать более эффективную структуру данных.

РЕДАКТИРОВАТЬ (на основе обновленного вопроса):

Я думаю, что проблема заключается в следующем: Когда используя бэкэнд состояния RocksDB, состояние живет как сериализованные байты. Каждое состояние доступа и обновления должно go через ser / de. Это означает, что ваш List[BarStateEvent] подвергается десериализации и повторной сериализации каждый раз, когда вы его модифицируете. Для IP-адреса с 200k событий в списке это будет очень дорого.

Вместо этого вы должны использовать ListState или MapState. Эти типы состояний оптимизированы для RocksDB. Бэкэнд состояния RocksDB может добавляться к ListState без десериализации списка. А с MapState каждая пара ключ / значение на карте представляет собой отдельный объект RocksDB, позволяющий осуществлять эффективный поиск и модификации.

Один из подходов, иногда используемых для реализации скольжения windows, заключается в использовании MapState, где ключи - это временные метки для срезов, а значения - это списки событий.

Или, если ваше состояние может поместиться в памяти, вы можете использовать FsStateBackend. Тогда все ваше состояние будет объектами в куче JVM, и ser / de вступит в игру только во время контрольных точек и восстановления.

...