Сбой Spark JVM при дубле (1) из кластера ES - PullRequest
0 голосов
/ 16 мая 2019

У меня есть кластер ES, из которого я хотел бы извлечь все идентификаторы сеанса (настраиваемое поле) с помощью Spark и записать их в виде одного столбца Dataframe в некоторые файлы Parquet.

Я создал Dataframe и хотел проверить с помощью take(1). План:

= Physical Plan ==
CollectLimit 1
+- *Scan ElasticsearchRelation(Map(
es.net.http.auth.user -> app,
es.net.ssl.cert.allow.self.signed -> true,
es.query -> {"query": {"bool": {"filter": [{"range": {"eventTime": {"lt": "2017-12-01 00:00:00", "gte": "2019-05-15 00:00:00", "time_zone": "UTC", "format": "yyyy-MM-dd HH:mm:ss"}}}]}}}, 
es.net.http.auth.pass -> 123456, 
es.read.field.include -> sessionId, 
es.port -> 9200, 
es.resource -> sessions/session, 
es.nodes -> 192.168.150.12,
es.net.ssl -> true),
org.apache.spark.sql.SQLContext@121e4c9e,None) [sessionId#0] ReadSchema: struct<sessionId:string>

Это постоянно приводит к сбою моей JVM.

Однако, когда я немного изменяю диапазон дат, он работает нормально. то есть вместо того, чтобы брать от 2017/12/01 до 2019/05/15, диапазон от 2017/12/31 до 2019/05/15, кажется, работает нормально.

Поскольку я использую предложение limit, я думаю, диапазон дат не должен иметь значения. Тем не менее, я проверил план, чтобы увидеть, возможно, он как-то изменился:

= Physical Plan ==
CollectLimit 1
+- *Scan ElasticsearchRelation(Map(
es.net.http.auth.user -> app,
es.net.ssl.cert.allow.self.signed -> true,
es.query -> {"query": {"bool": {"filter": [{"range": {"eventTime": {"lt": "2017-12-31 00:00:00", "gte": "2019-05-15 00:00:00", "time_zone": "UTC", "format": "yyyy-MM-dd HH:mm:ss"}}}]}}}, 
es.net.http.auth.pass -> 123456, 
es.read.field.include -> sessionId, 
es.port -> 9200, 
es.resource -> sessions/session, 
es.nodes -> 192.168.150.12,
es.net.ssl -> true),
org.apache.spark.sql.SQLContext@121e4c9e,None) [sessionId#0] ReadSchema: struct<sessionId:string>

Выглядит так же (с измененными датами).

Я ожидаю, что это не имеет значения, но я добавлю, что объем данных в дополнительном временном интервале (от 2017-12-01 до 2017-12-31) в значительной степени распределен аналогично остальной части диапазона дат. Я также хотел убедиться, что нет проблем с данными в этом конкретном диапазоне, поэтому я также выполнил аналогичный запрос в диапазоне дат от 2017-12-01 до 2017-12-31, и он также был успешным.

Что я делаю не так? Почему объем данных должен влиять в этом случае (когда я использую take(1))?

Я использую PySpark, ElasticSearch 6 и последнюю версию драйвера / разъема для ES-spark (org.elasticsearch:elasticsearch-spark-20_2.11:6.7.2).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...