Застрял на: Не удалось найти подходящую столовую фабрику - PullRequest
0 голосов
/ 29 февраля 2020

Играя с Flink, я пытался перенести данные в Elasticsearch. У меня есть эта ошибка на моем STDOUT:

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector.hosts=http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200
connector.index=transfers-sum
connector.key-null-literal=n/a
connector.property-version=1
connector.type=elasticsearch
connector.version=6
format.json-schema={      \"curr_careUnit\": {\"type\": \"text\"},      \"sum\": {\"type\": \"float\"}    }
format.property-version=1
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=curr_careUnit
schema.1.data-type=FLOAT
schema.1.name=sum
update-mode=upsert

The following factories have been considered:
org.apache.flink.streaming.connectors.kafka.Kafka09TableSourceSinkFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
    at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
...

Вот то, что у меня есть в моем scala Коде Flink:

  def main(args: Array[String]) {
    // Create streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // Set properties per KafkaConsumer API
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty("group.id", "test")

    // Add Kafka source to environment
    val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data4", new SimpleStringSchema(), properties)
    // Read from beginning of topic
    myKConsumer.setStartFromEarliest()

    val streamSource = env
      .addSource(myKConsumer)

    // Transform CSV (with a header row per Kafka event into a Transfers object
    val streamTransfers = streamSource.map(new TransfersMapper())

    // create a TableEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    // register a Table
    val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView("transfers", tblTransfers)

    tEnv.connect(
      new Elasticsearch()
        .version("6")
        .host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")
        .index("transfers-sum")
        .keyNullLiteral("n/a")
      .withFormat(new Json().jsonSchema("{      \"curr_careUnit\": {\"type\": \"text\"},      \"sum\": {\"type\": \"float\"}    }"))
      .withSchema(new Schema()
        .field("curr_careUnit", DataTypes.STRING())
        .field("sum", DataTypes.FLOAT())
      )
      .inUpsertMode()
      .createTemporaryTable("transfersSum")

    val result = tEnv.sqlQuery(
      """
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |""".stripMargin)

    result.insertInto("transfersSum")

    env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

Я создаю толстую банку и загружаю ее к моему удаленному экземпляру Flink. Вот мои зависимости build.gradle:

compile 'org.scala-lang:scala-library:2.11.12'
compile 'org.apache.flink:flink-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-streaming-scala_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-kafka-0.10_2.11:1.10.0'
compile 'org.apache.flink:flink-table-api-scala-bridge_2.11:1.10.0'
compile 'org.apache.flink:flink-connector-elasticsearch6_2.11:1.10.0'
compile 'org.apache.flink:flink-json:1.10.0'
compile 'com.fasterxml.jackson.core:jackson-core:2.10.1'
compile 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.1'
compile 'org.json4s:json4s-jackson_2.11:3.7.0-M1'

Вот как команда farJar создана для gradle:

jar {
    from {
        (configurations.compile).collect {
            it.isDirectory() ? it : zipTree(it)
        }
    }
    manifest {
        attributes("Main-Class": "main" )
    }
}
task fatJar(type: Jar) {
    zip64 true
    manifest {
        attributes 'Main-Class': "flinkNamePull.Demo"
    }
    baseName = "${rootProject.name}"
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    with jar
}

Может кто-нибудь помочь мне увидеть, что мне не хватает? Я довольно новичок в Flink и потоке данных в целом. Хе-хе

Заранее спасибо!

Ответы [ 2 ]

1 голос
/ 03 марта 2020

Вы должны использовать плагин shadow для создания толстой банки вместо того, чтобы делать это вручную.

В частности, вы хотите объединить дескрипторы услуг .

1 голос
/ 02 марта 2020

Список в The following factories have been considered: завершен? Содержит ли оно Elasticsearch6UpsertTableSinkFactory? Если не так далеко, насколько я могу судить, существует проблема с зависимостями обнаружения служб.

Как вы отправляете свою работу? Можете ли вы проверить, есть ли у вас файл META-INF/services/org.apache.flink.table.factories.TableFactory в uber jar с записью для Elasticsearch6UpsertTableSinkFactory?

При использовании maven вам необходимо добавить преобразователь для правильного слияния служебных файлов:

<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>

Я не знаю, как вы делаете это в Gradle.


РЕДАКТИРОВАТЬ: Благодаря Arvid Heise В Gradle при использовании shadowJar подключите вас можно объединить служебные файлы с помощью:

// Merging Service Files
shadowJar {
  mergeServiceFiles()
}
...