Я пытаюсь написать простой пример подсчета слов в Apache Beam и запустить его, используя прямой бегунок, бегущий бегунок и искровой бегун.Мой ввод и вывод выполняются на AWS S3, и мой кластер Flink / Spark работает на локальном компьютере с использованием докера.
Код работает как для прямого, так и для искрового бегуна, но я получаю:
org.apache.
Мне не пришлось вносить абсолютно никаких изменений в конфигурацию для spark, и просто настройка AWSOptions в коде луча работала без проблем.Но тот же код дает вышеуказанную ошибку с Flink.Итак, я попытался настроить Flink для чтения / записи с S3, используя инструкции из https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html#shaded-hadooppresto-s3-file-systems-recommended, но это не помогло.
Глядя на сообщение об ошибке, становится ясно, что Flink не настроен должным образом для файловой системы S3, но я пока не могу найти способ решить эту проблему.
Есть какие-нибудь указатели ???Вот мой основной метод:
public static void main( String[] args ) throws IOException
{
S3WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(S3WordCountOptions.class);
options.as(AwsOptions.class).setAwsCredentialsProvider(getAWSCredentialProvider(options.getAwsKey(), options.getAwsSecret()));
options.as(AwsOptions.class).setAwsRegion(options.getAwsRegion());
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.perElement())
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}