Кто-нибудь был в состоянии использоватьasticsearch xpack sql со Spark? - PullRequest
0 голосов
/ 31 января 2019

Используя PySpark, я пытаюсь прочитать данные изasticsearch.Обычно я устанавливаю запрос на что-нибудь в строке (см. Запрос ниже) и задаю для индекса es.resource индекс, такой как «my_index / doc», и я могу читать данные в spark:

q ="""{
          "query": {
              "match_all": {}
          }  
      }"""

Однако недавно я попробовал _xpack / sql с kibana и JDBC с другими клиентами SQL, и они очень хорошо работают для получения данных.Однако когда я пытаюсь сослаться на _xpack в своем коде pyspark, я получаю следующую ошибку:

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: 
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: 
invalid_index_name_exception: Invalid index name [_xpack], must not start with '_'.
null

Кто-нибудь пробовал использовать _xpack или знает, как выполнять SQL-запросы Elasticsearch из плагина Elasticsearch hadoop?

Ниже вы найдете отрывок из моего кода, который я пытаюсь использовать для выполнения через pyspark, заранее спасибо!

q = """{"query": "select * from eg_flight limit 1"}"""

es_read_conf = {
    "es.nodes" : "192.168.1.71,192.168.1.72,192.168.1.73",
    "es.port" : "9200",
    "es.resource" :  "_xpack/sql",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

1 Ответ

0 голосов
/ 31 января 2019

Я не думаю, что эта функция поддерживается.Альтернативным решением в PySpark было бы использование драйвера JDBC, который я попробовал.Я попробовал следующее:

es_df = spark.read.jdbc(url="jdbc:es://http://192.168.1.71:9200", table = "(select * from eg_flight) mytable")

и получил следующую ошибку:

Py4JJavaError: An error occurred while calling o2488.jdbc.
: java.sql.SQLFeatureNotSupportedException: Found 1 problem(s)
line 1:8: Unexecutable item

...

В качестве альтернативы можно было бы сделать это с использованием ядра Python и запросить его как таковой, но я не будурекомендую его для больших наборов данных.

import requests as r
import json


es_template = {
    "query": "select * from eg_flight"
}

es_link = "http://192.168.1.71:9200/_xpack/sql"
headers = {'Content-type': 'application/json'}


if __name__ == "__main__":

    load = r.post(es_link, data=json.dumps(es_template), headers=headers)
    if load.status_code == 200:
        load = load.json()
        #do something with it
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...