Получение ошибки "java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign" при попытке использования с использованием Flink Kafka Consumer - PullRequest
0 голосов
/ 18 мая 2018

Я пытался написать Kafka Consumer, который потребляет данные из темы.Но всякий раз, когда я пытаюсь запустить его, я получаю следующую ошибку.

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:39)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:391)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:229)

Класс Java:

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;


public class KafkaConsumer {
public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<String>("rdf-new", new SimpleStringSchema(), parameterTool.getProperties()));

    stream.print();
    env.execute();
}}

Я создал автономный проект (с собственным pom) в intellij с тем же кодом, и он работал нормально, но так как мне потребовалосьКод в другом проекте Я создал новый модуль Maven в уже существующем проекте, а затем попытался запустить его, и теперь он показывает мне эту ошибку.

Зависимости в pom.xml для модуля maven:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.4.2</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.11</scala.binary.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

Единственное, что я заметил, было внутри модуля maven, я получал KafkaVersion как 1.1.0, ноу pom есть KafkaConnector "flink-connector-kafka-0.9_2.11"

2018-05-18 11:14:56,105 - AbstractConfig                           [WARN] - ConsumerConfig            - The configuration 'zookeeper.connect' was supplied but isn't a known config. 
2018-05-18 11:14:56,105 - AppInfoParser$AppInfo                    [INFO] - AppInfoParser             - Kafka version : 1.1.0                              
2018-05-18 11:14:56,105 - AppInfoParser$AppInfo                    [INFO] - AppInfoParser             - Kafka commitId : fdcf75ea326b8e07  

В то время как в автономном проекте (где потребитель работает должным образом), версия Kafka составляет 0.9.0.1.

11:32:19,537 WARN  org.apache.kafka.clients.consumer.ConsumerConfig              - The configuration zookeeper.connect = localhost:2181 was supplied but isn't a known config.
11:32:19,537 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 0.9.0.1
11:32:19,538 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : 23c69d62a0cabf06

Было бы очень полезно, если бы кто-то мог сказать мне, в чем может быть проблема?Это может быть из-за зависимостей в файле pom, но в автономном проекте также те же самые зависимости, которые я дал. Заранее спасибо.

1 Ответ

0 голосов
/ 21 мая 2018

Как вы уже поняли, проблема в том, что в вашем модуле версия kafka (1.0) не соответствует версии, которую ожидает соединитель flink (0.9).

Вы можете сделать:

mvn dependency:tree

в командной строке, чтобы выяснить, откуда исходит версия зависимости клиентов kafka.

В pom вашего модуля вы можете добавить раздел dependencyManagement, чтобы переопределить версию зависимости клиентской библиотеки kafka к той, которую вы используете.нужно, вот так:

<dependencyManagement>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>
</dependencyManagement>
...