AuthenticationException Neo4j Spark потоковой передачи - PullRequest
0 голосов
/ 27 апреля 2018

Я использую Kafka Producer и Spark Consumer. Я хочу передать некоторые данные в теме в виде массива потребителю и выполнить запрос Neo4j с этими данными в качестве параметров. Сейчас я хочу протестировать этот запрос с набором данных. Проблема в том, что когда я пытаюсь запустить своего потребителя, я получаю исключение:

org.neo4j.driver.v1.exceptions.AuthenticationException: Unsupported authentication token, scheme 'none' is only allowed when auth is disabled.

Вот мой основной метод с конфигами Spark и Neo4j:

def main(args: Array[String]) {

    val sparkSession = SparkSession
      .builder()
      .appName("KafkaSparkStreaming")
      .master("local[*]")
      .getOrCreate()

     val sparkConf = sparkSession.conf

    val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
    streamingContext.sparkContext.setLogLevel("ERROR")


 val neo4jLocalConfig = ConfigFactory.parseFile(new File("configs/local_neo4j.conf"))
      sparkConf.set("spark.neo4j.bolt.url", neo4jLocalConfig.getString("neo4j.url"))
      sparkConf.set("spark.neo4j.bolt.user", neo4jLocalConfig.getString("neo4j.user"))
      sparkConf.set("spark.neo4j.bolt.password", neo4jLocalConfig.getString("neo4j.password"))

val arr = Array("18731", "41.84000015258789", "-87.62999725341797")
    execNeo4jSearchQuery(arr, sparkSession.sparkContext)

    streamingContext.start()
    streamingContext.awaitTermination()

}

И это метод, в котором я запускаю свой запрос:

def execNeo4jSearchQuery(data: Array[String], sc: SparkContext) = {
      println("Id: " + data(0) + ", Lat: " + data(1) + ", Lon: " + data(2))
      val neo = Neo4j(sc)
      val sqlContext = new SQLContext(sc)
      val query = "MATCH (m:Member)-[mtg_r:MT_TO_MEMBER]->(mt:MemberTopics)-[mtt_r:MT_TO_TOPIC]->(t:Topic), (t1:Topic)-[tt_r:GT_TO_TOPIC]->(gt:GroupTopics)-[tg_r:GT_TO_GROUP]->(g:Group)-[h_r:HAS]->(e:Event)-[a_r:AT]->(v:Venue) WHERE mt.topic_id = gt.topic_id AND distance(point({ longitude: {lon}, latitude: {lat}}),point({ longitude: v.lon, latitude: v.lat })) < 4000 AND mt.member_id = {id} RETURN g.group_name, e.event_name, v.venue_name"

    val df = neo.cypher(query).params(Map("lat" -> data(1).toDouble, "lon" -> data(2).toDouble, "id" -> data(0).toInt))
      .partitions(4).batch(25)
      .loadDataFrame
}

Я проверил запрос, он отлично работает в Neo4j. Так что может вызвать это исключение?

1 Ответ

0 голосов
/ 02 мая 2018

Я исследовал и попробовал различные варианты, которые помогли мне найти ответ. Как я понял, это исключение происходит, потому что я неправильно устанавливаю параметры SparkConfig для Neo4j. И решения будут заключаться в том, чтобы предоставить SparkConfig в качестве одного из атрибутов SparkSession. В SparkConfig уже должны быть установлены все атрибуты Neo4j

...