Расположение AWS Glue Crawl Dynamic S3 Path - PullRequest
0 голосов
/ 04 апреля 2019

Я создаю задание ETL в AWS Glue, которое будет извлекать из местоположения S3 самые последние изменения или текущие данные для каждой сущности в хранилище. Данные в хранилище представляют собой исторический отчет обо всех изменениях для сущностей. Каждый день я запускаю ETL, и он записывает в другое место S3, т.е. Bucket / path / to / files / current_date / ..., где текущая дата является динамической и соответствует дате запуска ETL.

Проблема, с которой я сталкиваюсь, заключается в том, что я не могу программно удалить из S3 (организационное ограничение) или переместить файлы как копии и удалить их за кулисами, чтобы это также не удавалось, оставляя единственный путь для клея для сканирования. Я хотел бы настроить сканер таким образом, чтобы часть пути была динамической, но я не смог найти способ сделать это - кто-нибудь знает, возможно ли это?

Мои данные разделены на run_date (см. Текущую дату выше), а также на 6 других иерархических разделов. Я создаю сканеры и задания ETL через CloudFormation, язык yaml. Пути для сканеров хранятся в виде параметров ssm, определенных в сценариях CloudFormation.

Пример параметра SSM пути

S3CurrentPath:
    Type: AWS::SSM::Parameter
    Properties:
      Description: "Path in the S3 Lake where the current entity data is stored."
      Type: String
      Value: 'Data/Entities/Software/SoftwareCurrent'
      Name: "/org/member/local/s3/path/entityCurrent"

Код ресурса сканера:

GenericCrawler:
    Type: AWS::Glue::Crawler
    Properties:
      Role: !Ref RoleNAme
      Name: !Sub "${ProfileName}-crawler-${CrawlerName}"
      Configuration: !Sub |
        {
          "Version": 1.0,
          "CrawlerOutput": {
            "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" },
            "Tables": { "AddOrUpdateBehavior": "MergeNewColumns" }
          }
        }
      Description: !Ref CrawlerDescription
      DatabaseName: !Ref DatabaseName
      Targets:
        S3Targets:
          - Path: !Sub "s3://${S3DataBucket}/${S3Path}"

ETL DataSink Write Code:

# Write the joined dynamic frame out to a datasink
        datasink = glueContext.write_dynamic_frame.from_options(
                frame = final_dynamic_frame, connection_type = "s3",
                connection_options = {
                    'path': 's3://{lakeBucketName}/{lakePath}/'.format(
                        lakeBucketName=args['lakeBucketName'],
                        lakePath=args['lakeDestinationPath']),
                        "partitionKeys": ['run_date','location','year','month','day','hour','timestamp']},
                format = "parquet",
                transformation_ctx = "datasink")

Я надеюсь, что сканер будет просматривать самую последнюю дату в репозитории, то есть самую свежую "папку" раздела run_date, и сканировать ее, не оглядываясь на более старые данные.

Пожалуйста, дайте мне знать, если вы хотите увидеть больше кода - я буду рад провести санитарную обработку и предоставить.

1 Ответ

1 голос
/ 05 апреля 2019

Если честно, я не нашел способа чтения / записи данных в динамические пути с помощью AWS Glue.Что я обычно делаю, так это чтение / запись с использованием методов PySpark:

datasink.write.\
        format("com.databricks.spark.csv").\
        option("header", "true").\
        mode("overwrite").\
        save("s3://my-bucket/files/" + current_date + "*.csv")

Вы даже можете указать этому методу только чтение / запись файлов определенного типа (например, CSV).PySpark имеет больше опций и доступных методов, чем AWS Glue, поэтому здесь больше гибкости.Кроме того, я добавляю запись ключ / значение в таблицу DynamoDB, чтобы сохранить запись самой последней даты.

...