Лямбда-функция для преобразования csv в паркет из s3 - PullRequest
1 голос
/ 24 сентября 2019

У меня есть требование - 1. Преобразовать файл паркета, представленный в формате s3, в формат csv и поместить его обратно в формат s3.Процесс должен исключать использование ЭМИ.2. В файле Parquet содержится более 100 столбцов. Мне нужно просто извлечь 4 столбца из этого файла и создать файл CSV в s3.

У кого-нибудь есть решение?Примечание. Нельзя использовать клей EMR или AWS

.

Ответы [ 2 ]

1 голос
/ 24 сентября 2019

Предполагая, что вы хотите упростить работу в среде AWS и не используя Spark (Glue / EMR), вы можете использовать AWS Athena следующим образом:

  1. Допустим, ваши файлы паркетанаходится в S3: // ведро / паркет /.
  2. Вы можете создать таблицу в Каталоге данных (т. Е. Используя Athena или Glue Crawler), указывая на это расположение паркета.Например, запустив что-то подобное в консоли SQL Athena:
CREATE EXTERNAL TABLE parquet_table (
    col_1 string,
    ...
    col_100 string)
PARTITIONED BY (date string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
LOCATION 's3://bucket/parquet/' ;
Как только вы сможете запросить таблицу parquet_table, которая будет читать файлы паркета, вы сможете создавать файлы CSV следующим образом, также используя Athena и выбирая только 4 интересующих вас столбца:
 CREATE TABLE csv_table
 WITH (
     format = 'TEXTFILE', 
     field_delimiter = ',',
     external_location = 's3://bucket/csv/'
         ) 
 AS SELECT col_1, col_2, col_3, col_4
 FROM parquet_table ;

После этого вы можете фактически удалить временную таблицу csv и использовать только файлы CSV в каталоге s3: // bucket / csv / и делать больше, например, используя лямбду-триггер S3.работать и делать что-то еще или подобное.

Помните, что всего этого можно добиться с помощью лямбды, взаимодействующей с Афиной (пример здесь ), а также имейте в виду, что у нее есть разъем ODBC и PyAthenaиспользовать его из Python или других опций, поэтому использование Athena через Lambda или Консоль AWS - не единственный вариант, который вы можете использовать, если вы хотите автоматизировать это другим способом.

Надеюсь, это поможет.

Дополнительное редактирование, 25 сентября 2019 года. Отвечая на ваш вопрос о том, как это сделать в Pandas, я думаю, что лучшим способом было бы использовать Glue Python Shell, ноВы упомянули, что не хотите его использовать.Итак, если вы решите, вот основной пример того, как:

import pandas as pd
import boto3
from awsglue.utils import getResolvedOptions
from boto3.dynamodb.conditions import Key, Attr

args = getResolvedOptions(sys.argv,
                          ['region',
                           's3_bucket',
                           's3_input_folder',
                           's3_output_folder'])

## @params and @variables: [JOB_NAME]
##    Variables used for now. Job input parameters to be used.
s3Bucket = args['s3_bucket']
s3InputFolderKey = args['s3_input_folder']
s3OutputFolderKey = args['s3_output_folder']

## aws Job Settings
s3_resource = boto3.resource('s3')
s3_client = boto3.client('s3')
s3_bucket = s3_resource.Bucket(s3Bucket)

for s3_object in s3_bucket.objects.filter(Prefix=s3InputFolderKey):
    s3_key = s3_object.key
    s3_file = s3_client.get_object(Bucket=s3Bucket, Key=s3_key)

    df = pd.read_csv(s3_file['Body'], sep = ';')

    partitioned_path = 'partKey={}/year={}/month={}/day={}'.format(partKey_variable,year_variable,month_variable,day_variable)
    s3_output_file = '{}/{}/{}'.format(s3OutputFolderKey,partitioned_path,s3_file_name)

    # Writing file to S3 the new dataset:
    put_response = s3_resource.Object(s3Bucket,s3_output_file).put(Body=df)

Карлос.

0 голосов
/ 24 сентября 2019
  • Все зависит от требований вашего бизнеса, какие действия вы хотите предпринять, например, асинхронный или синхронный вызов.

  • Вы можете запускать лямбда пример github на корзине s3 асинхронно, когда файл паркета поступает в указанную корзину. aws s3 doc

  • Вы можете настроить службу s3 на отправку уведомления SNS или SQS, а также при добавлении / удалении объекта из корзины, которая, в свою очередь, можетзатем вызовите лямбду для обработки файла Запуск уведомления .

  • Асинхронно можно запускать лямбду каждые 5 минут, планируя события облачных часов aws Наилучшее разрешение с использованием выражения cron - минута.

  • Синхронный запуск лямбды через HTTPS (конечная точка API REST) ​​ с использованием API-шлюза.

  • Также стоит проверить, насколько велик ваш файл Parquet в виде лямбдыможет работать максимум 15 минут, т. е. 900 секунд.

  • Стоит также проверить эту страницу Использование AWS Lambda с другими службами

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...