Spark Streaming передает параметры запроса в Neo4j Scala - PullRequest
0 голосов
/ 02 мая 2018

Я пытаюсь выполнить запрос Cypher, используя соединитель Spark-Neo4j. Я хочу передать параметры в этот запрос из потока данных, созданных Kafka. И результат запроса Cypher должен отображаться в виде полей фрейма данных. Соединение с Neo4j успешно установлено, и мой запрос работает нормально с простым контекстом искры. Однако тот же код не работает для потокового контекста. Есть ли какая-то разница с конфигурацией соединения Neo4j при использовании Spark Streaming?

Вот код для потокового контекста. Я не использую Кафку в качестве производителя, и данные для параметров определены в массиве данных для тестирования соединения и самого запроса:

val sparkSession = SparkSession
      .builder()
      .appName("KafkaSparkStreaming")
      .master("local[*]")
      .getOrCreate()

    val neo4jLocalConfig = ConfigFactory.parseFile(new File("configs/local_neo4j.conf"))

    sparkSession.conf.set("spark.neo4j.bolt.url", neo4jLocalConfig.getString("neo4j.url"))
    sparkSession.conf.set("spark.neo4j.bolt.user", neo4jLocalConfig.getString("neo4j.user"))
    sparkSession.conf.set("spark.neo4j.bolt.password", neo4jLocalConfig.getString("neo4j.password"))

    val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))

    val neo = Neo4j(streamingContext.sparkContext)
    val data = Array("18731", "41.84000015258789", "-87.62999725341797")

    val query = "MATCH (m:Member)-[mtg_r:MT_TO_MEMBER]->(mt:MemberTopics)-[mtt_r:MT_TO_TOPIC]->(t:Topic), (t1:Topic)-[tt_r:GT_TO_TOPIC]->(gt:GroupTopics)-[tg_r:GT_TO_GROUP]->(g:Group)-[h_r:HAS]->(e:Event)-[a_r:AT]->(v:Venue) WHERE mt.topic_id = gt.topic_id AND distance(point({ longitude: {lon}, latitude: {lat}}),point({ longitude: v.lon, latitude: v.lat })) < 4000 AND mt.member_id = {id} RETURN distinct g.group_name as group_name, e.event_name as event_name, v.venue_name as venue_name"



    val paramsMap = Map("lat" -> data(1).toDouble, "lon" -> data(2).toDouble, "id" -> data(0).toInt)

    val df = neo.cypher(query, paramsMap).loadDataFrame("group_name" -> "string", "event_name" -> "string", "venue_name" -> "string")
    df.show()

    streamingContext.start()
    streamingContext.awaitTermination()

1 Ответ

0 голосов
/ 02 мая 2018

Я решил проблему, предоставив SparkConfig необходимые параметры для Neo4j для SparkSession. Вот код:

val config = "neo4j_local"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
    sparkConf.set("spark.neo4j.bolt.url", neo4jLocalConfig.getString("neo4j.url"))
        sparkConf.set("spark.neo4j.bolt.user", neo4jLocalConfig.getString("neo4j.user"))

    val sparkSession = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
...