Как прочитать 1M записей из Elasticsearch в PySpark? - PullRequest
0 голосов
/ 29 января 2020

У меня проблема с чтением данных из Elasticsearch в кластер Spark (я использую среду Zeppelin, поэтому все параметры подключения настраиваются в настройках интерпретатора Zeppelin).

Сначала я попытался прочитать это с PySpark:

%pyspark
from pyspark.sql import SQLContext
from pyspark.sql import functions as F

df = spark.read.format("org.elasticsearch.spark.sql").load("index")
df = df.limit(100).drop('tags').drop('a.b')
# if 'tags' field is not dropped, pyspark cannot map scala field and throws an exception.
# If the limit is not set, pyspark will probably try to get the whole index at once
# if "a.b" is not dropped, the dot in the field name causes mapping error: https://github.com/elastic/elasticsearch-hadoop/issues/853

df = df.cache()
z.show(df)

К сожалению, в этом случае у меня много проблем с отображением. Потому что у меня есть много полей, содержащих точки в наборе данных, я решил дать Scala попытку прочитать данные (чтобы потом обработать их в PySpark):

%spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._
import org.apache.spark.sql.SQLContext
import org.elasticsearch.spark
import org.elasticsearch.spark.sql
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoder

val conf = new SparkConf()

conf.set("spark.es.mapping.date.rich", "false");
conf.set("spark.serializer", classOf[KryoSerializer].getName)

val EsReadRDD = sc.esRDD("index")

Однако даже с Scala Я могу получить только небольшое количество записей, например

EsReadRDD.take(10).foreach(println)

. По какой-то причине метод collect () не работает:

val esdf = EsReadRDD.collect() //does not work probably because data are too large

Ошибка:

Job aborted due to stage failure: Task 0 in stage 833.0 failed 4 times, most recent failure: Lost task 0.3 in stage 833.0 (TID 479, 10.10.11.37, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

Я также пробовал преобразование в DF, но получаю сообщение об ошибке:

val esdf = EsReadRDD.toDF()

java.lang.UnsupportedOperationException: No Encoder found for scala.AnyRef
- map value class: "java.lang.Object"
- field (class: "scala.collection.Map", name: "_2")
- root class: "scala.Tuple2"

У вас есть идеи, как с этим справиться?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...