Как мне передавать данные в Neo4j с помощью Spark - PullRequest
1 голос
/ 23 мая 2019

Я пытаюсь записать потоковые данные в Neo4j, используя Spark, и у меня возникли некоторые проблемы (я очень новичок в Spark).

Я попытался настроить поток счетчиков слов и могу записать это в Postgres, используя пользовательский ForeachWriter, как в примере здесь . Поэтому я думаю, что понимаю основной поток.

Затем я попытался воспроизвести это и отправить данные в Neo4j вместо этого, используя neo4j-spark-connector. Я могу отправить данные в Neo4j, используя пример в блокноте Zeppelin здесь . Итак, я попытался перенести этот код в ForeachWriter, но у меня возникла проблема - sparkContext недоступен в ForeachWriter, и из того, что я прочитал, его нельзя передавать, поскольку он работает на драйвере, пока код foreach работает на исполнителях. Может кто-нибудь помочь с тем, что я должен делать в этой ситуации?

Sink.scala:

val spark = SparkSession
  .builder()
  .appName("Neo4jSparkConnector")
  .config("spark.neo4j.bolt.url", "bolt://hdp1:7687")
  .config("spark.neo4j.bolt.password", "pw")
  .getOrCreate()

import spark.implicits._

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].flatMap(_.split(" "))

val wordCounts = words.groupBy("value").count()

wordCounts.printSchema()

val writer = new Neo4jSink()

import org.apache.spark.sql.streaming.ProcessingTime

val query = wordCounts
  .writeStream
  .foreach(writer)
  .outputMode("append")
  .trigger(ProcessingTime("25 seconds"))
  .start()

query.awaitTermination()

Neo4jSink.scala:

class Neo4jSink() extends ForeachWriter[Row]{

  def open(partitionId: Long, version: Long):Boolean = {
    true
  }

  def process(value: Row): Unit = {

    val word = ("Word", Seq("value"))
    val word_count = ("WORD_COUNT", Seq.empty)
    val count = ("Count", Seq("count"))
    Neo4jDataFrame.mergeEdgeList(sparkContext, value, word, word_count, count)

  }

  def close(errorOrNull:Throwable):Unit = {
  }
}
...