В двух разных программах, которые используют разные типы записи в коллекцию MongoDB, в эту коллекцию иногда записываются дополнительные записи. Из-за размера коллекции (более 200 млн. Записей) и обработки загрузки для них, выяснение, почему происходит эта (казалось бы, недетерминированная) дополнительная загрузка, сэкономит нам большое количество времени на повторную загрузку процесса.
Некоторая дополнительная информация:
В обеих программах я проверяю перед записью, что правильное количество записей было создано и существует (216,45 млн. Выходных записей в журналах MapReduce, 216,45 млн. Записей, когда я выполняю подсчет на СДР, содержащем документы bson, которые нужно записать). При последней загрузке в Mongo было загружено 239,8 млн записей. Средний размер документов в этой коллекции с дополнительными записями точно такой же, как средний размер документов в правильно загруженной коллекции, единственное различие заключается в количестве документов и общем размере.
Прямо сейчас я занимаюсь разделением здесь, чтобы создание документов bson было записано в папку hdfs, а затем другая программа загрузит его в Mongo. Таким образом, мне не нужно проходить все процессы присоединения и воссоздания документов каждый раз, когда мне нужно проверить это.
В какой-то момент, если я не смогу найти какие-либо решения, я напишу программу для анализа различий между этими двумя коллекциями, чтобы увидеть, пишутся ли они просто как дубликаты записей. Это было мое предположение из-за того, что средний размер документов был одинаковым, поэтому я еще не выделил на это свое время.
В Spark, вот процесс записи.
writeMeToMongo - это СДР документов в формате json, проверенное на правильное количество документов. Внутренний код каким-то образом иногда записывает в Mongo очень большое количество дополнительных документов.
// numMongoThreads is traditionally ~200
writeMeToMongo.repartition(numMongoThreads).foreachPartition(jsonsToWrite => {
if(outputDB == null || StringUtils.isEmpty(outputDB)) {
throw new Exception("Output DB was never set.")
}
// connectionString is a Broadcast[String]
val connectionURI: MongoClientURI = new MongoClientURI(connectionString.value)
val mongoClient: MongoClient = new MongoClient(connectionURI)
val mongoDatabase: MongoDatabase = mongoClient.getDatabase(outputDB)
// to setup the lazily-evaluated mongoDatabase
mongoDatabase.getCollection("dummy").find.first
if(mongoDatabase.listCollectionNames == null) {
throw new Exception("No collections in this mongo database.")
}
//validatedArgs is a Broadcast containing some arguments (such as collectionName)
val collectionConnector: MongoCollection[Document] = mongoDatabase.getCollection(validatedArgs.value.collection)
// buffer up to maxMongoDocsPerWrite # of documents before doing a bulk write
// I have used 1500 for this for testing, please feel free to recommend a different value
var documentsBuffer: ListBuffer[Document] = new ListBuffer[Document]
var docCounter = 0
while(jsonsToWrite.hasNext && docCounter < validatedArgs.value.maxMongoDocsPerWrite) {
documentsBuffer += Document.parse(jsonsToWrite.next)
docCounter = docCounter + 1
if(docCounter == validatedArgs.value.maxMongoDocsPerWrite) {
val lotsODocs = documentsBuffer.toList.asJava
collectionConnector.insertMany(lotsODocs)
docCounter = 0
documentsBuffer = new ListBuffer[Document]
}
}
val leftoverDocuments = documentsBuffer.toList.asJava
if(!leftoverDocuments.isEmpty) {
collectionConnector.insertMany(leftoverDocuments)
}
})
Для MapReduce код сложнее кратко воспроизвести здесь, поэтому я не включил его.
Мне известно, что существуют другие методы для записи в MongoDB из RDD, содержащей документы, но я полагаю, что из-за некоторых требований мы должны использовать этот метод, поскольку у нас есть некоторые специальные ограничения на строку подключения, и мы должны проверить подключение строка, поэтому мы не можем использовать параметры для этого.