Я использую multiprocessing
модуль. Я читаю таблицы из базы данных MySQL и делаю это частями параллельно. Все отлично работает, пока я не увеличу размер каждого фрейма данных.
Суть: я перебираю каждое имя таблицы (tbl_name
) в списке и читаю таблицу несколькими параллельными порциями. Я храню эти промежуточные куски в словаре. Наконец, я объединяю все кадры данных в словаре, используя pd.concat
.
Как я уже сказал, проблема в Memory Allocation Issue
, когда я набираю около 1 000 000 размеров строк, поэтому у меня есть 3 вопроса:
1) Где можно выполнить очистку памяти (например, del
объекты, gc.collect()
)? То есть, где делаются копии моих данных в памяти и когда я могу их безопасно выпустить? Я попытался поместить оператор del
в нескольких местах, но он кричит на меня, что объекты не существуют (потому что они живут и умирают только в параллельном цикле).
2) Поскольку я пытаюсь запустить Pool
ресурсов и вызываю одну и ту же функцию для нескольких таблиц, хочу ли я дождаться Pool.close()
до моего последнего цикла? Похоже, я излишне открываю и закрываю пулы ресурсов и создаю утечку памяти.
3) У меня есть функция обратного вызова, которая берет каждый промежуточный кадр данных и сохраняет его в словаре, а затем объединяет все эти кадры данных в конце после завершения параллельного выполнения. Должен ли я избежать этого и просто вернуть необработанный вывод и объединить в конце? Если да, то как мне pd.concat
списков в кадры данных?
Мне пришлось скрыть часть кода, поскольку он чувствителен, но суть в том, что sql_to_dataframe()
отправляет запрос ODBC в базу данных MySQL и возвращает фрейм данных в памяти.
Полный код:
import halosql
import pandas as pd
import datetime
import numpy as np
import time
import gc
from multiprocessing import Process, Queue, Pool
# parameters
limit_nbr = 500000
chunk_threshold = 100000
halo_open = datetime.date(2018, 2, 12)
extract_date = datetime.date(2018, 2, 13)
halo_close = datetime.date(9999, 12, 31)
ecf_id = 'ceedbb4e-c180-4ff1-9c77-d59fb65873c9'
ts_rowid_range = 20180213143418000000000001
TB_tbl_name = 'GLT0'
GL_lines_tbl_name = 'BSEG'
cm = halosql.Connections(cm_ipaddress, cm_dbname, cm_username, cm_userpass)
run_time = time.time() # PKB
def create_sql_chunk(tbl_name, chunk_nbr):
df = cm.sql_to_dataframe(
'SELECT * FROM {tbl_name} WHERE uid_row {query_filter}'.format(tbl_name=tbl_name,query_filter=chunk_dict[chunk_nbr]))
# time.sleep(chunk_nbr*2)
return df
results = {}
def collect_results(result):
"""Uses apply_async's callback to setup up a separate Queue for each process"""
results[time.time()] = result
tbl_list = ['BSEG', 'SKAT', 'TCURX', 'SKA1', 'T009', 'T001', 'TBSLT', 'T003T', 'T003', 'TJ01T', 'TSTCT', 'TTYPT', 'USR02',
'T881', 'BKPF', 'GLT0', 'ADRP', 'USR21', 'T004T', 'GLT0'] # load data
time_dict = {}
df_dict = {}
for tbl_name in tbl_list:
read_time = time.time() # PKB
print('Reading tbl ' + tbl_name + ' from MemSQL')
cm.execute('USE ' + cm_dbname + ';')
uidrow_guide = cm.sql_to_dataframe('SELECT min(uid_row) as row_start, max(uid_row) as row_end FROM {tbl_name}'.format(tbl_name=tbl_name,limit_nbr=limit_nbr))
total_row_size = int(str(uidrow_guide.row_end[0])[:-3]) if limit_nbr is None or int(str(uidrow_guide.row_end[0])[:-3]) < limit_nbr else limit_nbr
row_size = total_row_size
chunk_nbr = int(total_row_size / chunk_threshold)
chunk_nbr = 1 if chunk_nbr == 0 else chunk_nbr
chunk_size = int(row_size / chunk_nbr)
if (chunk_size * chunk_nbr) < row_size:
chunk_nbr += 1
chunk_dict = {}
row_start = 1
row_end = row_start + chunk_size - 1
uid_row_start = str(row_start) + '001'
uid_row_end = str(row_end) + '001'
for c in list(range(1,chunk_nbr+1)):
tbl_read_time = time.time()
if c < chunk_nbr:
chunk_dict[c] = 'BETWEEN ' + uid_row_start + ' AND ' + uid_row_end
else:
chunk_dict[c] = 'BETWEEN ' + uid_row_start + ' AND ' + str(total_row_size) + '001'
row_start += chunk_size
row_end = row_start + chunk_size - 1
uid_row_start = str(row_start) + '001'
uid_row_end = str(row_end) + '001'
if __name__ == "__main__":
start_time = time.time()
# Repeats the compute intensive operation on 10 data frames concurrently
p = Pool(processes=chunk_nbr)
for i in list(range(1,chunk_nbr+1)):
p.apply_async(create_sql_chunk, args=(tbl_name,i,), callback=collect_results)
print("--- %s seconds ---" % (time.time() - start_time))
p.close()
p.join()
df_dict['df_' + tbl_name] = pd.concat(results.values())
gc.collect()
read_time = (time.time() - read_time) / 60
time_dict['READ df_' + tbl_name] = read_time
print(df_dict)
print(time_dict)
run_time = (time.time() - run_time) / 60
print("--- %s minutes to finish ---" % run_time) # PKB