Flink: java.lang.ClassNotFoundException: org.apache.flink.api.common.serialization.DeserializationSchema - PullRequest
0 голосов
/ 06 декабря 2018

Использование Flink для получения данных из AWS Kinesis.

Мои коды:

public class kinesisConsumer {
public static void main(String[] args) throws Exception {
    Properties kinesisConsumerConfig = new Properties();
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "cn-northwest-1");
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aaa");
    kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "bbb");
    kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>(
            "test_data",
            new SimpleStringSchema(),
            kinesisConsumerConfig));

    kinesis.print();

    see.execute();
    }
}

Мой файл pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ne</groupId>
    <artifactId>kinesis</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>kinesisConsumer</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.5.2</version>
        </dependency>
    </dependencies>
</project>

Также я загружаю и добавляю flink-connector-kinesis_2.11-1.6.2.jar в библиотеку.

Затем я упаковываю ее, используя: mvn -DskipTests package.Но когда я запускаю флягу, она возвращает:

    Error: A JNI error has occurred, please check your installation and try again
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/common/serialization/DeserializationSchema
            at java.lang.Class.getDeclaredMethods0(Native Method)
            at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
            at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
            at java.lang.Class.getMethod0(Class.java:3018)
            at java.lang.Class.getMethod(Class.java:1784)
            at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
            at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.serialization.DeserializationSchema
            at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
            ... 7 more

Мои вопросы:

  1. Почему возникает эта ошибка и как ее устранить?

  2. Я потребляю данные, а затем распечатываю их, не сохраняя их на диск. Почему программа использует коды сериализации?

Любыепомощь приветствуется.

...