dask: увеличение скорости загрузки файла с несколькими файлами в один кадр данных - PullRequest
1 голос
/ 11 апреля 2019

Я объединяю несколько тысяч фреймов данных (~ 1 миллиона строк) разумного размера на довольно регулярной основе.Хотя я могу заставить панд работать с read_csv, это ужасное решение из-за очень больших накладных расходов.

Мне нужно более быстрое решение для этого, и dask, по-видимому, использует эту множественную CSV-функциональность в своих read_csv / read_table функциях.

Однако я не заметил значительного улучшения в скоростис этими решениями.

Есть ли способ увеличить скорость следующего типа процесса?:

import io
import re
import numpy as np
import dask.bag as dbag
import dask.dataframe as ddf

def filter_data(fp, ix_col = 'index_here', val_col = 'some_value'):
    dask_frame = ddf.read_table(fp)
    # filter to only one column and index (like a series)
    series = dask_frame[[ix_col, val_col]].set_index(ix_col)

    # Rename it to be the filename / file_id
    file_id = re.match("file_(.+)\.txt", fp)[1]
    series.columns = [file_id]
    return series

def get_dataframe(file_paths):
    # Make a collection
    dasks_bag = dbag.from_sequence(file_paths)

    # Open the files as dask frame and filter each to series-like frames
    filtered_dfs = dasks_bag.map(filter_data)

    # Compute pandas dataframe on each within the list
    filtered_dfs = filtered_dfs.compute()

    # concatenate them together
    df = ddf.concat(filtered_dfs, axis = 1)

    # Compute on concatenated again, so it becomes pandas dataframe
    return df.dropna(how = "all").compute()



# Just write some random files here
paths = ['file_120202021.txt', 'file_123.txt', 'file_12330.txt']
for fp in paths:
    with open(fp, 'w') as f:
        f.write('index_here\tsome_value\tother_cols\n')
        for row in range(0,1000):
            for val, other_col in np.random.rand(1, 2):
                f.write(str(row)+'\t'+str(val)+'\t'+str(other_col)+'\n')

# Make a dataframe with dask
get_dataframe(paths)

Редактировать: У меня есть небольшой скрипт, который показывает сбой dask: время, необходимое для dask на моем компьютере, составляет 1,87 секунд , в то время каквремя, требуемое для панд, составляет 0,29 секунды

Очевидно, что я делаю это неправильно, поскольку dask был специально создан для более быстрого вычисления на фреймах данных.

import io
import re
import numpy as np
import pandas as pd
import dask.bag as dbag
import dask.dataframe as ddf
import time

def get_dask_dataframe(file_paths,  ix_col = 'index_here', val_col = 'some_value'):
    # Make a collection
    dasks_bag = dbag.from_sequence(file_paths)

    # read and filter to data of interest
    dask_frames = ddf.read_table(file_paths, include_path_column = True)[[ix_col, val_col, 'path']]

    # Make pandas dataframe
    df = dask_frames.compute()

    # Pivot since read_table puts path in one column
    df = df.pivot_table(values = val_col, index = ix_col, columns = 'path')
    return df.dropna(how = "all")

def get_pandas_dataframe(file_paths, ix_col = 'index_here', val_col = 'some_value'):
    # Make a collection
    l = []
    for f in file_paths:
        series = pd.read_csv(f, sep = '\t')[[ix_col, val_col]].set_index(ix_col)
        # Rename it to be the filename / file_id
        file_id = re.match("file_(.+)\.txt", f)[1]
        series.columns = [file_id]
        l += [series]

    # concatenate them together
    df = pd.concat(l, axis = 1)
    return df.dropna(how = "all")


# Just write a whole bunch of random files
paths = ['file_'+str(i)+'.txt' for i in range(0, 100)]
for fp in paths:
    with open(fp, 'w') as f:
        f.write('index_here\tsome_value\tother_cols\n')
        for row in range(0,1000):
            for val, other_col in np.random.rand(1, 2):
                f.write(str(row)+'\t'+str(val)+'\t'+str(other_col)+'\n')

t0 = time.time()
# Make a dataframe with dask
df1 = get_dask_dataframe(paths)
t1 = time.time()
print(t1-t0)

t0 = time.time()
# Make a dataframe with dask
df2 = get_pandas_dataframe(paths)
t1 = time.time()
print(t1-t0)
...