Чтение вложенных данных из ElasticSearch через Spark Scala - PullRequest
1 голос
/ 28 февраля 2020

Я пытаюсь прочитать данные из Elasticsearch через Spark Scala:

Scala 2.11.8, Spark 2.3.0, Elasticsearch 5.6.8

Для подключения - spark2-shell --jars elasticsearch-spark-20_2.11-5.6.8.jar

val df = spark.read.format("org.elasticsearch.spark.sql").option("es.nodes", "xxxxxxx").option("es.port", "xxxx").option("es.net.http.auth.user","xxxxx").option("spark.serializer", "org.apache.spark.serializer.KryoSerializer").option("es.net.http.auth.pass", "xxxxxx").option("es.net.ssl", "true").option("es.nodes.wan.only", "true").option("es.net.ssl.cert.allow.self.signed", "true").option("es.net.ssl.truststore.location", "xxxxx").option("es.net.ssl.truststore.pass", "xxxxx").option("es.read.field.as.array.include","true").option("pushdown", "true").option("es.read.field.as.array.include","a4,a4.a41,a4.a42,a4.a43,a4.a43.a431,a4.a43.a432,a4.a44,a4.a45").load("<index_name>") 

Схема, как показано ниже

 |-- a1: string (nullable = true)
 |-- a2: string (nullable = true)
 |-- a3: struct (nullable = true)
 |    |-- a31: integer (nullable = true)
 |    |-- a32: struct (nullable = true)
 |-- a4: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a41: string (nullable = true)
 |    |    |-- a42: string (nullable = true)
 |    |    |-- a43: struct (nullable = true)
 |    |    |    |-- a431: string (nullable = true)
 |    |    |    |-- a432: string (nullable = true)
 |    |    |-- a44: string (nullable = true)
 |    |    |-- a45: string (nullable = true)
 |-- a8: string (nullable = true)
 |-- a9: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a91: string (nullable = true)
 |    |    |-- a92: string (nullable = true)
 |-- a10: string (nullable = true)
 |-- a11: timestamp (nullable = true)

Хотя я могу читать данные из прямых столбцов и вложенной схемы уровня 1 (т.е. столбцы a9 или a3) с помощью команды:

df.select(explode($"a9").as("exploded")).select("exploded.*").show

Возникает проблема, когда я пытаюсь прочитать элементы a4, так как это выдает мне ошибку ниже:

    [Stage 18:>                                                         (0 + 1) / 1]20/02/28 02:43:23 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 18.0 (TID 54, xxxxxxx, executor 12): scala.MatchError: Buffer() (of class scala.collection.convert.Wrappers$JListWrapper)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:276)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:275)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:241)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:164)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
        at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
        at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

20/02/28 02:43:23 ERROR scheduler.TaskSetManager: Task 0 in stage 18.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 57, xxxxxxx, executor 12): scala.MatchError: Buffer() (of class scala.collection.convert.Wrappers$JListWrapper)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:276)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:275)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:241)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)

Что-то я делаю неправильно или какие-то действия мне не хватает? Пожалуйста, помогите

1 Ответ

1 голос
/ 28 февраля 2020

В глубине души, эта ошибка возникает, когда схема, угаданная соединителем spark / ElasticSearch, фактически не совместима с читаемыми данными.

Имейте в виду, что ES не имеет схемы и Spark SQL имеет "жесткую" схему . Преодоление этого пробела не всегда возможно, поэтому все это просто лучшее усилие.

При соединении двух соединитель выбирает документы и пытается угадать схему: «поле A - это строка, поле B - это структура объекта с двумя подполями: B.1 является датой, а B.2 является массивом строк, ... что угодно ".

Если он угадан (обычно: данный столбец / подколонка угадывается как Будучи строкой, но в некоторых документах это на самом деле массив или число), преобразование JSON в Spark SQL выдает такие ошибки.

В словах документация гласит:

Elasticsearch обрабатывает поля с одним или несколькими значениями одинаково; на самом деле, сопоставление не дает никакой информации об этом. Как клиент, это означает, что никто не может сказать, является ли поле однозначным или нет, пока не будет прочитано. В большинстве случаев это не является проблемой, иasticsearch-had oop автоматически создает необходимый список / массив на лету. Однако в средах со строгой схемой, такой как Spark SQL, изменение фактического значения поля по сравнению с его объявленным типом не допускается. Хуже того, эта информация должна быть доступна еще до чтения данных. Так как сопоставление не является достаточно убедительным, иметь в наличии FlexibleSearch oop позволяет пользователю указывать дополнительную информацию через информацию о полях, в частности, es.read.field.as.array.include и es.read.field.as.array.exclude .

Поэтому я бы рекомендовал вам проверить, что схема, о которой вы сообщили в своем вопросе (схема, предполагаемая Spark), действительно действительна для всех ваших документов или нет.

Если это не так, у вас есть несколько вариантов на будущее:

  1. Исправьте сопоставление индивидуально. Если проблема связана с типом массива, который не распознается как таковой, вы можете сделать это, используя опции конфигурации . Вы можете увидеть параметр es.read.field.as.array.include (соответственно .exclude) (который используется, чтобы активно сообщать Spark, какие свойства в документах являются массивом (соответственно, не массивом). Если поле не используется, es.read.field.exclude - это параметр, который исключит данное поле из Spark в целом, минуя возможную схему для него.

  2. Если нет способа предоставить допустимую схему для всех случаев в ElasticSearch (например, некоторые поля иногда число, иногда строка, и нет никакого способа сказать), тогда, по сути, вы застряли, чтобы вернуться на уровень RDD (и, если необходимо, go вернуться к Dataset / Dataframe, как только схема будет хорошо определена) .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...