Предполагая, что вы хотите упростить работу в среде AWS и не используя Spark (Glue / EMR), вы можете использовать AWS Athena следующим образом:
- Допустим, ваши файлы паркетанаходится в S3: // ведро / паркет /.
- Вы можете создать таблицу в Каталоге данных (т. Е. Используя Athena или Glue Crawler), указывая на это расположение паркета.Например, запустив что-то подобное в консоли SQL Athena:
CREATE EXTERNAL TABLE parquet_table (
col_1 string,
...
col_100 string)
PARTITIONED BY (date string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
LOCATION 's3://bucket/parquet/' ;
Как только вы сможете запросить таблицу parquet_table, которая будет читать файлы паркета, вы сможете создавать файлы CSV следующим образом, также используя Athena и выбирая только 4 интересующих вас столбца:
CREATE TABLE csv_table
WITH (
format = 'TEXTFILE',
field_delimiter = ',',
external_location = 's3://bucket/csv/'
)
AS SELECT col_1, col_2, col_3, col_4
FROM parquet_table ;
После этого вы можете фактически удалить временную таблицу csv и использовать только файлы CSV в каталоге s3: // bucket / csv / и делать больше, например, используя лямбду-триггер S3.работать и делать что-то еще или подобное.
Помните, что всего этого можно добиться с помощью лямбды, взаимодействующей с Афиной (пример здесь ), а также имейте в виду, что у нее есть разъем ODBC и PyAthenaиспользовать его из Python или других опций, поэтому использование Athena через Lambda или Консоль AWS - не единственный вариант, который вы можете использовать, если вы хотите автоматизировать это другим способом.
Надеюсь, это поможет.
Дополнительное редактирование, 25 сентября 2019 года. Отвечая на ваш вопрос о том, как это сделать в Pandas, я думаю, что лучшим способом было бы использовать Glue Python Shell, ноВы упомянули, что не хотите его использовать.Итак, если вы решите, вот основной пример того, как:
import pandas as pd
import boto3
from awsglue.utils import getResolvedOptions
from boto3.dynamodb.conditions import Key, Attr
args = getResolvedOptions(sys.argv,
['region',
's3_bucket',
's3_input_folder',
's3_output_folder'])
## @params and @variables: [JOB_NAME]
## Variables used for now. Job input parameters to be used.
s3Bucket = args['s3_bucket']
s3InputFolderKey = args['s3_input_folder']
s3OutputFolderKey = args['s3_output_folder']
## aws Job Settings
s3_resource = boto3.resource('s3')
s3_client = boto3.client('s3')
s3_bucket = s3_resource.Bucket(s3Bucket)
for s3_object in s3_bucket.objects.filter(Prefix=s3InputFolderKey):
s3_key = s3_object.key
s3_file = s3_client.get_object(Bucket=s3Bucket, Key=s3_key)
df = pd.read_csv(s3_file['Body'], sep = ';')
partitioned_path = 'partKey={}/year={}/month={}/day={}'.format(partKey_variable,year_variable,month_variable,day_variable)
s3_output_file = '{}/{}/{}'.format(s3OutputFolderKey,partitioned_path,s3_file_name)
# Writing file to S3 the new dataset:
put_response = s3_resource.Object(s3Bucket,s3_output_file).put(Body=df)
Карлос.