Как использовать конкретный запрос ElasticSearch в соответствии с сообщением SparkStreaming-Kafka - PullRequest
0 голосов
/ 25 марта 2019

Я использую SparkStreaming-Kafka и хочу поддержать поиск в режиме реального времени ElasticSearch с помощью конкретного запроса, полученного из сообщения Kafka.
Код как ниже:

def creatingFuncTest():StreamingContext={

  val ssc = new StreamingContext(sc, Seconds(duration.toInt))
  ssc.checkpoint(checkpointDir)
  val kafkaParams = KafkaUtil.getKafkaParam(brokers, appName)

  val topics = actionTopicList.split(",").toSet

  val foodMessages = KafkaUtils
    .createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc,
    kafkaParams,
    topics
  )


  val foodBatch: DStream[(String,Float, Float)] =
    foodMessages
      .filter(_._2.nonEmpty)
      .map { msg =>
        try {
          println("___________ msg :" + msg._2)
          val gson = new Gson()
          val vo = gson.fromJson(msg._2, classOf[PoiMsg])
          (vo.person_id.toString, vo.latitude.toFloat,vo.longitude.toFloat)
        } catch {
          case e: Exception =>
            println("____________" + e.getMessage)
            ("", 0.0f,0.0f)
        }
      }
      .filter(_._1.nonEmpty)



  foodBatch.foreachRDD(row =>{
    row.foreach(t =>{
      var lat = t._2
      var lon = t._3


      val query:String =s"""{
                                    "filter" : {
                                        "geo_distance" : {//...
                                            "distance" : "200km",
                                            "pin.location" : {"lat" : "{$lat}", "lon" : "{#lon}" } }
                                    }
                }"""

      val rdd = sc.esRDD("recommend_diet_menu/fooddocument", query)

      println(rdd.count())

    })
  })
  ssc

}

Я знаю, что в RDD неправильно создавать новые RDD, но каков правильный путь?

...