Я пытаюсь выполнить запрос Cypher, используя соединитель Spark-Neo4j. Я хочу передать параметры в этот запрос из потока данных, созданных Kafka. И результат запроса Cypher должен отображаться в виде полей фрейма данных. Соединение с Neo4j успешно установлено, и мой запрос работает нормально с простым контекстом искры. Однако тот же код не работает для потокового контекста. Есть ли какая-то разница с конфигурацией соединения Neo4j при использовании Spark Streaming?
Вот код для потокового контекста. Я не использую Кафку в качестве производителя, и данные для параметров определены в массиве данных для тестирования соединения и самого запроса:
val sparkSession = SparkSession
.builder()
.appName("KafkaSparkStreaming")
.master("local[*]")
.getOrCreate()
val neo4jLocalConfig = ConfigFactory.parseFile(new File("configs/local_neo4j.conf"))
sparkSession.conf.set("spark.neo4j.bolt.url", neo4jLocalConfig.getString("neo4j.url"))
sparkSession.conf.set("spark.neo4j.bolt.user", neo4jLocalConfig.getString("neo4j.user"))
sparkSession.conf.set("spark.neo4j.bolt.password", neo4jLocalConfig.getString("neo4j.password"))
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
val neo = Neo4j(streamingContext.sparkContext)
val data = Array("18731", "41.84000015258789", "-87.62999725341797")
val query = "MATCH (m:Member)-[mtg_r:MT_TO_MEMBER]->(mt:MemberTopics)-[mtt_r:MT_TO_TOPIC]->(t:Topic), (t1:Topic)-[tt_r:GT_TO_TOPIC]->(gt:GroupTopics)-[tg_r:GT_TO_GROUP]->(g:Group)-[h_r:HAS]->(e:Event)-[a_r:AT]->(v:Venue) WHERE mt.topic_id = gt.topic_id AND distance(point({ longitude: {lon}, latitude: {lat}}),point({ longitude: v.lon, latitude: v.lat })) < 4000 AND mt.member_id = {id} RETURN distinct g.group_name as group_name, e.event_name as event_name, v.venue_name as venue_name"
val paramsMap = Map("lat" -> data(1).toDouble, "lon" -> data(2).toDouble, "id" -> data(0).toInt)
val df = neo.cypher(query, paramsMap).loadDataFrame("group_name" -> "string", "event_name" -> "string", "venue_name" -> "string")
df.show()
streamingContext.start()
streamingContext.awaitTermination()