Spark DataFrame фильтрация дает противоречивый вывод - PullRequest
0 голосов
/ 31 мая 2018

У меня есть коллекция MongoDB с 26 000 записей, которые я читаю в DataFrame.У него есть столбец column_1 со строковым значением column_1_value во всех записях.Я пытаюсь отфильтровать DataFrame и получить счетчик следующим образом

val readConfig = ReadConfig(Map("collection" -> collectionName,"spark.mongodb.input.readPreference.name" -> "primaryPreferred", "spark.mongodb.input.database" -> dataBaseName, "spark.mongodb.input.uri" -> hostName))
val dataFrame = MongoSpark.load(spark, readConfig)
df.filter(df.col("column_1") === "column_1_value").count()

Где spark - это экземпляр SparkSession.

Структура записи MongoDB выглядит примерно так.

{
    "_id" : ObjectId("SOME_ID"),
    "column_1" : "column_1_value",
    "column_2" : SOME_VALUE,
    "column_3" : SOME_OTHER_VALUE
}

Нет вложенной структуры, и все записи имеют одинаковый набор полей.Я не получаю доступ к БД в течение всего времени, пока Spark работает

Поскольку все записи имеют одинаковое значение column_1, я ожидал получить сам размер DataFrame в качестве вывода, но вместо этого я получаюменьшее значение.Мало того, я получаю разные результаты каждый раз, когда я запускаю выше.Результат обычно варьируется от 15 000 до 24 000.

Но тот же подход, похоже, работает, когда размер коллекции меньше, около 5000.

Я пробовал следующие подходы безуспешно

  • Используется equalTo вместо ===
  • Используется $column_1
  • Используетсяisin
  • Используется df.where вместо df.filter
  • Используется createOrReplaceTempView и выполняется SQL-запрос

Единственное, что работает, это либоdf.cache() или df.persist(), ни один из которых, я думаю, не будет хорошим для общей производительности при работе с большими данными.

Что может быть возможной причиной такого поведения и путей, с помощью которых это можно решить?

Моя версия Spark - 2.2.0, версия MongoDB - 3.4.Я использую Spark в локальном режиме с 16 ГБ ОЗУ и 8-ядерным процессором.

Редактировать 1

Попытка изменить политику разбиения коннектора Mongo Spark следующим образом, но безуспешно.

val readConfig = ReadConfig(Map("collection" -> collectionName,"spark.mongodb.input.readPreference.name" -> "primaryPreferred", "spark.mongodb.input.database" -> dataBaseName, "spark.mongodb.input.uri" -> hostName, "spark.mongodb.input.partitionerOptions" -> "MongoPaginateBySizePartitioner"))
val dataFrame = MongoSpark.load(spark, readConfig)
df.count()
df.filter(df.col("column_1") === "column_1_value").count()

При первом подсчете возвращается правильное значение, даже при использовании стратегии секционирования по умолчанию, из-за чего я полагаю, что Mongo Connector работает нормально.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...