OOM при чтении данных в Pandas из MongoDB с использованием клиента pymon go - PullRequest
1 голос
/ 30 января 2020

У меня (900k, 300) записей в пн go коллекция. Когда я пытаюсь прочитать данные в pandas, потребление памяти резко возрастает, пока процесс не будет уничтожен. Я должен упомянуть, что данные помещаются в память (1.5GB~), если я читаю их из файла CSV.

Моя машина имеет 32 ГБ ОЗУ и 16 процессоров Centos 7.

Мой простой код:

client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()
df = pd.DataFrame(list(cursor))

Мой многопроцессорный код:

def read_mongo_parallel(skipses):


    print("Starting process")
    client = MongoClient(skipses[4],skipses[5])
    db = client[skipses[2]]
    collection = db[skipses[3]]
    print("range of {} to {}".format(skipses[0],skipses[0]+skipses[1]))

    cursor = collection.find().skip(skipses[0]).limit(skipses[1])

    return list(cursor)

all_lists = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for  rows in  executor.map(read_mongo_parallel, skipesess):
            all_lists.extend(rows)


df = pd.DataFrame(all_lists)   

Увеличение памяти обоими методами и уничтожение ядра,

Что я делаю, работая?

Ответы [ 5 ]

4 голосов
/ 08 февраля 2020

Проблема в использовании list при создании DataFrame. Курсор используется все сразу, составляя список со словарями по 900 тыс., Что занимает много памяти.

Этого можно избежать, если создать пустой DataFrame, а затем вытащить документы в пакетном режиме, несколько документов за раз, добавление их к DataFrame.

def batched(cursor, batch_size):
    batch = []
    for doc in cursor:
        batch.append(doc)
        if batch and not len(batch) % batch_size:
            yield batch
            batch = []

    if batch:   # last documents
        yield batch

df = pd.DataFrame()
for batch in batched(cursor, 10000):
    df = df.append(batch, ignore_index=True)

10000 кажется приемлемым размером пакета, но вы можете изменить его в соответствии с вашими ограничениями памяти: чем выше, чем быстрее это закончится, но тем больше памяти он будет использовать во время работы.

ОБНОВЛЕНИЕ : добавьте некоторый тест

Обратите внимание, что при таком подходе запрос не требуется длятся дольше, а скорее наоборот, так как на самом деле требуется время, чтобы вытащить документы из mongodb в качестве словарей и разместить их в списке.

Вот некоторые тесты с документами 300K, которые показывают, как этот подход с правым batch_size на самом деле даже быстрее, чем перетаскивать весь курсор в список:

  • Весь курсор nto list
%%time

df = pd.DataFrame(list(db.test.find().limit(300000)))

Процессорное время: пользователь 35,3 с, sys: 2,14 с, всего: 37,5 с Время настенного режима: 37,7 с

  • batch_size=10000 <- <strong>FASTEST
%%time

df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 10000):
    df = df.append(batch, ignore_index=True)

Процессорное время: пользователь 29,5 с, sys: 1,23 с, всего: 30,7 с Время настенного режима: 30,8 с

  • batch_size=1000
%%time

df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 1000):
    df = df.append(batch, ignore_index=True)

Время CPU: пользовательский 44,8 с, sys: 2,09 с, всего: 46,9 с Время наработки на стенку: 46,9 с

  • batch_size=100000
%%time

df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 100000):
    df = df.append(batch, ignore_index=True)

Время CPU: пользователь 34,6 с, sys: 1,15 с, всего: 35,8 с Время настенного режима: 36 с

1 голос
/ 09 февраля 2020

Загрузка данных кусками.

Использование iterator2dataframes из { ссылка }

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)

client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()

df = iterator2dataframes(cursor, 1000)
1 голос
/ 31 января 2020

Этот тестовый комплект создает 900 тыс. (Пусть и небольших) записей и отлично работает на моем стандартном ноутбуке. Попробуйте.

import pymongo
import pandas as pd

db = pymongo.MongoClient()['mydatabase']
db.mycollection.drop()
operations = []

for i in range(900000):
    operations.append(pymongo.InsertOne({'a': i}))

db.mycollection.bulk_write(operations, ordered=False)
cursor = db.mycollection.find({})
df = pd.DataFrame(list(cursor))
print(df.count())
0 голосов
/ 11 февраля 2020

Я нашел решение с многопроцессорностью, и оно является самым быстрым

def chunks(collection_size, n_cores=mp.cpu_count()):
    """ Return chunks of tuples """


    batch_size = round(collection_size/n_cores)
    rest = collection_size%batch_size 
    cumulative = 0
    for i in range(n_cores):
        cumulative += batch_size
        if i == n_cores-1:
            yield (batch_size*i,cumulative+rest)
        else:
           yield (batch_size*i,cumulative)


def parallel_read(skipses,host=HOST, port=PORT):


    print('Starting process on range of {} to {}'.format(skipses[0],skipses[1]))
    client = MongoClient(host,port)
    db = client[DB_NAME]
    collection = db[COLLECTION_NAME]

    cursor = collection.find({},{ '_id': False } )
    _df = pd.DataFrame(list(cursor[skipses[0]:skipses[1]]))
    return _df



def read_mongo(colc_size,_workers=mp.cpu_count()):
    temp_df = pd.DataFrame()
    pool = mp.Pool(processes=_workers)
    results = [pool.apply_async(parallel_read, args=(chunk,))  for chunk in chunks(colc_size,n_cores=_workers)]
    output = [p.get() for p in results]
    temp_df = pd.concat(output)
    return temp_df


time_0 = time()
df = read_mongo(get_collection_size())
print("Reading database with  {} processes took {}".format(mp.cpu_count(),time()-time_0))



Starting process on range of 0 to 53866
Starting process on range of 323196 to 377062
Starting process on range of 430928 to 484794
Starting process on range of 538660 to 592526
Starting process on range of 377062 to 430928
Starting process on range of 700258 to 754124
Starting process on range of 53866 to 107732
Starting process on range of 484794 to 538660
Starting process on range of 592526 to 646392
Starting process on range of 646392 to 700258
Starting process on range of 215464 to 269330
Starting process on range of 754124 to 807990
Starting process on range of 807990 to 915714
Starting process on range of 107732 to 161598
Starting process on range of 161598 to 215464
Starting process on range of 269330 to 323196

Чтение базы данных с 16 процессами заняло 142.64860558509827

С одним из примеров выше (без многопроцессорности)

def iterator2dataframes(iterator, chunk_size: int):
  """Turn an iterator into multiple small pandas.DataFrame

  This is a balance between memory and efficiency
  """
  records = []
  frames = []
  for i, record in enumerate(iterator):
    records.append(record)
    if i % chunk_size == chunk_size - 1:
      frames.append(pd.DataFrame(records))
      records = []
  if records:
    frames.append(pd.DataFrame(records))
  return pd.concat(frames)

time_0 = time()
cursor = collection.find()
chunk_size = 1000
df = iterator2dataframes(cursor, chunk_size)
print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))

Чтение базы данных с размером chunksize = 10000 заняло 372.1170778274536

time_0 = time()
cursor = collection.find()
chunk_size = 10000
df = iterator2dataframes(cursor, chunk_size)
print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))

Чтение базы данных с размером chunksize = 10000 заняло 367.02637577056885

0 голосов
/ 08 февраля 2020

Вы можете попытаться получить данные из mongodb в чанке, используя индекс слайса, т.е. получить 100000 документов за раз из mongodb. Добавьте документы в фрейм данных, затем извлеките следующие 100000 документов и добавьте данные в фрейм данных.

client = MongoClient(host,port)
collection = client[db_name][collection_name]
maxrows=905679
        for i in range(0, maxrows, 100000):
            df2 = df2.iloc[0:0]
            if (i+100000<maxrows):
                cursor = collection.find()[i:i+100000]
            else:
                cursor = collection.find()[i:maxrows]
            df2= pd.DataFrame(list(cursor))
            df.append(df2, ignore_index=True)




См. Ссылку ниже, чтобы узнать больше об индексе срезов в mongodb.

https://api.mongodb.com/python/current/api/pymongo/cursor.html

...