Мигание контрольных точек вызывает противодавление - PullRequest
1 голос
/ 19 апреля 2020

У меня есть данные для обработки задания Flink со скоростью около 200 тыс. Кадров в секунду. Без контрольных точек работа работает нормально. Но когда я попытался добавить контрольные точки (с интервалом 50 минут), это вызывает обратное давление при первой задаче, которая добавляет ключевое поле к каждой записи, отставание данных также постоянно увеличивается. Отставание двух моих тем о Кафке, в первой половине были включены контрольные точки, отставание идет очень быстро. вторая часть (при очень низкой задержке были отключены контрольные точки, где отставание находится в пределах миллисекунд) enter image description here

Я использую по крайней мере once checkpoint mode, что должно быть асинхронным процессом. Кто-нибудь может предложить? Моя настройка контрольной точки

    env.enableCheckpointing(1800000,
          CheckpointingMode.AT_LEAST_ONCE);
      env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
      env.getCheckpointConfig()
          .enableExternalizedCheckpoints(
              CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
      env.getCheckpointConfig()
          .setCheckpointTimeout(10min);
      env.getCheckpointConfig()
          .setFailOnCheckpointingErrors(
              jobConfiguration.getCheckpointConfig().getFailOnCheckpointingErrors());

моя работа содержит 128 контейнеров.

Со временем контрольной точки 10 минут следующая статистика: enter image description here

I я пытаюсь использовать контрольную точку 30 минут и вижу

Я пытался настроить использование памяти, но, похоже, не работает. my settings

Но в диспетчере задач все равно: enter image description here

Ответы [ 2 ]

4 голосов
/ 24 апреля 2020

TLDR; иногда трудно проанализировать проблему. У меня есть две удачные догадки / снимки - если вы используете бэкэнд состояния RocksDB, вы можете переключиться на FsStateBackend - обычно это быстрее, и RocksDB имеет больше смысла с большими размерами состояний, которые не помещаются в память (или если вам действительно нужна функция дополнительных контрольных точек ). Во-вторых, возиться с параллелизмом, увеличивающимся или уменьшающимся.

Я бы заподозрил то же, что написал @ArvidHeise. У вас размер контрольной точки не велик, но он также не тривиален. Это может добавить дополнительные накладные расходы, чтобы перенести работу за порог едва поспевания за трафиком c, чтобы не отставать и вызывать противодавление. Если вы находитесь под обратным давлением, задержка будет только накапливаться, поэтому даже изменение пары% дополнительных служебных данных может иметь значение между сквозными задержками в миллисекундах и неограниченным постоянно растущим значением.

Если вы можете не просто добавить больше ресурсов, вы должны проанализировать, что именно добавляет эти дополнительные издержки и какой ресурс является узким местом.

  1. Это процессор? Проверьте использование процессора в кластере. Если это ~ 100%, это то, что вам нужно оптимизировать.
  2. Это IO? Проверьте использование ввода-вывода в кластере и сравните его с максимальной пропускной способностью / числом запросов в секунду, которое вы можете достичь.
  3. Если использование ЦП и ввода-вывода низкое, вы можете попытаться увеличить параллелизм, но. ..
  4. Имейте в виду перекос данных. Противодавление может быть вызвано одной задачей, и в этом случае сложно проанализировать проблему, так как это будет один узкий поток (на IO или CPU), а не целая машина.

После выяснения того, какой ресурс является узким местом, возникает следующий вопрос: почему? Это может быть сразу видно, как только вы его увидите, или может потребоваться копаться, например, проверять журналы G C, прикреплять profiler et c.

Ответы на эти вопросы могут либо дать вам информацию о том, что вы можете попытаться оптимизировать в своей работе, либо позволить вам настроить конфигурацию, либо дать нам (разработчикам Flink) дополнительную точку данных, которую мы могли бы попытаться оптимизировать на Flink. сторона.

2 голосов
/ 21 апреля 2020

Любой тип контрольных точек добавляет вычислительные затраты. Большая часть контрольных точек выполняется асинхронно (как вы уже сказали), но все равно добавляет общие операции ввода-вывода. Этот дополнительный запрос ввода / вывода может, например, ограничить ваш доступ к внешним системам. Также, если вы включите контрольные точки, Flink необходимо отслеживать больше информации (новые или уже проверенные).

Вы пытались добавить больше ресурсов к своей работе? Не могли бы вы поделиться всей своей конфигурацией контрольных точек?

...