Невозможно подключиться кasticsearch из pyspark, но может сделать это из Hive - PullRequest
9 голосов
/ 19 сентября 2019

Я использую приведенный ниже фрагмент для подключения и загрузки данных из Hive вasticsearch (v 6.2) без каких-либо проблем.

ADD JAR file:///<>/elasticsearch-hadoop-hive-6.2.2.jar;
ADD FILE file:///<>/mycerts.jks;

CREATE EXTERNAL TABLE if not exists my_db.my_es_table
(
col1 int,
col2 string,
col3 string,
col4 timestamp,
key_id string
)
COMMENT 'data into ES'
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'index1/type1',
'es.index.auto.create'='true',
'es.nodes'='<vip_name>',
'es.port'='9200',
'es.net.http.auth.user'='<user>',
'es.net.http.auth.pass'='pwd',
'es.net.ssl.protocol'='SSL',
'es.net.ssl'='TRUE',
'es.net.ssl.truststore.location'='mycerts.jks',
'es.net.ssl.truststore.pass'='<pwd>',
'es.mapping.id'='key_id'
);

INSERT OVERWRITE TABLE my_db.my_es_table
SELECT
col1,
col2,
col3,
col4,
key_id
FROM my_db.stagging_data;

Но, когда я пытаюсь перенести один и тот же фрагмент в py-spark, онвыдает исключения

   org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Expected to find keystore file at [file:///<path>/mycerts.jks] but was unable to. Make sure that it is available on the classpath, or if not, that you have specified a valid URI

Ниже приведен фрагмент кода, который я пробовал для spark

df_delta=sqlContext.table('my_db.stagging_data')
status=df_delta.rdd.map(lambda row:(None,row.asDict())).saveAsNewAPIHadoopFile(path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",keyClass="org.apache.hadoop.io.NullWritable",valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",conf={'es.resource' : 'index1/type1','es.index.auto.create':'true','es.nodes':'<vip_name>','es.port':'9200','es.net.http.auth.user':'<user>','es.net.http.auth.pass':'<pwd>','es.net.ssl':'true','es.net.ssl.truststore.location':'file:///<path>/mycerts.jks','es.net.ssl.truststore.pass':'<pwd>','es.mapping.id' : 'key_id'})

Я вызываю оболочку с помощью команды ниже -

pyspark --jars <path>/elasticsearch-spark-20_2.11-6.2.2.jar --py-files <path>/mycerts.jks

Ниже я добавляю весь журнал

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Expected to find keystore file at [file:///<path>/mycerts.jks] but was unable to. Make sure that it is available on the classpath, or if not, that you have specified a valid URI.
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.loadKeyStore(SSLSocketFactory.java:193)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.loadTrustManagers(SSLSocketFactory.java:224)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSSLContext(SSLSocketFactory.java:171)
        ... 31 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1609)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1597)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1596)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1596)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1830)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1779)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1768)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
        at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
        ... 26 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:155)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cannot initialize SSL - Expected to find keystore file at [file:///<path>/mycerts.jks] but was unable to. Make sure that it is available on the classpath, or if not, that you have specified a valid URI.
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSSLContext(SSLSocketFactory.java:173)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.getSSLContext(SSLSocketFactory.java:158)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSocket(SSLSocketFactory.java:127)
        at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
        at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
        at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
        at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
        at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
        at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:478)
        at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:112)
        at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:380)
        at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:344)
        at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:348)
        at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:158)
        at org.elasticsearch.hadoop.rest.RestClient.getHttpNodes(RestClient.java:115)
        at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:92)
        at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:579)
        at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.init(EsOutputFormat.java:173)
        at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.write(EsOutputFormat.java:149)
        at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:356)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:130)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1413)
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139)
        ... 8 more
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Expected to find keystore file at [file:///<path>/mycerts.jks] but was unable to. Make sure that it is available on the classpath, or if not, that you have specified a valid URI.
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.loadKeyStore(SSLSocketFactory.java:193)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.loadTrustManagers(SSLSocketFactory.java:224)
        at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSSLContext(SSLSocketFactory.java:171)
        ... 31 more

Я могу прочитать и распечатать файл jks после подключения к py-spark.Невозможно решить эту проблему.Может кто-нибудь, пожалуйста, предложите.

Ответы [ 3 ]

3 голосов
/ 22 сентября 2019

Я думаю, что вы используете неправильную опцию

Для Python вы можете использовать --py-files аргумент spark-submit для добавления .py, .zip или .egg файлов будет распространяться вместе с вашим приложением

Вместо этого вы хотите --files

--files FILES: Comma-separated list of files to be placed in the working directory of each executor. File paths of these files in executors can be accessed via SparkFiles.get(fileName)

Чтобы разместить его надругой путь внутри исполнителя, тогда вы можете использовать # разделитель

spark-submit ... --files mycerts.jks#/<path>/mycerts.jks
0 голосов
/ 24 сентября 2019

Как указано выше в cricket_007, вы используете неверную опцию --py-files.Вместо этого используйте опцию --files для загрузки файлов сертификата.

Кроме того, эти файлы загружаются не в локальной файловой системе этих исполнителей, а в HDFS.Таким образом, путь, который вы передаете для файла сертификата в вашем коде искры, также неверен, поскольку он указывает на локальную файловую систему.

'es.net.ssl.truststore.location':'file:///<path>/mycerts.jks'

Вы можете использовать опцию # разделитель с файлами в вашей команде spark submitзагрузите файл на указанный в HDFS

spark-submit ... --files mycerts.jks#/<path>/mycerts.jks

, а затем используйте тот же путь в вашем искровом коде для доступа к файлу.

'es.net.ssl.truststore.location':'/<path>/mycerts.jks'
0 голосов
/ 23 сентября 2019

Судя по вашему стековому сообщению, кажется, что <path явно неверен.Ваше значение path после --jars и --py-files должно быть точным, но, увы, на данный момент это не так.

Ожидается найти файл хранилища ключей в [file: ////mycerts.jks], но не удалось.Убедитесь, что он доступен в classpath или, если нет, указан правильный URI.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...