Как исправить ошибку Flink: невозможно определить аргументы типа для FlinkKafkaConsumer011 <> ' - PullRequest
2 голосов
/ 23 мая 2019

Я следую примерам использования Flink с Kafka.Я нахожу только такие результаты, как эта страница , которые не компилируются правильно и дают сообщения об ошибках, которые трудно найти.

Обычно, когда я пытаюсь скомпилировать этот фрагмент, я получаю сообщение об ошибке:

import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public final class Main {

    public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
            String topic, String kafkaAddress, String kafkaGroup ) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaAddress);
        props.setProperty("group.id",kafkaGroup);
        FlinkKafkaConsumer011<String> consumer =
                new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);

        return consumer;
    }
}

Вот мои зависимости и такие в build.gradle файле:

group 'myapp'
version '1.0-SNAPSHOT'

apply plugin: 'java'
sourceCompatibility = 1.8

repositories {
  jcenter()
}

dependencies {
  ecj 'org.eclipse.jdt.core.compiler:ecj:4.6.1'
  compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.2.0'
  compile group: 'org.apache.flink', name: 'flink-java', version: '1.5.0'
  compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.5.0'
  compile group: 'org.apache.flink', name: 'flink-avro', version: '1.8.0'
  compile group: 'org.apache.flink', name: 'flink-core', version: '1.5.0'
  compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.11_2.11', version: '1.5.0'

  compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '1.1.0'
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.1.0'

  compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
}

Вот ошибка при использовании инструмента сборки для запуска кода:

$ gradle build
> Task :compileJava FAILED
/Users/john/dev/john/flink-example/src/main/java/com/company/opi/flinkexample/Main.java:55: error: cannot infer type arguments for FlinkKafkaConsumer011<>
                new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
                ^
Note: /Users/john/dev/john/flink-example/src/main/java/com/company/opi/flinkexample/EnvironmentConfig.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 error


FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':compileJava'.
> Compilation failed; see the compiler error output for details.

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.

* Get more help at https://help.gradle.org

BUILD FAILED in 3s
1 actionable task: 1 executed

Вот ссылка на его исходный код .

1 Ответ

0 голосов
/ 23 мая 2019

Одна проблема: все используемые вами библиотеки flink должны иметь одинаковый номер версии - вы, похоже, смешиваете версии 1.2.0, 1.5.0 и 1.8.0. Ниже приведены обновленные зависимости и исходный код, которые будут правильно скомпилированы.

(build.gradle)

group 'myapp'
version '1.0-SNAPSHOT'

apply plugin: 'java'
sourceCompatibility = 1.8

repositories {
  jcenter()
}

dependencies {
  compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.8.0'
  compile group: 'org.apache.flink', name: 'flink-java', version: '1.8.0'
  compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.8.0'
  compile group: 'org.apache.flink', name: 'flink-avro', version: '1.8.0'
  compile group: 'org.apache.flink', name: 'flink-core', version: '1.8.0'
  compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.12', version: '1.8.0'

  compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
}

(workingCode.java)

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public final class Main {

    public static FlinkKafkaConsumer<String> createStringConsumerForTopic(
            String topic, String kafkaAddress, String kafkaGroup ) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaAddress);
        props.setProperty("group.id",kafkaGroup);
        FlinkKafkaConsumer011<String> consumer =
                new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);

        return consumer;
    }
}

Кроме того, но это не связано с ошибкой компиляции, поскольку вы используете Kafka 1.1, вы также можете использовать более свежую версию соединителя Flink Kafka, а не ту, которая предназначена для Kafka 0.11. FlinkKafkaConsumer (класс без номера версии в названии) является подходящим соединителем для Kafka 1.0.0 и более поздних версий.

...