spark подключается к сегментированному кластеру mongoDB, но данные не выбираются - PullRequest
0 голосов
/ 07 января 2019

Окружающая среда:

  1. Четыре сервера Debian 9 (с именами visa0, visa1, visa2, visa3)
  2. кластер Spark (v2.4.0) на 4 узла (visa1: master, visa0..3: slaves)
  3. MongoDB (v3.2.11) сегментированный кластер с 4 узлами (реплика сервера конфигурации установлена ​​на visa1..3, mongos на visa1, серверы shard: visa0.3)
  4. Я использую соединитель Spark MongoDB, установленный с "spark-shell --packages org.mongodb.spark: mongo-spark-connector_2.11: 2.4.0"
  5. и Jupyter Notebook, Python 3 (pyspark v. 2.4.0)

Проблема:

Я могу создать SparkSession, подключенный к мастеру, и загрузить DataFrame со всем содержимым коллекции Mongo. На самом деле, я правильно понимаю схему DataFrame. Но с методами .count () или .show () на фрейме данных я получаю 0 результатов.

Код Python / Pyspark:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-memory 6g --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 pyspark-shell'

import pyspark
sparkSession = pyspark.sql.SparkSession \
  .builder \
  .master('spark://visa1:7077') \
  .appName("myApp") \
  .config("spark.executor.memory", "4g") \
  .config("spark.mongodb.input.uri", "mongodb://visa1/email.emails") \
  .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
  .getOrCreate()

df = sparkSession.read.format("com.mongodb.spark.sql.DefaultSource") \
  .option("uri", "mongodb://visa1/email.emails").load()

df.printSchema()
# gets the schema correctly

df.count()
# gets 0, when there are more than 750.000 documents on the collection

Вопросы:

  • Тот же тест, подключающийся с тем же кодом к автономному серверу mongo, работает нормально (df.count () выдает правильный счет).
  • при подключении к mongos, db.emails.count () дает правильный счет
  • Настройка реплики сервера конфигурации выглядит нормально (с помощью команды rs.status () на основном сервере)
  • Sharding выглядит нормально (с помощью команды sh.status () на mongos)
  • на искровых исполнителях, я получаю следующее на stderr:

    Spark Executor Command: "/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java" "-cp" "/root/spark/conf/:/root/spark/jars/*" "-Xmx4096M" "-Dspark.driver.port=36511" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@visa1:36511" "--executor-id" "2" "--hostname" "visa2" "--cores" "6" "--app-id" "app-20190106213435-0003" "--worker-url" "spark://Worker@visa2:46705"
    
  • для искровых исполнителей, на stdout я получаю следующее (пожалуйста, обратите внимание на «cluster: 71 - описание кластера еще не доступно. Ожидание 30000 мс до истечения времени ожидания»):

    2019-01-06 21:34:35 INFO  CoarseGrainedExecutorBackend:2566 - Started daemon with process name: 18812@visa2
    2019-01-06 21:34:35 INFO  SignalUtils:54 - Registered signal handler for TERM
    2019-01-06 21:34:35 INFO  SignalUtils:54 - Registered signal handler for HUP
    2019-01-06 21:34:35 INFO  SignalUtils:54 - Registered signal handler for INT
    2019-01-06 21:34:36 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    2019-01-06 21:34:36 INFO  SecurityManager:54 - Changing view acls to: root
    2019-01-06 21:34:36 INFO  SecurityManager:54 - Changing modify acls to: root
    2019-01-06 21:34:36 INFO  SecurityManager:54 - Changing view acls groups to: 
    2019-01-06 21:34:36 INFO  SecurityManager:54 - Changing modify acls groups to: 
    2019-01-06 21:34:36 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
    2019-01-06 21:34:37 INFO  TransportClientFactory:267 - Successfully created connection to visa1/1.1.241.71:36511 after 103 ms (0 ms spent in bootstraps)
    2019-01-06 21:34:37 INFO  SecurityManager:54 - Changing view acls to: root
    2019-01-06 21:34:37 INFO  SecurityManager:54 - Changing modify acls to: root
    2019-01-06 21:34:37 INFO  SecurityManager:54 - Changing view acls groups to: 
    2019-01-06 21:34:37 INFO  SecurityManager:54 - Changing modify acls groups to: 
    2019-01-06 21:34:37 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
    2019-01-06 21:34:37 INFO  TransportClientFactory:267 - Successfully created connection to visa1/1.1.241.71:36511 after 2 ms (0 ms spent in bootstraps)
    2019-01-06 21:34:37 INFO  DiskBlockManager:54 - Created local directory at /tmp/spark-ae02f35d-8340-4cda-ba6f-8d8b7138e803/executor-b6a0e407-de5a-420a-a528-96573fcd9700/blockmgr-411ce01c-f631-45b5-9b60-b7d6c124d289
    2019-01-06 21:34:37 INFO  MemoryStore:54 - MemoryStore started with capacity 2004.6 MB
    2019-01-06 21:34:37 INFO  CoarseGrainedExecutorBackend:54 - Connecting to driver: spark://CoarseGrainedScheduler@visa1:36511
    2019-01-06 21:34:37 INFO  WorkerWatcher:54 - Connecting to worker spark://Worker@1.1.237.142:46705
    2019-01-06 21:34:37 INFO  TransportClientFactory:267 - Successfully created connection to /1.1.237.142:46705 after 2 ms (0 ms spent in bootstraps)
    2019-01-06 21:34:37 INFO  WorkerWatcher:54 - Successfully connected to spark://Worker@1.1.237.142:46705
    2019-01-06 21:34:37 INFO  CoarseGrainedExecutorBackend:54 - Successfully registered with driver
    2019-01-06 21:34:37 INFO  Executor:54 - Starting executor ID 2 on host 1.1.237.142
    2019-01-06 21:34:37 INFO  Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44735.
    2019-01-06 21:34:37 INFO  NettyBlockTransferService:54 - Server created on 1.1.237.142:44735
    2019-01-06 21:34:37 INFO  BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    2019-01-06 21:34:37 INFO  BlockManagerMaster:54 - Registering BlockManager BlockManagerId(2, 1.1.237.142, 44735, None)
    2019-01-06 21:34:37 INFO  BlockManagerMaster:54 - Registered BlockManager BlockManagerId(2, 1.1.237.142, 44735, None)
    2019-01-06 21:34:37 INFO  BlockManager:54 - Initialized BlockManager: BlockManagerId(2, 1.1.237.142, 44735, None)
    2019-01-06 21:35:17 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 1
    2019-01-06 21:35:17 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 5
    2019-01-06 21:35:17 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 9
    2019-01-06 21:35:17 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 13
    2019-01-06 21:35:17 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 17
    2019-01-06 21:35:17 INFO  CoarseGrainedExecutorBackend:54 - Got assigned task 21
    2019-01-06 21:35:17 INFO  Executor:54 - Running task 16.0 in stage 1.0 (TID 17)
    2019-01-06 21:35:17 INFO  Executor:54 - Running task 8.0 in stage 1.0 (TID 9)
    2019-01-06 21:35:17 INFO  Executor:54 - Running task 0.0 in stage 1.0 (TID 1)
    2019-01-06 21:35:17 INFO  Executor:54 - Running task 20.0 in stage 1.0 (TID 21)
    2019-01-06 21:35:17 INFO  Executor:54 - Running task 4.0 in stage 1.0 (TID 5)
    2019-01-06 21:35:17 INFO  Executor:54 - Running task 12.0 in stage 1.0 (TID 13)
    2019-01-06 21:35:17 INFO  Executor:54 - Fetching spark://visa1:36511/files/org.mongodb_mongo-java-driver-3.9.0.jar with timestamp 1546806874832
    2019-01-06 21:35:17 INFO  TransportClientFactory:267 - Successfully created connection to visa1/1.1.241.71:36511 after 5 ms (0 ms spent in bootstraps)
    2019-01-06 21:35:17 INFO  Utils:54 - Fetching spark://visa1:36511/files/org.mongodb_mongo-java-driver-3.9.0.jar to /tmp/spark-ae02f35d-8340-4cda-ba6f-8d8b7138e803/executor-b6a0e407-de5a-420a-a528-96573fcd9700/spark-1f05e532-25da-492b-8e52-3e5da3fd9617/fetchFileTemp6501978500036245382.tmp
    2019-01-06 21:35:18 INFO  Utils:54 - Copying /tmp/spark-ae02f35d-8340-4cda-ba6f-8d8b7138e803/executor-b6a0e407-de5a-420a-a528-96573fcd9700/spark-1f05e532-25da-492b-8e52-3e5da3fd9617/-13359565311546806874832_cache to /root/spark/work/app-20190106213435-0003/2/./org.mongodb_mongo-java-driver-3.9.0.jar
    2019-01-06 21:35:18 INFO  Executor:54 - Fetching spark://visa1:36511/files/org.mongodb.spark_mongo-spark-connector_2.11-2.4.0.jar with timestamp 1546806874820
    2019-01-06 21:35:18 INFO  Utils:54 - Fetching spark://visa1:36511/files/org.mongodb.spark_mongo-spark-connector_2.11-2.4.0.jar to /tmp/spark-ae02f35d-8340-4cda-ba6f-8d8b7138e803/executor-b6a0e407-de5a-420a-a528-96573fcd9700/spark-1f05e532-25da-492b-8e52-3e5da3fd9617/fetchFileTemp205676444589226484.tmp
    2019-01-06 21:35:18 INFO  Utils:54 - Copying /tmp/spark-ae02f35d-8340-4cda-ba6f-8d8b7138e803/executor-b6a0e407-de5a-420a-a528-96573fcd9700/spark-1f05e532-25da-492b-8e52-3e5da3fd9617/8587355671546806874820_cache to /root/spark/work/app-20190106213435-0003/2/./org.mongodb.spark_mongo-spark-connector_2.11-2.4.0.jar
    2019-01-06 21:35:18 INFO  Executor:54 - Fetching spark://visa1:36511/jars/org.mongodb.spark_mongo-spark-connector_2.11-2.4.0.jar with timestamp 1546806874797
    2019-01-06 21:35:18 INFO  Utils:54 - Fetching spark://visa1:36511/jars/org.mongodb.spark_mongo-spark-connector_2.11-2.4.0.jar to /tmp/spark-ae02f35d-8340-4cda-ba6f-8d8b7138e803/executor-b6a0e407-de5a-420a-a528-96573fcd9700/spark-1f05e532-25da-492b-8e52-3e5da3fd9617/fetchFileTemp2003659413222858965.tmp
    2019-01-06 21:35:18 INFO  Utils:54 - /tmp/spark-ae02f35d-8340-4cda-ba6f-8d8b7138e803/executor-b6a0e407-de5a-420a-a528-96573fcd9700/spark-1f05e532-25da-492b-8e52-3e5da3fd9617/-10843728141546806874797_cache has been previously copied to /root/spark/work/app-20190106213435-0003/2/./org.mongodb.spark_mongo-spark-connector_2.11-2.4.0.jar
    2019-01-06 21:35:18 INFO  Executor:54 - Adding file:/root/spark/work/app-20190106213435-0003/2/./org.mongodb.spark_mongo-spark-connector_2.11-2.4.0.jar to class loader
    2019-01-06 21:35:18 INFO  Executor:54 - Fetching spark://visa1:36511/jars/org.mongodb_mongo-java-driver-3.9.0.jar with timestamp 1546806874798
    2019-01-06 21:35:18 INFO  Utils:54 - Fetching spark://visa1:36511/jars/org.mongodb_mongo-java-driver-3.9.0.jar to /tmp/spark-ae02f35d-8340-4cda-ba6f-8d8b7138e803/executor-b6a0e407-de5a-420a-a528-96573fcd9700/spark-1f05e532-25da-492b-8e52-3e5da3fd9617/fetchFileTemp1454666184402659399.tmp
    2019-01-06 21:35:18 INFO  Utils:54 - /tmp/spark-ae02f35d-8340-4cda-ba6f-8d8b7138e803/executor-b6a0e407-de5a-420a-a528-96573fcd9700/spark-1f05e532-25da-492b-8e52-3e5da3fd9617/20228089061546806874798_cache has been previously copied to /root/spark/work/app-20190106213435-0003/2/./org.mongodb_mongo-java-driver-3.9.0.jar
    2019-01-06 21:35:18 INFO  Executor:54 - Adding file:/root/spark/work/app-20190106213435-0003/2/./org.mongodb_mongo-java-driver-3.9.0.jar to class loader
    2019-01-06 21:35:18 INFO  TorrentBroadcast:54 - Started reading broadcast variable 2
    2019-01-06 21:35:18 INFO  TransportClientFactory:267 - Successfully created connection to /1.1.241.71:38095 after 4 ms (0 ms spent in bootstraps)
    2019-01-06 21:35:18 INFO  MemoryStore:54 - Block broadcast_2_piece0 stored as bytes in memory (estimated size 7.5 KB, free 2004.6 MB)
    2019-01-06 21:35:18 INFO  TorrentBroadcast:54 - Reading broadcast variable 2 took 182 ms
    2019-01-06 21:35:18 INFO  MemoryStore:54 - Block broadcast_2 stored as values in memory (estimated size 15.8 KB, free 2004.6 MB)
    2019-01-06 21:35:18 INFO  TorrentBroadcast:54 - Started reading broadcast variable 0
    2019-01-06 21:35:18 INFO  MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 396.0 B, free 2004.6 MB)
    2019-01-06 21:35:18 INFO  TorrentBroadcast:54 - Reading broadcast variable 0 took 14 ms
    2019-01-06 21:35:18 INFO  MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 200.0 B, free 2004.6 MB)
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster created with settings {hosts=[visa1:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster created with settings {hosts=[visa1:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster created with settings {hosts=[visa1:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster created with settings {hosts=[visa1:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster created with settings {hosts=[visa1:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster created with settings {hosts=[visa1:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster description not yet available. Waiting for 30000 ms before timing out
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster description not yet available. Waiting for 30000 ms before timing out
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster description not yet available. Waiting for 30000 ms before timing out
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster description not yet available. Waiting for 30000 ms before timing out
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster description not yet available. Waiting for 30000 ms before timing out
    2019-01-06 21:35:19 INFO  cluster:71 - Cluster description not yet available. Waiting for 30000 ms before timing out
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:5}] to visa1:27017
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:3}] to visa1:27017
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:6}] to visa1:27017
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:1}] to visa1:27017
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:4}] to visa1:27017
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:2}] to visa1:27017
    2019-01-06 21:35:19 INFO  cluster:71 - Monitor thread successfully connected to server with description ServerDescription{address=visa1:27017, type=SHARD_ROUTER, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 11]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=null, roundTripTimeNanos=2389159}
    2019-01-06 21:35:19 INFO  cluster:71 - Monitor thread successfully connected to server with description ServerDescription{address=visa1:27017, type=SHARD_ROUTER, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 11]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=null, roundTripTimeNanos=3296820}
    2019-01-06 21:35:19 INFO  cluster:71 - Monitor thread successfully connected to server with description ServerDescription{address=visa1:27017, type=SHARD_ROUTER, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 11]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=null, roundTripTimeNanos=3158622}
    2019-01-06 21:35:19 INFO  cluster:71 - Monitor thread successfully connected to server with description ServerDescription{address=visa1:27017, type=SHARD_ROUTER, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 11]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=null, roundTripTimeNanos=2556701}
    2019-01-06 21:35:19 INFO  cluster:71 - Monitor thread successfully connected to server with description ServerDescription{address=visa1:27017, type=SHARD_ROUTER, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 11]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=null, roundTripTimeNanos=2174393}
    2019-01-06 21:35:19 INFO  cluster:71 - Monitor thread successfully connected to server with description ServerDescription{address=visa1:27017, type=SHARD_ROUTER, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 11]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=null, roundTripTimeNanos=7550692}
    2019-01-06 21:35:19 INFO  MongoClientCache:48 - Creating MongoClient: [visa1:27017]
    2019-01-06 21:35:19 INFO  MongoClientCache:48 - Creating MongoClient: [visa1:27017]
    2019-01-06 21:35:19 INFO  MongoClientCache:48 - Creating MongoClient: [visa1:27017]
    2019-01-06 21:35:19 INFO  MongoClientCache:48 - Closing MongoClient: [visa1:27017]
    2019-01-06 21:35:19 INFO  MongoClientCache:48 - Creating MongoClient: [visa1:27017]
    2019-01-06 21:35:19 INFO  MongoClientCache:48 - Closing MongoClient: [visa1:27017]
    2019-01-06 21:35:19 INFO  MongoClientCache:48 - Creating MongoClient: [visa1:27017]
    2019-01-06 21:35:19 INFO  MongoClientCache:48 - Closing MongoClient: [visa1:27017]
    2019-01-06 21:35:19 INFO  MongoClientCache:48 - Creating MongoClient: [visa1:27017]
    2019-01-06 21:35:19 INFO  MongoClientCache:48 - Closing MongoClient: [visa1:27017]
    2019-01-06 21:35:19 INFO  MongoClientCache:48 - Closing MongoClient: [visa1:27017]
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:11}] to visa1:27017
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:9}] to visa1:27017
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:12}] to visa1:27017
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:8}] to visa1:27017
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:10}] to visa1:27017
    2019-01-06 21:35:19 INFO  connection:71 - Opened connection [connectionId{localValue:7}] to visa1:27017
    2019-01-06 21:35:19 INFO  CodeGenerator:54 - Code generated in 259.273212 ms
    2019-01-06 21:35:20 INFO  Executor:54 - Finished task 12.0 in stage 1.0 (TID 13). 1586 bytes result sent to driver
    2019-01-06 21:35:20 INFO  Executor:54 - Finished task 8.0 in stage 1.0 (TID 9). 1586 bytes result sent to driver
    2019-01-06 21:35:20 INFO  Executor:54 - Finished task 16.0 in stage 1.0 (TID 17). 1586 bytes result sent to driver
    2019-01-06 21:35:20 INFO  Executor:54 - Finished task 0.0 in stage 1.0 (TID 1). 1586 bytes result sent to driver
    2019-01-06 21:35:20 INFO  Executor:54 - Finished task 20.0 in stage 1.0 (TID 21). 1586 bytes result sent to driver
    2019-01-06 21:35:20 INFO  Executor:54 - Finished task 4.0 in stage 1.0 (TID 5). 1586 bytes result sent to driver
    2019-01-06 21:35:25 INFO  MongoClientCache:48 - Closing MongoClient: [visa1:27017]
    2019-01-06 21:35:25 INFO  connection:71 - Closed connection [connectionId{localValue:9}] to visa1:27017 because the pool has been closed.
    2019-01-06 21:35:25 INFO  connection:71 - Closed connection [connectionId{localValue:7}] to visa1:27017 because the pool has been closed.
    2019-01-06 21:35:25 INFO  connection:71 - Closed connection [connectionId{localValue:10}] to visa1:27017 because the pool has been closed.
    2019-01-06 21:35:25 INFO  connection:71 - Closed connection [connectionId{localValue:11}] to visa1:27017 because the pool has been closed.
    2019-01-06 21:35:25 INFO  connection:71 - Closed connection [connectionId{localValue:12}] to visa1:27017 because the pool has been closed.
    2019-01-06 21:35:25 INFO  connection:71 - Closed connection [connectionId{localValue:8}] to visa1:27017 because the pool has been closed.
    

Обновленная информация (благодаря ответу @ kk1957)

Выполняя дальнейшие тесты, я теперь уверен, что проблема связана с инициализацией объекта SparkSession, выполненной в Jupyter Notebook:

  • когда я запускаю оболочку pyspark, все идет хорошо, если я использую объект "spark", созданный pyspark
  • но , если я создаю новую SparkSession, я воспроизводлю отсутствие результатов.

Использование сеанса искры по умолчанию:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>> df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 08:41:30 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df.count()
1162              

Но, создавая свой собственный объект сеанса искры:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>> 
>>> spark2 = SparkSession \
...     .builder \
...     .master('spark://visa1:7077') \
...     .appName("myApp") \
...     .config("spark.executor.memory", "4g") \
...     .config("spark.mongodb.input.uri", "mongodb://visa1/email.emails") \
...     .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...     .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0") \
...     .getOrCreate()
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 09:18:04 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df2.count()
0

Тот же код, атакующий одну MongoDB (без шардинга), отлично работает:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>> 
>>> spark2 = SparkSession \
...                     .builder \
...                     .appName("myApp") \
...                     .config("spark.mongodb.input.uri", "mongodb://singleMongoDB/email.emails") \
...                     .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...                     .getOrCreate()
>>> 
>>> df = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://singleMongoDB/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 09:04:58 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df.count()
2019-01-07 09:05:03 WARN  MongoShardedPartitioner:60 - Collection 'email.emails' does not appear to be sharded, continuing with a single partition. To split the collections into multiple partitions connect to the MongoDB node directly
1162

Вопрос:

Я почти уверен, что проблема в том, как объект SparkSession создается в блокноте Jupyter, когда он атакует защищенный кластер MongoDB.

Не могли бы вы помочь мне отладить проблему?

Заранее спасибо

1 Ответ

0 голосов
/ 07 января 2019

Несколько предложений:

1) Вы пытались подключиться к Mongo db на главной машине? просто чтобы убедиться, что между монго и мастером нет ничего.

2) Попробуйте запустить кластер в более простой конфигурации (без какого-либо исполнителя или только одного исполнителя) и посмотрите, поможет ли это найти основную причину.

...