PySpark не может подключиться к MongoDB, но командная строка может - PullRequest
0 голосов
/ 01 марта 2020

Попытка загрузить коллекцию MongoDB в PySpark DataFrame. Прежде всего .. Я могу подключиться с помощью командной строки на NameNode:

mongo mongodb://USER:PASSWORD@HOST/DB_NAME

MongoDB shell version v3.6.3
connecting to: mongodb://HOST/DB_NAME
MongoDB server version: 3.6.3
> 

Я запускаю сценарий на кластере следующим образом:

spark-submit \
--master yarn \
--deploy-mode client \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 3 \
--num-executors 10 \
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 \
load_from_mongo.py

Теперь я создаю a SparkSession:

  spark = SparkSession.builder \
            .appName("TestMongoLoad") \
            .config("spark.mongodb.input.uri", 'mongodb://USER:PASSWORD@HOST:27017') \
            .config("spark.mongodb.input.database", DB_NAME) \
            .config("spark.mongodb.input.collection", COLLECTION_NAME) \
            .getOrCreate()

Затем я пытаюсь прочитать в DataFame:

    df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
              .load()
    df.show(5, truncate=False)

В результате он не проходит аутентификацию. Я явно что-то передаю неправильно ...

Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
:: loading settings :: url = jar:file:/home/ubuntu/server/spark-2.4.4-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-caedf270-dd43-42f2-a39e-e3d1b7134046;1.0
    confs: [default]
    found org.mongodb.spark#mongo-spark-connector_2.11;2.4.1 in central
    found org.mongodb#mongo-java-driver;3.10.2 in central
    [3.10.2] org.mongodb#mongo-java-driver;[3.10,3.11)
:: resolution report :: resolve 1129ms :: artifacts dl 4ms
    :: modules in use:
    org.mongodb#mongo-java-driver;3.10.2 from central in [default]
    org.mongodb.spark#mongo-spark-connector_2.11;2.4.1 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   2   |   1   |   0   |   0   ||   2   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-caedf270-dd43-42f2-a39e-e3d1b7134046
    confs: [default]
    0 artifacts copied, 2 already retrieved (0kB/7ms)
20/02/29 21:26:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
  File "/home/ubuntu/server/load_from_mongo.py", line 124, in <module>
    main(args)
  File "/home/ubuntu/server/load_from_mongo.py", line 102, in main
    keyword_df = getKeywordCorpus(args.begin_dt, args.end_dt)
  File "/home/ubuntu/server/load_from_mongo.py", line 79, in getKeywordCorpus
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
  File "/home/ubuntu/server/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
  File "/home/ubuntu/server/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/home/ubuntu/server/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/home/ubuntu/server/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o57.load.
: com.mongodb.MongoSecurityException: Exception authenticating MongoCredential{mechanism=SCRAM-SHA-1, userName='USER', source='admin', password=<hidden>, mechanismProperties={}}
    at com.mongodb.internal.connection.SaslAuthenticator.wrapException(SaslAuthenticator.java:173)
    at com.mongodb.internal.connection.SaslAuthenticator.access$300(SaslAuthenticator.java:40)
    at com.mongodb.internal.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:70)
    at com.mongodb.internal.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:47)
    at com.mongodb.internal.connection.SaslAuthenticator.doAsSubject(SaslAuthenticator.java:179)
    at com.mongodb.internal.connection.SaslAuthenticator.authenticate(SaslAuthenticator.java:47)
    at com.mongodb.internal.connection.InternalStreamConnectionInitializer.authenticateAll(InternalStreamConnectionInitializer.java:152)
    at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:63)
    at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:127)
    at com.mongodb.internal.connection.UsageTrackingInternalConnection.open(UsageTrackingInternalConnection.java:50)
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.open(DefaultConnectionPool.java:390)
    at com.mongodb.internal.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:106)
    at com.mongodb.internal.connection.DefaultConnectionPool.get(DefaultConnectionPool.java:92)
    at com.mongodb.internal.connection.DefaultServer.getConnection(DefaultServer.java:85)
    at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.getConnection(ClusterBinding.java:115)
    at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:212)
    at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:206)
    at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:116)
    at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:109)
    at com.mongodb.operation.CommandReadOperation.execute(CommandReadOperation.java:56)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:179)
    at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:184)
    at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:153)
    at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:148)
    at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
    at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237)
    at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
    at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174)
    at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:157)
    at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:174)
    at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:237)
    at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator$lzycompute(MongoRDD.scala:221)
    at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator(MongoRDD.scala:221)
    at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:68)
    at com.mongodb.spark.sql.DefaultSource.constructRelation(DefaultSource.scala:97)
    at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.mongodb.MongoCommandException: Command failed with error 18 (AuthenticationFailed): 'Authentication failed.' on server HOST:27017. The full response is {"ok": 0.0, "errmsg": "Authentication failed.", "code": 18, "codeName": "AuthenticationFailed"}
    at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:179)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299)
    at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:255)
    at com.mongodb.internal.connection.CommandHelper.sendAndReceive(CommandHelper.java:83)
    at com.mongodb.internal.connection.CommandHelper.executeCommand(CommandHelper.java:33)
    at com.mongodb.internal.connection.SaslAuthenticator.sendSaslStart(SaslAuthenticator.java:130)
    at com.mongodb.internal.connection.SaslAuthenticator.access$100(SaslAuthenticator.java:40)
    at com.mongodb.internal.connection.SaslAuthenticator$1.run(SaslAuthenticator.java:54)
    ... 48 more

1 Ответ

0 голосов
/ 01 марта 2020

Ответ, как упоминалось выше @Lamanus, состоял в том, чтобы слегка изменить URI:

  MONGO_URL = "mongodb://USER:PASSWORD@HOST:27017/DB_NAME"

  spark = SparkSession.builder \
            .appName('TestMongoLoad') \
            .config('spark.mongodb.input.uri', MONGO_URL) \
            .config('spark.mongodb.input.collection', COLLECTION) \
            .getOrCreate()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...