Как эффективно переместить файл данных / Dask размером 67 ГБ, не загружая его полностью в память? - PullRequest
0 голосов
/ 16 января 2019

У меня есть 3 довольно больших файла (67 ГБ, 36 ГБ, 30 ГБ), на которых мне нужно обучать модели. Однако элементы представляют собой строки, а образцы - столбцы. Так как Dask не реализовал транспонирование и хранит DataFrames, разделенные на строки, мне нужно написать что-то, чтобы сделать это самостоятельно. Есть ли способ, которым я могу эффективно транспонировать без загрузки в память?

В моем распоряжении 16 ГБ оперативной памяти, и я использую ноутбук Jupyter. Я написал довольно медленный код, но был бы очень признателен за более быстрое решение. Скорость кода ниже займет месяц, чтобы закончить все файлы. Самый медленный шаг на несколько порядков - это awk.

import dask.dataframe as dd
import subprocess
from IPython.display import clear_output

df = dd.read_csv('~/VeryLarge.tsv')
with open('output.csv','wb') as fout:
    for i in range(1, len(df.columns)+1):
        print('AWKing')
        #read a column from the original data and store it elsewhere
        x = "awk '{print $"+str(i)+"}' ~/VeryLarge.tsv > ~/file.temp"
        subprocess.check_call([x], shell=True)

        print('Reading')
        #load and transpose the column
        col = pd.read_csv('~/file.temp')
        row = col.T
        display(row)

        print('Deleting')
        #remove the temporary file created
        !rm ../file.temp

        print('Storing')
        #store the row in its own csv just to be safe. not entirely necessary
        row.to_csv('~/columns/col_{:09d}'.format(i), header=False)

        print('Appending')
        #append the row (transposed column) to the new file
        with open('~/columns/col_{:09d}', 'rb') as fin:
            for line in fin:
                fout.write(line)

        clear_output()
        #Just a measure of progress
        print(i/len(df.columns))

Данные сами по себе составляют 10 миллионов строк (элементы) и 2000 столбцов (образцы). Это просто нужно транспонировать. В настоящее время это выглядит так: DataFrame

Ответы [ 2 ]

0 голосов
/ 17 января 2019

Я изменил свой оригинальный скрипт для развертывания на любом количестве процессоров. Это работало намного быстрее, так как я мог использовать несколько потоков и развертываться на AWS. Я использовал 96-ядерный компьютер, который выполнил задачу примерно за 8 часов. Я был очень удивлен, так как это почти линейное масштабирование! Идея состоит в том, чтобы сделать некоторые повторяющиеся задачи распространяемыми. После этого вы сможете назначать задачи процессору. Здесь распараллеливание выполняется командой pool.map().

Использование этого скрипта из командной строки довольно просто:

python3 transposer.py -i largeFile.tsv

Вы также можете указать другие аргументы, если требуется.

import argparse, subprocess
import numpy as np
import pandas as pd
import dask.dataframe as dd
from IPython.display import clear_output
from contextlib import closing
from os import cpu_count
from multiprocessing import Pool

parser = argparse.ArgumentParser(description='Transpose csv')
parser.add_argument('-i', '--infile', help='Path to input folder',
                    default=None)
parser.add_argument('-s', '--sep', help='input separator',
                    default='\t')

args = parser.parse_args()
infile = args.infile
sep = args.sep    
df = pd.read_csv(infile, sep='\t', nrows=3)    

def READ_COL(item):
    print(item)
    outfile = 'outfile{}.temp'.format(item)
    if item !=0:
                x = "awk '{print $"+str(item)+"}' "+infile+" > "+outfile
                subprocess.check_call([x], shell=True)
                col = pd.read_csv(outfile)
                row = col.T
                display(row)
                row.to_csv('col_{:09d}.csv'.format(item), header=False)
                subprocess.check_call(['rm '+outfile], shell=True)
                print(item/len(df.columns))

with closing(Pool(processes=cpu_count())) as pool:
    pool.map(READ_COL, list(range(1, len(df.columns)+1)))
0 голосов
/ 16 января 2019

Я бы создал промежуточный файл и использовал бы fp.seek, чтобы записать их в двоичном формате в новом порядке, прежде чем преобразовать его обратно в новый CSV. Если строка, столбец становится столбцом, строка - sys.float_info предоставит вам размер каждого элемента, позицию каждого элемента ((это столбец * old_row_length + строка) * размер с плавающей запятой).

Затем вы заново объединяете их в CSV, конвертируя их обратно в текст и читая old_count_rows на строку.

...