SqlParseException произошло, когда я использовал таблицу Flink - PullRequest
0 голосов
/ 14 апреля 2020

Я новичок в Apache Flink, и я попытался манипулировать набором данных с помощью таблицы SQL Флинка.

После чтения входного файла через API-интерфейсы flink, и я настроил некоторые настройки flink, как показано ниже.

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5,TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS)));
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);

А потом я прочитал файл csv через CsvTableSource и выбрал нужные параметры. и наконец я попытался запросить оператор sql, используя sqlQuery, как показано ниже, но была найдена ошибка, указывающая, что Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "table <EOF>" at line 1, column 22.. Я пытался решить мою проблему, но не смог. Я приложил остальную часть кода.

CsvTableSource tableSource = CsvTableSource.builder()
                .path(sourceFile)
                .ignoreFirstLine()
                .fieldDelimiter(",")
                .fieldDelimiter(",")
                .field("assetId", STRING)
                .field("timestring", STRING)
                .field("PARAM001", DOUBLE)
                .field("PARAM002", DOUBLE)
                .field("PARAM003", DOUBLE)
                .field("PARAM004", DOUBLE)
                .field("PARAM005", DOUBLE)
                .field("PARAM006", DOUBLE)
                .field("PARAM007", DOUBLE)
                .field("PARAM008", DOUBLE)
                .build();


        Table table0 = tEnv
                .fromTableSource(tableSource)
                .select("assetId, PARAM001, PARAM002, PARAM003, PARAM004, PARAM005, PARAM006, PARAM007, PARAM008");


        DataStream<Row> row = tEnv.toAppendStream(table0, Row.class)
                .process(new ProcessFunction<Row, Row>() {
                    @Override
                    public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception {
                        Thread.sleep(sleepLength);
                        out.collect(value);
                    }
                })
                .returns(new RowTypeInfo(STRING, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE));

        Table table = tEnv.fromDataStream(row,
                "assetId, PARAM001, PARAM002, PARAM003, PARAM004, PARAM005, PARAM006, PARAM007, PARAM008,time.proctime");

        Table sql = tEnv.sqlQuery("SELECT PARAM001 from table");  //the error occurred
        tEnv.toAppendStream(sql,Row.class).print();

Буду признателен, если вы решите мою проблему.

Спасибо.

...