непрерывное развертывание приложения flink с отслеживанием состояния apache на кубернетах - PullRequest
0 голосов
/ 06 августа 2020

Я хочу запустить потоковое приложение apache flink (1.11.1) на кубернетах. С сохранением состояния файловой системы в s3. Контрольная точка для s3 работает

args:
  - "standalone-job"
    - "-s"
    - "s3://BUCKET_NAME/34619f2862ce3e5fc91d80eae13a434a/chk-4/_metadata"
    - "--job-classname"
    - "com.abc.def.MY_JOB"
    - "--kafka-broker"
    - "KAFKA_HOST:9092"

Итак, проблема, с которой я столкнулся:

  • Мне нужно выбрать предыдущий каталог состояния вручную. Есть ли возможность сделать его лучше?
  • Задание увеличивает chk dir, но не использует контрольную точку. Означает, что я генерирую новое событие, когда впервые вижу событие, и сохраняю его в ListState<String> всякий раз, когда я развертываю через Gitlab более новую версию моего приложения, оно снова генерирует это событие.
  • Почему я должен явно включать контрольную точку в моем коде, когда я определил state.backend для файловой системы? env.enableCheckpointing(Duration.ofSeconds(60).toMillis()); и env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);

Ответы [ 2 ]

2 голосов
/ 06 августа 2020
  • Возможно, вам больше понравится Платформа Ververica: Community Edition , которая поднимает уровень абстракции до такой степени, что вам не нужно иметь дело с деталями на этом уровне. У него есть API, который был разработан с учетом CI / CD.
  • Я не уверен, что понимаю ваш второй момент, но это нормально, что ваша работа будет перематывать и повторно обрабатывать некоторые данные во время восстановления. Flink не гарантирует точно однократную обработку, а скорее семантику только один раз: каждое событие влияет на состояние, которым управляет Flink, ровно один раз. Это выполняется путем отката к смещениям в самой последней контрольной точке и отката всего другого состояния к тому, каким оно было после использования всех данных до этих смещений.
  • Наличие серверной части состояния - это необходимо как место для хранения рабочего состояния вашего задания во время его выполнения. Если вы не включите контрольную точку, то рабочее состояние не будет проверяться и не может быть восстановлено. Однако, начиная с Flink 1.11, вы можете включить контрольную точку через файл конфигурации, используя
execution.checkpointing.interval: 60000
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
0 голосов
/ 07 августа 2020

существует несколько способов развертывания рабочих нагрузок в кубернетах, простых файлах YAML, Helm Chart и Operator.

Обновление задания Flink с отслеживанием состояния не так просто, как обновление службы без отслеживания состояния, вам нужно только обновить двоичный файл и перезапустите.

Обновление задания Flink вам нужно взять точку сохранения или получить последний каталог контрольной точки, а затем обновить двоичный файл и, наконец, повторно отправить свое задание, в этом случае я думаю, что простые файлы YAML и Helm Chart не могут помочь Чтобы достичь этого, вам следует подумать о внедрении оператора Flink для выполнения обновления.

https://github.com/GoogleCloudPlatform/flink-on-k8s-operator

...