В моем приложении я получаю Array[String]
из потока и передаю его методу (в качестве параметра запроса), который, в свою очередь, запускает для него запрос Neo4j. Я хочу преобразовать результат запроса (a List[List[String]]
) в Spark DataFrame
. Как я могу сделать это при запуске Spark Streaming?
Вот мой метод выполнения запроса Neo4j:
def execNeo4jSearchQuery(neo4jSession: Session, data: Array[String]) = {
neo4jSession.run(neo4jQueries.searchQueryWithParams, paramsMap.asJava)
.list()
.asScala
.map(toRow(_, fieldsToRetrieve))
.toList
}
Это основной метод самой потоковой передачи:
def main(args: Array[String]) {
val sparkConf = new SparkConf()
.setMaster(args(0))
.setAppName("SparkStreaming")
neo4jConfigs.setNeo4jSparkConfig(args(1), sparkConf)
val sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
val sparkContext = streamingContext.sparkContext
sparkContext.setLogLevel("ERROR")
val sqlContext = new SQLContext(sparkContext)
val numStreams = 2
val topics = Array("topic1")
def kafkaParams(i: Int) = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group2",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val lines = (1 to numStreams).map(i => KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams(i))
))
val messages = streamingContext.union(lines)
val values = messages
.map(record => record.value().toString)
val wordsArrays = values.map(_.split(", "))
wordsArrays.foreachRDD(rdd => rdd.foreach(
data => {
execNeo4jSearchQuery(getNeo4jConfig(), data)
}
))
streamingContext.start()
streamingContext.awaitTermination()
}
}