Как добавить коннектор Kafka, чтобы прошивать через sbt IntelliJ? Хочу добавить универсальный разъем для поддержки Кафки 2.3 - PullRequest
1 голос
/ 27 февраля 2020

Ниже приведена конфигурация SBT

// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.1"

// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.1"

// https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala-bridge
libraryDependencies += "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.9.1"

// https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner
libraryDependencies += "org.apache.flink" %% "flink-table-planner" % "1.9.1"

// https://mvnrepository.com/artifact/org.apache.flink/flink-table-common
libraryDependencies += "org.apache.flink" % "flink-table-common" % "1.9.1"

// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-base
//libraryDependencies += "org.apache.flink" %% "flink-connector-kafka_2.11" % "1.9.1" -- This not working and throwing unable to connect error.

Поскольку я не смог добавить flink-connector-kafka через sbt, я скачал jar-файл и поместил его в папку lib (made lib) в моем sbt проект. Проект sbt создается через IntelliJ, и только я добавил папку lib вручную.

Теперь, когда я импортирую пакет соединителя kafka, т.е. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer работает нормально.

Теперь ниже мой код для использования из Kafka

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._

object KafkaFlink {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
//    properties.setProperty("bootstrap.servers", "localhost:9092")
//    // only required for Kafka 0.8
//    properties.setProperty("zookeeper.connect", "localhost:2181")
//    properties.setProperty("group.id", "test")
    val properties1 = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "test")
    val topic = "flink-fault-testing"
    val flinkKafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties1)

    val value: DataStream[String] = env.addSource(flinkKafkaConsumer)
  }

}

Я не могу скомпилировать, потому что получаю ошибку не могу разрешить перегруженный метод "addSource"

Пожалуйста, предоставьте входные данные, чтобы где я делаю неправильно.

Также, если есть способ получить универсальный разъем flink-kafka напрямую через build.sbt IntelliJ

1 Ответ

1 голос
/ 27 февраля 2020

То, что вы хотите указать в вашей конфигурации SBT, это:

libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.1"

Если вы пропустите "_2.11", тогда оно должно работать. Это указывает, какую scala версию использовать, и SBT обрабатывает scala версию самого себя.

Не уверен, почему ваш код не компилируется. Выглядит хорошо для меня.

...