Я использую следующий очень простой код, который читает файлы CSV или паркета из корзины S3 и копирует ее в другую корзину S3.
def read_s3_file_as_raw(bucket_name, path_to_file):
object = s3_client.get_object(Bucket=bucket_name, Key=path_to_file)
response_body = object['Body'].read().decode(encoding="utf-8",errors="ignore")
file_name = os.path.basename(path_to_file)
print(file_name + ' read from S3 successfully.')
return response_body
def read_s3_file_as_dataframe(bucket_name, path_to_file):
'''
read single csv or parquet file on s3 as dataframe
'''
file_name = os.path.basename(path_to_file)
if path_to_file.endswith('.csv'):
object = s3_client.get_object(Bucket=bucket_name, Key=path_to_file)
df = pd.read_csv(object['Body'])
print(file_name + ' read from S3 successfully.')
return df
elif path_to_file.endswith('.parquet'):
fs = s3fs.S3FileSystem()
p_dataset = pq.ParquetDataset(f"s3://{bucket_name}/{path_to_file}",filesystem=fs)
df = p_dataset.read().to_pandas()
print(file_name + ' read from S3 successfully.')
return df
Когда я запускаю код на постоянном кластере, EC2 или даже на моей локальной машине, он работает отлично (как для csv, так и для паркета), но когда я пытаюсь запустить его через временный кластер EMR, я получаю следующую ошибку (только для паркета и без проблем с CSV-файлами):
File "pyarrow/array.pxi", line 559, in pyarrow.lib._PandasConvertible.to_pandas
File "pyarrow/table.pxi", line 1367, in pyarrow.lib.Table._to_pandas
File "/usr/local/lib64/python3.6/site-packages/pyarrow/pandas_compat.py", line 769, in table_to_blockmanager
return BlockManager(blocks, axes)
File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/managers.py", line 141, in __init__
self._consolidate_check()
File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/managers.py", line 656, in _consolidate_check
ftypes = [blk.ftype for blk in self.blocks]
File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/managers.py", line 656, in <listcomp>
ftypes = [blk.ftype for blk in self.blocks]
File "/usr/local/lib64/python3.6/site-packages/pandas/core/internals/blocks.py", line 349, in ftype
return f"{dtype}:{self._ftype}"
File "/usr/local/lib64/python3.6/site-packages/numpy/core/_dtype.py", line 54, in __str__
return dtype.name
File "/usr/local/lib64/python3.6/site-packages/numpy/core/_dtype.py", line 347, in _name_get
if _name_includes_bit_suffix(dtype):
File "/usr/local/lib64/python3.6/site-packages/numpy/core/_dtype.py", line 326, in _name_includes_bit_suffix
elif np.issubdtype(dtype, np.flexible) and _isunsized(dtype):
File "/usr/local/lib64/python3.6/site-packages/numpy/core/numerictypes.py", line 726, in issubdtype
arg1 = dtype(arg1).type
TypeError: data type not understood
Command exiting with ret '1'
Я использую следующую команду для его выполнения:
aws emr create-cluster --applications Name=Hadoop Name=Spark \
--bootstrap-actions '[{"Path":"s3://propic-nonprod-datalake-force-transient/bootstrap3.sh","Name":"cluster_setup"}]' \
--service-role EMR_DefaultRole \
--release-label emr-5.20.0 \
--log-uri 's3n://propic-nonprod-datalake-logs/logs/emrtransientcluster/development/' \
--name 'emrtransientcluster-dataload-development' \
--instance-type m1.large --instance-count 1 \
--auto-terminate \
--steps Type=CUSTOM_JAR,Name=CustomJAR,ActionOnFailure=CONTINUE,Jar=s3://ap-southeast-2.elasticmapreduce/libs/script-runner/script-runner.jar,Args=["s3://propic-nonprod-datalake-force-transient/s3_file_transfer5.py"]