Fast or Bulk Upsert в пимонго - PullRequest
32 голосов
/ 14 марта 2011

Как я могу сделать массовую вставку в пимонго? Я хочу обновить кучу записей и делать их по одной очень медленно.

Ответ на почти идентичный вопрос находится здесь: Массовое обновление / вставка в MongoDB?

Принятый ответ на самом деле не отвечает на вопрос. Он просто дает ссылку на CLI Mongo для импорта / экспорта.

Я также был бы открыт для того, чтобы кто-то объяснил, почему выполнение массовой загрузки невозможно / не рекомендуется, но, пожалуйста, объясните, какое предпочтительное решение проблемы такого рода.

Ответы [ 6 ]

30 голосов
/ 25 марта 2014

MongoDB 2.6+ поддерживает массовые операции.Это включает в себя массовые вставки, вставки, обновления и т. Д. Суть этого состоит в том, чтобы уменьшить / устранить задержки из-за задержек при передаче данных «запись за записью» («документ за документом» должен быть правильным).

Итак, как это работает?Пример на Python, потому что это то, над чем я работаю.

>>> import pymongo
>>> pymongo.version
'2.7rc0'

Чтобы использовать эту функцию, мы создаем объект «навальный», добавляем к нему документы, затем вызываем на нем команду execute, и он отправит всеобновления сразу.Предостережения: размер BSON собранных операций (сумма размеров bsonsizes) не может превышать ограничение размера документа в 16 МБ.Конечно, количество операций может значительно варьироваться. Ваш пробег может меняться.

Пример операции Pymongo of Bulk:

import pymongo
conn = pymongo.MongoClient('myserver', 8839)
db = conn['mydbname']
coll = db.myCollection
bulkop = coll.initialize_ordered_bulk_op()
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':1}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':2}})
retval = bulkop.find({'field1':1}).upsert().update({'$push':{'vals':3}})
retval = bulkop.execute()

Это основной метод.Дополнительная информация доступна по адресу:

http://api.mongodb.org/python/2.7rc1/examples/bulk.html

Редактировать: - начиная с версии 3.5 драйвера питона, initialize_ordered_bulk_op устарела.Вместо этого используйте bulk_write ().[http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write]

26 голосов
/ 25 марта 2016

Современные выпуски pymongo (больше 3.x) объединяют массовые операции в единый интерфейс, который понижает версию, когда выпуск сервера не поддерживает массовые операции. Это теперь согласуется с официально поддерживаемыми драйверами MongoDB.

Таким образом, предпочтительный метод кодирования - вместо этого использовать bulk_write(), где вместо этого вы используете UpdateOne другое другое соответствующее действие операции. И теперь, конечно, предпочтительнее использовать списки естественных языков, а не конкретный компоновщик

Прямой перевод старой документации:

from pymongo import UpdateOne

operations = [
    UpdateOne({ "field1": 1},{ "$push": { "vals": 1 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 2 } },upsert=True),
    UpdateOne({ "field1": 1},{ "$push": { "vals": 3 } },upsert=True)
]

result = collection.bulk_write(operations)

Или классический цикл преобразования документов:

import random
from pymongo import UpdateOne

random.seed()

operations = []

for doc in collection.find():
    # Set a random number on every document update
    operations.append(
        UpdateOne({ "_id": doc["_id"] },{ "$set": { "random": random.randint(0,10) } })
    )

    # Send once every 1000 in batch
    if ( len(operations) == 1000 ):
        collection.bulk_write(operations,ordered=False)
        operations = []

if ( len(operations) > 0 ):
    collection.bulk_write(operations,ordered=False)

Возвращенный результат имеет значение BulkWriteResult, которое будет содержать счетчики совпадающих и обновленных документов, а также возвращенные значения _id для любых возникающих "upserts".

Существует некоторое неправильное представление о размере массива массовых операций. Фактический запрос, отправленный на сервер, не может превышать предел BSON в 16 МБ, поскольку этот предел также применяется к «запросу», отправляемому на сервер, который также использует формат BSON.

Однако это не влияет на размер массива запросов, который вы можете построить, так как фактические операции все равно будут отправляться и обрабатываться только партиями по 1000 штук. Единственным реальным ограничением является то, что сами эти 1000 рабочих инструкций фактически не создают документ BSON, размер которого превышает 16 МБ. Это действительно довольно высокий заказ.

Общая концепция массовых методов - «меньше трафика», в результате отправки сразу нескольких вещей и обработки только одного ответа сервера. Сокращение этих накладных расходов, связанных с каждым запросом на обновление, экономит много времени.

5 голосов
/ 14 марта 2011

Ответ остается прежним: поддержка массовых рассылок не поддерживается.

1 голос
/ 14 марта 2011

Вы можете обновить все документы, соответствующие вашей спецификации запроса, используя multi = True.

Здесь есть ошибка , связанная с выполнением пакета команд так, как вы хотите.

0 голосов
/ 25 января 2018

если у вас много данных, и вы хотите использовать «_id» для оценки, если данные существуют,

вы можете попробовать ...

import pymongo
from pymongo import UpdateOne
client = pymongo.MongoClient('localhost', 27017)
db=client['sampleDB']

collectionInfo = db.sample

#sample data
datas=[
    {"_id":123456,"name":"aaa","N":1,"comment":"first sample","lat":22,"lng":33},
    {"_id":234567,"name":"aaa","N":1,"comment":"second sample","lat":22,"lng":33},
    {"_id":345678,"name":"aaa","N":1,"comment":"xxx sample","lat":22,"lng":33},
    {"_id":456789,"name":"aaa","N":1,"comment":"yyy sample","lat":22,"lng":33},
    {"_id":123456,"name":"aaaaaaaaaaaaaaaaaa","N":1,"comment":"zzz sample","lat":22,"lng":33},
    {"_id":11111111,"name":"aaa","N":1,"comment":"zzz sample","lat":22,"lng":33}
]

#you should split judge item and other data 
ids=[data.pop("_id") for data in datas]

operations=[UpdateOne({"_id":idn},{'$set':data},upsert=True) for idn ,data in zip(ids,datas)]

collectionInfo.bulk_write(operations)

Мой английский очень плохой,если вы не можете понять, что я говорю, извините

0 голосов
/ 07 декабря 2016

Самое быстрое массовое обновление с Python 3.5+, двигателем и asyncio:

import asyncio
import datetime
import logging
import random
import time

import motor.motor_asyncio
import pymongo.errors


async def execute_bulk(bulk):
    try:
        await bulk.execute()
    except pymongo.errors.BulkWriteError as err:
        logging.error(err.details)


async def main():
    cnt = 0
    bulk = db.initialize_unordered_bulk_op()
    tasks = []
    async for document in db.find({}, {}, no_cursor_timeout=True):
        cnt += 1
        bulk.find({'_id': document['_id']}).update({'$set': {"random": random.randint(0,10)}})
        if not cnt % 1000:
            task = asyncio.ensure_future(execute_bulk(bulk))
            tasks.append(task)
            bulk = db.initialize_unordered_bulk_op()
    if cnt % 1000:
        task = asyncio.ensure_future(bulk.execute(bulk))
        tasks.append(task)
    logging.info('%s processed', cnt)
    await asyncio.gather(*tasks)


logging.basicConfig(level='INFO')    
db = motor.motor_asyncio.AsyncIOMotorClient()['database']['collection']
start_time = time.time()
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    execution_time = time.time() - start_time
    logging.info('Execution time: %s', datetime.timedelta(seconds=execution_time))
...