Я знаю, что это может быть моей проблемой, но я пытаюсь ее решить некоторое время.
Я пытаюсь запустить flink в кластере AWS EMR.
Моя настройка:
Событие временного ряда из Kinesis -> Flink Job -> Сохранить его на S3
DataStream<Event> kinesis =
env.addSource(new FlinkKinesisConsumer< (this.streamName, new EventSchema(), kinesisConsumerConfig)).name("source");
final StreamingFileSink<Event> streamingFileSink =
StreamingFileSink.<Event>forRowFormat(
new org.apache.flink.core.fs.Path("s3a://"+ this.bucketName + "/" + this.objectPrefix),
new SimpleStringEncoder<>("UTF-8"))
.withBucketAssignerAndPolicy(new OrgIdBucketAssigner(), DefaultRollingPolicy.create().build())
.build();
DataStream<Event> eventDataStream = kinesis
.rebalance()
.keyBy(createKeySelectorByChoosingOrgIdFromTheEvent())
.process(new KeyedProcessFunction<String, Event, Event>() {
@Override
public void processElement(Event value, Context ctx, Collector<DeviceEvent> out) throws Exception {
out.collect(value);
}
});
eventDataStream.addSink(streamingFileSink).name("streamingFileSink");
с одного из сайтов,
https://www.mail-archive.com/user@flink.apache.org/msg25039.html
UPDATE:
Я узнал, что для того, чтобы заставить StreamingFileSink работать, нужно поместить папку jar flink-s3-fs-hadoop-1.7.1.jar
в /usr/lib/flink/lib
.
Моя папка /usr/lib/flink/lib
в главном узле EMR выглядит следующим образом
-rw-r--r-- 1 root root 9924 Mar 20 01:06 slf4j-log4j12-1.7.15.jar
-rw-r--r-- 1 root root 42655628 Mar 20 01:06 flink-shaded-hadoop2-uber-1.7.1.jar
-rw-r--r-- 1 root root 483665 Mar 20 01:06 log4j-1.2.17.jar
-rw-r--r-- 1 root root 140172 Mar 20 01:06 flink-python_2.11-1.7.1.jar
-rw-r--r-- 1 root root 92070994 Mar 20 01:08 flink-dist_2.11-1.7.1.jar
-rw-r--r-- 1 root root 23451686 May 5 23:04 flink-s3-fs-hadoop-1.7.1.jar
Когда я пытаюсь запустить задание Flink, оно вызывает приведенное ниже исключение в подчиненных устройствах EMR.
2019-05-06 01:43:49,589 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess -> Sink: streamingFileSink (3/4) (31000a186f6ab11f0066556116c669ba) switched from RUNNING to FAILED.
java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:374)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:553)
at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:531)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.newAmazonS3Client(DefaultS3ClientFactory.java:80)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.createS3Client(DefaultS3ClientFactory.java:54)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:256)
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Можете ли вы, пожалуйста, дайте мне знать, что является основным, что я скучаю?