Flink Kinesis Consumer не сохраняет последнюю успешно обработанную последовательность номеров - PullRequest
0 голосов
/ 22 февраля 2019

Мы используем Flink Kinesis Consumer для получения данных из потока Kinesis в наше приложение Flink.

Библиотека KCL использует таблицу DynamoDB для хранения последних успешно обработанных номеров последовательностей потоков Kinesis.так что при следующем запуске приложения оно возобновляется с того места, на котором оно было остановлено.

Но, похоже, что Flink Kinesis Consumer не поддерживает никаких таких порядковых номеров.в любом постоянном магазине.В результате нам нужно полагаться на ShardIteratortype (trim_horizen, последняя версия и т. Д.), Чтобы решить, где возобновить обработку приложения Flink после перезапуска приложения.

Возможное решение этой проблемы может заключаться в использовании механизма контрольных точек Flink, ноэто работает только тогда, когда приложение возобновляет работу после сбоя, а не тогда, когда приложение было намеренно отменено, и его необходимо перезапустить из последней успешно использованной последовательности потока Kinesis №.

Нужно ли хранить эти последние успешно использованные последовательностимы сами?

1 Ответ

0 голосов
/ 22 февраля 2019

Рекомендуется использовать Flink с контрольными точками и точками сохранения, поскольку они создают непротиворечивые снимки, которые содержат смещения в очередях сообщений (в данном случае порядковые номера потока Kinesis) вместе со всеми состояниями в остальной части графика работ, которыев результате использования данных до этих смещений.Это позволяет восстановить или перезапустить без потери или дублирования данных.

Контрольные точки Flink представляют собой моментальные снимки, сделанные самим Flink для восстановления после сбоев, и имеют формат, оптимизированный для быстрого восстановления. Точки сохранения используют тот же базовый механизм моментальных снимков, но они запускаются вручную, и их формат больше касается эксплуатационной гибкости, чем производительности.

Точки сохранения - это то, что вы ищете.В частности, отмена с точкой сохранения и возобновление с точки сохранения очень полезны.

Другой вариант - использовать сохраненные контрольные точки с ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION.

...