«Не найдена файловая система для схемы s3» при попытке чтения / записи с использованием Apache Beam - PullRequest
0 голосов
/ 27 августа 2018

Я начинаю использовать Apache Beam в проекте впервые, и я пытаюсь читать и записывать файлы Parquet в S3 и из S3 из кластера EMR в AWS.

Однако каждый раз, когда я пытаюсь выполнить свой код, я получаю только:

java.lang.IllegalArgumentException: No filesystem found for scheme s3
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:119)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:140)
at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:152)
at org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn.process(FileIO.java:636)

В документации не приводится ни одного примера, поэтому я понятия не имею, нужно ли что-то инициализировать где-либо в моем коде.

Я попытался проверить исходный код Beam, но, насколько я понимаю, класс FileSystems должен регистрировать все модули файловой системы, а мой pom.xml содержит модуль Beam Amazon Web Services (который, в свою очередь, содержит модуль AWS S3).

Единственный блок инициализации, который я делаю сейчас:

val options = PipelineOptionsFactory.create()
options.runner = SparkRunner::class.java
val pipeline = Pipeline.create(options)
...
val runner = SparkRunner.fromOptions(options)
runner.run(pipeline).waitUntilFinish()

Spark начинает работать правильно, вплоть до исключения.

Есть предложения?

1 Ответ

0 голосов
/ 19 декабря 2018

Полагаю, вам нужно создать собственный класс для учетных данных AWS, который будет представлять параметры работы Apache Beam.

BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);
YourCustomOptionsClass options = PipelineOptionsFactory.create().as(YourCustomOptionsClass.class);
options.as(AwsOptions.class).setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
options.as(AwsOptions.class).setAwsRegion(region);
options.setRunner(DataflowRunner.class);
options.setProject(projectId);

options.set... (All other options you need)

В моем коде YourCustomOptionClass реализует S3Options и DataflowPipelineOptions

Чтобы узнать больше о создании пользовательских опций, ознакомьтесь с документацией Apache Beam https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options

Другой полный пример, который может помочь:

https://github.com/asaharland/beam-pipeline-examples/tree/master/src/main/java/com/harland/example/batch

...