В целях тестирования я настроил кластер из 4 узлов, каждый из которых имеет Spark Worker и MongoDB Shard. Это детали:
- Четыре сервера Debian 9 (с именами visa0, visa1, visa2, visa3)
- кластер Spark (v2.4.0) на 4 узла (visa1: master, visa0..3: slaves)
- MongoDB (v3.2.11) сегментированный кластер с 4 узлами (реплика сервера конфигурации установлена на visa1..3, mongos на visa1, серверы shard: visa0.3)
- Я использую соединитель MongoDB Spark, установленный с "spark-shell --packages"
org.mongodb.spark: mongo-spark-connector_2.11: 2.4.0 "
При настройке SparkSession с MongoShardedPartitioner
каждый загруженный из базы данных фрейм данных пуст, хотя схема фрейма данных выбрана правильно.
Воспроизводится, либо конфигурация выполняется в файле spark-defaults.conf
или с помощью .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner")
в конструкторе SparkSession.
С MongoShardedPartitioner
, df.count () == 0:
./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
... .getOrCreate()
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
>>>
>>> df2.count()
0
Но работает правильно без указания разделителя:
./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .getOrCreate()
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 22:7:33 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>>
>>> df2.count()
1162
Вопросы:
- Как узнать, какой разделитель настроен по умолчанию?
- Как можно использовать
MongoShardedPartitioner
в этом сценарии?
Заранее спасибо
13 января 2019 г.: рекомендуемое решение
Как показано ниже, кажется, что MongoShardedPartitioner
не поддерживает хешированные индексы как индекс шарда. Однако мне нужен хеш-индекс для равномерного распределения чанков по моим узлам независимо от времени (я полагаю, использование _id будет распределять в хронологическом порядке).
Мой обходной путь - создать новое поле в базе данных с вычисленным хэшем md5 блока даты, индексировать его (как обычный индекс) и использовать его в качестве индекса шарда.
Теперь код работает нормально:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>>
>>>
>>> spark2 = SparkSession \
... .builder \
... .appName("myApp") \
... .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
... .config("spark.mongodb.input.partitionerOptions.shardkey", "datebuckethash") \
... .getOrCreate()
>>>
>>>
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-13 11:19:31 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>>
>>> df2.count()
1162