Я намерен зарегистрировать таблицу с БД и информацией о таблице соответственно как:
tableEnv.registerDataStream("dbA.tableA", flattenEventSrc, clientLogToRowWithRawBytesSchema.getEventFieldExpressions());
Table result = tableEnv.sqlQuery("select * from dbA.tableA where x = 1");
Но на это жалуется ошибка:
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 15 to line 1, column 24: Object 'dbA' not found
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:91)
at org.apache.flink.table.api.TableEnvironment.sqlQuery(TableEnvironment.scala:651)
at com.kuaishou.dp.stream.worker.portal.ClientLogProtobufToProtobufPOC2.main(ClientLogProtobufToProtobufPOC2.java:67)
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 15 to line 1, column 24: Object 'dbA' not found
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:803)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:788)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4706)
at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:172)
at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:947)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:928)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2975)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:2960)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3219)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:947)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:928)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:226)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:903)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:613)
at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:87)
... 2 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'dbA' not found
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 21 more
Есть ли способ поддержки db
и table
в API таблиц Flink, чтобы таблицы могли хорошо управляться нашей собственной системой. Спасибо.