Проблема выделения многопроцессорной памяти Python - PullRequest
0 голосов
/ 04 января 2019

Я использую multiprocessing модуль. Я читаю таблицы из базы данных MySQL и делаю это частями параллельно. Все отлично работает, пока я не увеличу размер каждого фрейма данных.

Суть: я перебираю каждое имя таблицы (tbl_name) в списке и читаю таблицу несколькими параллельными порциями. Я храню эти промежуточные куски в словаре. Наконец, я объединяю все кадры данных в словаре, используя pd.concat.

Как я уже сказал, проблема в Memory Allocation Issue, когда я набираю около 1 000 000 размеров строк, поэтому у меня есть 3 вопроса:

1) Где можно выполнить очистку памяти (например, del объекты, gc.collect())? То есть, где делаются копии моих данных в памяти и когда я могу их безопасно выпустить? Я попытался поместить оператор del в нескольких местах, но он кричит на меня, что объекты не существуют (потому что они живут и умирают только в параллельном цикле).

2) Поскольку я пытаюсь запустить Pool ресурсов и вызываю одну и ту же функцию для нескольких таблиц, хочу ли я дождаться Pool.close() до моего последнего цикла? Похоже, я излишне открываю и закрываю пулы ресурсов и создаю утечку памяти.

3) У меня есть функция обратного вызова, которая берет каждый промежуточный кадр данных и сохраняет его в словаре, а затем объединяет все эти кадры данных в конце после завершения параллельного выполнения. Должен ли я избежать этого и просто вернуть необработанный вывод и объединить в конце? Если да, то как мне pd.concat списков в кадры данных?

Мне пришлось скрыть часть кода, поскольку он чувствителен, но суть в том, что sql_to_dataframe() отправляет запрос ODBC в базу данных MySQL и возвращает фрейм данных в памяти.

Полный код:

import halosql
import pandas as pd
import datetime
import numpy as np

import time
import gc
from multiprocessing import Process, Queue, Pool

# parameters
limit_nbr = 500000
chunk_threshold = 100000
halo_open = datetime.date(2018, 2, 12)
extract_date = datetime.date(2018, 2, 13)
halo_close = datetime.date(9999, 12, 31)
ecf_id = 'ceedbb4e-c180-4ff1-9c77-d59fb65873c9'
ts_rowid_range = 20180213143418000000000001
TB_tbl_name = 'GLT0'
GL_lines_tbl_name = 'BSEG'

cm = halosql.Connections(cm_ipaddress, cm_dbname, cm_username, cm_userpass)

run_time = time.time() # PKB

def create_sql_chunk(tbl_name, chunk_nbr):
    df = cm.sql_to_dataframe(
        'SELECT * FROM {tbl_name} WHERE uid_row {query_filter}'.format(tbl_name=tbl_name,query_filter=chunk_dict[chunk_nbr]))
    # time.sleep(chunk_nbr*2)
    return df


results = {}
def collect_results(result):
    """Uses apply_async's callback to setup up a separate Queue for each process"""
    results[time.time()] = result


tbl_list = ['BSEG', 'SKAT', 'TCURX', 'SKA1', 'T009', 'T001', 'TBSLT', 'T003T', 'T003', 'TJ01T', 'TSTCT', 'TTYPT', 'USR02',
            'T881', 'BKPF', 'GLT0', 'ADRP', 'USR21', 'T004T', 'GLT0'] # load data

time_dict = {}
df_dict = {}

for tbl_name in tbl_list:
    read_time = time.time()  # PKB
    print('Reading tbl ' + tbl_name + ' from MemSQL')
    cm.execute('USE ' + cm_dbname + ';')
    uidrow_guide = cm.sql_to_dataframe('SELECT min(uid_row) as row_start, max(uid_row) as row_end FROM {tbl_name}'.format(tbl_name=tbl_name,limit_nbr=limit_nbr))
    total_row_size = int(str(uidrow_guide.row_end[0])[:-3]) if limit_nbr is None or int(str(uidrow_guide.row_end[0])[:-3]) < limit_nbr else limit_nbr
    row_size = total_row_size
    chunk_nbr = int(total_row_size / chunk_threshold)
    chunk_nbr = 1  if chunk_nbr == 0 else chunk_nbr
    chunk_size = int(row_size / chunk_nbr)
    if (chunk_size * chunk_nbr) < row_size:
        chunk_nbr += 1
    chunk_dict = {}
    row_start = 1
    row_end = row_start + chunk_size - 1
    uid_row_start = str(row_start) + '001'
    uid_row_end = str(row_end) + '001'
    for c in list(range(1,chunk_nbr+1)):
        tbl_read_time = time.time()
        if c < chunk_nbr:
            chunk_dict[c] = 'BETWEEN ' + uid_row_start + ' AND ' + uid_row_end
        else:
            chunk_dict[c] = 'BETWEEN ' + uid_row_start + ' AND ' + str(total_row_size) + '001'

        row_start += chunk_size
        row_end = row_start + chunk_size - 1
        uid_row_start = str(row_start) + '001'
        uid_row_end = str(row_end) + '001'

    if __name__ == "__main__":
        start_time = time.time()

        # Repeats the compute intensive operation on 10 data frames concurrently
        p = Pool(processes=chunk_nbr)
        for i in list(range(1,chunk_nbr+1)):
            p.apply_async(create_sql_chunk, args=(tbl_name,i,), callback=collect_results)
        print("--- %s seconds ---" % (time.time() - start_time))
        p.close()
        p.join()

        df_dict['df_' + tbl_name] = pd.concat(results.values())
        gc.collect()

    read_time = (time.time() - read_time) / 60
    time_dict['READ df_' + tbl_name] = read_time

print(df_dict)
print(time_dict)
run_time = (time.time() - run_time) / 60
print("--- %s minutes to finish ---" % run_time) # PKB
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...