Apache Beam + Apache Flink + AWS S3 - PullRequest
       76

Apache Beam + Apache Flink + AWS S3

0 голосов
/ 11 июня 2018

Я пытаюсь написать простой пример подсчета слов в Apache Beam и запустить его, используя прямой бегунок, бегущий бегунок и искровой бегун.Мой ввод и вывод выполняются на AWS S3, и мой кластер Flink / Spark работает на локальном компьютере с использованием докера.

Код работает как для прямого, так и для искрового бегуна, но я получаю:

org.apache.

Мне не пришлось вносить абсолютно никаких изменений в конфигурацию для spark, и просто настройка AWSOptions в коде луча работала без проблем.Но тот же код дает вышеуказанную ошибку с Flink.Итак, я попытался настроить Flink для чтения / записи с S3, используя инструкции из https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended, но это не помогло.

Глядя на сообщение об ошибке, становится ясно, что Flink не настроен должным образом для файловой системы S3, но я пока не могу найти способ решить эту проблему.

Есть какие-нибудь указатели ???Вот мой основной метод:

public static void main( String[] args ) throws IOException
{
    S3WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(S3WordCountOptions.class);
    options.as(AwsOptions.class).setAwsCredentialsProvider(getAWSCredentialProvider(options.getAwsKey(), options.getAwsSecret()));
    options.as(AwsOptions.class).setAwsRegion(options.getAwsRegion());
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    .apply(FlatMapElements
            .into(TypeDescriptors.strings())
            .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
    .apply(Filter.by((String word) -> !word.isEmpty()))
    .apply(Count.perElement())
    .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

    p.run().waitUntilFinish();
}
...