Я пытаюсь писать на S3, используя acceptRole через FileIO с ParquetIO - PullRequest
2 голосов
/ 18 июня 2020

Шаг 1: AssumeRole

public static AWSCredentialsProvider getCredentials() {
        if (roleARN.length() > 0) {
            STSAssumeRoleSessionCredentialsProvider credentialsProvider = new STSAssumeRoleSessionCredentialsProvider
                    .Builder(roleARN, Constants.SESSION_NAME)
                    .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient())
                    .build();
            return credentialsProvider;
        }
        return new ProfileCredentialsProvider();
    }

Шаг 2: Установите учетные данные для конвейера

credentials = getCredentials();
pipeline.getOptions().as(AwsOptions.class).setAwsRegion(Regions.US_WEST_2.getName());
pipeline.getOptions().as(AwsOptions.class).setAwsCredentialsProvider(new AWSStaticCredentialsProvider(new BasicAWSCredentials(credentials.getCredentials().getAWSAccessKeyId(), credentials.getCredentials().getAWSAccessKeyId())));

Шаг 3: Запустите конвейер для записи в s3

PCollection<GenericRecord> parquetRecord = formattedEvent
        .apply("ParquetRecord", ParDo.of(new ParquetWriter()))
        .setCoder(AvroCoder.of(getOutput_schema()));

parquetRecord.apply(FileIO.<GenericRecord, GenericRecord>writeDynamic()
        .by(elm -> elm)
        .via(ParquetIO.sink(getOutput_schema()))
        .to(outputPath).withNumShards(1)
        .withNaming(type -> FileNaming.getNaming("part", ".snappy.parquet", "" + DateTime.now().getMillisOfSecond()))
        .withDestinationCoder(AvroCoder.of(getOutput_schema())));

I я использую 'org.apache.beam:beam-sdks-java-io-parquet:jar:2.22.0' и 'org.apache.beam:beam-sdks-java-io-amazon-web-services:jar:2.22.0'

Проблема : в настоящее время acceptRole кажется не работает.

Ошибки :

org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records.

или

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected IOException (of type java.io.IOException): Failed to serialize and deserialize property 'awsCredentialsProvider' with value 'com.amazonaws.auth.InstanceProfileCredentialsProvider@71262020'

Ответы [ 2 ]

0 голосов
/ 06 июля 2020

В недавнем выпуске луча (2.24.0) появилась возможность взять на себя роль.

0 голосов
/ 19 июня 2020

откуда вы запускаете этот конвейер (в учетной записи AWS?), Если да, то лучше предоставить предполагаемый ролевой доступ к роли, которая запускает конвейер, а затем из конвейера FileIO будет просто использовать значение по умолчанию AWS Клиент.

Лучше вынести операцию «Предполагаемая роль» из конвейера и просто предоставить разрешения S3 для роли, запускающей конвейер.

...