Контекст
Здравствуйте! У меня есть сценарий, который преобразует документы Mongo в JSON, записанный в файл.Для запуска скрипта на 1000 документов потребуется примерно 45 секунд.
Предсказание
Однако в коллекции Монго содержится около 18 миллионов документов, поэтому на просмотр всей коллекции потребуется около 200 часов.
ЯСейчас я пытаюсь оптимизировать код, чтобы значительно сократить время, и хотел бы узнать, какие варианты есть для этого.
Возможные решения
Я знаю, что есть варианты аппаратного обеспечения, такие каккак работающий в кластере с более быстрыми жесткими дисками, но мне также было интересно, какие оптимизации я могу сделать с алгоритмом и другими областями кода.
Глядя на профилировщик, большую часть времени тратят на {method 'recv' of '_socket.socket' objects}
(Не уверен, что это делает) и encoder.py:212(iterencode)
(Преобразование в JSON).Это встроенные функции, но остальные трудоемкие функции определяются пользователем.
Вот некоторые возможные решения, которые я рассматриваю: 1. Копирование списка строк с использованием writeline 2. Компиляция в код C с использованием пакетанапример, Cython 3. Использование более быстрых структур данных и алгоритмического процесса.
Что касается 3. Я не уверен, как реализовать более быстрый алгоритм, поскольку мне приходится проходить через вложенные слои для каждого документа для создания записей.
Специально для каждого документа есть поддокументы, обозначенные ключом «d».В каждом поддокументе также есть ключи с AT, например AT432.Я должен создать отдельную запись для каждого 'AT', чтобы один документ мог привести к 50 отдельным записям.
Помощь Добро пожаловать!
Я открыт для любых идейпоскольку я действительно хотел бы сократить время, затрачиваемое на конвертацию каждого документа.Спасибо за ваш ввод!
Файл 1
from pymongo import MongoClient
import cProfile
import sys
import time
from mapping_functions import create_files
from datetime import datetime
from date_functions import get_array_of_dates
pr = cProfile.Profile()
pr.enable()
start_time = time.time()
# A. Connect to a collection in MongoDB
client = MongoClient('mongo-host-ip', 27017)
db = client.db_name
collection = db.collection_name
# Set time range for query in year, month, day, hour
start = datetime(2017, 12, 29, 10)
end = datetime(2017, 12, 29, 11)
delta_in_minutes = 60
num_documents = 1000
dates = get_array_of_dates(start, end, delta_in_minutes)
create_files(collection, 'collection_name', dates, start_time, num_documents)
pr.disable()
pr.print_stats(sort='time')
Файл 2
import dateutil.parser
import calendar
import datetime
import time
import json
from bson import json_util
import os
# creates new folder
def createFolder(directory):
try:
if not os.path.exists(directory):
os.makedirs(directory)
except OSError:
print ('Error: Creating directory. ' + directory)
# creates a file with the entries for each hour
def create_files(collection_cursor, collection_name, dates, start_time,
num_documents):
createFolder('./data_hour/' + collection_name + '/')
for string_time in dates:
python_datetime = dateutil.parser.parse(string_time)
documents = collection_cursor.find({'T': python_datetime}).limit(num_documents)
text_file = open('./data_hour/' + collection_name + '/' + string_time +
".json", "a")
document_parser(collection_name, documents, text_file)
text_file.close()
# exports sql entries to a file
def document_parser(collection_name, documents, text_file):
list = []
for doc in documents:
list_of_subdocuments = subdocument_parser(doc['d'])
for subdoc in list_of_subdocuments:
actiontypes = actiontype_parser(subdoc)
for actiontype in actiontypes:
sql_entry = nosql_to_sql(collection_name, doc, subdoc,
actiontype)
text_file.write(json.dumps(sql_entry,
separators=(',', ':'),
default=json_util.default) + '\n')
return
# returns a list of subdocuments
def subdocument_parser(subdocuments):
subdocs = []
for subdoc in subdocuments:
subdocs.append(subdoc)
return subdocs
# returns a list of action types
def actiontype_parser(subdocument):
actiontypes = []
# Find all of the keys that represent action types
keys = subdocument.keys()
for key in keys:
if key[:2] == 'AT':
actiontypes.append(key)
return actiontypes
def handle_arrays(json, document, key, full_key):
sorted_array = sorted(document[key])
comma_separated_string = ','.join(map(str, sorted_array))
json[full_key] = comma_separated_string
return
def handle_time(json, document, key, full_key):
epoch = document[key]
seconds = calendar.timegm(epoch.utctimetuple())
json[full_key] = seconds
return
# Copy all keys in the document except 'd'
def copy_document_keys(json, mapping, collection_name, document,
document_keys):
for key in document_keys:
if key in prohibited:
continue
if key != 'd':
full_key = mapping[key]
# sort array and convert to csv
if key == 'MP' and collection_name in inventory_or_auction:
full_key += 's'
handle_arrays(json, document, key, full_key)
continue
if key == 'T':
handle_time(json, document, key, full_key)
continue
json[full_key] = document[key]
return
# Copy all the keys in a subdocument except for action types
def copy_subdocument_keys(json, mapping, subdocument, subdocument_keys):
for key in subdocument_keys:
if key in prohibited:
continue
if key[:2] != 'AT':
full_key = mapping[key]
# sort array and convert to csv
if key == 'UI' or key == 'DA' or key == 'F':
handle_arrays(json, subdocument, key, full_key)
continue
json[full_key] = subdocument[key]
return
# copy misc. keys
def copy_misc_keys(json, subdocument, actiontype):
# If there is no recordType key, then put default value
if 'recordType' not in json:
json['recordType'] = 1
# Copy the action type
json['actionType'] = int(actiontype[2:])
json['count'] = int(subdocument[actiontype])
return
# return a sql entry
def nosql_to_sql(collection_name, document, subdocument, actiontype):
json = {}
# Copy all keys in the document except 'd'
document_keys = document.keys()
copy_document_keys(json, mapping, collection_name, document, document_keys)
# Copy all the keys in a subdocument except for action types
subdocument_keys = subdocument.keys()
copy_subdocument_keys(json, mapping, subdocument, subdocument_keys)
# copy action types
copy_misc_keys(json, subdocument, actiontype)
return json
Mongo Document
{
"_id" : ObjectId("7dfgdftew564324546ff3"),
"T" : ISODate("2011-10-13T07:00:00Z"),
"MP" : [
40,
16,
13,
11,
1
],
"P" : 3881,
"PB" : 12285,
"d" : [
{
"D" : 32,
"DL" : 0,
"ST" : 1007,
"AT315" : NumberLong(5),
"AT328" : NumberLong(14),
"AT331" : NumberLong(19),
"AT306" : NumberLong(19),
"AT100331" : NumberLong(431),
"AT500" : 0
},
{
"D" : 16,
"DL" : 0,
"ST" : 1007,
"AT328" : NumberLong(28),
"AT315" : NumberLong(8),
"AT331" : NumberLong(36),
"AT306" : NumberLong(36),
"AT100331" : NumberLong(953),
"AT500" : 0
},
{
"D" : 1,
"DL" : 0,
"ST" : 1007,
"AT315" : NumberLong(29),
"AT331" : NumberLong(34),
"AT328" : NumberLong(5),
"AT306" : NumberLong(34),
"AT100331" : NumberLong(803),
"AT500" : 0
},
{
"D" : 2,
"DL" : 0,
"ST" : 1007,
"AT328" : NumberLong(1),
"AT100331" : NumberLong(82),
"AT306" : NumberLong(1),
"AT331" : NumberLong(1),
"AT500" : 0
}
],
"bn" : NumberLong(21137)
}
Результирующие записи, записанные в файл
{"count":254,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":100331,"recordType":1}
{"count":7,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":306,"recordType":1}
{"count":7,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":331,"recordType":1}
{"count":6,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":315,"recordType":1}
{"count":1,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":328,"recordType":1}
{"count":0,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":500,"recordType":1} ...
Результат от профилирования
30567605 function calls (30567594 primitive calls) in 48.483 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
5476 6.992 0.001 6.992 0.001 {method 'recv' of '_socket.socket' objects}
1194719 5.489 0.000 5.489 0.000 encoder.py:212(iterencode)
1194719 4.869 0.000 4.869 0.000 mapping_functions.py:180(copy_subdocument_keys)
1194719 4.254 0.000 11.878 0.000 mapping_functions.py:160(copy_document_keys)
1194719 3.223 0.000 13.123 0.000 __init__.py:193(dumps)
2 3.175 1.587 48.480 24.240 mapping_functions.py:99(document_parser)
1194719 2.032 0.000 2.032 0.000 mapping_functions.py:195(copy_misc_keys)
1194719 1.810 0.000 21.245 0.000 mapping_functions.py:207(nosql_to_sql)
1194719 1.726 0.000 8.666 0.000 encoder.py:186(encode)
6 1.606 0.268 1.611 0.268 {bson._cbson.decode_all}
1194719 1.438 0.000 1.438 0.000 {method 'utctimetuple' of 'datetime.datetime' objects}
1194719 1.320 0.000 1.438 0.000 calendar.py:611(timegm)
1194719 1.261 0.000 3.619 0.000 mapping_functions.py:145(handle_arrays)
1194719 1.234 0.000 1.234 0.000 encoder.py:101(__init__)
1194720 1.227 0.000 1.227 0.000 {map}
1194719 1.130 0.000 4.006 0.000 mapping_functions.py:152(handle_time)
2389438 1.025 0.000 1.025 0.000 {method 'join' of 'str' objects}
1194719 0.929 0.000 0.929 0.000 {sorted}
1194719 0.818 0.000 0.818 0.000 {method 'write' of 'file' objects}
2555978 0.718 0.000 0.718 0.000 {method 'keys' of 'dict' objects}
2391505 0.628 0.000 0.628 0.000 {isinstance}
166540 0.573 0.000 0.788 0.000 mapping_functions.py:134(actiontype_parser)
2558008 0.339 0.000 0.339 0.000 {time.time}
1361313 0.165 0.000 0.165 0.000 {method 'append' of 'list' objects}
7 0.123 0.018 0.123 0.018 {time.sleep}
12 0.121 0.010 7.118 0.593 network.py:166(_receive_data_on_socket)
1194719 0.118 0.000 0.118 0.000 {method 'toordinal' of 'datetime.date' objects}
2000 0.037 0.000 0.050 0.000 mapping_functions.py:125(subdocument_parser)
1 0.035 0.035 0.035 0.035 {method 'connect' of '_socket.socket' objects}
6 0.030 0.005 0.031 0.005 message.py:953(unpack)
2002 0.010 0.000 8.942 0.004 cursor.py:1165(next)
9567 0.006 0.000 0.006 0.000 {len}
2000 0.004 0.000 0.004 0.000 database.py:402(_fix_outgoing)
2001 0.004 0.000 0.005 0.000 objectid.py:68(__init__)
6 0.002 0.000 7.151 1.192 network.py:143(receive_message)
7 0.001 0.000 8.926 1.275 cursor.py:1057(_refresh)
1 0.001 0.001 0.001 0.001 {_socket.getaddrinfo}
2022 0.001 0.000 0.001 0.000 collection.py:306(database)
2 0.001 0.000 0.001 0.000 {method 'close' of 'file' objects}