Я использую MongoDB версии 3.4.7, Spark версии 1.6.3 и MongoDB-Spark Connector версии 1.1.0.
У меня есть скрипт pyspark, который извлекает данные из коллекции MongoDB для создания фрейма данных.Я заметил, что мой spark-submit не работает после закрытия соединения.(см. журнал ниже).
19/02/18 23:47:25 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, node1.dev.qwerty.asdf.io, partition 0,ASDF_LOCAL, 2476 bytes)
19/02/18 23:47:25 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on node1.dev.qwerty.asdf.io:45779 (size: 2.7 KB, free: 2.7 GB)
19/02/18 23:47:25 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on node1.dev.qwerty.asdf.io:45779 (size: 497.0 B, free: 2.7 GB)
19/02/18 23:47:29 INFO MongoClientCache: Closing MongoClient: [mongoconfig-001.zxcv.prod.rba.company.net:27017]
19/02/18 23:47:29 INFO connection: Closed connection [connectionId{localValue:2}] to mongoconfig-001.zxcv.prod.rba.company.net:27017 because the pool has been closed.
19/02/18 23:47:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, node1.dev.qwerty.asdf.io): com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=mongoconfig-001.zxcv.prod.rba.company.net:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
at com.mongodb.connection.BaseCluster.getDescription(BaseCluster.java:163)
at com.mongodb.Mongo.getClusterDescription(Mongo.java:411)
at com.mongodb.Mongo.getServerAddressList(Mongo.java:404)
at com.mongodb.spark.connection.MongoClientCache$$anonfun$logClient$1.apply(MongoClientCache.scala:161)
at com.mongodb.spark.connection.MongoClientCache$$anonfun$logClient$1.apply(MongoClientCache.scala:161)
at com.mongodb.spark.LoggingTrait$class.logInfo(LoggingTrait.scala:48)
at com.mongodb.spark.Logging.logInfo(Logging.scala:24)
at com.mongodb.spark.connection.MongoClientCache.logClient(MongoClientCache.scala:161)
at com.mongodb.spark.connection.MongoClientCache.acquire(MongoClientCache.scala:56)
at com.mongodb.spark.MongoConnector.acquireClient(MongoConnector.scala:239)
at com.mongodb.spark.rdd.MongoRDD.compute(MongoRDD.scala:141)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:247)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Я не уверен, что здесь происходит.Может ли кто-нибудь помочь мне здесь?
В настоящее время я использую приведенный ниже код.
conf = SparkConf().setAppName("pyspark test")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://<USER>:<PASSWORD>@<HOST>:<PORT>/db.<COLLECTION>?ssl=true&authSource=<DATABASENAME>").load()
Я вызываю вышеуказанный сценарий, используя приведенный ниже запрос spark-submit
spark-submit --master yarn --verbose --jars mongo-java-driver-3.4.2.jar,mongo-spark-connector_2.10-1.1.0.jar --py-files pymongo_spark.py test.py