Dask dataframe выдает ошибку при чтении файла паркета в s3 - PullRequest
0 голосов
/ 13 мая 2019

Я пытаюсь использовать dask для чтения паркетного стола в s3 следующим образом:

import dask.dataframe as dd

s3_path = "s3://my_bucket/my_table"
times = dd.read_parquet(
        s3_path,
        storage_options={
                          "client_kwargs": {
                              "endpoint_url": bucket_endpoint_url,
                          },
                          "profile_name": bucket_profile,
                        }
    )
result = times.groupby(['account', 'system_id'])['exec_time'].sum().nlargest(num_row).compute().reset_index().to_dict(orient='records')

У меня установлены только pyarrow и s3fs. Когда я читаю его с помощью LocalCluster, как показано ниже, он прекрасно работает

client = LocalCluster(n_workers=1, threads_per_worker=1, processes=False)

Но когда я читаю его, используя истинный кластер, он выдает эту ошибку:

client = Client('master_ip:8786')

TypeError: ('Could not serialize object of type tuple.', "(<function apply at 0x7f9f9c9942f0>, <function _apply_chunk at 0x7f9f76ed1510>, [(<function _read_pyarrow_parquet_piece at 0x7f9f76eedea0>, <dask.bytes.s3.DaskS3FileSystem object at 0x7f9f5a83edd8>, ParquetDatasetPiece('my_bucket/my_table/0a0a6e71438a43cd82985578247d5c97.parquet', row_group=None, partition_keys=[]), ['account', 'system_id', 'upload_time', 'name', 'exec_time'], [], False, <pyarrow.parquet.ParquetPartitions object at 0x7f9f5a565278>, []), 'account', 'system_id'], {'chunk': <methodcaller: sum>, 'columns': 'exec_time'})")

distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/project_folder/lib64/python3.6/site-packages/distributed/batched.py", line 94, in _background_send
    on_error='raise')
  File "/project_folder/lib64/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/project_folder/lib64/python3.6/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/project_folder/lib64/python3.6/site-packages/distributed/comm/tcp.py", line 224, in write
    'recipient': self._peer_addr})
  File "/project_folder/lib64/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/project_folder/lib64/python3.6/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/project_folder/lib64/python3.6/site-packages/distributed/comm/utils.py", line 50, in to_frames
    res = yield offload(_to_frames)
  File "/project_folder/lib64/python3.6/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/usr/lib64/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/project_folder/lib64/python3.6/site-packages/distributed/comm/utils.py", line 43, in _to_frames
    context=context))
  File "/project_folder/lib64/python3.6/site-packages/distributed/protocol/core.py", line 54, in dumps
    for key, value in data.items()
  File "/project_folder/lib64/python3.6/site-packages/distributed/protocol/core.py", line 55, in <dictcomp>
    if type(value) is Serialize}
  File "/project_folder/lib64/python3.6/site-packages/distributed/protocol/serialize.py", line 164, in serialize
    raise TypeError(msg, str(x)[:10000])

Вы знаете, в чем может быть проблема?

Спасибо

1 Ответ

0 голосов
/ 13 мая 2019

Сериализация объектов pyarrow была проблематичной в pyarrow 0.13.0, что должно быть исправлено в следующем выпуске.Можете ли вы попробовать понизить версию Pyarrow?

...