PyMon go: Как выполнить массовое обновление огромных JSON данных в MongoDB - PullRequest
5 голосов
/ 17 января 2020

Я извлекаю JSON данных из API, с выводом, подобным этому:

[[{'employeeId': 1, 'lastName': 'Smith'}, {'employeeId': 2, 'lastName': 'Flores'}]]

В списке около 250k объектов. Я могу перебирать объекты в списке и делать update_one через PyMon go таким образом:

json_this = json.dumps(json_list[0])
json_that = json.loads(json_this)
for x in json_that:
    collection.update_one({"employeeId": x['employeeId']},{"$set": x},upsert=True)

Но с 250k записи это занимает много времени. Я пытаюсь использовать update_many, но не могу понять, как правильно преобразовать / отформатировать этот список JSON для использования функции update_many. Любое руководство будет оценено.

1 Ответ

1 голос
/ 17 января 2020

Обновление / вставка 250K документов в базу данных может быть сложной задачей. Вы не можете использовать update_many, так как запрос фильтра и значения обновления меняются между каждым словарем. Таким образом, с помощью приведенного ниже запроса вы можете избежать нескольких обращений к базе данных, но я не совсем уверен, насколько хорошо это работает для вашего сценария. Обратите внимание, что я новичок в python, и это базовый c код, который даст вам идея:

Лучшее, что вы можете сделать для массовых операций, это PyMon go -bulk из-за ограничений .bulkWrite () , которые мы разделяем 250K записывает на куски:

from pymongo import UpdateOne
from pprint import pprint
import sys

json_this = json.dumps(json_list[0])
json_that = json.loads(json_this)

primaryBulkArr = []
secondaryBulkArr = []
thirdBulkArr = []

## Here we're splicing 250K records into 3 arrays, in case if we want to finish a chunk at a time,
 # No need to splice all at once - Finish end - to - end for one chunk & restart the process for another chunk from the index of the list where you left previously

for index, x in enumerate(json_that):
    if index < 90000:
        primaryBulkArr.append(
            UpdateOne({"employeeId": x['employeeId']}, {'$set': x}, upsert=True))
    elif index > 90000 and index < 180000:
        secondaryBulkArr.append(
            UpdateOne({"employeeId": x['employeeId']}, {'$set': x}, upsert=True))
    else:
        thirdBulkArr.append(
            UpdateOne({"employeeId": x['employeeId']}, {'$set': x}, upsert=True))

## Reason why I've spliced into 3 arrays is may be you can run below code in parallel if your DB & application servers can take it,
# At the end of the day irrespective of time taken only 3 DB calls are needed & this bulk op is much efficient.
try:
    result = collection.bulk_write(bulkArr)
    ## result = db.test.bulk_write(bulkArr, ordered=False)
    # Opt for above if you want to proceed on all dictionaries to be updated, even though an error occured in between for one dict
    pprint(result.bulk_api_result)
except:
    e = sys.exc_info()[0]
    print("An exception occurred ::", e) ## Get the ids failed if any & do re-try
...