У меня есть коллекция MongoDB с 26 000 записей, которые я читаю в DataFrame.У него есть столбец column_1
со строковым значением column_1_value
во всех записях.Я пытаюсь отфильтровать DataFrame и получить счетчик следующим образом
val readConfig = ReadConfig(Map("collection" -> collectionName,"spark.mongodb.input.readPreference.name" -> "primaryPreferred", "spark.mongodb.input.database" -> dataBaseName, "spark.mongodb.input.uri" -> hostName))
val dataFrame = MongoSpark.load(spark, readConfig)
df.filter(df.col("column_1") === "column_1_value").count()
Где spark
- это экземпляр SparkSession
.
Структура записи MongoDB выглядит примерно так.
{
"_id" : ObjectId("SOME_ID"),
"column_1" : "column_1_value",
"column_2" : SOME_VALUE,
"column_3" : SOME_OTHER_VALUE
}
Нет вложенной структуры, и все записи имеют одинаковый набор полей.Я не получаю доступ к БД в течение всего времени, пока Spark работает
Поскольку все записи имеют одинаковое значение column_1
, я ожидал получить сам размер DataFrame в качестве вывода, но вместо этого я получаюменьшее значение.Мало того, я получаю разные результаты каждый раз, когда я запускаю выше.Результат обычно варьируется от 15 000 до 24 000.
Но тот же подход, похоже, работает, когда размер коллекции меньше, около 5000.
Я пробовал следующие подходы безуспешно
- Используется
equalTo
вместо ===
- Используется
$column_1
- Используется
isin
- Используется
df.where
вместо df.filter
- Используется
createOrReplaceTempView
и выполняется SQL-запрос
Единственное, что работает, это либоdf.cache()
или df.persist()
, ни один из которых, я думаю, не будет хорошим для общей производительности при работе с большими данными.
Что может быть возможной причиной такого поведения и путей, с помощью которых это можно решить?
Моя версия Spark - 2.2.0, версия MongoDB - 3.4.Я использую Spark в локальном режиме с 16 ГБ ОЗУ и 8-ядерным процессором.
Редактировать 1
Попытка изменить политику разбиения коннектора Mongo Spark следующим образом, но безуспешно.
val readConfig = ReadConfig(Map("collection" -> collectionName,"spark.mongodb.input.readPreference.name" -> "primaryPreferred", "spark.mongodb.input.database" -> dataBaseName, "spark.mongodb.input.uri" -> hostName, "spark.mongodb.input.partitionerOptions" -> "MongoPaginateBySizePartitioner"))
val dataFrame = MongoSpark.load(spark, readConfig)
df.count()
df.filter(df.col("column_1") === "column_1_value").count()
При первом подсчете возвращается правильное значение, даже при использовании стратегии секционирования по умолчанию, из-за чего я полагаю, что Mongo Connector работает нормально.