NotSerializableException с Neo4j Spark Streaming Scala - PullRequest
0 голосов
/ 02 мая 2018

Я пытаюсь выполнить запрос в Neo4j, используя соединитель Neo4j-Spark. Я хочу передать значения из потока (созданного Kafka в виде строки) в мой запрос. Однако я получаю исключение сериализации:

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@54688d9f)
    - field (class: consumer.SparkConsumer$$anonfun$processingLogic$2, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class consumer.SparkConsumer$$anonfun$processingLogic$2, <function1>)
    - field (class: consumer.SparkConsumer$$anonfun$processingLogic$2$$anonfun$apply$3, name: $outer, type: class consumer.SparkConsumer$$anonfun$processingLogic$2)
    - object (class consumer.SparkConsumer$$anonfun$processingLogic$2$$anonfun$apply$3, <function1>)

Вот код для основной функции и логики запросов:

object SparkConsumer {
    def main(args: Array[String]) {
        val config = "neo4j_local"
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
        setNeo4jSparkConfig(config, sparkConf)

        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()

        val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(3))
        streamingContext.sparkContext.setLogLevel("ERROR")
        val sqlContext = new SQLContext(streamingContext.sparkContext)
        val numStreams = 2
        val topics = Array("member_topic1")

        def kafkaParams(i: Int) = Map[String, Object](
          "bootstrap.servers" -> "localhost:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "group2",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )

        val lines = (1 to numStreams).map(i => KafkaUtils.createDirectStream[String, String](
          streamingContext,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams(i))
        ))

        val messages = streamingContext.union(lines)
        val wordsArrays = values.map(_.split(","))

        wordsArrays.foreachRDD(rdd => rdd.foreach(
          data => execNeo4jSearchQuery(data)(streamingContext.sparkContext)
        ))

        streamingContext.start()
        streamingContext.awaitTermination()
      }

    def execNeo4jSearchQuery(data: Array[String])(implicit sc: SparkContext) = {
        val neo = Neo4j(sc)
        val query = "my query"

        val paramsMap = Map("lat" -> data(1).toDouble, "lon" -> data(2).toDouble, "id" -> data(0).toInt)
        val df = neo.cypher(query, paramsMap).loadDataFrame("group_name" -> "string", "event_name" -> "string", "venue_name" -> "string", "distance" -> "double")
        println("\ndf:")
        df.show()
      }
}

1 Ответ

0 голосов
/ 02 мая 2018

Не разрешен доступ к SparkContext, SparkSession или создание распределенных структур данных от исполнителя. Поэтому:

wordsArrays.foreachRDD(rdd => rdd.foreach(
  data => execNeo4jSearchQuery(data)(streamingContext.sparkContext)
))

, где execNeo4jSearchQuery звонки:

neo.cypher(query, paramsMap).loadDataFrame

недействительный код Spark.

Если вы хотите получить доступ к Neo4j напрямую из RDD.foreach, вам нужно использовать стандартный клиент ( AnormCypher , кажется, предоставляет очень элегантный API), без преобразования в распределенные структуры Spark.

Немного не связанное примечание - вы могли бы рассмотреть возможность использования одного соединения для набора записей с foreachPartition (также SPARK Стоимость инициализации соединения с базой данных в контексте map / mapPartitions ) .

...