Я пытаюсь зарегистрировать источник flink с помощью приведенного ниже фрагмента кода. Но произошел сбой за исключением.
Исключение в потоке "main" org.apache.flink.table.api.TableException: findAndCreateTableSource не удалось. в org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource (TableFactoryUtil.java:67) в org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource (TableFactoryUtil.java:54) в org.aptable.ache.descriptors.ConnectTableDescriptor.registerTableSource (ConnectTableDescriptor.java:69) в test.flink.BatchJob.main (BatchJob.java:58) Причина: org.apache.flink.table.api.NoMatchingTableFactoryException: Не удалось найти подходящую фабрику таблицдля 'org.apache.flink.table.factories.TableSourceFactory' в пути к классам.
Reason: No context matches.
The following properties are requested:
connector.path=file://D:\\Work\\flink\\flink-quickstart\\table-api\\src\\main\\resources\\result
connector.property-version=1
connector.type=filesystem
format.avro-schema={\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"User\",\n \"fields\": [\n {\"name\": \"name\", \"type\": \"string\"},\n {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n ]\n}
format.property-version=1
format.type=avro
schema.0.name=name
schema.0.type=VARCHAR
schema.1.name=favorite_number
schema.1.type=INT
schema.2.name=favorite_color
schema.2.type=VARCHAR
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.formats.avro.AvroRowFormatFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
String schemaString = "{\"namespace\": \"example.avro\",\n" +
" \"type\": \"record\",\n" +
" \"name\": \"User\",\n" +
" \"fields\": [\n" +
" {\"name\": \"name\", \"type\": \"string\"},\n" +
" {\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]},\n" +
" {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
" ]\n" +
"}";
tEnv
.connect(
new FileSystem()
.path("file://D:\\Work\\flink\\flink-quickstart\\table-api\\src\\main\\resources\\result")
)
.withFormat(new Avro().avroSchema(schemaString))
.withSchema(
new Schema()
.field("name", Types.STRING)
.field("favorite_number", Types.INT)
.field("favorite_color", Types.STRING)
)
.registerTableSource("FirstTableSource");
env.execute();
}```