ежедневное обновление sh Коллекции MongoDB с Вставкой и обновлением - PullRequest
0 голосов
/ 12 апреля 2020

У меня есть коллекция MongoDB, где данные нужно обновлять для определенных полей каждую ночь. Коллекция Target имеет 3 дополнительных настраиваемых поля, которые используются конечным пользователем для ввода соответствующих документов.

Таким образом, когда ежедневное обновление sh происходит в течение ночи, источник данных может отправлять новые документы или обновлять данные существующих документов. Документы могут быть до 10000.

Я использую Pymon go и MongoDB для достижения этой цели. Моя проблема в том, как определить, какую запись необходимо обновить, а какую запись нужно вставить с этими 3 дополнительными настраиваемыми полями, не влияя на данные конечного пользователя.

Например:

Источник данных:

Manufacture Name       Model        Year    Units
BMW                    5Series      2019      10
BMW                    5Series      2020       5
AUDI                   A4           2020      20 
AUDI                   A7           2019       3
TOYOTA                 COROLLA      2020       5
TOYOTA                 CAMRY        2020       6
HONDA                  ACCORD       2020       10
HONDA                  PILOT        2019       15  
HONDA                  CRV          2019       20 

После загрузки таблица приложения содержит 1 настраиваемый столбец (расположение) для пользовательского ввода

Manufacture Name       Model        Year     Location   Units
BMW                    5Series      2019     London       10 
BMW                    5Series      2020     New York     5  
AUDI                   A4           2020     Melbourne    20
AUDI                   A7           2019     London       3
TOYOTA                 COROLLA      2020     New York     5
TOYOTA                 CAMRY        2020     London       6  
HONDA                  ACCORD       2020     Sydney       10 
HONDA                  PILOT        2019     Tokyo        15
HONDA                  CRV          2019

Во второй день мы получаем новые данные, как показано ниже

Manufacture Name       Model        Year    Units
BMW                    5Series      2019      10
BMW                    5Series      2020       **35**
**BMW                    7Series      2020       12**
AUDI                   A4           2020      20 
AUDI                   A7           2019       3
**AUDI                   A6           2019       1**
TOYOTA                 COROLLA      2020       5
TOYOTA                 CAMRY        2020       6
HONDA                  ACCORD       2020       10
HONDA                  PILOT        2019       15  
*HONDA                  CRV          2019       20*       *-- deleted -- in second refresh*

Данные могут быть 10000 записей. Как этого добиться с помощью Pymon go или MongodB? Я написал код в PyMon go до получения исходных данных и сохранения курсора в словаре. Не уверен, как действовать после этого, используя MongoDB Upsert или массовую запись, а также сохранять / обновлять данные столбца Location для существующих записей и назначать значения NULL для новых записей.

Спасибо

1 Ответ

0 голосов
/ 20 апреля 2020

наконец это достигается как показано ниже:

import pymongo
from pymongo import UpdateOne


# Define MongoDB connection
client = pymongo.MongoClient(server, username='User', password='password', authSource='DBName', authMechanism='SCRAM-SHA-256')

#Source table    
collection2 = db['cars2']

count123 = collection2.count_documents({})

#print("New Cars2 Data - Count before Insert:",count123)

source_cursor = collection2.find()

print("Cars2 - Count in Cursor:",source_cursor.count())

#Target Table    
collection = db['cars']

tgt_count = collection.count_documents({})

print("Cars Collection - Count before Insert:",tgt_count)

sourcedata = []

#since this is a MongoDB Cursor object, we need push to List using list()
sourcedata = list(source_cursor)

source_cursor.close()



# ADD new columns to the Data before inserting in MongoDB
for item in sourcedata:    

    item['Location'] = None        
    item['last_refresh'] =  datetime.now()


#sourcedata = source_cursor

ops = []

if tgt_count == 0:

    print("Loading for First time:")
    for rec in sourcedata:        
        #Load Data for first time with new fields
        ops.append( UpdateOne({'name': rec['name'],'model':rec['model']}, {'$set': {'name':rec['name'],'model':rec['model'],'year':rec['year'],'units':rec['units'], 'Location': rec['Location'],'last_refresh':rec['last_refresh']}}, upsert=True))

    #print(ops)

    result = collection.bulk_write(ops)


    print("Inserted Count:", result.inserted_count)
    print("Matched Count:", result.matched_count)
    print("Modified Count:", result.modified_count)
    print("Upserted Count:", result.upserted_count)

elif tgt_count > 0:

    print("Updating the Load:")
    for rec in sourcedata:        
        #No Location field is included to avoid replacing the values of this field by NULL
        ops.append( UpdateOne({'name': rec['name'],'model':rec['model']}, {'$set': {'name':rec['name'],'model':rec['model'],'year':rec['year'],'units':rec['units'], 'last_refresh':rec['last_refresh']}}, upsert=True))

    result = collection.bulk_write(ops)

    print("Inserted Count:", result.inserted_count)
    print("Matched Count:", result.matched_count)
    print("Modified Count:", result.modified_count)
    print("Upserted Count:", result.upserted_count)

 #because I didn’t include “Location” field above, the new UPSERT records DO NOT have “Location” field anymore. So I have to update the collection one more time to include “Location” field        

    nullfld_result = collection.update_many({'Location':None},{ '$set':{'Location':None}})


count2 = collection.count_documents({})

print("Count after Insert:",count2)
...