Моя проблема примерно такая, как описано здесь . Часть кода (фактически взята с apache сайта) выглядит следующим образом:
val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[String](
httpHosts,
new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("my-index")
.`type`("my-type")
.source(json)
Если я добавляю эти три утверждения, я получаю ошибку, как показано ниже
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
Ошибка, которую я получаю
object elasticsearch is not a member of package org.apache.flink.streaming.connectors
object elasticsearch6 is not a member of package org.apache.flink.streaming.connectors
Если я не добавлю эти операторы импорта, я получу ошибку, как показано ниже
Compiling 1 Scala source to E:\sar\scala\practice\readstbdata\target\scala-2.11\classes ...
[error] E:\sar\scala\practice\readstbdata\src\main\scala\example\readcsv.scala:35:25: not found: value ElasticsearchSink
[error] val esSinkBuilder = new ElasticsearchSink.Builder[String](
[error] ^
[error] E:\sar\scala\practice\readstbdata\src\main\scala\example\readcsv.scala:37:7: not found: type ElasticsearchSinkFunction
[error] new ElasticsearchSinkFunction[String] {
[error] ^
[error] two errors found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed 10 Feb, 2020 2:15:04 PM
Вопрос о стеке, о котором я говорил выше, некоторые функции были расширены. Насколько я понимаю, flink.streaming.connectors.elasticsearch необходимо расширить в библиотеки REST. 1) Верно ли мое понимание 2) Если да, могу ли я иметь полные расширения 3) Если мое понимание неверно, пожалуйста, дайте мне решение.
Примечание: я добавил следующие утверждения в build.sbt
libraryDependencies += "org.elasticsearch.client" % "elasticsearch-rest-high-level-client" % "7.5.2" ,
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "7.5.2",