Ошибка компилятора при использовании примера, приведенного в документации Flink.Документация Flink предоставляет пример кода Scala для установки параметров фабрики клиента REST при обращении к Elasticsearch, https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html. При попытке этого кода я получаю ошибку компилятора в IntelliJ, которая говорит: «Не удается разрешить символ restClientBuilder».
Я обнаружил следующее ТАК, что ТОЧНО моя проблема, за исключением того, что это на Java, и я делаю это в Scala. Apache Flink (v1.6.0) аутентифицирует Elasticsearch Sink (v6.4)
Я попытался скопировать код решения, предоставленный в приведенном выше SO, в IntelliJ, в автоконвертированном коде также есть компиляторошибки.
// provide a RestClientFactory for custom configuration on the internally created REST client
// i only show the setMaxRetryTimeoutMillis for illustration purposes, the actual code will use HTTP cutom callback
esSinkBuilder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setMaxRetryTimeoutMillis(10)
}
)
Затем я попытался (автоматически сгенерированный код Java в Scala от IntelliJ)
// provide a RestClientFactory for custom configuration on the internally created REST client// provide a RestClientFactory for custom configuration on the internally created REST client
import org.apache.http.auth.AuthScope
import org.apache.http.auth.UsernamePasswordCredentials
import org.apache.http.client.CredentialsProvider
import org.apache.http.impl.client.BasicCredentialsProvider
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.elasticsearch.client.RestClientBuilder
// provide a RestClientFactory for custom configuration on the internally created REST client// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory((restClientBuilder) => {
def foo(restClientBuilder) = restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = { // elasticsearch username and password
val credentialsProvider = new BasicCredentialsProvider
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(es_user, es_password))
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
}
})
foo(restClientBuilder)
})
Исходный фрагмент кода выдает ошибку «не удается разрешить RestClientFactory», а затем Java в Scalaпоказывает несколько других ошибок.
Так что в основном мне нужно найти версию решения Scala, описанную в Apache Flink (v1.6.0) аутентифицировать Elasticsearch Sink (v6.4)
Обновление 1 : мне удалось добиться некоторого прогресса с некоторой помощью IntelliJ.Следующий код компилируется и запускается, но есть другая проблема.
esSinkBuilder.setRestClientFactory(
new RestClientFactory {
override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
// elasticsearch username and password
val credentialsProvider = new BasicCredentialsProvider
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(es_user, es_password))
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
httpClientBuilder.setSSLContext(trustfulSslContext)
}
})
}
}
Проблема в том, что я не уверен, должен ли я делать новый объект RestClientFactory.Что происходит, так это то, что приложение подключается к кластеруasticsearch, но затем обнаруживает, что SSL CERT недействителен, поэтому мне пришлось поставить trustfullSslContext (как описано здесь https://gist.github.com/iRevive/4a3c7cb96374da5da80d4538f3da17cb),, это помогло мне справиться с проблемой SSL, но теперь ESREST Client выполняет тест ping, и ping завершается неудачно, он генерирует исключение и приложение закрывается. Я подозреваю, что ping не работает из-за ошибки SSL и, возможно, он не использует настройку trustfulSslContext i как часть нового RestClientFactory, и это делаетя подозреваю, что я не должен был делать новое, должен быть простой способ обновить существующий объект RestclientFactory, и в основном все это происходит из-за моего недостатка знаний Scala.