Оптимизация кода Python итерация по миллионам документов - PullRequest
0 голосов
/ 12 июня 2018

Контекст

Здравствуйте! У меня есть сценарий, который преобразует документы Mongo в JSON, записанный в файл.Для запуска скрипта на 1000 документов потребуется примерно 45 секунд.

Предсказание

Однако в коллекции Монго содержится около 18 миллионов документов, поэтому на просмотр всей коллекции потребуется около 200 часов.

ЯСейчас я пытаюсь оптимизировать код, чтобы значительно сократить время, и хотел бы узнать, какие варианты есть для этого.

Возможные решения

Я знаю, что есть варианты аппаратного обеспечения, такие каккак работающий в кластере с более быстрыми жесткими дисками, но мне также было интересно, какие оптимизации я могу сделать с алгоритмом и другими областями кода.

Глядя на профилировщик, большую часть времени тратят на {method 'recv' of '_socket.socket' objects} (Не уверен, что это делает) и encoder.py:212(iterencode) (Преобразование в JSON).Это встроенные функции, но остальные трудоемкие функции определяются пользователем.

Вот некоторые возможные решения, которые я рассматриваю: 1. Копирование списка строк с использованием writeline 2. Компиляция в код C с использованием пакетанапример, Cython 3. Использование более быстрых структур данных и алгоритмического процесса.

Что касается 3. Я не уверен, как реализовать более быстрый алгоритм, поскольку мне приходится проходить через вложенные слои для каждого документа для создания записей.

Специально для каждого документа есть поддокументы, обозначенные ключом «d».В каждом поддокументе также есть ключи с AT, например AT432.Я должен создать отдельную запись для каждого 'AT', чтобы один документ мог привести к 50 отдельным записям.

Помощь Добро пожаловать!

Я открыт для любых идейпоскольку я действительно хотел бы сократить время, затрачиваемое на конвертацию каждого документа.Спасибо за ваш ввод!

Файл 1

from pymongo import MongoClient
import cProfile
import sys
import time
from mapping_functions import create_files
from datetime import datetime
from date_functions import get_array_of_dates

pr = cProfile.Profile()
pr.enable()

start_time = time.time()

# A. Connect to a collection in MongoDB
client = MongoClient('mongo-host-ip', 27017)
db = client.db_name
collection = db.collection_name

# Set time range for query in year, month, day, hour
start = datetime(2017, 12, 29, 10)
end = datetime(2017, 12, 29, 11)
delta_in_minutes = 60

num_documents = 1000
dates = get_array_of_dates(start, end, delta_in_minutes)

create_files(collection, 'collection_name', dates, start_time, num_documents)
pr.disable()
pr.print_stats(sort='time')

Файл 2

import dateutil.parser
import calendar
import datetime
import time
import json
from bson import json_util
import os


# creates new folder
def createFolder(directory):
    try:
        if not os.path.exists(directory):
            os.makedirs(directory)
    except OSError:
        print ('Error: Creating directory. ' + directory)


# creates a file with the entries for each hour
def create_files(collection_cursor, collection_name, dates, start_time, 
                 num_documents):
    createFolder('./data_hour/' + collection_name + '/')
    for string_time in dates:
        python_datetime = dateutil.parser.parse(string_time)
        documents = collection_cursor.find({'T': python_datetime}).limit(num_documents)
        text_file = open('./data_hour/' + collection_name + '/' + string_time +
                         ".json", "a")
        document_parser(collection_name, documents, text_file)
        text_file.close()


# exports sql entries to a file
def document_parser(collection_name, documents, text_file):
    list = []
    for doc in documents:
        list_of_subdocuments = subdocument_parser(doc['d'])
        for subdoc in list_of_subdocuments:
            actiontypes = actiontype_parser(subdoc)
            for actiontype in actiontypes:
                sql_entry = nosql_to_sql(collection_name, doc, subdoc,
                                         actiontype)
                text_file.write(json.dumps(sql_entry,
                                           separators=(',', ':'),
                                           default=json_util.default) + '\n')
    return


# returns a list of subdocuments
def subdocument_parser(subdocuments):
    subdocs = []

    for subdoc in subdocuments:
        subdocs.append(subdoc)
    return subdocs


# returns a list of action types
def actiontype_parser(subdocument):
    actiontypes = []

    # Find all of the keys that represent action types
    keys = subdocument.keys()
    for key in keys:
        if key[:2] == 'AT':
            actiontypes.append(key)
    return actiontypes


def handle_arrays(json, document, key, full_key):
    sorted_array = sorted(document[key])
    comma_separated_string = ','.join(map(str, sorted_array))
    json[full_key] = comma_separated_string
    return


def handle_time(json, document, key, full_key):
    epoch = document[key]
    seconds = calendar.timegm(epoch.utctimetuple())
    json[full_key] = seconds
    return


# Copy all keys in the document except 'd'
def copy_document_keys(json, mapping, collection_name, document,
                       document_keys):
    for key in document_keys:
        if key in prohibited:
            continue
        if key != 'd':
            full_key = mapping[key]
            # sort array and convert to csv
            if key == 'MP' and collection_name in inventory_or_auction:
                full_key += 's'
                handle_arrays(json, document, key, full_key)
                continue
            if key == 'T':
                handle_time(json, document, key, full_key)
                continue
            json[full_key] = document[key]
    return


# Copy all the keys in a subdocument except for action types
def copy_subdocument_keys(json, mapping, subdocument, subdocument_keys):
    for key in subdocument_keys:
        if key in prohibited:
            continue
        if key[:2] != 'AT':
            full_key = mapping[key]
            # sort array and convert to csv
            if key == 'UI' or key == 'DA' or key == 'F':
                handle_arrays(json, subdocument, key, full_key)
                continue
            json[full_key] = subdocument[key]
    return


# copy misc. keys
def copy_misc_keys(json, subdocument, actiontype):
    # If there is no recordType key, then put default value
    if 'recordType' not in json:
        json['recordType'] = 1

    # Copy the action type
    json['actionType'] = int(actiontype[2:])
    json['count'] = int(subdocument[actiontype])
    return


# return a sql entry
def nosql_to_sql(collection_name, document, subdocument, actiontype):
    json = {}

    # Copy all keys in the document except 'd'
    document_keys = document.keys()
    copy_document_keys(json, mapping, collection_name, document, document_keys)

    # Copy all the keys in a subdocument except for action types
    subdocument_keys = subdocument.keys()
    copy_subdocument_keys(json, mapping, subdocument, subdocument_keys)

    # copy action types
    copy_misc_keys(json, subdocument, actiontype)

    return json

Mongo Document

{
    "_id" : ObjectId("7dfgdftew564324546ff3"),
    "T" : ISODate("2011-10-13T07:00:00Z"),
    "MP" : [
        40,
        16,
        13,
        11,
        1
    ],
    "P" : 3881,
    "PB" : 12285,
    "d" : [
        {
            "D" : 32,
            "DL" : 0,
            "ST" : 1007,
            "AT315" : NumberLong(5),
            "AT328" : NumberLong(14),
            "AT331" : NumberLong(19),
            "AT306" : NumberLong(19),
            "AT100331" : NumberLong(431),
            "AT500" : 0
        },
        {
            "D" : 16,
            "DL" : 0,
            "ST" : 1007,
            "AT328" : NumberLong(28),
            "AT315" : NumberLong(8),
            "AT331" : NumberLong(36),
            "AT306" : NumberLong(36),
            "AT100331" : NumberLong(953),
            "AT500" : 0
        },
        {
            "D" : 1,
            "DL" : 0,
            "ST" : 1007,
            "AT315" : NumberLong(29),
            "AT331" : NumberLong(34),
            "AT328" : NumberLong(5),
            "AT306" : NumberLong(34),
            "AT100331" : NumberLong(803),
            "AT500" : 0
        },
        {
            "D" : 2,
            "DL" : 0,
            "ST" : 1007,
            "AT328" : NumberLong(1),
            "AT100331" : NumberLong(82),
            "AT306" : NumberLong(1),
            "AT331" : NumberLong(1),
            "AT500" : 0
        }
    ],
    "bn" : NumberLong(21137)
}

Результирующие записи, записанные в файл

   {"count":254,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":100331,"recordType":1}
{"count":7,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":306,"recordType":1}
{"count":7,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":331,"recordType":1}
{"count":6,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":315,"recordType":1}
{"count":1,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":328,"recordType":1}
{"count":0,"publisher":730,"marketPlaces":"1,13","publication":6452,"deal":0,"hour":1514541600,"deviceType":1,"provider":1011,"actionType":500,"recordType":1} ...

Результат от профилирования

         30567605 function calls (30567594 primitive calls) in 48.483 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     5476    6.992    0.001    6.992    0.001 {method 'recv' of '_socket.socket' objects}
  1194719    5.489    0.000    5.489    0.000 encoder.py:212(iterencode)
  1194719    4.869    0.000    4.869    0.000 mapping_functions.py:180(copy_subdocument_keys)
  1194719    4.254    0.000   11.878    0.000 mapping_functions.py:160(copy_document_keys)
  1194719    3.223    0.000   13.123    0.000 __init__.py:193(dumps)
        2    3.175    1.587   48.480   24.240 mapping_functions.py:99(document_parser)
  1194719    2.032    0.000    2.032    0.000 mapping_functions.py:195(copy_misc_keys)
  1194719    1.810    0.000   21.245    0.000 mapping_functions.py:207(nosql_to_sql)
  1194719    1.726    0.000    8.666    0.000 encoder.py:186(encode)
        6    1.606    0.268    1.611    0.268 {bson._cbson.decode_all}
  1194719    1.438    0.000    1.438    0.000 {method 'utctimetuple' of 'datetime.datetime' objects}
  1194719    1.320    0.000    1.438    0.000 calendar.py:611(timegm)
  1194719    1.261    0.000    3.619    0.000 mapping_functions.py:145(handle_arrays)
  1194719    1.234    0.000    1.234    0.000 encoder.py:101(__init__)
  1194720    1.227    0.000    1.227    0.000 {map}
  1194719    1.130    0.000    4.006    0.000 mapping_functions.py:152(handle_time)
  2389438    1.025    0.000    1.025    0.000 {method 'join' of 'str' objects}
  1194719    0.929    0.000    0.929    0.000 {sorted}
  1194719    0.818    0.000    0.818    0.000 {method 'write' of 'file' objects}
  2555978    0.718    0.000    0.718    0.000 {method 'keys' of 'dict' objects}
  2391505    0.628    0.000    0.628    0.000 {isinstance}
   166540    0.573    0.000    0.788    0.000 mapping_functions.py:134(actiontype_parser)
  2558008    0.339    0.000    0.339    0.000 {time.time}
  1361313    0.165    0.000    0.165    0.000 {method 'append' of 'list' objects}
        7    0.123    0.018    0.123    0.018 {time.sleep}
       12    0.121    0.010    7.118    0.593 network.py:166(_receive_data_on_socket)
  1194719    0.118    0.000    0.118    0.000 {method 'toordinal' of 'datetime.date' objects}
     2000    0.037    0.000    0.050    0.000 mapping_functions.py:125(subdocument_parser)
        1    0.035    0.035    0.035    0.035 {method 'connect' of '_socket.socket' objects}
        6    0.030    0.005    0.031    0.005 message.py:953(unpack)
     2002    0.010    0.000    8.942    0.004 cursor.py:1165(next)
     9567    0.006    0.000    0.006    0.000 {len}
     2000    0.004    0.000    0.004    0.000 database.py:402(_fix_outgoing)
     2001    0.004    0.000    0.005    0.000 objectid.py:68(__init__)
        6    0.002    0.000    7.151    1.192 network.py:143(receive_message)
        7    0.001    0.000    8.926    1.275 cursor.py:1057(_refresh)
        1    0.001    0.001    0.001    0.001 {_socket.getaddrinfo}
     2022    0.001    0.000    0.001    0.000 collection.py:306(database)
        2    0.001    0.000    0.001    0.000 {method 'close' of 'file' objects}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...