Dask dataframe из отложенного zip csv - PullRequest
0 голосов
/ 19 октября 2018

Я пытаюсь создать dask-фрейм данных из набора сжатых CSV-файлов.Читая эту проблему, кажется, что dask нужно использовать dask.distributed delayed ()

import glob
import dask.dataframe as dd
import zipfile
import pandas as pd 
from dask.delayed import delayed

#Create zip_dict with key-value pairs for .zip & .csv names
file_list = glob.glob('my_directory/zip_files/')
zip_dict = {}
for f in file_list:
    key = f.split('/')[5][:-4]
    zip_dict[key] = zipfile.ZipFile(f)

пример содержимого zip_dict = {'log20160201': zipfile.ZipFile filename = '/ my_directory /zip_files / log20160201.zip 'mode =' r ',' log20160218 ': zipfile.ZipFile filename =' / my_directory / zip_files / log20160218.zip 'mode =' r '}

# Create list of delayed pd.read_csv()    
d_rows = []
for k, v in zip_dict.items():

    row = delayed(pd.read_csv)(v.open(k+'.csv'),usecols=['time','cik'])
    d_rows.append(row)
    v.close()

пример содержимого d_rows = [Задержка ('read_csv-c05dc861-79c3-4e22-8da6-927f5b7da123'), Задержка ('read_csv-4fe1c901-44b4-478b-9c11-4a80f7a639e2')]

1012 *

Возвращаемая ошибка: ValueError: Неверный путь к файлу или тип объекта буфера: класс 'список'

1 Ответ

0 голосов
/ 19 октября 2018

В этом случае, я не думаю, что вам на самом деле требуется, чтобы словарь zip_dict лениво читал в этих заархивированных файлах с помощью Pandas.

На основании этот очень похожий вопрос SO для чтения (.gz) сжатые *.csv файлы с использованием Dask (также показанные здесь здесь ), один из возможных подходов, который вы можете предпринять, это следующий

A.лениво читайте файлы, используя Pandas и dask.delayed (не забудьте указать имена столбцов, которые вы хотите сохранить) и создайте список отложенных объектов

B.Преобразовать в один кадр данных Dask, используя dd.from_delayed, указав dtype столбцов ( в соответствии с рекомендациями ) - нужно указать dtype только для 2 необходимых вам столбцов

import glob
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
from collections import OrderedDict

file_list = glob.glob('my_directory/zip_files/*.zip')

# Lazily reading files into Pandas DataFrames
dfs = [delayed(pd.read_csv)(f, compression='zip', usecols=['time','cik'])
    for f in file_list]

# Specify column dtypes for columns in Dask DataFrame (recommended)
my_dtypes = OrderedDict([("time",int), ("cik",int)])

# Combine into a single Dask DataFrame
ddf = dd.from_delayed(dfs, meta=my_dtypes)

print(type(ddf))
<class 'dask.dataframe.core.DataFrame'>
...