Когда я использую данные процесса flink sql api.
Перезапуск приложения , sum
результат не сохраняется в контрольной точке. Он все еще начинается с 1.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new FsStateBackend("file:///D:/d_backup/github/flink-best-practice/checkpoint");
env.enableCheckpointing(1000 * 60);
env.setStateBackend(stateBackend);
Table table = tableEnv.sqlQuery("select sum(area_id) from rtc_warning_gmys where area_id = 1 group by character_id,area_id,group_id,platform");
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> dsRow = tableEnv.toRetractStream(table, Row.class);
dsRow.map(new MapFunction<Tuple2<Boolean,Row>, Object>() {
@Override
public Object map(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
if(booleanRowTuple2.f0) {
System.out.println(booleanRowTuple2.f1.toString());
return booleanRowTuple2.f1;
}
return null;
}
});
env.execute("Kafka table select");
Войти как:
1 2 3 ... ... 100
Перезапустить приложение, оно все еще запускается: 1 2 3...
Я думаю, что значение суммы будет сохранено в файле контрольной точки, и приложение перезапуска может прочитать последний результат из контрольной точки, например:
101 102 103 ... 120