Как выполнить запрос к Elasticsearch, используя PySpark, не запрашивая каждый узел? - PullRequest
0 голосов
/ 13 февраля 2019

Моя конечная цель - использовать PySpark для эффективной индексации большого объема данных в Elasticsearch (ES), а затем выполнить огромное количество запросов к индексу и записать статистику по результатам.

Elasticsearch version 5.6.5
Spark version 2.4.0
Hadoop version 2.7
Elasticsearch-Hadoop python library version: 6.6.0

Рассмотрим следующий код:

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

# create our Spark Context  
sc_conf = SparkConf().setAll((
    ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
))
sc_conf.setAppName("PythonSparkStreaming")

sc = SparkContext(conf=sc_conf)

sqlContext = SQLContext(sc)

q ="""{
  "query": {
    "match_all": {}
  }  
}"""

es_live_conf["es.query"] = q

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_live_conf)

sqlContext.createDataFrame(es_rdd).limit(1).collect()

Я просто пытаюсь выполнить запрос на сопоставление всех по индексу, и мне нужен только лучший результат.Я попытался выразить ограничение в запросе ES, но, видимо, Spark игнорирует это, поэтому вместо этого я выразил его с помощью фильтра фрейма данных.

Я настроил Spark следующим образом:

es_live_conf = {

# specify the node that we are sending data to (this should be the master)
"es.nodes" : 'xxx.xxxx.com',

# specify the port in case it is not the default port
"es.port" : ES_PORT,

# specify a resource in the form 'index/doc-type'
"es.resource" : 'xxxxxx/document',

"es.net.http.auth.user" : ES_USERNAME,

"es.net.http.auth.pass" : ES_PASSWORD,

"es.net.ssl":"true",

"es.net.ssl.cert.allow.self.signed": "true",

"es.nodes.discovery": "false",

"es.nodes.wan.only": "true",

"es.index.read.missing.as.empty": "true",

}

Я получаю доступ к кластеру ES за VPC, поэтому у меня есть доступ только к клиентским узлам и ни к одному из внутренних узлов данных и т. Д.Вот почему для wan.only установлено значение true.

При такой настройке Spark, по-видимому, запрашивает каждый отдельный узел с полным соответствием, а затем в конечном итоге сводится к единственному результату, который я на самом деле хочу.Он невероятно медленный (50 осколков, 30 миллионов документов) и полностью исключает способность ES эффективно сокращать результаты для каждого узла.Даже если я изменю запрос на поиск по отдельному идентификатору документа, он запускает запрос для каждого отдельного сегмента через главный узел, указывая конкретный идентификатор сегмента при каждом вызове.Я попытался установить es.nodes.client.only в значение true, но это жалуется, что настройка конфликтует с wan.only.Если я включу client.only и отключу wan.only, я больше не смогу подключиться к кластеру, потому что он пытается напрямую подключиться к каждому узлу, который недоступен.

Что я здесь не так делаю?Как использовать PySpark для запуска запроса к ES один раз, а не один раз для каждого шарда.Кроме того, как мне использовать такие вещи, как from, size и rescore в моих запросах, если PySpark все равно пытается выполнить полный запрос на каждом фрагменте, а затем, по-видимому, после этого обрабатывает результаты?

1 Ответ

0 голосов
/ 11 июля 2019

Я не смог найти способ решить эту проблему с помощью библиотеки ES Hadoop.Похоже, что он больше подходит для использования Spark, когда вам нужно выполнить очень длинный и очень сложный шаг сокращения результатов, возвращаемых одним запросом Elasticsearch, вместо того, чтобы запускать миллионы быстрых запросов ES и агрегировать результаты.Чтобы решить эту проблему, я использовал этот плагин: https://github.com/sourav-mazumder/Data-Science-Extensions/tree/master/spark-datasource-rest

На самом деле я разработал его так, чтобы каждое ядро ​​могло использовать несколько потоков для параллельного выполнения еще большего количества запросов.По сути, он не только позволяет DDOS кластеру ES, но и любой остальной конечной точке для любой платформы, которая может потребоваться вам для обработки и агрегирования большого объема запросов.

Если я смогу получить все ясно, яопубликовать многопоточную версию, которую я публично создал, на github.

...