Pymongo объемное время постоянно увеличивается - PullRequest
0 голосов
/ 29 марта 2019

Я работаю над проектом, в котором мне нужно обработать огромное количество данных.

В рамках обработки я выполняю следующие шаги -

  1. Чтение строки CSV
  2. Перевести строку CSV в словарь
  3. Массовое сохранение 1000 строк в MongoDB
  4. Итерация до конца файла CSV

Логика довольно проста инормально работает без проблем для миллионов записей.

Однако время bulk_write постоянно увеличивается.

Первый вызов занимает 0,55 секунды и начинает увеличиваться в течениеследующие партии.

Статистика

enter image description here

РЕДАКТИРОВАТЬ - Добавление журналов базы данных

Добавление базы данныхжурналы, чтобы доказать, что время базы данных постоянно увеличивается (показано в конце строки журнала)

2019-03-29T10:55:43.726+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 4439, w: 4439 } }, Database: { acquireCount: { w: 4438, W: 1 } }, Collection: { acquireCount: { w: 4438 } } } protocol:op_query 531ms

2019-03-29T10:55:45.214+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 12250, w: 12250 } }, Database: { acquireCount: { w: 12250 } }, Collection: { acquireCount: { w: 12250 } } } protocol:op_query 1040ms

2019-03-29T10:55:47.041+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 20059, w: 20059 } }, Database: { acquireCount: { w: 20059 } }, Collection: { acquireCount: { w: 20059 } } } protocol:op_query 1406ms

2019-03-29T10:55:49.390+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 27870, w: 27870 } }, Database: { acquireCount: { w: 27870 } }, Collection: { acquireCount: { w: 27870 } } } protocol:op_query 1733ms

2019-03-29T10:55:52.099+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 35679, w: 35679 } }, Database: { acquireCount: { w: 35679 } }, Collection: { acquireCount: { w: 35679 } } } protocol:op_query 2209ms

2019-03-29T10:55:55.046+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 43503, w: 43503 } }, Database: { acquireCount: { w: 43503 } }, Collection: { acquireCount: { w: 43503 } } } protocol:op_query 2453ms

2019-03-29T10:55:58.724+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 51325, w: 51325 } }, Database: { acquireCount: { w: 51325 } }, Collection: { acquireCount: { w: 51325 } } } protocol:op_query 3146ms

2019-03-29T10:56:02.565+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 59127, w: 59127 } }, Database: { acquireCount: { w: 59127 } }, Collection: { acquireCount: { w: 59127 } } } protocol:op_query 3413ms

2019-03-29T10:56:06.820+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 66941, w: 66941 } }, Database: { acquireCount: { w: 66941 } }, Collection: { acquireCount: { w: 66941 } } } protocol:op_query 3868ms

2019-03-29T10:56:11.890+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 74747, w: 74747 } }, Database: { acquireCount: { w: 74747 } }, Collection: { acquireCount: { w: 74747 } } } protocol:op_query 4612ms

2019-03-29T10:56:17.461+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 82567, w: 82567 } }, Database: { acquireCount: { w: 82567 } }, Collection: { acquireCount: { w: 82567 } } } protocol:op_query 5118ms

2019-03-29T10:56:23.417+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 90397, w: 90397 } }, Database: { acquireCount: { w: 90397 } }, Collection: { acquireCount: { w: 90397 } } } protocol:op_query 5502ms

2019-03-29T10:56:30.232+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 98209, w: 98209 } }, Database: { acquireCount: { w: 98209 } }, Collection: { acquireCount: { w: 98209 } } } protocol:op_query 6377ms

2019-03-29T10:56:37.326+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 106014, w: 106014 } }, Database: { acquireCount: { w: 106014 } }, Collection: { acquireCount: { w: 106014 } } } protocol:op_query 6606ms

2019-03-29T10:56:44.692+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 113818, w: 113818 } }, Database: { acquireCount: { w: 113818 } }, Collection: { acquireCount: { w: 113818 } } } protocol:op_query 6970ms

2019-03-29T10:56:53.036+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 121635, w: 121635 } }, Database: { acquireCount: { w: 121635 } }, Collection: { acquireCount: { w: 121635 } } } protocol:op_query 7678ms

2019-03-29T10:57:01.974+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 129463, w: 129463 } }, Database: { acquireCount: { w: 129463 } }, Collection: { acquireCount: { w: 129463 } } } protocol:op_query 8385ms

2019-03-29T10:57:10.969+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 137277, w: 137277 } }, Database: { acquireCount: { w: 137277 } }, Collection: { acquireCount: { w: 137277 } } } protocol:op_query 8517ms

2019-03-29T10:57:20.760+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 145089, w: 145089 } }, Database: { acquireCount: { w: 145089 } }, Collection: { acquireCount: { w: 145089 } } } protocol:op_query 9189ms

2019-03-29T10:57:30.568+0000 I COMMAND  [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 152891, w: 152891 } }, Database: { acquireCount: { w: 152891 } }, Collection: { acquireCount: { w: 152891 } } } protocol:op_query 9294ms

код

# save records in database
def insert_car_records(db, records, header):
    try:
        if db and header and records:
            a = datetime.datetime.now()
            coll = db.get_collection("car_records")
            db_cars = convert_csv_row(header, records)
            rq = []
            for car in db_cars:
                obj = { "$set": car }
                rq.append(UpdateOne({"uid": car["uid"]}, obj, upsert=True))
            bulkop = coll.bulk_write(rq, ordered=False)
            b = datetime.datetime.now()
            delta = b - a
            print("nInserted count - {}".format(bulkop.bulk_api_result["nInserted"]))
            print("nMatched count - {}".format(bulkop.bulk_api_result["nMatched"]))
            print("nModified count - {}".format(bulkop.bulk_api_result["nModified"]))
            print("nRemoved count - {}".format(bulkop.bulk_api_result["nRemoved"]))
            print("nUpserted count - {}".format(bulkop.bulk_api_result["nUpserted"]))
            print("time is seconds - {}".format(delta.total_seconds()))
    except BulkWriteError as bwe:
        print(bwe.details)
        raise

Как показано, вышеупомянутая функция вызывается один раз 1000 строк CSVобрабатываются.

Код для генерации 1000 строк прост, как следуетs -

def init():
    quotechar = '"'
    paginated_rows = []
    page_size = 1000
    for line in request.iter_lines(decode_unicode=True):
        if len(paginated_rows) == page_size:
            insert_car_records(database, paginated_rows)
            paginated_rows = []
        #  prepare new row
        paginated_rows.append(generate_row(line, quotechar))

Вы можете игнорировать синтаксические ошибки в коде, так как я изменил его, прежде чем вставлять его здесь

Я знаком с mongo и понимаю, как массовые операции могут повысить производительность.

Тем не менее, я новичок в Python, и я думаю, что мне здесь чего-то не хватает.

Спасибо.

...