Spark задание не подключается к удаленной базе данных neo4j в emr - PullRequest
0 голосов
/ 03 июля 2019

У меня проблемы с подключением к моей базе данных neo4j с помощью задания зажигания EMR. Несмотря на то, что я указал в aws emr add-steps и spark-defaults.conf для подключения к удаленной базе данных, он все равно возвращает ошибку org.neo4j.driver.v1.exceptions.ServiceUnavailableException: Unable to connect to localhost:7687. Когда я вызываю spark-submit в главном узле EMR, он фактически подключается к удаленной базе данных.

Подключение к localhost происходит, когда я пытаюсь добавить шаг. Я предполагаю, что это означает, что где-то в cli SparkConf происходит повреждение.

Я собрал файл фляги, используя sbt assembly.

build.sbt выглядит так:

ThisBuild / scalaVersion := "2.11.12"
ThisBuild / organization := "com.example"
logLevel := Level.Error
cancelable in Global := true

lazy val hello = (project in file("."))
  .settings(
    name := "Hello",
    libraryDependencies += "com.typesafe.play" %% "play-json" % "2.6.9",
    libraryDependencies += "com.eed3si9n" %% "gigahorse-okhttp" % "0.3.1",
    libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test,
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0" % "provided",
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0" % "provided",
    libraryDependencies += "org.apache.spark" %% "spark-graphx" % "2.4.0" % "provided",
    libraryDependencies += "org.neo4j" % "neo4j" % "3.5.6" % "provided",
    resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven",
    libraryDependencies += "neo4j-contrib" % "neo4j-spark-connector" % "2.1.0-M4" % "provided",
    dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7",
  )       

Я не думаю, что версии являются проблемой, потому что, используя тот же файл jar, я назвал spark-submit --class Main --master yarn --packages neo4j-contrib:neo4j-spark-connector:2.4.0-M6 here-is-the-jar-file.jar, который читает часть графика, вызывает PageRank из graphx и записывает результаты обратно в базу данных. , В итоге мне удалось найти результаты в базе данных через браузер neo4j.

Но почему-то, когда я звоню aws emr add-steps --cluster-id x-xxxxxxxxxxxx --steps Type=Spark,Name=xxx,Args=[--deploy-mode,cluster,--master,yarn,--driver-memory,10G,--packages,neo4j-contrib:neo4j-spark-connector:2.4.0-M6,--class,Main,--conf,spark.neo4j.bolt.url=bolt://this.is.ip.address:7687,--conf,spark.neo4j.bolt.user=username,--conf,spark.neo4j.bolt.password=password,s3://this/is/the/jar/file.jar,--jars,/home/hadoop/dependencies/neo4j-spark-connector_2.11-full-2.1.0-M4.jar],ActionOnFailure=CONTINUE, работа не выполняется. Я включил сообщение об ошибке в конце.

Вот мой код scala:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.neo4j.spark._

object Main{
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("app").set("spark.neo4j.bolt.url", "bolt://this.is.ip.address:7687").set("spark.neo4j.bolt.user", "username").set("spark.neo4j.bolt.password", "password")
        val sc = new SparkContext(conf)
        val neo = Neo4j(sc)
        val graphQuery = """xxxxxxx"""
        val graph: Graph[Long, String] =  neo.rels(graphQuery).partitions(10).batch(200).loadGraph
        val g = PageRank.run(graph, 5)
        Neo4jGraph.saveGraph(sc, g, "rank")
    }
}

Дополнительный файл конфигурации json для кластера emr:

[
  {
     "Classification": "spark-defaults",
     "Properties": {
         "spark.neo4j.bolt.url": "bolt://this.is.ip.address:7687",
         "spark.neo4j.bolt.user": "username",
         "spark.neo4j.bolt.password": "password",
      }
  },
  {
     "Classification": "spark-env",
     "Configurations": [
       {
         "Classification": "export",
         "Properties": {
            "PYSPARK_PYTHON": "/usr/bin/python3"
          }
       }
    ]
  }
]

Эти изменения действительно присутствуют в /etc/spark/conf/spark-defaults.conf

Поэтому я установил спецификации neo4j в файле конфигурации emr, в коде scala и в скрипте add-step. Однако кластер все еще не подключается к указанному IP-адресу и подключается к localhost:

19/07/02 21:03:37 ERROR Executor: Exception in task 7.0 in stage 2.0 (TID 27)
org.neo4j.driver.v1.exceptions.ServiceUnavailableException: Unable to connect to localhost:7687, ensure the database is
running and that there is a working network connection to it.
        at org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:122)
        at org.neo4j.driver.internal.DriverFactory.verifyConnectivity(DriverFactory.java:346)
        at org.neo4j.driver.internal.DriverFactory.newInstance(DriverFactory.java:93)
        at org.neo4j.driver.v1.GraphDatabase.driver(GraphDatabase.java:136)
        at org.neo4j.driver.v1.GraphDatabase.driver(GraphDatabase.java:119)
        at org.neo4j.spark.Neo4jConfig.driver(Neo4jConfig.scala:15)
        at org.neo4j.spark.Neo4jConfig.driver(Neo4jConfig.scala:19)
        at org.neo4j.spark.Executor$.execute(Neo4j.scala:394)
        at org.neo4j.spark.Neo4jRDD.compute(Neo4j.scala:458)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
        at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
        Suppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause
                at org.neo4j.driver.internal.async.ChannelConnectedListener.databaseUnavailableError(ChannelConnectedLis
tener.java:76)
                at org.neo4j.driver.internal.async.ChannelConnectedListener.operationComplete(ChannelConnectedListener.j
ava:70)
                at org.neo4j.driver.internal.async.ChannelConnectedListener.operationComplete(ChannelConnectedListener.j
ava:37)
                at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromi
se.java:511)
                at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultProm
ise.java:504)
                at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPr
omise.java:483)
                at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromi
se.java:424)
                at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.ja
va:121)
                at org.neo4j.driver.internal.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillCon
nectPromise(AbstractNioChannel.java:327)
                at org.neo4j.driver.internal.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConn
ect(AbstractNioChannel.java:343)
                at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.ja
va:633)
                at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEv
entLoop.java:580)
                at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.j
ava:497)
                at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
                at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThrea
dEventExecutor.java:886)
                at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocal
Runnable.java:30)
                ... 1 more
Caused by: org.neo4j.driver.internal.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:7687
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.neo4j.driver.internal.shaded.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChanne
l.java:325)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(Abst
ractNioChannel.java:340)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.
java:580)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at org.neo4j.driver.internal.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventEx
ecutor.java:886)
        at org.neo4j.driver.internal.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable
.java:30)
        ... 1 more
Caused by: java.net.ConnectException: Connection refused
        ... 11 more

Я не думаю, что это проблема подключения кластера emr к удаленной базе данных, поскольку, как я уже упоминал, когда я ssh мастер-узел и вызывал spark-submit, там код работает.

...