Ошибка преобразования таблицы Flink CEP в объект Dataset <Row> - PullRequest
0 голосов
/ 21 октября 2019

Я создал BatchTableEnviroment в Apache Flink и создал Table объект, в который я загрузил данные. Теперь я хочу найти некоторые шаблоны. Я делаю это с Detecting patterns in Tables CEP library. Задача запроса ниже состоит в том, чтобы найти самый длинный период mGroup, для которого avgResult не опускался ниже определенного порога. Где mGroup является значением Integer, таким как 100, 200, 300 и т. Д. Avgresult является двойным значением. Когда я компилирую часть запроса, я не получаю никакой ошибки. Я получаю сообщение об ошибке при преобразовании Table в DataSet<row>. Ниже запроса вы можете увидеть сообщение об ошибке.

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(fbEnv);

Table trendTable = tableEnv.sqlQuery(
             "  SELECT * " +
             " FROM tableAvg " +
             " MATCH_RECOGNIZE(" +
             "   PARTITION BY mType " +
             "   ORDER BY mGroup  " +
             "   MEASURES " +
             "       FIRST(A.mGroup) as startGr, " +
             "       LAST(A.mGroup) as endGr, " +
                   "  A.avgResult as avgRes" +
             " ONE ROW PER MATCH " +
             " AFTER MATCH SKIP PAST LAST ROW " +
             " PATTERN (A+ B) " +
             " DEFINE " +
             "   A AS A.avgResult < 50 " +
                     ") "
     );

tableEnv.registerTable("TrendTable", trendTable);

DataSet<Row> result = tableEnv.toDataSet(trendTable, Row.class);

/////////////////////ERROR MESSAGE BELOW
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalMatch(partition=[[$1]], order=[[2]], outputFields=[[mType, startGr, endGr, avgRes]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[<(PREV(A.$0, 0), 50)]], inputFields=[[sumResult, mType, EXPR$2]])
FlinkLogicalSort(sort0=[$2], dir0=[ASC])
 FlinkLogicalCalc(expr#0..5=[{inputs}], expr#6=[/($t2, $t3)], expr#7=[99], expr#8=[>($t5, $t7)], sumResult=[$t6], mType=[$t1], EXPR$2=[$t4], $condition=[$t8])
   FlinkLogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)], agg#1=[SUM($3)], agg#2=[MAX($4)], agg#3=[COUNT()])
     FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[-($t0, $t3)], expr#5=[100], expr#6=[/($t4, $t5)], expr#7=[1.0:DECIMAL(2, 1)], $f0=[$t6], mType=[$t1], mValue=[$t2], $f3=[$t7], mID=[$t0])
       FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, H]], fields=[mID, dateTime, mValue, unixDateTime, mType], source=[CsvTableSource(read fields: mID, mType, mValue)])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
 at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
 at org.apache.flink.table.plan.Optimizer.optimizePhysicalPlan(Optimizer.scala:170)
 at org.apache.flink.table.plan.BatchOptimizer.optimize(BatchOptimizer.scala:57)
 at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:280)
 at org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:71)
 at StreamTableEnv.main(StreamTableEnv.java:169)

1 Ответ

1 голос
/ 21 октября 2019

Библиотека CEP и MATCH_RECOGNIZE работают только поверх потокового API (а не пакетного), что означает, что вам нужно использовать StreamTableEnvironment вместо BatchTableEnviroment.

...