ошибка соединения флинка кафки показывает NoSuchMethodError - PullRequest
0 голосов
/ 23 января 2020

новая ошибка появилась, когда я перехожу с flinkkafkaconsumer09 на flinkkafkaconsumer Flink code:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;

@SuppressWarnings("deprecation")
public class ReadFromKafka {


  public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test-consumer-group");


    DataStream<String> stream = env
            .addSource(new FlinkKafkaConsumer<String>("test4", new SimpleStringSchema(), properties));

    stream.map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;

      @Override
      public String map(String value) throws Exception {
        return "Stream Value: " + value;
      }
    }).print();

    env.execute();
  }


}

ОШИБКА: log4j: ПРЕДУПРЕЖДЕНИЕ. Не удалось найти никаких дополнений для регистратора (org. apache .flink.api. java .ClosureCleaner). log4j: WARN Пожалуйста, правильно инициализируйте систему log4j. log4j: WARN См. http://logging.apache.org/log4j/1.2/faq.html#noconfig для получения дополнительной информации. Исключение в потоке "главная" организация. apache .flink.runtime.client.JobExecutionException: Не удалось выполнить задание. в орг. apache .flink.runtime.jobmaster.JobResult.toJobExecutionResult (JobResult. java: 146) в орг. apache .flink.runtime.minicluster.MiniCluster.executeJobBlocking (MiniCluster. java: 6) в орг. apache .flink.streaming.api.environment.LocalStreamEnvironment.execute (LocalStreamEnvironment. java: 117) в орг. apache .flink.streaming.api.environment.StreamExecutionEnvironment.execute (StreamExecutionEnvironment. java: 1507) в орг. apache .flink.streaming.api.environment.StreamExecutionEnvironment.execute (StreamExecutionEnvironment. java: 1489) в ReadFromKafka.main (ReadFromKafka. java: 33) Вызывается: org. apache .kafka.common.errors.TimeoutException: истекло время ожидания при получении метаданных topi c

pom. xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.dataartisans</groupId>
  <artifactId>kafka-example</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>kafkaex</name>
  <description>this is flink kafka example</description>
  <dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.9.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.9.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.9.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.12</artifactId>
        <version>1.9.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.9.1</version>
</dependency>

    <dependency>
        <groupId>com.googlecode.json-simple</groupId>
        <artifactId>json-simple</artifactId>
        <version>1.1</version>
    </dependency>  
</dependencies>
</project>

1 Ответ

0 голосов
/ 23 января 2020

flink-connector-kafka_2.12 не совместимо с FlinkKafkaConsumer09.

flink-connector-kafka_2.12 - это «универсальный» разъем kafka, скомпилированный для использования с Scala 2.12. Этот универсальный разъем можно использовать с любой версией Kafka начиная с версии 0.11.0.

FlinkKafkaConsumer09 предназначен для использования с Kafka 0.9.x. Если ваш брокер Kafka использует Kafka 0.9.x, вам потребуется flink-connector-kafka-0.9_2.11 или flink-connector-kafka-0.9_2.12, в зависимости от того, какую версию Scala вы хотите.

С другой стороны, если ваш брокер Kafka является запустив последнюю версию Kafka (0.11.0 или новее), придерживайтесь flink-connector-kafka_2.12 и используйте FlinkKafkaConsumer вместо FlinkKafkaConsumer09.

См. документацию для получения дополнительной информации.

...