Использование Dask для чтения паркетных файлов из облачного хранилища Google - PullRequest
0 голосов
/ 24 сентября 2018

Я пытаюсь использовать Dask для чтения и записи из Google Bucket.Использование группы csv файлов работает, но неудобно (медленнее, не может сжимать, не может читать только некоторые столбцы), поэтому я попытался использовать формат apache parquet.

кажется, что запись работает нормально:

import dask.dataframe as dd
pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]})
dask_df = dd.from_pandas(pandas_df, npartitions=2)
dask_df.to_parquet("gcs://my_google_bucket/test/")

Но когда я пытаюсь прочитать его обратно

read_again_df = dd.read_parquet("gcs://my_google_bucket/test/") 

, я получаю неосуществленную ошибку:

AttributeError                            Traceback (most recent call last)
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
    520     try:
--> 521         return fs._get_pyarrow_filesystem()
    522     except AttributeError:

AttributeError: 'DaskGCSFileSystem' object has no attribute '_get_pyarrow_filesystem'

During handling of the above exception, another exception occurred:

NotImplementedError                       Traceback (most recent call last)
<ipython-input-42-ef1fc41d04d5> in <module>()
----> 1 read_again = dd.read_parquet("gcs://my_google_bucket/test/")

~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, infer_divisions)
    991 
    992     return read(fs, fs_token, paths, columns=columns, filters=filters,
--> 993                 categories=categories, index=index, infer_divisions=infer_divisions)
    994 
    995 

~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/dataframe/io/parquet.py in _read_pyarrow(fs, fs_token, paths, columns, filters, categories, index, infer_divisions)
    505         columns = list(columns)
    506 
--> 507     dataset = pq.ParquetDataset(paths, filesystem=get_pyarrow_filesystem(fs))
    508     if dataset.partitions is not None:
    509         partitions = [n for n in dataset.partitions.partition_names

~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
    522     except AttributeError:
    523         raise NotImplementedError("Using pyarrow with a %r "
--> 524                                   "filesystem object" % type(fs).__name__)

NotImplementedError: Using pyarrow with a 'DaskGCSFileSystem' filesystem object

I 'Я предполагаю, что это означает, что dask по-прежнему не может читать файлы паркета напрямую из облачного сервиса Google.Есть ли какой-нибудь косвенный способ заставить эту работу, скажем, использовать pyarrow?

Что я хочу сохранить, так это возможность ленивой загрузки вещей, а затем использовать dask для преобразования данных.

Спасибо!

1 Ответ

0 голосов
/ 24 сентября 2018

Dask может, конечно, читать паркет из GCS с бэкэндом fastparquet (engine='fastparquet').Обратите внимание, что pyarrow не создал файл _metadata, которого ожидает fastparquet, поэтому вы можете либо записать свои данные с помощью fastparquet, создать файл из существующих файлов данных с помощью fastparquet, либо передать строку глобуса, указывающую на все файлы данных.вместо каталога.

То, что вы делаете, должно работать и с pyarrow, так как обычно pyarrow может принимать любой файлоподобный объект python, но в этом случае, похоже, вместо этого пытается создать файловую систему pyarrow.Ошибка, которую вы видите выше, вероятно, является ошибкой и должна быть исследована.

-edit-

Согласно комментариям от OP, следующее работает

pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]}) 
dask_df = dd.from_pandas(pandas_df, npartitions=2) 
dask_df.to_parquet("gcs://my_bucket/test", engine='fastparquet') 
read_again_df = dd.read_parquet("gcs://my_bucket/test/", engine='fastparquet')

Обратите внимание, что по какой-то ошибочной причине dask_df.to_parquet() необходимо вызывать с помощью "gcs: // my_bucket / test", без "/", в противном случае dd.read_parquet() не будет работать

...