Flink Elastic Search Sink Ошибка типа несоответствия - PullRequest
0 голосов
/ 12 ноября 2018

Я подключаю Flink и Elastic Search.С помощью документации .Я создал приемник с использованием Scala.

def getEsSink:ElasticsearchSink[CustomerSegementation] = {
    val httpHosts = new util.ArrayList[HttpHost]
    httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"))
    val esSinkBuilder = new ElasticsearchSink.Builder[CustomerSegementation](httpHosts, new ElasticsearchSinkFunction[CustomerSegementation]() {
      def createIndexRequest(element: CustomerSegementation): IndexRequest = {
        val json = new java.util.HashMap[String, String]
        json.put("_id", element._id)
        json.put("last_ordered_date", element.last_ordered_date.toString)
        Requests.indexRequest.index("customerSegementation").`type`("test_type").source(json)
      }

      @Override
      def process(element: CustomerSegementation, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
        indexer.add(createIndexRequest(element))
      }
    })
    esSinkBuilder.setBulkFlushMaxActions(1)
    esSinkBuilder.build()
  }

Затем я пытаюсь добавить приемник в поток

stream.addSink(getEsSink())

Я получаю сообщение об ошибке enter image description here

Я не уверен, где именно я ошибаюсь.Я использую Flink версии 1.6.0 и ElasticSearch версии 6 и Scala версии 2.11.

Пожалуйста, помогите мне.

...