Я новичок в Apache Flink, и я попытался манипулировать набором данных с помощью таблицы SQL Флинка.
После чтения входного файла через API-интерфейсы flink, и я настроил некоторые настройки flink, как показано ниже.
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5,TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS)));
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);
А потом я прочитал файл csv через CsvTableSource
и выбрал нужные параметры. и наконец я попытался запросить оператор sql, используя sqlQuery
, как показано ниже, но была найдена ошибка, указывающая, что Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "table <EOF>" at line 1, column 22.
. Я пытался решить мою проблему, но не смог. Я приложил остальную часть кода.
CsvTableSource tableSource = CsvTableSource.builder()
.path(sourceFile)
.ignoreFirstLine()
.fieldDelimiter(",")
.fieldDelimiter(",")
.field("assetId", STRING)
.field("timestring", STRING)
.field("PARAM001", DOUBLE)
.field("PARAM002", DOUBLE)
.field("PARAM003", DOUBLE)
.field("PARAM004", DOUBLE)
.field("PARAM005", DOUBLE)
.field("PARAM006", DOUBLE)
.field("PARAM007", DOUBLE)
.field("PARAM008", DOUBLE)
.build();
Table table0 = tEnv
.fromTableSource(tableSource)
.select("assetId, PARAM001, PARAM002, PARAM003, PARAM004, PARAM005, PARAM006, PARAM007, PARAM008");
DataStream<Row> row = tEnv.toAppendStream(table0, Row.class)
.process(new ProcessFunction<Row, Row>() {
@Override
public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception {
Thread.sleep(sleepLength);
out.collect(value);
}
})
.returns(new RowTypeInfo(STRING, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE));
Table table = tEnv.fromDataStream(row,
"assetId, PARAM001, PARAM002, PARAM003, PARAM004, PARAM005, PARAM006, PARAM007, PARAM008,time.proctime");
Table sql = tEnv.sqlQuery("SELECT PARAM001 from table"); //the error occurred
tEnv.toAppendStream(sql,Row.class).print();
Буду признателен, если вы решите мою проблему.
Спасибо.