Конвертировать List [List [String]] в Spark DataFrame при потоковой передаче Scala - PullRequest
0 голосов
/ 07 мая 2018

В моем приложении я получаю Array[String] из потока и передаю его методу (в качестве параметра запроса), который, в свою очередь, запускает для него запрос Neo4j. Я хочу преобразовать результат запроса (a List[List[String]]) в Spark DataFrame. Как я могу сделать это при запуске Spark Streaming?

Вот мой метод выполнения запроса Neo4j:

  def execNeo4jSearchQuery(neo4jSession: Session, data: Array[String]) = {
    neo4jSession.run(neo4jQueries.searchQueryWithParams, paramsMap.asJava)
      .list()
      .asScala
      .map(toRow(_, fieldsToRetrieve))
      .toList
  }

Это основной метод самой потоковой передачи:

def main(args: Array[String]) {
    val sparkConf = new SparkConf()
      .setMaster(args(0))
      .setAppName("SparkStreaming")
    neo4jConfigs.setNeo4jSparkConfig(args(1), sparkConf)

    val sparkSession = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()

    val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
    val sparkContext = streamingContext.sparkContext
    sparkContext.setLogLevel("ERROR")

    val sqlContext = new SQLContext(sparkContext)

    val numStreams = 2
    val topics = Array("topic1")

    def kafkaParams(i: Int) = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group2",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val lines = (1 to numStreams).map(i => KafkaUtils.createDirectStream[String, String](
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams(i))
    ))

    val messages = streamingContext.union(lines)

    val values = messages
      .map(record => record.value().toString)
    val wordsArrays = values.map(_.split(", "))

    wordsArrays.foreachRDD(rdd => rdd.foreach(
      data => {
        execNeo4jSearchQuery(getNeo4jConfig(), data)
      }
    ))

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
...