Как подключить PySpark к Elasticsearch с SSL и проверить, установлены ли сертификаты на False? - PullRequest
0 голосов
/ 07 февраля 2019

Ранее я успешно подключился к кластеру Elasticsearch напрямую из Python со следующим кодом:

ssl_context = create_ssl_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

es = Elasticsearch(
    ES_HOST,
    http_auth=(ES_USERNAME, ES_PASSWORD),
    scheme="https",
    port=ES_PORT,
    use_ssl=True,
    verify_certs=False,
    ssl_context=ssl_context,
    ca_certs=False
)

Теперь я пытаюсь подключиться к тому же кластеру с помощью соединителя Pyspark to Elasticsearch.Spark был настроен с версиями 2.4.0 и Hadoop 2.7.Я использую elasticsearch-hadoop-6.1.1 для соединения двух.

Я использую следующую конфигурацию для соединения PySpark с ES:

es_live_conf = {

    "es.nodes" : ES_HOST,

    "es.port" : ES_PORT,

    "es.resource" : 'testindex/testdoc',

    "es.net.http.auth.user" : ES_USERNAME,

    "es.net.http.auth.pass" : ES_PASSWORD,

    "es.net.ssl":"true",

    "es.nodes.resolve.hostname": "false",

    "es.net.ssl.cert.allow.self.signed": "true"

}

Затем этот код для активации соединения:

sc = SparkContext(appName="PythonSparkStreaming")  
sc.setLogLevel("WARN")

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_live_conf)

Я получаю следующую ошибку при включенном ведении журнала TRACE для повторного вызова:

19/02/07 07:00:31 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:31 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:31 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:31 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:31 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:31 TRACE CommonsHttpTransport:Tx [HEAD]@[xx.xx.xx.xx:xxxxx][testindex]?[null] w/ payload [null]
19/02/07 07:00:31 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:31 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
19/02/07 07:00:35 TRACE CommonsHttpTransport: Rx @[xx.xx.xx.xx] [404-Not Found] [null]
19/02/07 07:00:35 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:35 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:35 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:35 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:35 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:35 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:35 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][]?[null] w/ payload [null]
19/02/07 07:00:35 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:35 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
19/02/07 07:00:39 TRACE CommonsHttpTransport: Rx @[yyy.yy.y.y] [200-OK] [{
  "name" : "iad1esapp2vz742",
  "cluster_name" : "68fc89bc2c36e7188782e4f226ed3948",
  "cluster_uuid" : "7xk6py25R_mRgLgLTYeavg",
  "version" : {
    "number" : "5.6.5",
    "build_hash" : "6a37571",
    "build_date" : "2017-12-04T07:50:10.466Z",
    "build_snapshot" : false,
    "lucene_version" : "6.6.1"
  },
  "tagline" : "You Know, for Search"
}
]
19/02/07 07:00:39 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:39 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:39 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:39 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:39 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:39 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:39 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][_nodes/http]?[null] w/ payload [null]
19/02/07 07:00:39 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:39 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
19/02/07 07:00:43 TRACE CommonsHttpTransport: Rx @[yyy.yy.y.y] [200-OK] [{"_nodes":{"total":14,"successful":14,"failed":0},"cluster_name":"68fc89bc2c36e7188782e4f226ed3948","nodes":{"VJl4scTeQbuPmnXcsb5MOA":{"name":"iad1esdn20vz166","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn20"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"MEq2hOGkSdeBMldoBiv2AQ":{"name":"iad1esdn23vz187","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn23"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"AHDBAMeQTg2LMyhvnxYxeQ":{"name":"iad1esmst4vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst4"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"nJjtbzRuTP2Bkp9E0EOLBw":{"name":"iad1esapp0vz755","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp0"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"5PrI09xWQRuNHPPUYCKOcQ":{"name":"iad1esdn31vz12","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn31"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"f1l6borzQt6-d_QEy9hY9Q":{"name":"iad1esapp1vz782","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp1"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"bO6AoGXFSgGzrISe-pCW8g":{"name":"iad1esdn21vz99","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn21"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"z7UA-JHlQ7SCNyUYfbwR1A":{"name":"iad1esmst5vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst5"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"53RAotd0QbiE28swz3fyOg":{"name":"iad1esapp2vz742","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp2"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"5Lje4zCgSryonTouZMBqyA":{"name":"iad1esdn22vz156","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn22"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"YHFxFv5SQzCt9OesKD_a-g":{"name":"iad1esapp3vz748","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp3"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"yZOAHjgoSz-Qi_eNO7HHxg":{"name":"iad1esmst3vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst3"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"kg9NYEJiRy60InOxIh796A":{"name":"iad1esdn1vz163","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn1"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"PvQ0PobMTaSNnzw0l03z4A":{"name":"iad1esdn13vz80","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn13"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}}}}]
19/02/07 07:00:43 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:43 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:43 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:43 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:43 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:43 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:43 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][_nodes/http]?[null] w/ payload [null]
19/02/07 07:00:43 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:43 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
[I 07:01:55.454 NotebookApp] Saving file at /work/sth-baseline.ipynb
[W 07:01:55.455 NotebookApp] Notebook work/sth-baseline.ipynb is not trusted
19/02/07 07:05:48 TRACE NetworkClient: Caught exception while performing request [xx.xx.xx.xx:xxxxx][_nodes/http] - falling back to the next node in line...
java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:666)
    at sun.security.ssl.SSLSocketImpl.<init>(SSLSocketImpl.java:471)
    at sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:153)
    at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSocket(SSLSocketFactory.java:129)
    at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
    at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
    at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
    at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
    at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
    at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:478)
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:112)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:466)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:430)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:155)
    at org.elasticsearch.hadoop.rest.RestClient.getHttpNodes(RestClient.java:112)
    at org.elasticsearch.hadoop.rest.RestClient.getHttpDataNodes(RestClient.java:129)
    at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:157)
    at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:223)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:405)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:386)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:130)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1343)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
    at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:239)
    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:302)
    at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
19/02/07 07:05:48 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:05:48 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:05:48 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:05:48 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:05:48 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:05:48 ERROR NetworkClient: Node [xx.xx.xx.xx:xxxxx] failed (Connection refused (Connection refused)); selected next node [xx.xx.xx.xx:xxxxx]

Следует отметить, что тот же код, использующий конфигурацию, которая подключается к локальному кластеру ES, работаетуспешно.Таким образом, ошибка связана исключительно с SSL-соединением с удаленным кластером.Я могу получить доступ к кластеру из клиентской среды с помощью команды curl --insecure --user ES_USER:ES_PASS -XGET 'https://ES_HOST:ES_PORT/' - так что, похоже, клиент должен иметь возможность подключения.

Как настроить соединение Spark ES с правильными данными ssl?Я попытался удалить свойства es.net.ssl.cert.allow.self.signed и es.nodes.resolve.hostname в конфигурации, но все еще получаю ту же ошибку.

1 Ответ

0 голосов
/ 07 февраля 2019

Хорошо, разобрался.

Подробный ответ здесь: https://www.elastic.co/guide/en/elasticsearch/hadoop/master/cloud.html

Вкратце, как видно из опубликованных журналов, клиент может подключиться к кластеру ивернуть список узлов.Получив этот список, он пытается подключиться к ним, но все они недоступны в частной сети.По сути, вам необходимо отключить возможность прямого подключения к узлам кластера, хотя отмечается, что это может повлиять на производительность.Настройка для этого - "es.nodes.wan.only": "true", хотя я также установил "es.nodes.discovery": "false", чтобы выяснить это, поэтому настройка обнаружения, вероятно, не нужна, но я оставлю ее здесь на случай, если она будет полезна.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...