Я использую Kafka Producer и Spark Consumer. Я хочу передать некоторые данные в теме в виде массива потребителю и выполнить запрос Neo4j с этими данными в качестве параметров. Сейчас я хочу протестировать этот запрос с набором данных.
Проблема в том, что когда я пытаюсь запустить своего потребителя, я получаю исключение:
org.neo4j.driver.v1.exceptions.AuthenticationException: Unsupported authentication token, scheme 'none' is only allowed when auth is disabled.
Вот мой основной метод с конфигами Spark и Neo4j:
def main(args: Array[String]) {
val sparkSession = SparkSession
.builder()
.appName("KafkaSparkStreaming")
.master("local[*]")
.getOrCreate()
val sparkConf = sparkSession.conf
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
streamingContext.sparkContext.setLogLevel("ERROR")
val neo4jLocalConfig = ConfigFactory.parseFile(new File("configs/local_neo4j.conf"))
sparkConf.set("spark.neo4j.bolt.url", neo4jLocalConfig.getString("neo4j.url"))
sparkConf.set("spark.neo4j.bolt.user", neo4jLocalConfig.getString("neo4j.user"))
sparkConf.set("spark.neo4j.bolt.password", neo4jLocalConfig.getString("neo4j.password"))
val arr = Array("18731", "41.84000015258789", "-87.62999725341797")
execNeo4jSearchQuery(arr, sparkSession.sparkContext)
streamingContext.start()
streamingContext.awaitTermination()
}
И это метод, в котором я запускаю свой запрос:
def execNeo4jSearchQuery(data: Array[String], sc: SparkContext) = {
println("Id: " + data(0) + ", Lat: " + data(1) + ", Lon: " + data(2))
val neo = Neo4j(sc)
val sqlContext = new SQLContext(sc)
val query = "MATCH (m:Member)-[mtg_r:MT_TO_MEMBER]->(mt:MemberTopics)-[mtt_r:MT_TO_TOPIC]->(t:Topic), (t1:Topic)-[tt_r:GT_TO_TOPIC]->(gt:GroupTopics)-[tg_r:GT_TO_GROUP]->(g:Group)-[h_r:HAS]->(e:Event)-[a_r:AT]->(v:Venue) WHERE mt.topic_id = gt.topic_id AND distance(point({ longitude: {lon}, latitude: {lat}}),point({ longitude: v.lon, latitude: v.lat })) < 4000 AND mt.member_id = {id} RETURN g.group_name, e.event_name, v.venue_name"
val df = neo.cypher(query).params(Map("lat" -> data(1).toDouble, "lon" -> data(2).toDouble, "id" -> data(0).toInt))
.partitions(4).batch(25)
.loadDataFrame
}
Я проверил запрос, он отлично работает в Neo4j. Так что может вызвать это исключение?