Загрузка дополнительных записей в MongoDB как в MapReduce, так и в Spark (с использованием другой структуры записи) - PullRequest
1 голос
/ 05 июня 2019

В двух разных программах, которые используют разные типы записи в коллекцию MongoDB, в эту коллекцию иногда записываются дополнительные записи. Из-за размера коллекции (более 200 млн. Записей) и обработки загрузки для них, выяснение, почему происходит эта (казалось бы, недетерминированная) дополнительная загрузка, сэкономит нам большое количество времени на повторную загрузку процесса.

Некоторая дополнительная информация: В обеих программах я проверяю перед записью, что правильное количество записей было создано и существует (216,45 млн. Выходных записей в журналах MapReduce, 216,45 млн. Записей, когда я выполняю подсчет на СДР, содержащем документы bson, которые нужно записать). При последней загрузке в Mongo было загружено 239,8 млн записей. Средний размер документов в этой коллекции с дополнительными записями точно такой же, как средний размер документов в правильно загруженной коллекции, единственное различие заключается в количестве документов и общем размере.

Прямо сейчас я занимаюсь разделением здесь, чтобы создание документов bson было записано в папку hdfs, а затем другая программа загрузит его в Mongo. Таким образом, мне не нужно проходить все процессы присоединения и воссоздания документов каждый раз, когда мне нужно проверить это.

В какой-то момент, если я не смогу найти какие-либо решения, я напишу программу для анализа различий между этими двумя коллекциями, чтобы увидеть, пишутся ли они просто как дубликаты записей. Это было мое предположение из-за того, что средний размер документов был одинаковым, поэтому я еще не выделил на это свое время.

В Spark, вот процесс записи. writeMeToMongo - это СДР документов в формате json, проверенное на правильное количество документов. Внутренний код каким-то образом иногда записывает в Mongo очень большое количество дополнительных документов.

// numMongoThreads is traditionally ~200
writeMeToMongo.repartition(numMongoThreads).foreachPartition(jsonsToWrite => {

      if(outputDB == null || StringUtils.isEmpty(outputDB)) {
        throw new Exception("Output DB was never set.")
      }

      // connectionString is a Broadcast[String]
      val connectionURI: MongoClientURI = new MongoClientURI(connectionString.value)
      val mongoClient: MongoClient = new MongoClient(connectionURI)
      val mongoDatabase: MongoDatabase = mongoClient.getDatabase(outputDB)
      // to setup the lazily-evaluated mongoDatabase
      mongoDatabase.getCollection("dummy").find.first

      if(mongoDatabase.listCollectionNames == null) {
        throw new Exception("No collections in this mongo database.")
      }

      //validatedArgs is a Broadcast containing some arguments (such as collectionName)
      val collectionConnector: MongoCollection[Document] = mongoDatabase.getCollection(validatedArgs.value.collection)

      // buffer up to maxMongoDocsPerWrite # of documents before doing a bulk write
      // I have used 1500 for this for testing, please feel free to recommend a different value
      var documentsBuffer: ListBuffer[Document] = new ListBuffer[Document]
      var docCounter = 0
      while(jsonsToWrite.hasNext && docCounter < validatedArgs.value.maxMongoDocsPerWrite) {
        documentsBuffer += Document.parse(jsonsToWrite.next)
        docCounter = docCounter + 1
        if(docCounter == validatedArgs.value.maxMongoDocsPerWrite) {
          val lotsODocs = documentsBuffer.toList.asJava
          collectionConnector.insertMany(lotsODocs)
          docCounter = 0
          documentsBuffer = new ListBuffer[Document]
        }
      }

      val leftoverDocuments = documentsBuffer.toList.asJava
      if(!leftoverDocuments.isEmpty) {
        collectionConnector.insertMany(leftoverDocuments)
      }
    })

Для MapReduce код сложнее кратко воспроизвести здесь, поэтому я не включил его.

Мне известно, что существуют другие методы для записи в MongoDB из RDD, содержащей документы, но я полагаю, что из-за некоторых требований мы должны использовать этот метод, поскольку у нас есть некоторые специальные ограничения на строку подключения, и мы должны проверить подключение строка, поэтому мы не можем использовать параметры для этого.

...