Я подключаю Flink и Elastic Search.С помощью документации .Я создал приемник с использованием Scala.
def getEsSink:ElasticsearchSink[CustomerSegementation] = {
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[CustomerSegementation](httpHosts, new ElasticsearchSinkFunction[CustomerSegementation]() {
def createIndexRequest(element: CustomerSegementation): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("_id", element._id)
json.put("last_ordered_date", element.last_ordered_date.toString)
Requests.indexRequest.index("customerSegementation").`type`("test_type").source(json)
}
@Override
def process(element: CustomerSegementation, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
indexer.add(createIndexRequest(element))
}
})
esSinkBuilder.setBulkFlushMaxActions(1)
esSinkBuilder.build()
}
Затем я пытаюсь добавить приемник в поток
stream.addSink(getEsSink())
Я получаю сообщение об ошибке
Я не уверен, где именно я ошибаюсь.Я использую Flink версии 1.6.0 и ElasticSearch версии 6 и Scala версии 2.11.
Пожалуйста, помогите мне.