Отказоустойчивость в файле Flink Sink - PullRequest
1 голос
/ 22 апреля 2020

Я использую потоковую передачу Flink с потребительским соединителем Kafka (FlinkKafkaConsumer) и файлом Sink (StreamingFileSink) в режиме кластера с единовременной политикой. Приемник файлов записывает файлы на локальный диск. Я заметил, что в случае сбоя задания и автоматического перезапуска c диспетчеры задач ищут оставшиеся файлы последнего сбойного задания (скрытые файлы). Очевидно, что поскольку задачи могут назначаться различным менеджерам задач, это снова и снова суммирует новые сбои. Единственное решение, которое я нашел, - это удалить скрытые файлы и повторно отправить задание. Если я правильно понял (и, пожалуйста, поправьте меня, если я ошибаюсь), события в скрытых файлах не были зафиксированы на сервере bootstrap, поэтому потеря данных не происходит.

Есть ли способ, заставляющий Flink игнорировать уже записанные файлы? Или, может быть, есть лучший способ реализовать решение (может быть, как-то с помощью точек сохранения)?

1 Ответ

0 голосов
/ 28 апреля 2020

Я получил очень подробный ответ в списке рассылки Flink. TLDR, чтобы реализовать ровно один раз, мне нужно использовать какую-то распределенную ФС.

Полный ответ:

Локальная файловая система не является правильным выбором для того, чего вы пытаетесь достичь. Я не думаю, что вы можете достичь истинной политики в этой ситуации. Позвольте мне объяснить, почему. Интересно, как он ведет себя на контрольно-пропускных пунктах. Поведение контролируется RollingPolicy. Поскольку вы еще не сказали, какой формат вы используете, давайте сначала предположим, что вы используете формат строки. Для формата строки используемая по умолчанию скользящая политика (при изменении файла с выполняемого на ожидающий) - она ​​будет свернута, если размер файла достигнет 128 МБ, файл старше 60 se c или не был записан для 60 se c. Это не катит на контрольно-пропускном пункте. Более того, StreamingFileSink рассматривает файловую систему как долговечный приемник, к которому можно получить доступ после восстановления. Это означает, что он попытается добавить этот файл при восстановлении из контрольной точки / точки сохранения.

Даже если вы прокатили файлы на каждой контрольной точке, вы все равно можете столкнуться с проблемой, что у вас могут быть некоторые остатки, потому что StreamingFileSink перемещает файлы, ожидающие завершения, после завершения контрольной точки. Если произойдет сбой между завершением контрольной точки и перемещением файлов, он не сможет переместить их после восстановления (он сделает это, если у него будет доступ).

Наконец, завершенная контрольная точка будет содержать смещения записей, которые были обработаны успешно сквозной, что означает записи, которые предположительно зафиксированы StreamingFileSink. Это могут быть записи, записанные в текущий файл с указателем в метаданных контрольной точки StreamingFileSink, записи в «ожидающем» файле с записью в метаданных контрольной точки StreamingFileSink о завершении этого файла или записи в «готовых» файлах. [ 1]

Поэтому, как вы можете видеть, есть несколько сценариев ios, когда StreamingFileSink должен получить доступ к файлам после перезапуска.

Последнее, что вы упомянули, «принятие в» bootstrap -server ". Имейте в виду, что Flink не использует смещения, переданные обратно в Kafka, для гарантии согласованности. Он может записывать эти смещения обратно, но только для целей мониторинга / отладки. Flink сохраняет / восстанавливает обработанные смещения из своих контрольных точек. [3 ]

Дайте мне знать, если это помогло. Я старался изо всех сил;) Кстати, я настоятельно рекомендую прочитать связанные источники, поскольку они пытаются описать все это в более структурированном виде. Я также cc IN Kostas кто знает больше о StreamingFileSink, чем я, поэтому он может исправить меня где-то.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html [3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka -потребителей-смещение-фиксация-поведение -конфигурация

...