Flink sql для государственного контрольно-пропускного пункта - PullRequest
0 голосов
/ 28 декабря 2018

Когда я использую данные процесса flink sql api.

Перезапуск приложения , sum результат не сохраняется в контрольной точке. Он все еще начинается с 1.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new FsStateBackend("file:///D:/d_backup/github/flink-best-practice/checkpoint");
env.enableCheckpointing(1000 * 60);
env.setStateBackend(stateBackend);

Table table = tableEnv.sqlQuery("select sum(area_id) from rtc_warning_gmys where area_id = 1 group by character_id,area_id,group_id,platform");

//   convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
//   The boolean field indicates the type of the change.
//   True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> dsRow = tableEnv.toRetractStream(table, Row.class);
dsRow.map(new MapFunction<Tuple2<Boolean,Row>, Object>() {
	@Override
	public Object map(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
		if(booleanRowTuple2.f0) {
			System.out.println(booleanRowTuple2.f1.toString());
			return booleanRowTuple2.f1;
		}
		return null;
	}
});

env.execute("Kafka table select");

Войти как:

1 2 3 ... ... 100

Перезапустить приложение, оно все еще запускается: 1 2 3...

Я думаю, что значение суммы будет сохранено в файле контрольной точки, и приложение перезапуска может прочитать последний результат из контрольной точки, например:

101 102 103 ... 120

1 Ответ

0 голосов
/ 28 декабря 2018

Некоторые возможности:

  • Работало ли задание достаточно долго для выполнения контрольной точки?Тот факт, что задание выдает результат, не означает, что контрольная точка завершена.Я вижу, что у вас есть контрольные точки, настроенные на выполнение раз в минуту, и выполнение контрольных точек может занять некоторое время.

  • Как работа была остановлена?Если они не были выведены на внешний уровень, контрольные точки удаляются при отмене задания.

  • Как перезапустить задание?Восстановился ли он (автоматически) из контрольной точки, или он был восстановлен из внешней контрольной точки или точки сохранения, или он был перезапущен с нуля?

Этот вид эксперимента проще всего выполнить из командной строки.Например, вы можете

  1. написать приложение, которое использует контрольные точки и имеет стратегию перезапуска (например, env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000)))
  2. запустить локальный кластер
  3. "flink run -d app.jar", чтобы запустить задание
  4. дождаться завершения хотя бы одной контрольной точки
  5. "kill -9 task-manager-PID " длявызвать сбой
  6. "taskmanager.sh start", чтобы разрешить возобновление работы с контрольной точки
...