Проблема: приложение Flink не получает и не обрабатывает события от соединителя Kinesis, сгенерированные, когда он был отключен (из-за перезапуска)
У нас есть следующая настройка Flink env
env.enableCheckpointing(1000ms);
env.setStateBackend(new RocksDBStateBackend("file:///<filelocation>", true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(pause);
env.getCheckpointConfig().setCheckpointTimeout(timeOut);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(concurrency);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
и Kinesis имеет следующую начальную конфигурацию
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
Интересно, когда я изменяю конфигурацию Kinesis, чтобы ответить на событие, т.е.
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
Flink получает все буферизованные записи (включая события, сгенерированные до, во время и после события, когда приложение Flink было закрыто) из Kinesis и обрабатывает его. Таким образом, это поведение нарушает свойство «Ровно один раз» приложения Flink.
Может кто-нибудь указать на некоторые очевидные вещи, которые мне не хватает?