Apache Flink - обработка дублирующихся сообщений во время развертываний заданий, с ActiveMQ в качестве источника - PullRequest
1 голос
/ 05 марта 2020

Учитывая,

У меня есть задание Flink, которое читает из источника ActiveMQ и записывает в базу данных mysql с ключом идентификатора. Я включил контрольные точки для этой работы каждую секунду. Я указываю контрольные точки на экземпляр Minio, я проверил, что контрольные точки работают с jobid. Я развертываю эту работу как Openshift (под Kubernetes) - я могу увеличивать / уменьшать эту работу по мере необходимости.

Проблема

Когда задание развернуто (выполняется) или задание было прервано из-за ошибки / ошибки, а также в случае наличия каких-либо неиспользованных сообщений в ActiveMQ или неподтвержденных сообщения в Flink (но записанные в базу данных), когда задание восстанавливается (или развертывается новое задание), процесс обрабатывает уже обработанные сообщения, в результате чего дубликаты записей вставляются в базу данных.

Вопрос

  • Разве контрольные точки не должны помочь заданию восстановиться с того места, где оно осталось?
  • Должен ли я взять контрольную точку до того, как я (развертывание) разверную новую работу?
  • Что произойдет, если работа завершится из-за ошибки или сбоя кластера?
  • Как jobid постоянно меняется при каждом развертывании, как происходит восстановление?
  • Редактировать Поскольку я не могу ожидать идемпотентности от базы данных, чтобы избежать дубликатов, сохраненных в базу данных (Exactly-Once), можно ли написать запрос спецификации базы данных c (upsert) для обновления, если данная запись присутствует, и вставить, если нет?

1 Ответ

0 голосов
/ 05 марта 2020

JDB C в настоящее время поддерживает только один раз, что означает, что вы получите повторяющиеся сообщения после восстановления. В настоящее время существует черновой вариант добавления поддержки ровно один раз , который, вероятно, будет выпущен с 1.11.

Разве контрольные точки не должны помочь восстановлению задания с того места, где оно осталось?

Да, но время между последними успешными контрольными точками и восстановлением может привести к наблюдаемым дубликатам. Я дал более подробный ответ по несколько связанным темам c.

Должен ли я пройти контрольную точку до того, как я (развернут) развернул новое задание?

Абсолютно. На самом деле вы должны использовать отмену с точкой сохранения. Это единственный надежный способ изменить топологию. Кроме того, отмена с помощью точек сохранения позволяет избежать дублирования данных, поскольку оно корректно завершает работу.

Что произойдет, если работа завершится с ошибкой или сбоем кластера?

It должен автоматически перезагрузиться (в зависимости от настроек перезапуска). Это будет использовать последнюю контрольную точку для восстановления. Это наверняка приведет к дубликатам.

Как происходит восстановление при каждом развертывании, как происходит восстановление?

Обычно вы явно указываете на один и тот же каталог контрольных точек (на S3?).

Поскольку я не могу ожидать идемпотентность от базы данных, разве это единственный способ добиться обработки точно-однократно?

В настоящее время я этого не делаю увидеть способ обойти это. Должно измениться с 1.11.

...