Моя конечная цель - использовать PySpark для эффективной индексации большого объема данных в Elasticsearch (ES), а затем выполнить огромное количество запросов к индексу и записать статистику по результатам.
Elasticsearch version 5.6.5
Spark version 2.4.0
Hadoop version 2.7
Elasticsearch-Hadoop python library version: 6.6.0
Рассмотрим следующий код:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
# create our Spark Context
sc_conf = SparkConf().setAll((
("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
))
sc_conf.setAppName("PythonSparkStreaming")
sc = SparkContext(conf=sc_conf)
sqlContext = SQLContext(sc)
q ="""{
"query": {
"match_all": {}
}
}"""
es_live_conf["es.query"] = q
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_live_conf)
sqlContext.createDataFrame(es_rdd).limit(1).collect()
Я просто пытаюсь выполнить запрос на сопоставление всех по индексу, и мне нужен только лучший результат.Я попытался выразить ограничение в запросе ES, но, видимо, Spark игнорирует это, поэтому вместо этого я выразил его с помощью фильтра фрейма данных.
Я настроил Spark следующим образом:
es_live_conf = {
# specify the node that we are sending data to (this should be the master)
"es.nodes" : 'xxx.xxxx.com',
# specify the port in case it is not the default port
"es.port" : ES_PORT,
# specify a resource in the form 'index/doc-type'
"es.resource" : 'xxxxxx/document',
"es.net.http.auth.user" : ES_USERNAME,
"es.net.http.auth.pass" : ES_PASSWORD,
"es.net.ssl":"true",
"es.net.ssl.cert.allow.self.signed": "true",
"es.nodes.discovery": "false",
"es.nodes.wan.only": "true",
"es.index.read.missing.as.empty": "true",
}
Я получаю доступ к кластеру ES за VPC, поэтому у меня есть доступ только к клиентским узлам и ни к одному из внутренних узлов данных и т. Д.Вот почему для wan.only
установлено значение true.
При такой настройке Spark, по-видимому, запрашивает каждый отдельный узел с полным соответствием, а затем в конечном итоге сводится к единственному результату, который я на самом деле хочу.Он невероятно медленный (50 осколков, 30 миллионов документов) и полностью исключает способность ES эффективно сокращать результаты для каждого узла.Даже если я изменю запрос на поиск по отдельному идентификатору документа, он запускает запрос для каждого отдельного сегмента через главный узел, указывая конкретный идентификатор сегмента при каждом вызове.Я попытался установить es.nodes.client.only
в значение true, но это жалуется, что настройка конфликтует с wan.only
.Если я включу client.only
и отключу wan.only
, я больше не смогу подключиться к кластеру, потому что он пытается напрямую подключиться к каждому узлу, который недоступен.
Что я здесь не так делаю?Как использовать PySpark для запуска запроса к ES один раз, а не один раз для каждого шарда.Кроме того, как мне использовать такие вещи, как from
, size
и rescore
в моих запросах, если PySpark все равно пытается выполнить полный запрос на каждом фрагменте, а затем, по-видимому, после этого обрабатывает результаты?