Как экспортировать все данные из индекса эластичного поиска в файл в формате JSON с указанным полем _id? - PullRequest
1 голос
/ 09 июля 2019

Я новичок как в Spark, так и в Scala.Я пытаюсь прочитать все данные из определенного индекса в Elastic Search в RDD и использовать эти данные для записи в базу данных Mongo.

Я загружаю данные поиска Elastic в esJsonRDD, и когда я пытаюсьраспечатайте содержимое СДР в следующем формате:

(1765770532{"FirstName":ABC,"LastName":"DEF",Zipcode":"36905","City":"PortAdam","StateCode":"AR"})

Ожидаемый формат,

{_id:"1765770532","FirstName":ABC,"LastName":"DEF",Zipcode":"36905","City":"PortAdam","StateCode":"AR"}

Как получить результат от упругого поиска, чтобыотформатирован таким образом?.

Буду признателен за любую помощь.

Данные, полученные в результате упругого поиска, имеют следующий формат:

(1765770532{"FirstName":ABC,"LastName":"DEF",Zipcode":"36905","City":"PortAdam","StateCode":"AR"})

Ожидаемый формат:,

{_ id: "1765770532", "FirstName": ABC, "LastName": "DEF", почтовый индекс ":" 36905 "," City ":" PortAdam "," StateCode ":"AR"}

    object readFromES {

    def main(args: Array[String]) {

        val conf = new SparkConf().setAppName("readFromES")
        .set("es.nodes", Config.ES_NODES)
        .set("es.nodes.wan.only", Config.ES_NODES_WAN_ONLY)
        .set("es.net.http.auth.user", Config.ES_NET_HTTP_AUTH_USER)
        .set("es.net.http.auth.pass", Config.ES_NET_HTTP_AUTH_PASS)
        .set("es.net.ssl", Config.ES_NET_SSL)
        .set("es.output.json","true")

        val sc = new SparkContext(conf)
        val RDD =  EsSpark.esJsonRDD(sc, "userdata/user")
        //RDD.coalesce(1).saveAsTextFile(args(0))
        RDD.take(5).foreach(println)
        }
       }

Я бы хотел, чтобы вывод RDD был записан в файл в следующем формате JSON (одна строка на документ),

{_id:"1765770532","FirstName":ABC,"LastName":"DEF",Zipcode":"36905","City":"PortAdam","StateCode":"AR"}
{_id:"1765770533","FirstName":DEF,"LastName":"DEF",Zipcode":"35525","City":"PortWinchestor","StateCode":"AI"}

1 Ответ

0 голосов
/ 09 июля 2019

"_id" является частью метаданных, для доступа к ним необходимо добавить .config("es.read.metadata", true) в конфигурацию.

Тогда вы можете получить к нему доступ двумя способами, вы можете использовать

val RDD =  EsSpark.esJsonRDD(sc, "userdata/user") 

и вручную добавьте поле _id в json

Или проще читать как фрейм данных

val df = spark.read
  .format("org.elasticsearch.spark.sql")
  .load("userdata/user")
  .withColumn("_id", $"_metadata".getItem("_id"))
  .drop("_metadata")

// Записать как json в файл

df.write.json("output folder ")

Здесь искра - это сеанс искры, созданный как

val spark = SparkSession.builder().master("local[*]").appName("Test")
  .config("spark.es.nodes","host")
  .config("spark.es.port","ports")
  .config("spark.es.nodes.wan.only","true")
  .config("es.read.metadata", true) //for enabling metadata
  .getOrCreate()

Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...