Читайте из Кафки в Flink Scala Shell - PullRequest
1 голос
/ 11 марта 2019

Я пытаюсь подключиться и читать с Kafka (2.1) на моем локальном компьютере, в оболочке scala, которая поставляется с Flink (1.7.2).

Вот что я делаю:

:require flink-connector-kafka_2.11-1.7.1.jar
:require flink-connector-kafka-base_2.11-1.7.1.jar

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import java.util.Properties

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()

После последнего утверждения я получаю следующую ошибку:

scala> var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
<console>:69: error: overloaded method value addSource with alternatives:
  [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] <and>
  [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
 cannot be applied to (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[String])
   var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()

Я создал тему с именем "topic" и могу создавать и читать сообщения.от него через другого клиента правильно.Я использую Java версии 1.8.0_201 и следую инструкциям https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html.

Любая помощь, что может быть не так?

Ответы [ 2 ]

1 голос
/ 11 марта 2019

Некоторые зависимости требуют других зависимостей, неявно. Обычно мы используем некоторые менеджеры зависимостей, такие как maven или sbt , и когда мы добавляем некоторые зависимости в проект, менеджер зависимостей предоставляет свои неявные зависимости в фоновом режиме.

С другой стороны, когда вы используете оболочки, в которых нет менеджера зависимостей, вы несете ответственность за предоставление зависимостей вашего кода. При использовании соединителя Flink Kafka явно требуется jar Flink Connector Kafka, но вы должны заметить, что Flink Connector Kafka также нужны некоторые зависимости. Вы можете найти его зависимости внизу страницы , которая находится в разделе Зависимости компиляции . Итак, начиная с этого предисловия, я добавил следующие jar-файлы в каталог FLINK_HOME/lib (Flink classpath):

flink-connector-kafka-0.11_2.11-1.4.2.jar
flink-connector-kafka-0.10_2.11-1.4.2.jar    
flink-connector-kafka-0.9_2.11-1.4.2.jar   
flink-connector-kafka-base_2.11-1.4.2.jar  
flink-core-1.4.2.jar                                         
kafka_2.11-2.1.1.jar
kafka-clients-2.1.0.jar

и я мог бы успешно использовать сообщения Kafka, используя следующий код в оболочке Flink:

scala> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

scala> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

scala> import java.util.Properties
import java.util.Properties

scala> val properties = new Properties()
properties: java.util.Properties = {}

scala> properties.setProperty("bootstrap.servers", "localhost:9092")
res0: Object = null

scala> properties.setProperty("group.id", "test")
res1: Object = null

scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
warning: there was one deprecation warning; re-run with -deprecation for details
stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091

scala> senv.execute("Kafka Consumer Test")
Submitting job with JobID: 23e3bb3466d914a2747ae5fed293a076. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:40093/user/jobmanager#1760995711] with leader session id 00000000-0000-0000-0000-000000000000.
03/11/2019 21:42:39 Job execution switched to status RUNNING.
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 
03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 
hello
hello

Кроме того, еще один способ добавить некоторые файлы jar в путь к классу Flink - это передать файлы jar в качестве аргументов для команды запуска оболочки Flink:

bin/start-scala-shell.sh local "--addclasspath <path/to/jar.jar>"

Тестовая среда:

Flink 1.4.2
Kafka 2.1.0
Java  1.8 201
Scala 2.11
0 голосов
/ 11 марта 2019

Скорее всего, вы должны импортировать имплициты Флинка по Scala перед добавлением источника:

import org.apache.flink.streaming.api.scala._
...