Flink - необработанное исключение в кафке-производителе-сети-потоке - PullRequest
0 голосов
/ 12 сентября 2018

Я создаю потоковое задание, которое читает две темы кафки и пишет в одной теме кафки.Я работаю с этими версиями: flink 1.4.1, kafka_2.11-1.0.0 и flink-connector-kafka-0.11_2.11.

Иногда (это не систематически), у меня есть этот журнал:

KafkaThread.6648.1 - - |43| Uncaught exception in kafka-producer-network-thread | agr-client-id-INH_INH.FRQ-AGR-20180911XXXX-1536659128943: 
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1 
        at org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:583) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:705) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:443) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) ~[blob_p-a8baadf640b9f0a5ac1e2f13f859e91bbe2e801d-ec64eb83511ae1aba321b0d9d09a4744:na]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_161] 
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.NetworkClient$1 
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_161] 
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_161] 
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) ~[flink-dist_2.11-1.4.1.jar:1.4.1]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_161] 
        ... 6 common frames omitted

Здесь agr-client-id-INH_INH.FRQ-AGR-20180911XXXX-1536659128943 - это group.id моего продюсера.

У меня есть несколько рабочих мест, которые работают одновременно.Этот журнал может отображаться для одной работы, а не для других.У меня нет проблем с потребителями.Я чувствую, что этот журнал может появиться, даже если задание не записано в теме Кафки.

Мой pom.xml выглядит так:

<properties>
    <flink.version>1.4.1</flink.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-jdbc</artifactId>
        <version>${flink.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>*</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
                <execution>
                    <phase>install</phase>
                    <goals>
                        <goal>copy-dependencies</goal>
                    </goals>
                    <configuration>
                        <outputDirectory>target/lib</outputDirectory>
                        <overWriteReleases>false</overWriteReleases>
                        <overWriteSnapshots>true</overWriteSnapshots>
                        <excludeTransitive>true</excludeTransitive>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>${maven.shade.version}</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder. Otherwise, 
                                        this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                    <exclude>logback.xml</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.mypackage.MyClass</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Я хотел бы понять, почему этот журналпоявляется, его серьезность и как я могу это исправить ... Спасибо.

1 Ответ

0 голосов
/ 13 августа 2019

Скорее всего, эта ошибка произошла из-за невозможности подключения к брокеру Kafka, и причина может быть:

  • Ваш загрузочный сервер Kafka не работает.

  • Ваш загрузочный сервер Kafka находится на другом компьютере, и IP-адрес вашего компьютера не включен в белый список для сервера Kafka.

Попробуйте запустить локальноКафка и подключиться к нему, чтобы увидеть, если ошибка не устранена.Попробуйте запустить zookeeper на всех узлах, где есть Flink.

...