Сильно следуя инструкциям руководства Flink по настройке API таблиц, я сталкиваюсь со следующим исключением:
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
at com.reeeliance.flink.StreamingTableL3.main(StreamingTableL3.java:234)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=<ip>
connector.properties.1.key=group.id
connector.properties.1.value=test-consumer-group
connector.properties.2.key=transaction.timeout.ms
connector.properties.2.value=900000
connector.properties.3.key=bootstrap.servers
connector.properties.3.value=<ip>
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=<topic>
connector.type=kafka
connector.version=2.3.1
format.avro-schema=<avroschema>
format.property-version=1
format.type=avro
The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
... 17 more
Код, который я использую для создания соединения:
final StreamTableEnvironment envT = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build());
envT.connect(new Kafka()
.version("2.3.1")
.topic(topic)
.startFromEarliest()
.property("zookeeper.connect", "<ip>")
.property(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerAddress)
.property(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
.property(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "900000")
)
.withFormat(new Avro().avroSchema(schema))
.registerTableSourceAndSink("test");
Мой пом. xml включает
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
и
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.reeeliance.flink.StreamingTableL3</mainClass>
</transformer>
, чтобы сделать его жирной банкой. Я даже попытался добавить ServicesResourceTransformer
(как предложено в другом посте SO), который тоже не работал.
Вы также можете видеть, что рассматривается KafkaTableSourceSinkFactory
, но это не Фабрика, которую он ищет. Переключение с registerTableSourceAndSink()
на registerTableSource()
также не имело значения. В еще одном посте SO я обнаружил, что свойство connector.type
должно быть установлено правильно, но, как я вижу, оно правильно установлено на kafka
.
Я предполагаю, что это связано с Maven, а не уверен, что даже может быть проблема несовместимости версий, но я использую Flink 1.9.1 практически для всего.
Почему рассматриваемый KafkaTableSourceSinkFactory
не удовлетворяет критериям и как я могу это исправить?