Я использую flink table api, используя kafka в качестве источника ввода и json в качестве схемы таблицы.Я получил эту ошибку при отправке моей программы: `` `Программа завершила со следующим исключением:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=localhost:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=localhost:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_table_test
connector.type=kafka
connector.version=0.10
format.fail-on-missing-field=true
format.json-schema={\t'type': 'object',\t'properties': {\t\t'uid': {\t\t\t'type': 'number'\t\t},\t\t'name': {\t\t\t'type': 'string'\t\t},\t\t'age': {\t\t\t'type': 'number'\t\t},\t\t'timestamp': {\t\t\t'type': 'long'\t\t}\t}}
format.property-version=1
format.type=json
schema.0.name=uid
schema.0.type=BIGINT
schema.1.name=name
schema.1.type=VARCHAR
schema.2.name=age
schema.2.type=INT
schema.3.name=ts
schema.3.type=TIMESTAMP
update-mode=append
The following factories have been considered:
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:81)
at org.apache.flink.table.factories.TableFactoryUtil$.findAndCreateTableSource(TableFactoryUtil.scala:49)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:44)
at com.akulaku.data.flink.QueryTable$.main(QueryTable.scala:53)
at com.akulaku.data.flink.QueryTable.main(QueryTable.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 9 more
` ``
Я уже добавил flink-json-1.6.1.Джар в Flink путь к библиотеке.
Может ли кто-нибудь помочь?