Я пытаюсь вставить около 8 миллионов записей в Mongo, и кажется, что они вставляются со скоростью 1000 записей в секунду, что очень медленно.
Код написан на python, поэтомуможет быть проблема питона, но я в этом сомневаюсь.Вот код:
def str2datetime(str):
return None if (not str or str == r'\N') else datetime.strptime(str, '%Y-%m-%d %H:%M:%S')
def str2bool(str):
return None if (not str or str == r'\N') else (False if str == '0' else True)
def str2int(str):
return None if (not str or str == r'\N') else int(str)
def str2float(str):
return None if (not str or str == r'\N') else float(str)
def str2float2int(str):
return None if (not str or str == r'\N') else int(float(str) + 0.5)
def str2latin1(str):
return unicode(str, 'latin-1')
_ = lambda x: x
converters_map = {
'test_id': str2int,
'android_device_id': str2int,
'android_fingerprint': _,
'test_date': str2datetime,
'client_ip_address': _,
'download_kbps': str2int,
'upload_kbps': str2int,
'latency': str2int,
'server_name': _,
'server_country': _,
'server_country_code': _,
'server_latitude': str2float,
'server_longitude': str2float,
'client_country': _,
'client_country_code': _,
'client_region_name': str2latin1,
'client_region_code': _,
'client_city': str2latin1,
'client_latitude': str2float,
'client_longitude': str2float,
'miles_between': str2float2int,
'connection_type': str2int,
'isp_name': _,
'is_isp': str2bool,
'network_operator_name': _,
'network_operator': _,
'brand': _,
'device': _,
'hardware': _,
'build_id': _,
'manufacturer': _,
'model': str2latin1,
'product': _,
'cdma_cell_id': str2int,
'gsm_cell_id': str2int,
'client_ip_id': str2int,
'user_agent': _,
'client_net_speed': str2int,
'iphone_device_id': str2int,
'carrier_name': _,
'iso_country_code': _,
'mobile_country_code': str2int,
'mobile_network_code': str2int,
'model': str2latin1,
'version': _,
'server_sponsor_name': _,
}
def read_csv_zip(path):
with ZipFile(path) as z:
with z.open(z.namelist()[0]) as input:
r = csv.reader(input)
header = r.next()
converters = tuple((title if title != 'test_id' else '_id', converters_map[title]) for title in header)
for row in r:
row = {converter[0]:converter[1](value) for converter, value in zip(converters, row)}
yield row
argv = [x for x in argv if not x == '']
if len(argv) == 1:
print("Usage: " + argv[0] + " zip-file")
exit(1)
zip_file = argv[1]
collection_name = zip_file[:zip_file.index('_')]
print("Populating " + collection_name + " with the data from " + zip_file)
with Connection() as connection:
db = connection.db
collection = db.__getattr__(collection_name)
i = 0;
try:
start = time()
for item in read_csv_zip(zip_file):
i += 1
if (i % 1000) == 0:
stdout.write("\r%d " % i)
stdout.flush()
try:
collection.insert(item)
except Exception as exc:
print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
print exc
print("Elapsed time = {0} seconds, {1} records.".format(time() - start, i))
raw_input("Press ENTER to exit")
except Exception as exc:
print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
print exc
exit(1)
Чтобы вставить 262796 записей (один файл csv), требуется 350 секунд.
Сервер mongo работает на той же машине, и никто его не использует.Итак, я мог бы написать напрямую в файл базы данных, если бы был способ.
Меня не интересует шардинг, потому что 8 миллионов записей не должны требовать шардинга, не так ли?
Мой вопрос: что я делаю не так?Может быть, мой выбор БД неправильный?Типичный процесс заключается в том, что раз в месяц записи обновляются, а затем к базе данных делаются только запросы.
Спасибо.
РЕДАКТИРОВАТЬ
ЭтоОказывается, что узким местом является не монго, а чтение zip-файла.Я изменил код, чтобы прочитать zip-файл кусками по 1000 строк, а затем передать их монго за один вызов Collection.insert
.Это почтовый файл, который занимает все время.Вот модифицированный код:
def insert_documents(collection, source, i, batch_size):
count = 0;
while True:
items = list(itertools.islice(source, batch_size))
if len(items) == 0:
break;
old_i = i
count += len(items)
i += len(items)
if (old_i / 1000) != (i / 1000):
sys.stdout.write("\r%d " % i)
sys.stdout.flush()
try:
collection.insert(items)
except Exception as exc:
print("Failed at some record between #{0} (id = {1}) and #{2} (id = {3})".format(old_i,items[0]['_id'],i,items[-1]['_id']))
print exc
return count
def main():
argv = [x for x in sys.argv if not x == '']
if len(argv) == 1:
print("Usage: " + argv[0] + " zip-file")
exit(1)
zip_file = argv[1]
collection_name = zip_file[:zip_file.index('_')]
print("Populating " + collection_name + " with the data from " + zip_file)
with Connection() as connection:
ookla = connection.ookla
collection = ookla.__getattr__(collection_name)
i = 0;
start = time()
count = insert_documents(collection, read_csv_zip(zip_file), i, 1000)
i += count
print("Elapsed time = {0} seconds, {1} records.".format(time() - start, count))
raw_input("Press ENTER to exit")
if __name__ == "__main__":
main()
Получается, что большую часть времени занимает items = list(itertools.islice(source, batch_size))
.
Есть идеи, как его улучшить?