У меня есть кластер ES, из которого я хотел бы извлечь все идентификаторы сеанса (настраиваемое поле) с помощью Spark и записать их в виде одного столбца Dataframe в некоторые файлы Parquet.
Я создал Dataframe и хотел проверить с помощью take(1)
. План:
= Physical Plan ==
CollectLimit 1
+- *Scan ElasticsearchRelation(Map(
es.net.http.auth.user -> app,
es.net.ssl.cert.allow.self.signed -> true,
es.query -> {"query": {"bool": {"filter": [{"range": {"eventTime": {"lt": "2017-12-01 00:00:00", "gte": "2019-05-15 00:00:00", "time_zone": "UTC", "format": "yyyy-MM-dd HH:mm:ss"}}}]}}},
es.net.http.auth.pass -> 123456,
es.read.field.include -> sessionId,
es.port -> 9200,
es.resource -> sessions/session,
es.nodes -> 192.168.150.12,
es.net.ssl -> true),
org.apache.spark.sql.SQLContext@121e4c9e,None) [sessionId#0] ReadSchema: struct<sessionId:string>
Это постоянно приводит к сбою моей JVM.
Однако, когда я немного изменяю диапазон дат, он работает нормально. то есть вместо того, чтобы брать от 2017/12/01
до 2019/05/15
, диапазон от 2017/12/31
до 2019/05/15
, кажется, работает нормально.
Поскольку я использую предложение limit
, я думаю, диапазон дат не должен иметь значения. Тем не менее, я проверил план, чтобы увидеть, возможно, он как-то изменился:
= Physical Plan ==
CollectLimit 1
+- *Scan ElasticsearchRelation(Map(
es.net.http.auth.user -> app,
es.net.ssl.cert.allow.self.signed -> true,
es.query -> {"query": {"bool": {"filter": [{"range": {"eventTime": {"lt": "2017-12-31 00:00:00", "gte": "2019-05-15 00:00:00", "time_zone": "UTC", "format": "yyyy-MM-dd HH:mm:ss"}}}]}}},
es.net.http.auth.pass -> 123456,
es.read.field.include -> sessionId,
es.port -> 9200,
es.resource -> sessions/session,
es.nodes -> 192.168.150.12,
es.net.ssl -> true),
org.apache.spark.sql.SQLContext@121e4c9e,None) [sessionId#0] ReadSchema: struct<sessionId:string>
Выглядит так же (с измененными датами).
Я ожидаю, что это не имеет значения, но я добавлю, что объем данных в дополнительном временном интервале (от 2017-12-01
до 2017-12-31
) в значительной степени распределен аналогично остальной части диапазона дат. Я также хотел убедиться, что нет проблем с данными в этом конкретном диапазоне, поэтому я также выполнил аналогичный запрос в диапазоне дат от 2017-12-01
до 2017-12-31
, и он также был успешным.
Что я делаю не так? Почему объем данных должен влиять в этом случае (когда я использую take(1)
)?
Я использую PySpark, ElasticSearch 6 и последнюю версию драйвера / разъема для ES-spark (org.elasticsearch:elasticsearch-spark-20_2.11:6.7.2
).