Аутентифицироваться с помощью встроенного поискового элемента ECE ElasticSearch от Apache Fink (код Scala) - PullRequest
0 голосов
/ 26 мая 2019

Ошибка компилятора при использовании примера, приведенного в документации Flink.Документация Flink предоставляет пример кода Scala для установки параметров фабрики клиента REST при обращении к Elasticsearch, https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html. При попытке этого кода я получаю ошибку компилятора в IntelliJ, которая говорит: «Не удается разрешить символ restClientBuilder».

Я обнаружил следующее ТАК, что ТОЧНО моя проблема, за исключением того, что это на Java, и я делаю это в Scala. Apache Flink (v1.6.0) аутентифицирует Elasticsearch Sink (v6.4)

Я попытался скопировать код решения, предоставленный в приведенном выше SO, в IntelliJ, в автоконвертированном коде также есть компиляторошибки.

      // provide a RestClientFactory for custom configuration on the internally created REST client
      // i only show the setMaxRetryTimeoutMillis for illustration purposes, the actual code will use HTTP cutom callback
      esSinkBuilder.setRestClientFactory(
        restClientBuilder -> {
          restClientBuilder.setMaxRetryTimeoutMillis(10)
        }
      )

Затем я попытался (автоматически сгенерированный код Java в Scala от IntelliJ)

// provide a RestClientFactory for custom configuration on the internally created REST client// provide a RestClientFactory for custom configuration on the internally created REST client
      import org.apache.http.auth.AuthScope
      import org.apache.http.auth.UsernamePasswordCredentials
      import org.apache.http.client.CredentialsProvider
      import org.apache.http.impl.client.BasicCredentialsProvider
      import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
      import org.elasticsearch.client.RestClientBuilder
      // provide a RestClientFactory for custom configuration on the internally created REST client// provide a RestClientFactory for custom configuration on the internally created REST client

      esSinkBuilder.setRestClientFactory((restClientBuilder) => {
        def foo(restClientBuilder) = restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
          override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = { // elasticsearch username and password
            val credentialsProvider = new BasicCredentialsProvider
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(es_user, es_password))
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
          }
        })

        foo(restClientBuilder)
      })

Исходный фрагмент кода выдает ошибку «не удается разрешить RestClientFactory», а затем Java в Scalaпоказывает несколько других ошибок.

Так что в основном мне нужно найти версию решения Scala, описанную в Apache Flink (v1.6.0) аутентифицировать Elasticsearch Sink (v6.4)


Обновление 1 : мне удалось добиться некоторого прогресса с некоторой помощью IntelliJ.Следующий код компилируется и запускается, но есть другая проблема.

esSinkBuilder.setRestClientFactory(
          new RestClientFactory {
            override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
              restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = {
                  // elasticsearch username and password
                  val credentialsProvider = new BasicCredentialsProvider
                  credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(es_user, es_password))
                  httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
                  httpClientBuilder.setSSLContext(trustfulSslContext)
                }
              })
            }
          }

Проблема в том, что я не уверен, должен ли я делать новый объект RestClientFactory.Что происходит, так это то, что приложение подключается к кластеруasticsearch, но затем обнаруживает, что SSL CERT недействителен, поэтому мне пришлось поставить trustfullSslContext (как описано здесь https://gist.github.com/iRevive/4a3c7cb96374da5da80d4538f3da17cb),, это помогло мне справиться с проблемой SSL, но теперь ESREST Client выполняет тест ping, и ping завершается неудачно, он генерирует исключение и приложение закрывается. Я подозреваю, что ping не работает из-за ошибки SSL и, возможно, он не использует настройку trustfulSslContext i как часть нового RestClientFactory, и это делаетя подозреваю, что я не должен был делать новое, должен быть простой способ обновить существующий объект RestclientFactory, и в основном все это происходит из-за моего недостатка знаний Scala.

1 Ответ

0 голосов
/ 29 мая 2019

Рад сообщить, что это решено.Код, который я разместил в Обновление 1 , верен.Пинг для ECE не работал по двум причинам:

  1. Сертификат должен включать в себя полную цепочку, включая корневой CA, промежуточный CA и сертификат для ECE.Это помогло избавиться от всего материала trustfulSslContext.

  2. ECE сидел за ha-proxy, и прокси выполнял сопоставление имени хоста в HTTP-запросе с фактическим именем кластера развертывания.в ЕЭК.эта логика сопоставления не учитывала, что клиент высокого уровня Java REST использует класс org.apache.httphost, который создает имя хоста в качестве имени хоста: номер_порта, даже когда номер порта равен 443. Поскольку он не нашел сопоставление из-за 443поэтому ЕЭК вернула ошибку 404 вместо 200 в порядке (единственный способ найти это - посмотреть на незашифрованные пакеты на ha-proxy).Как только логика сопоставления в ha-proxy была исправлена, сопоставление было найдено, и эхо-запросы теперь успешны.

...