Разъем Spark Mongo, MongoShardedPartitioner не работает - PullRequest
0 голосов
/ 08 января 2019

В целях тестирования я настроил кластер из 4 узлов, каждый из которых имеет Spark Worker и MongoDB Shard. Это детали:

  • Четыре сервера Debian 9 (с именами visa0, visa1, visa2, visa3)
  • кластер Spark (v2.4.0) на 4 узла (visa1: master, visa0..3: slaves)
  • MongoDB (v3.2.11) сегментированный кластер с 4 узлами (реплика сервера конфигурации установлена ​​на visa1..3, mongos на visa1, серверы shard: visa0.3)
  • Я использую соединитель MongoDB Spark, установленный с "spark-shell --packages" org.mongodb.spark: mongo-spark-connector_2.11: 2.4.0 "

При настройке SparkSession с MongoShardedPartitioner каждый загруженный из базы данных фрейм данных пуст, хотя схема фрейма данных выбрана правильно.

Воспроизводится, либо конфигурация выполняется в файле spark-defaults.conf или с помощью .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") в конструкторе SparkSession.

С MongoShardedPartitioner, df.count () == 0:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...   .getOrCreate()
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
>>>                                                                             
>>> df2.count()
0  

Но работает правильно без указания разделителя:

./pyspark --master "spark://visa1:7077" --packages "org.mongodb.spark:mongo-spark-connector_2.11:2.4.0"

...

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:27:03)
SparkSession available as 'spark'.
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .getOrCreate()
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()
2019-01-07 22:7:33 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df2.count()
1162  

Вопросы:

  • Как узнать, какой разделитель настроен по умолчанию?
  • Как можно использовать MongoShardedPartitioner в этом сценарии?

Заранее спасибо

13 января 2019 г.: рекомендуемое решение

Как показано ниже, кажется, что MongoShardedPartitioner не поддерживает хешированные индексы как индекс шарда. Однако мне нужен хеш-индекс для равномерного распределения чанков по моим узлам независимо от времени (я полагаю, использование _id будет распределять в хронологическом порядке).

Мой обходной путь - создать новое поле в базе данных с вычисленным хэшем md5 блока даты, индексировать его (как обычный индекс) и использовать его в качестве индекса шарда.

Теперь код работает нормально:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.5.3 (default, Sep 27 2018 17:25:39)
SparkSession available as 'spark'.
>>> 
>>> 
>>> spark2 = SparkSession \
...   .builder \
...   .appName("myApp") \
...   .config("spark.mongodb.input.partitioner" ,"MongoShardedPartitioner") \
...   .config("spark.mongodb.input.partitionerOptions.shardkey", "datebuckethash") \
...   .getOrCreate()
>>> 
>>> 
>>> df2 = spark2.read.format("com.mongodb.spark.sql.DefaultSource") \
... .option("uri", "mongodb://visa1/email.emails") \
... .option("pipeline", '[ {"$match": {"mailbox": /^\/root\/pst_export\/albert_meyers_000_1_1.export/}} ]') \
... .load()

2019-01-13 11:19:31 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
>>> 
>>> df2.count()
1162   

1 Ответ

0 голосов
/ 10 января 2019

Извините, Хосе, я слышал, что у вас проблема с разъемом.

Как узнать, какой разделитель настроен по умолчанию?

Информацию о разделителях можно найти на сайте документации Spark Connector . Пожалуйста, отправьте заявку в Документы Jira Project , если вы чувствуете, что что-то отсутствует или неясно, это действительно может помочь будущим пользователям!

Разделителем по умолчанию является тонкая оболочка вокруг MongoSamplePartitioner. Он разбивает коллекцию на разделы по размерам на основе статистической выборки коллекции.

Как можно использовать MongoShardedPartitioner в этом сценарии?

MongoShardedPartitioner использует shardKey для генерации разделов. По умолчанию он будет использовать _id в качестве ключа. Возможно, вам придется настроить это значение.

Примечание: Хешированные shardkeys не поддерживаются MongoShardedPartitioner, поскольку в настоящее время нет способа запросить коллекцию по хеш-значению - поэтому при получении разделов она не сможет вернуть результаты. Я добавил DOCS-12345 для обновления документации.

Похоже, в вашей настройке есть проблема, когда MongoShardedPartitioner не может разделить коллекцию, как ожидалось, и возвращает 0 результатов. Вывод схемы по-прежнему будет работать из-за того, как он запрашивает коллекцию. Если проблема не в config / hashed shardkey, тогда проблема, пожалуйста, сообщите об ошибке в проекте Spark jira , и я могу помочь определить причину и выпустить для вас исправление.

...