Не найдено подходящей TableFactory - PullRequest
0 голосов
/ 27 марта 2020

Сильно следуя инструкциям руководства Flink по настройке API таблиц, я сталкиваюсь со следующим исключением:

Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
    at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
    at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
    at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
    at com.reeeliance.flink.StreamingTableL3.main(StreamingTableL3.java:234)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
    ... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=<ip>
connector.properties.1.key=group.id
connector.properties.1.value=test-consumer-group
connector.properties.2.key=transaction.timeout.ms
connector.properties.2.value=900000
connector.properties.3.key=bootstrap.servers
connector.properties.3.value=<ip>
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=<topic>
connector.type=kafka
connector.version=2.3.1
format.avro-schema=<avroschema>
format.property-version=1
format.type=avro

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
    at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
    at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
    at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
    at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
    at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
    ... 17 more

Код, который я использую для создания соединения:

                final StreamTableEnvironment envT = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build());

                envT.connect(new Kafka()
                                .version("2.3.1")
                                .topic(topic)
                                .startFromEarliest()
                                .property("zookeeper.connect", "<ip>")
                                .property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress)
                                .property(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
                                .property(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "900000")
                                )
                    .withFormat(new Avro().avroSchema(schema))
                    .registerTableSourceAndSink("test");

Мой пом. xml включает

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

и

<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    <mainClass>com.reeeliance.flink.StreamingTableL3</mainClass>
</transformer>

, чтобы сделать его жирной банкой. Я даже попытался добавить ServicesResourceTransformer (как предложено в другом посте SO), который тоже не работал.

Вы также можете видеть, что рассматривается KafkaTableSourceSinkFactory, но это не Фабрика, которую он ищет. Переключение с registerTableSourceAndSink() на registerTableSource() также не имело значения. В еще одном посте SO я обнаружил, что свойство connector.type должно быть установлено правильно, но, как я вижу, оно правильно установлено на kafka.

Я предполагаю, что это связано с Maven, а не уверен, что даже может быть проблема несовместимости версий, но я использую Flink 1.9.1 практически для всего.

Почему рассматриваемый KafkaTableSourceSinkFactory не удовлетворяет критериям и как я могу это исправить?

...