Соединитель Flinkasticsearch завершается неудачно с неизвестной настройкой исключения [bulk.flush.max.size.mb] - PullRequest
0 голосов
/ 04 июля 2018

Соединитель Flink 1.5.0 ElasticSearch Страница имеет следующий код

Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");

Похоже, что bulk.flush.max.actions устарели или удалены. Как только программа запускается, я получаю следующее исключение.

Вот мой точный код (слегка измененная версия этого файла )

    @throws[Exception]
      override def open(parameters: Configuration) {
        val config = new util.HashMap[String, String]
        config.put("bulk.flush.max.size.mb", "1")
        config.put("cluster.name", cluster)

        val settings = Settings.builder()
          .put(config)
          .build()

        client = new PreBuiltTransportClient(settings)
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port))


}

и это исключение, которое я получаю

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.IllegalArgumentException: unknown setting [bulk.flush.max.size.mb] please check that any required plugins are installed, or check the breaking changes documentation for removed settings
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:625)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:121)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
    at org.myorg.quickstart.StreamingKafkaClient$.main(StreamingKafkaClient.scala:63)
    at org.myorg.quickstart.StreamingKafkaClient.main(StreamingKafkaClient.scala)

Кроме того, похоже, что разъем Fink не работает с ES 6.x, мне пришлось удалить и вернуться к ES 5.x, чтобы подключение Flink к ES работало. Я хочу использовать Наверх *

Я использую Flink 1.5.0, и соответствующая выдержка из pom.xml выглядит следующим образом

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
        <version>1.5.0</version>
    </dependency>
...