сбой dask в операции слияния, и причина - ошибка памяти - PullRequest
0 голосов
/ 03 сентября 2018

Сбой операции Dask в операции слияния, и причина в ошибке памяти, я попытался объединить два файла CSV, первый размер файла 6,5 ГБ с 48 столбцами атрибутов, второй 315 МБ с тремя столбцами атрибутов, мы провели эту операцию на сервере с 8 ГБ ОЗУ и 2 виртуальными ЦП (amazon EC2 t2.large linux redhat), после запуска задания память начинает увеличиваться до тех пор, пока ОС не завершит работу. после этого я добавил следующие два ключа в sysctl.conf, и новая ошибка стала «Memoey Error» и «Out of memory».

vm.overcommit_memory = 2 vm.overcommit_ratio = 90

фрагмент кода:

result_df =pd.merge(first_df, second_df, left_index=True , right_index=True)
result_df.to_csv('result*.csv', sep=';')

есть идеи о том, что я здесь что-то упустил? и если это вопрос ресурсов? или быстрая реализация? и если есть какие-либо ограничения на dask от того, насколько он может использовать диск стороне с памятью и какую оптимизацию я могу сделать, чтобы получить лучший результат.

ниже приведен полный код с использованием индекса и без использования индекса, но тот же результат, но, пожалуйста, обратите внимание, что я использую одну машину, а не кластер, и я понимаю, что dask должен записывать на диск в случае, если память не достаточно.

import dask.dataframe as pd 

def build_data_file(): 
first_df = pd.read_csv('/home/lion/du/data/first_df', sep=';', blocksize=10000000,dtype={'user_ID': str}, names=['user_ID', 'att1', 'att2', 'att3','att4','att5','att6','att7','att8','att9','att10','att11','att12','att13','att14', 'att15', 'att16', 'att17', 'att18','att19', 'att20', 'att21','att22','att23', 'att24','att25', 'att26', 'att27','att28', 'att29', 'att30', 'att31','att32', 'att33','att34', 'att35','att36', 'att37','att38','att39', 'att40','att41', 'att42', 'att43','att44','att45','att46', 'att47','att48','att49']) 

second_df = pd.read_csv('/home/lion/du/data/second_df', sep=';', blocksize=10000000,dtype={'user_ID': str}, names=['user_ID', 'att_S_1', 'att_S_2', 'att_S_3']) 

df_final = pd.merge(first_df, second_df, on='user_ID') 
df_final.to_csv('result*.csv', sep=';') 
if name == 'main': build_data_file()

**** с индексом

import dask.dataframe as pd
def build_data_file():
first_df = pd.read_csv('/home/lion/du/data/first_df', sep=';', blocksize=10000000,dtype={'user_ID': str},
                                    names=['user_ID', 'att1', 'att2',
                   'att3','att4','att5','att6','att7','att8','att9','att10','att11','att12','att13','att14', 'att15',
                                             'att16', 'att17', 'att18','att19', 'att20', 'att21','att22','att23', 'att24','att25', 'att26', 'att27','att28', 'att29',
                                                 'att30', 'att31','att32', 'att33','att34', 'att35','att36', 'att37','att38','att39', 'att40','att41', 'att42',
                                                  'att43','att44','att45','att46', 'att47','att48','att49']).set_index('user_ID')
    second_df = pd.read_csv('/home/lion/du/data/second_df', sep=';', blocksize=10000000,dtype={'user_ID': str},
                              names=['user_ID', 'att_S_1',
                                     'att_S_2', 'att_S_3']).set_index('user_ID')

    df_final =pd.merge(first_df, second_df, left_index=True , right_index=True)
    df_final.to_csv('result*.csv', sep=';')
if __name__ == '__main__':
    build_data_file()
...