Медленная загрузка данных в MongoDB из панд Dataframe - PullRequest
0 голосов
/ 17 декабря 2018

У меня есть 2 больших файла CSV, которые необходимо загрузить в коллекции Mongo.Во-первых, я считываю данные в pandas Dataframe, выполняю некоторую предварительную обработку, и после этого вставляю полученный dict в коллекцию Mongo.Проблема в том, что производительность очень низкая, потому что она выполняется последовательно, и загрузка данных во вторую коллекцию должна выполняться после того, как первая коллекция уже заполнена (чтобы обновить строки внешними ключами).Как я могу ускорить процесс загрузки?

import pymongo
import config
import pandas as pd
import numpy as np
from datetime import datetime
from config import logger


client = pymongo.MongoClient(config.IP)
try:
    client.server_info()
except pymongo.errors.ServerSelectionTimeoutError as e:
    logger.error("Unable to connect to %s. Error: %s" % (config.IP, e))
    client = None

# connect to database (or create if not exists)
mydb = client[config.DB_NAME]

# connect to collections (or create if not exists)
movie_collection = mydb[config.DB_MOVIE_COLLECTION]
actors_collection = mydb[config.DB_ACTOR_COLLECTION]


def read_data(file):
    '''
    returns Dataframe with read csv data
    '''
    df = pd.read_csv(file, sep='\t')
    df.replace('\\N', np.nan, inplace=True)
    return df


def insert_to_collection(collection, data):
    collection.insert(data)


def fill_movie_data():
    '''
    iterates over movie Dataframe
    process values and creates dict structure
    with specific attributes to insert into MongoDB movie collection
    '''


    # load data to pandas Dataframe
    logger.info("Reading movie data to Dataframe")
    data = read_data('datasets/title.basics.tsv')

    for index, row in data.iterrows():
        result_dict = {}

        id_ = row['tconst']
        title = row['primaryTitle']

        # check value of movie year (if not NaN)
        if not pd.isnull(row['endYear']) and not pd.isnull(row['startYear']):
            year = list([row['startYear'], row['endYear']])
        elif not pd.isnull(row['startYear']):
            year = int(row['startYear'])
        else:
            year = None

        # check value of movie duration (if not NaN)
        if not pd.isnull(row['runtimeMinutes']):
            try:
                duration = int(row['runtimeMinutes'])
            except ValueError:
                duration = None
        else:
            duration = None

        # check value of genres (if not NaN)
        if not pd.isnull(row['genres']):
            genres = row['genres'].split(',')
        else:
            genres = None

        result_dict['_id'] = id_
        result_dict['primary_title'] = title

        # if both years have values
        if isinstance(year, list):
            result_dict['year_start'] = int(year[0])
            result_dict['year_end'] = int(year[1])

        # if start_year has value
        elif year:
            result_dict['year'] = year

        if duration:
            result_dict['duration'] = duration

        if genres:
            result_dict['genres'] = genres

        insert_to_collection(movie_collection, result_dict)



def fill_actors_data():
    '''
    iterates over actors Dataframe
    process values, creates dict structure
    with new fields to insert into MongoDB actors collection
    '''


    logger.info("Inserting data to actors collection")
    # load data to pandas Dataframe
    logger.info("Reading actors data to Dataframe")
    data = read_data('datasets/name.basics.tsv')

    logger.info("Inserting data to actors collection")
    for index, row in data.iterrows():
        result_dict = {}

        id_ = row['nconst']
        name = row['primaryName']

        # if no birth year and death year value
        if pd.isnull(row['birthYear']):
            yob = None
            alive = False
        # if both birth and death year have value
        elif not pd.isnull(row['birthYear']) and not pd.isnull(row['deathYear']):
            yob = int(row['birthYear'])
            death = int(row['deathYear'])
            age = death - yob
            alive = False
        # if only birth year has value
        else:
            yob = int(row['birthYear'])
            current_year = datetime.now().year
            age = current_year - yob
            alive = True

        if not pd.isnull(row['knownForTitles']):
            movies = row['knownForTitles'].split(',')

        result_dict['_id'] = id_
        result_dict['name'] = name
        result_dict['yob'] = yob
        result_dict['alive'] = alive
        result_dict['age'] = age
        result_dict['movies'] = movies

        insert_to_collection(actors_collection, result_dict)

        # update movie documents with list of actors ids
        movie_collection.update_many({"_id": {"$in": movies}}, {"$push": { "people": id_}})



# if collections are empty, fill it with data
if movie_collection.count() == 0:
    fill_movie_data()

if actors_collection.count() == 0:
    fill_actors_data()

1 Ответ

0 голосов
/ 17 декабря 2018

TL; DR

Вместо вставки одной записи за раз, массовая вставка .

insert_many

В данный момент выиметь:

def insert_to_collection(collection: pymongo.collection.Collection, data: dict):
    collection.insert(data)

вы используете insert(), что, кстати, устарело.

То, что вы хотите иметь:

def insert_to_collection(collection: pymongo.collection.Collection, data: list):
    collection.insert_many(data)

Так что в вашемдве функции: fill_movie_data и fill_actors_data, вместо того, чтобы постоянно вызывать insert_to_collection() в цикле, вы можете вызывать его время от времени и вставлять массово.

Код

Ниже приведен код, который вы опубликовали с несколькими изменениями:

Добавьте max_bulk_size, который чем больше, тем лучше для вашей скорости, просто убедитесь, что он не превышает вашу оперативную память.

max_bulk_size = 500

Добавьте results_list и добавьте result_dict к нему.Как только размер списка достигнет max_bulk_size, сохраните его и очистите список.

def fill_movie_data():
    '''
    iterates over movie Dataframe
    process values and creates dict structure
    with specific attributes to insert into MongoDB movie collection
    '''


    # load data to pandas Dataframe
    logger.info("Reading movie data to Dataframe")
    data = read_data('datasets/title.basics.tsv')

    results_list = []

    for index, row in data.iterrows():
        result_dict = {}

        id_ = row['tconst']
        title = row['primaryTitle']

        # check value of movie year (if not NaN)
        if not pd.isnull(row['endYear']) and not pd.isnull(row['startYear']):
            year = list([row['startYear'], row['endYear']])
        elif not pd.isnull(row['startYear']):
            year = int(row['startYear'])
        else:
            year = None

        # check value of movie duration (if not NaN)
        if not pd.isnull(row['runtimeMinutes']):
            try:
                duration = int(row['runtimeMinutes'])
            except ValueError:
                duration = None
        else:
            duration = None

        # check value of genres (if not NaN)
        if not pd.isnull(row['genres']):
            genres = row['genres'].split(',')
        else:
            genres = None

        result_dict['_id'] = id_
        result_dict['primary_title'] = title

        # if both years have values
        if isinstance(year, list):
            result_dict['year_start'] = int(year[0])
            result_dict['year_end'] = int(year[1])

        # if start_year has value
        elif year:
            result_dict['year'] = year

        if duration:
            result_dict['duration'] = duration

        if genres:
            result_dict['genres'] = genres

        results_list.append(result_dict)

        if len(results_list) > max_bulk_size:
            insert_to_collection(movie_collection, results_list)
            results_list = []

То же самое с другим циклом.

def fill_actors_data():
    '''
    iterates over actors Dataframe
    process values, creates dict structure
    with new fields to insert into MongoDB actors collection
    '''


    logger.info("Inserting data to actors collection")
    # load data to pandas Dataframe
    logger.info("Reading actors data to Dataframe")
    data = read_data('datasets/name.basics.tsv')

    logger.info("Inserting data to actors collection")

    results_list = []

    for index, row in data.iterrows():
        result_dict = {}

        id_ = row['nconst']
        name = row['primaryName']

        # if no birth year and death year value
        if pd.isnull(row['birthYear']):
            yob = None
            alive = False
        # if both birth and death year have value
        elif not pd.isnull(row['birthYear']) and not pd.isnull(row['deathYear']):
            yob = int(row['birthYear'])
            death = int(row['deathYear'])
            age = death - yob
            alive = False
        # if only birth year has value
        else:
            yob = int(row['birthYear'])
            current_year = datetime.now().year
            age = current_year - yob
            alive = True

        if not pd.isnull(row['knownForTitles']):
            movies = row['knownForTitles'].split(',')

        result_dict['_id'] = id_
        result_dict['name'] = name
        result_dict['yob'] = yob
        result_dict['alive'] = alive
        result_dict['age'] = age
        result_dict['movies'] = movies

        results_list.append(result_dict)

        if len(results_list) > max_bulk_size:
            insert_to_collection(actors_collection, results_list)
            results_list = []

        # update movie documents with list of actors ids
        movie_collection.update_many({"_id": {"$in": movies}}, {"$push": { "people": id_}})
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...