Я работаю над проектом, в котором мне нужно обработать огромное количество данных.
В рамках обработки я выполняю следующие шаги -
- Чтение строки CSV
- Перевести строку CSV в словарь
- Массовое сохранение 1000 строк в MongoDB
- Итерация до конца файла CSV
Логика довольно проста инормально работает без проблем для миллионов записей.
Однако время bulk_write
постоянно увеличивается.
Первый вызов занимает 0,55 секунды и начинает увеличиваться в течениеследующие партии.
Статистика
![enter image description here](https://i.stack.imgur.com/bDs4h.png)
РЕДАКТИРОВАТЬ - Добавление журналов базы данных
Добавление базы данныхжурналы, чтобы доказать, что время базы данных постоянно увеличивается (показано в конце строки журнала)
2019-03-29T10:55:43.726+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 4439, w: 4439 } }, Database: { acquireCount: { w: 4438, W: 1 } }, Collection: { acquireCount: { w: 4438 } } } protocol:op_query 531ms
2019-03-29T10:55:45.214+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 12250, w: 12250 } }, Database: { acquireCount: { w: 12250 } }, Collection: { acquireCount: { w: 12250 } } } protocol:op_query 1040ms
2019-03-29T10:55:47.041+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 20059, w: 20059 } }, Database: { acquireCount: { w: 20059 } }, Collection: { acquireCount: { w: 20059 } } } protocol:op_query 1406ms
2019-03-29T10:55:49.390+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 27870, w: 27870 } }, Database: { acquireCount: { w: 27870 } }, Collection: { acquireCount: { w: 27870 } } } protocol:op_query 1733ms
2019-03-29T10:55:52.099+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 35679, w: 35679 } }, Database: { acquireCount: { w: 35679 } }, Collection: { acquireCount: { w: 35679 } } } protocol:op_query 2209ms
2019-03-29T10:55:55.046+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 43503, w: 43503 } }, Database: { acquireCount: { w: 43503 } }, Collection: { acquireCount: { w: 43503 } } } protocol:op_query 2453ms
2019-03-29T10:55:58.724+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 51325, w: 51325 } }, Database: { acquireCount: { w: 51325 } }, Collection: { acquireCount: { w: 51325 } } } protocol:op_query 3146ms
2019-03-29T10:56:02.565+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 59127, w: 59127 } }, Database: { acquireCount: { w: 59127 } }, Collection: { acquireCount: { w: 59127 } } } protocol:op_query 3413ms
2019-03-29T10:56:06.820+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 66941, w: 66941 } }, Database: { acquireCount: { w: 66941 } }, Collection: { acquireCount: { w: 66941 } } } protocol:op_query 3868ms
2019-03-29T10:56:11.890+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 74747, w: 74747 } }, Database: { acquireCount: { w: 74747 } }, Collection: { acquireCount: { w: 74747 } } } protocol:op_query 4612ms
2019-03-29T10:56:17.461+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 82567, w: 82567 } }, Database: { acquireCount: { w: 82567 } }, Collection: { acquireCount: { w: 82567 } } } protocol:op_query 5118ms
2019-03-29T10:56:23.417+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 90397, w: 90397 } }, Database: { acquireCount: { w: 90397 } }, Collection: { acquireCount: { w: 90397 } } } protocol:op_query 5502ms
2019-03-29T10:56:30.232+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 98209, w: 98209 } }, Database: { acquireCount: { w: 98209 } }, Collection: { acquireCount: { w: 98209 } } } protocol:op_query 6377ms
2019-03-29T10:56:37.326+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 106014, w: 106014 } }, Database: { acquireCount: { w: 106014 } }, Collection: { acquireCount: { w: 106014 } } } protocol:op_query 6606ms
2019-03-29T10:56:44.692+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 113818, w: 113818 } }, Database: { acquireCount: { w: 113818 } }, Collection: { acquireCount: { w: 113818 } } } protocol:op_query 6970ms
2019-03-29T10:56:53.036+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 121635, w: 121635 } }, Database: { acquireCount: { w: 121635 } }, Collection: { acquireCount: { w: 121635 } } } protocol:op_query 7678ms
2019-03-29T10:57:01.974+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 129463, w: 129463 } }, Database: { acquireCount: { w: 129463 } }, Collection: { acquireCount: { w: 129463 } } } protocol:op_query 8385ms
2019-03-29T10:57:10.969+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 137277, w: 137277 } }, Database: { acquireCount: { w: 137277 } }, Collection: { acquireCount: { w: 137277 } } } protocol:op_query 8517ms
2019-03-29T10:57:20.760+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 145089, w: 145089 } }, Database: { acquireCount: { w: 145089 } }, Collection: { acquireCount: { w: 145089 } } } protocol:op_query 9189ms
2019-03-29T10:57:30.568+0000 I COMMAND [conn38] command test_db.$cmd command: update { update: "car_records", ordered: false, updates: 1000 } numYields:0 reslen:37964 locks:{ Global: { acquireCount: { r: 152891, w: 152891 } }, Database: { acquireCount: { w: 152891 } }, Collection: { acquireCount: { w: 152891 } } } protocol:op_query 9294ms
код
# save records in database
def insert_car_records(db, records, header):
try:
if db and header and records:
a = datetime.datetime.now()
coll = db.get_collection("car_records")
db_cars = convert_csv_row(header, records)
rq = []
for car in db_cars:
obj = { "$set": car }
rq.append(UpdateOne({"uid": car["uid"]}, obj, upsert=True))
bulkop = coll.bulk_write(rq, ordered=False)
b = datetime.datetime.now()
delta = b - a
print("nInserted count - {}".format(bulkop.bulk_api_result["nInserted"]))
print("nMatched count - {}".format(bulkop.bulk_api_result["nMatched"]))
print("nModified count - {}".format(bulkop.bulk_api_result["nModified"]))
print("nRemoved count - {}".format(bulkop.bulk_api_result["nRemoved"]))
print("nUpserted count - {}".format(bulkop.bulk_api_result["nUpserted"]))
print("time is seconds - {}".format(delta.total_seconds()))
except BulkWriteError as bwe:
print(bwe.details)
raise
Как показано, вышеупомянутая функция вызывается один раз 1000 строк CSVобрабатываются.
Код для генерации 1000 строк прост, как следуетs -
def init():
quotechar = '"'
paginated_rows = []
page_size = 1000
for line in request.iter_lines(decode_unicode=True):
if len(paginated_rows) == page_size:
insert_car_records(database, paginated_rows)
paginated_rows = []
# prepare new row
paginated_rows.append(generate_row(line, quotechar))
Вы можете игнорировать синтаксические ошибки в коде, так как я изменил его, прежде чем вставлять его здесь
Я знаком с mongo и понимаю, как массовые операции могут повысить производительность.
Тем не менее, я новичок в Python, и я думаю, что мне здесь чего-то не хватает.
Спасибо.