Приложение Flink не получает и не обрабатывает события от соединителя Kinesis, сгенерированные, когда он был недоступен - PullRequest
0 голосов
/ 02 мая 2018

Проблема: приложение Flink не получает и не обрабатывает события от соединителя Kinesis, сгенерированные, когда он был отключен (из-за перезапуска)

У нас есть следующая настройка Flink env

env.enableCheckpointing(1000ms);
env.setStateBackend(new RocksDBStateBackend("file:///<filelocation>", true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(pause); 
env.getCheckpointConfig().setCheckpointTimeout(timeOut); 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(concurrency);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

и Kinesis имеет следующую начальную конфигурацию

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
                "LATEST");

Интересно, когда я изменяю конфигурацию Kinesis, чтобы ответить на событие, т.е.

 kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
                "TRIM_HORIZON");

Flink получает все буферизованные записи (включая события, сгенерированные до, во время и после события, когда приложение Flink было закрыто) из Kinesis и обрабатывает его. Таким образом, это поведение нарушает свойство «Ровно один раз» приложения Flink.

Может кто-нибудь указать на некоторые очевидные вещи, которые мне не хватает?

1 Ответ

0 голосов
/ 03 мая 2018

Соединитель Flink Kinesis сохраняет порядковые номера сегментов в состоянии для точной однократной обработки.

Из вашего описания кажется, что на вашей работе "перезагрузка", состояние контрольной точки не соблюдается.

Просто чтобы сначала устранить очевидное: Как ваша работа восстанавливается после перезагрузки? Вы возобновляете с точки сохранения или этот автоматический перезапуск выполняется с предыдущей контрольной точки?

...