Есть ли способ программно проверить, запущено ли потоковое задание Flink с точки сохранения, перед выполнением потока? - PullRequest
0 голосов
/ 12 октября 2019

Прежде чем вызвать execute на StreamExecutionEnvironment и запустить потоковое задание, есть ли способ программно узнать, было ли задание восстановлено из точки сохранения? Мне нужно знать такую ​​информацию, чтобы я мог установить смещение источника Кафки в зависимости от него при построении графика задания.

Кажется, что класс FlinkConnectorKafkaBase, который имеет метод initializeState, имеет доступ ктакая информация ( код ). Однако нет способа перехватить FunctionInitializationContext и получить значение isRestored(), поскольку initializeState является методом final. Кроме того, метод initializeState вызывается после выполнения графа заданий, и поэтому я не думаю, что с ним связано возможное решение.

Еще одна попытка, которую я предпринял, состояла в том, чтобы найти параметр задания Flink, который указываетбыло ли задание запущено из точки сохранения. Однако я не думаю, что такой параметр существует.

1 Ответ

0 голосов
/ 13 октября 2019

Вы можете получить искомый эффект, просто выполнив следующее:

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest(); 

Если вы используете setStartFromEarliest, тогда Flink проигнорирует смещения, сохраненные в Kafka, и вместо этого начнет чтение с самой ранней записи. Более того, даже если вы используете setStartFromEarliest, если Flink возобновляет работу с контрольной точки или точки сохранения, он вместо этого будет использовать смещения, сохраненные в этом снимке.

Обратите внимание, что Flink выполняет свое собственное управление смещениями Kafka и при восстановлении изконтрольная точка игнорирует смещения, сохраненные в Kafka. Flink делает это как часть предоставления точно однократных гарантий, для чего необходимо точно знать, сколько входных данных было использовано для получения результатов, присутствующих в остальной части состояния, захваченного в контрольной точке или точке сохранения. По этой причине Flink всегда сохраняет смещения как часть каждого снимка состояния (контрольная точка или точка сохранения).

Это задокументировано здесь и здесь .

Что касается вашего исходного вопроса о initializeState, он доступен, если вы реализуете интерфейс CheckpointedFunction, но на самом деле это редко требуется.

...