Использование StreamingFileSink вызывает NoClassDefFoundError - PullRequest
0 голосов
/ 06 мая 2019

Я знаю, что это может быть моей проблемой, но я пытаюсь ее решить некоторое время. Я пытаюсь запустить flink в кластере AWS EMR.

Моя настройка: Событие временного ряда из Kinesis -> Flink Job -> Сохранить его на S3

    DataStream<Event> kinesis =
                env.addSource(new FlinkKinesisConsumer< (this.streamName, new EventSchema(), kinesisConsumerConfig)).name("source");
    final StreamingFileSink<Event> streamingFileSink =
                StreamingFileSink.<Event>forRowFormat(
                        new org.apache.flink.core.fs.Path("s3a://"+ this.bucketName + "/" + this.objectPrefix),
                        new SimpleStringEncoder<>("UTF-8"))
                        .withBucketAssignerAndPolicy(new OrgIdBucketAssigner(), DefaultRollingPolicy.create().build())
                        .build();

DataStream<Event> eventDataStream =  kinesis
                .rebalance()
                .keyBy(createKeySelectorByChoosingOrgIdFromTheEvent())
                .process(new KeyedProcessFunction<String, Event, Event>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<DeviceEvent> out) throws Exception {
                        out.collect(value);
                    }
                });
eventDataStream.addSink(streamingFileSink).name("streamingFileSink");

с одного из сайтов, https://www.mail-archive.com/user@flink.apache.org/msg25039.html

UPDATE: Я узнал, что для того, чтобы заставить StreamingFileSink работать, нужно поместить папку jar flink-s3-fs-hadoop-1.7.1.jar в /usr/lib/flink/lib. Моя папка /usr/lib/flink/lib в главном узле EMR выглядит следующим образом

-rw-r--r-- 1 root root     9924 Mar 20 01:06 slf4j-log4j12-1.7.15.jar
-rw-r--r-- 1 root root 42655628 Mar 20 01:06 flink-shaded-hadoop2-uber-1.7.1.jar
-rw-r--r-- 1 root root   483665 Mar 20 01:06 log4j-1.2.17.jar
-rw-r--r-- 1 root root   140172 Mar 20 01:06 flink-python_2.11-1.7.1.jar
-rw-r--r-- 1 root root 92070994 Mar 20 01:08 flink-dist_2.11-1.7.1.jar
-rw-r--r-- 1 root root 23451686 May  5 23:04 flink-s3-fs-hadoop-1.7.1.jar

Когда я пытаюсь запустить задание Flink, оно вызывает приведенное ниже исключение в подчиненных устройствах EMR.

2019-05-06 01:43:49,589 INFO  org.apache.flink.runtime.taskmanager.Task                     - KeyedProcess -> Sink: streamingFileSink (3/4) (31000a186f6ab11f0066556116c669ba) switched from RUNNING to FAILED.
java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:374)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:553)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:531)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.newAmazonS3Client(DefaultS3ClientFactory.java:80)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.createS3Client(DefaultS3ClientFactory.java:54)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:256)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

Можете ли вы, пожалуйста, дайте мне знать, что является основным, что я скучаю?

Ответы [ 2 ]

0 голосов
/ 10 мая 2019

Наконец я нашел причину. Хотя это очень и очень обманчиво. Flink использует AWS SDK версии 1.11.271. Класс S3ErrorResponseHandler, вызывающий NoClassDefFoundError ... имеет следующие статические переменные.

public class S3ErrorResponseHandler implements
        HttpResponseHandler<AmazonServiceException> {
    /** Shared logger for profiling information */
    private static final Log log = LogFactory
            .getLog(S3ErrorResponseHandler.class);

    /** Shared factory for creating XML event readers */
    private static final XMLInputFactory xmlInputFactory = XMLInputFactory
            .newInstance();

    private static enum S3ErrorTags {
        Error, Message, Code, RequestId, HostId
    };
   ....
   ...

В AWS SDK 1.11.272 инициализация XMLInputFactory была удалена. Это дало мне подсказку. Я перестроил библиотеку flink, переключившись на 1.11.272. Вуаля это начало работать. Это все еще оставляло мне несколько вопросов без ответа. Я выполнил тщательную отладку в Flink JVM, работающей в EMR. У classpath явно есть flink-s3-fs-hadoop-1.7.1.jar. Я написал код, чтобы прочитать эту банку и распечатать все ее записи, и я увидел S3ErrorResponseHandler. В моем операторе flink я мог бы инициализировать, как показано ниже -

XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance()

Загрузчик классов явно имел ссылки на jre libs. Тем не менее, Flink не смог инициализировать этот конкретный класс. Интересно, почему!! Это из-за того, как Флинк общается с операторами !!! Flink внизу сериализует операторов и передает их на подчиненные узлы. В подчиненных узлах операторы десериализуются, инициализируются и запускаются как задачи. Между этими различными фазами загрузчик классов Flink почему-то не имел доступа к реализации по умолчанию XMLInputFactory из JRE. Это очень странно! Также хотелось бы, чтобы JRE более конкретно указывал, какую статическую переменную он не мог инициализировать при загрузке класса. Должен ли я назвать это ошибкой во Flink? Никто не сообщил об этой ошибке в AWS EMR при использовании Flink?

Также отметили, что у flink relayse 1.7.1 есть ошибка с StreamingFlineSink. Если ваш кластер EMR имеет 2 монтирования, он пытается выполнить операцию чтения / записи на обоих, что приводит к сбою. Это было решено в 1.8.0 и выше. Однако AWS EMR по-прежнему использует версию 1.7.1 по умолчанию. Поэтому, пожалуйста, убедитесь, что вы поместили библиотеки 1.8.0 в / usr / lib / flink / lib

0 голосов
/ 06 мая 2019

Первое, что выглядело как неправильный взгляд, это то, что вы смешиваете JAR-файлы из разных версий Flink.Это из Flink 1.7.1

-rw-r--r-- 1 root root 42655628 Mar 20 01:06 flink-shaded-hadoop2-uber-1.7.1.jar
-rw-r--r-- 1 root root   140172 Mar 20 01:06 flink-python_2.11-1.7.1.jar
-rw-r--r-- 1 root root 92070994 Mar 20 01:08 flink-dist_2.11-1.7.1.jar

, в то время как это из Flink 1.8.0

-rw-r--r-- 1 root root 23451686 May  5 23:04 flink-s3-fs-hadoop-1.8.0.jar

Это не сработает;выбрать один релиз или другой.Обратите внимание, что 1.7.2 является последним выпуском исправления ошибок в серии 1.7.

Я также рекомендую прочитать этот раздел документации: Поток потокового файла: важные замечания для S3 .

...