Некоторые зависимости требуют других зависимостей, неявно. Обычно мы используем некоторые менеджеры зависимостей, такие как maven или sbt , и когда мы добавляем некоторые зависимости в проект, менеджер зависимостей предоставляет свои неявные зависимости в фоновом режиме.
С другой стороны, когда вы используете оболочки, в которых нет менеджера зависимостей, вы несете ответственность за предоставление зависимостей вашего кода. При использовании соединителя Flink Kafka явно требуется jar Flink Connector Kafka
, но вы должны заметить, что Flink Connector Kafka
также нужны некоторые зависимости. Вы можете найти его зависимости внизу страницы , которая находится в разделе Зависимости компиляции . Итак, начиная с этого предисловия, я добавил следующие jar-файлы в каталог FLINK_HOME/lib
(Flink classpath):
flink-connector-kafka-0.11_2.11-1.4.2.jar
flink-connector-kafka-0.10_2.11-1.4.2.jar
flink-connector-kafka-0.9_2.11-1.4.2.jar
flink-connector-kafka-base_2.11-1.4.2.jar
flink-core-1.4.2.jar
kafka_2.11-2.1.1.jar
kafka-clients-2.1.0.jar
и я мог бы успешно использовать сообщения Kafka, используя следующий код в оболочке Flink:
scala> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
scala> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
scala> import java.util.Properties
import java.util.Properties
scala> val properties = new Properties()
properties: java.util.Properties = {}
scala> properties.setProperty("bootstrap.servers", "localhost:9092")
res0: Object = null
scala> properties.setProperty("group.id", "test")
res1: Object = null
scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
warning: there was one deprecation warning; re-run with -deprecation for details
stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091
scala> senv.execute("Kafka Consumer Test")
Submitting job with JobID: 23e3bb3466d914a2747ae5fed293a076. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:40093/user/jobmanager#1760995711] with leader session id 00000000-0000-0000-0000-000000000000.
03/11/2019 21:42:39 Job execution switched to status RUNNING.
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING
hello
hello
Кроме того, еще один способ добавить некоторые файлы jar в путь к классу Flink - это передать файлы jar в качестве аргументов для команды запуска оболочки Flink:
bin/start-scala-shell.sh local "--addclasspath <path/to/jar.jar>"
Тестовая среда:
Flink 1.4.2
Kafka 2.1.0
Java 1.8 201
Scala 2.11