TypeError: тип данных не понятен при использовании временного кластера EMR - PullRequest
0 голосов
/ 25 марта 2020

Я использую следующий очень простой код, который читает файлы CSV или паркета из корзины S3 и копирует ее в другую корзину S3.

def read_s3_file_as_raw(bucket_name, path_to_file):
    object = s3_client.get_object(Bucket=bucket_name, Key=path_to_file)
    response_body = object['Body'].read().decode(encoding="utf-8",errors="ignore")
    file_name = os.path.basename(path_to_file)
    print(file_name + ' read from S3 successfully.')

    return response_body

def read_s3_file_as_dataframe(bucket_name, path_to_file):
    '''
    read single csv or parquet file on s3 as dataframe
    '''
    file_name = os.path.basename(path_to_file)

    if path_to_file.endswith('.csv'):
        object = s3_client.get_object(Bucket=bucket_name, Key=path_to_file)
        df = pd.read_csv(object['Body'])
        print(file_name + ' read from S3 successfully.')
        return df

    elif path_to_file.endswith('.parquet'):
        fs = s3fs.S3FileSystem()
        p_dataset = pq.ParquetDataset(f"s3://{bucket_name}/{path_to_file}",filesystem=fs)
        df = p_dataset.read().to_pandas()
        print(file_name + ' read from S3 successfully.')
        return df

Когда я запускаю код на постоянном кластере, EC2 или даже на моей локальной машине, он работает отлично (как для csv, так и для паркета), но когда я пытаюсь запустить его через временный кластер EMR, я получаю следующую ошибку (только для паркета и без проблем с CSV-файлами):

 File "pyarrow/array.pxi", line 559, in pyarrow.lib._PandasConvertible.to_pandas
  File "pyarrow/table.pxi", line 1367, in pyarrow.lib.Table._to_pandas
  File "/usr/local/lib64/python3.6/site-packages/pyarrow/pandas_compat.py", line 769, in table_to_blockmanager
    return BlockManager(blocks, axes)
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/managers.py", line 141, in __init__
    self._consolidate_check()
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/managers.py", line 656, in _consolidate_check
    ftypes = [blk.ftype for blk in self.blocks]
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/managers.py", line 656, in <listcomp>
    ftypes = [blk.ftype for blk in self.blocks]
  File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/blocks.py", line 349, in ftype
    return f"{dtype}:{self._ftype}"
  File "/usr/local/lib64/python3.6/site-packages/numpy/core/_dtype.py", line 54, in __str__
    return dtype.name
  File "/usr/local/lib64/python3.6/site-packages/numpy/core/_dtype.py", line 347, in _name_get
    if _name_includes_bit_suffix(dtype):
  File "/usr/local/lib64/python3.6/site-packages/numpy/core/_dtype.py", line 326, in _name_includes_bit_suffix
    elif np.issubdtype(dtype, np.flexible) and _isunsized(dtype):
  File "/usr/local/lib64/python3.6/site-packages/numpy/core/numerictypes.py", line 726, in issubdtype
    arg1 = dtype(arg1).type
TypeError: data type not understood
Command exiting with ret '1'

Я использую следующую команду для его выполнения:

aws emr create-cluster --applications Name=Hadoop Name=Spark \
--bootstrap-actions '[{"Path":"s3://propic-nonprod-datalake-force-transient/bootstrap3.sh","Name":"cluster_setup"}]' \
--service-role EMR_DefaultRole \
--release-label emr-5.20.0 \
--log-uri 's3n://propic-nonprod-datalake-logs/logs/emrtransientcluster/development/' \
--name 'emrtransientcluster-dataload-development' \
--instance-type m1.large --instance-count 1 \
--auto-terminate \
--steps Type=CUSTOM_JAR,Name=CustomJAR,ActionOnFailure=CONTINUE,Jar=s3://ap-southeast-2.elasticmapreduce/libs/script-runner/script-runner.jar,Args=["s3://propic-nonprod-datalake-force-transient/s3_file_transfer5.py"]

1 Ответ

0 голосов
/ 19 апреля 2020

Если вы не выполняете никаких преобразований данных, я бы предложил использовать встроенный s3-dist-cp вместо того, чтобы писать собственный код с нуля только для копирования данных между сегментами. Подробную информацию о том, как добавить его в качестве шага к работающему кластеру, можно найти здесь . Короче говоря, вам нужно изменить последнюю строку вашей команды на что-то вроде этого:

--steps Type=CUSTOM_JAR, Name="S3DistCp step", ActionOnFailure=CONTINUE, Jar="command-runner.jar", Args=["s3-dist-cp", "--s3Endpoint=s3.amazonaws.com", "--src=s3://src-bucket/dir/", "--dest=s3://dest-bucket/dir/"]
...