ошибка flink Не удалось найти подходящую фабрику таблиц для 'org. apache .flink.table.factories.BatchTableSourceFactory' в classpath - PullRequest
0 голосов
/ 24 февраля 2020

Я новичок в Apache Flink, и я пытаюсь прочитать файл Avro следующим образом,

val schema = new Schema()
  .field("tconst", "string")
  .field("titleType", "string")
  .field("primaryTitle", "string")
  .field("originalTitle", "string")
  .field("isAdult", "int")
  .field("startYear", "string")
  .field("endYear", "string")
  .field("runtimeMinutes", "int")
  .field("genres", "string")

val avroFormat: Avro = new Avro()
  .avroSchema(
    "{" +
      "  \"type\": \"record\"," +
      "  \"name\": \"test\"," +
      "  \"fields\" : [" +
      "    {\"name\": \"tconst\", \"type\": \"string\"}," +
      "    {\"name\": \"titleType\", \"type\": \"string\"}" +
      "    {\"name\": \"primaryTitle\", \"type\": \"string\"}" +
      "    {\"name\": \"originalTitle\", \"type\": \"string\"}" +
      "    {\"name\":   \"isAdult\", \"type\": \"int\"}" +
      "    {\"name\": \"startYear\", \"type\": \"string\"}" +
      "    {\"name\": \"endYear\", \"type\": \"string\"}" +
      "    {\"name\": \"runtimeMinutes\", \"type\": \"int\"}" +
      "    {\"name\": \"genres\", \"type\": \"string\"}" +
      "  ]" +
      "}"
  )

tableEnv.connect(new FileSystem().path("/Users/x/Documents/test_1.avro"))
  .withSchema(schema)
  .withFormat(avroFormat)
  .registerTableSource("sink")

Но когда я запускаю это, я получаю следующую ошибку.

Exception in thread "main" org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.BatchTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.path=/Users/x/Documents/test_1.avro
connector.property-version=1
connector.type=filesystem
format.avro-schema=.... // above schema
format.property-version=1
format.type=avro
schema.0.name=tconst
schema.0.type=string
schema.1.name=titleType
schema.1.type=string
schema.2.name=primaryTitle
schema.2.type=string
schema.3.name=originalTitle
schema.3.type=string
schema.4.name=isAdult
schema.4.type=int
schema.5.name=startYear
schema.5.type=string
schema.6.name=endYear
schema.6.type=string
schema.7.name=runtimeMinutes
schema.7.type=int
schema.8.name=genres
schema.8.type=string

The following factories have been considered:
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.formats.avro.AvroRowFormatFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory

В этом файле Avro он имеет Flink Dataset и использует AvroOutputFormat для записи файла.

val avroOutputFormat = new AvroOutputFormat[Row](classOf[Row])
flinkDatase.write(avroOutputFormat, "/Users/x/Documents/test_1.avro").setParallelism(1)

Я думаю, если это неправильный тип данных, который может привести к упомянутому ошибка. Есть ли способ определить точную проблему этого?

1 Ответ

0 голосов
/ 24 февраля 2020

Извините, что ввел вас в заблуждение На данный момент коннектор файловой системы, к сожалению, не поддерживает Avro.

Так что нет другого выбора, кроме как использовать API набора данных. Я рекомендую использовать avrohugger для создания соответствующего scala класса для вашей схемы avro.

// convert to your scala class
val dsTuple: DataSet[User] = tableEnv.toDataSet[User](table)
// write out
val avroOutputFormat = new AvroOutputFormat<>(User.class)
avroOutputFormat.setCodec(Codec.SNAPPY)
avroOutputFormat.setSchema(User.SCHEMA$)
specificUser.write(avroOutputFormat, outputPath1)
...