Невозможно зарегистрировать Flink TableSource с внешним соединителем - PullRequest
0 голосов
/ 22 декабря 2019

Я пытаюсь зарегистрировать источник flink с помощью приведенного ниже фрагмента кода. Но произошел сбой за исключением.

Исключение в потоке "main" org.apache.flink.table.api.TableException: findAndCreateTableSource не удалось. в org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource (TableFactoryUtil.java:67) в org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource (TableFactoryUtil.java:54) в org.aptable.ache.descriptors.ConnectTableDescriptor.registerTableSource (ConnectTableDescriptor.java:69) в test.flink.BatchJob.main (BatchJob.java:58) Причина: org.apache.flink.table.api.NoMatchingTableFactoryException: Не удалось найти подходящую фабрику таблицдля 'org.apache.flink.table.factories.TableSourceFactory' в пути к классам.

Reason: No context matches.

The following properties are requested:
connector.path=file://D:\\Work\\flink\\flink-quickstart\\table-api\\src\\main\\resources\\result
connector.property-version=1
connector.type=filesystem
format.avro-schema={\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"User\",\n \"fields\": [\n     {\"name\": \"name\", \"type\": \"string\"},\n     {\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]},\n     {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n ]\n}
format.property-version=1
format.type=avro
schema.0.name=name
schema.0.type=VARCHAR
schema.1.name=favorite_number
schema.1.type=INT
schema.2.name=favorite_color
schema.2.type=VARCHAR

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.formats.avro.AvroRowFormatFactory
    at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
    at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
    at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
    at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
    at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)

            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

            BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

            String schemaString = "{\"namespace\": \"example.avro\",\n" +
                    " \"type\": \"record\",\n" +
                    " \"name\": \"User\",\n" +
                    " \"fields\": [\n" +
                    "     {\"name\": \"name\", \"type\": \"string\"},\n" +
                    "     {\"name\": \"favorite_number\",  \"type\": [\"int\", \"null\"]},\n" +
                    "     {\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]}\n" +
                    " ]\n" +
                    "}";

            tEnv
                    .connect(
                            new FileSystem()
                                    .path("file://D:\\Work\\flink\\flink-quickstart\\table-api\\src\\main\\resources\\result")
                    )
                    .withFormat(new Avro().avroSchema(schemaString))
                    .withSchema(
                            new Schema()
                                    .field("name", Types.STRING)
                                    .field("favorite_number", Types.INT)
                                    .field("favorite_color", Types.STRING)
                    )
                    .registerTableSource("FirstTableSource");

            env.execute();
        }```
...