Ошибка при подключении искровой структурированной потоковой передачи + кафка - PullRequest
0 голосов
/ 03 мая 2020

я пытаюсь подключить мою структурированную потоковую искру 2.4.5 с kafka, но все время, когда я пытаюсь использовать этот источник данных, возникает ошибка. Следуйте моему scala коду и моей сборке sbt:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

object streaming_app_demo {
  def main(args: Array[String]): Unit = {

    println("Spark Structured Streaming with Kafka Demo Application Started ...")

    val KAFKA_TOPIC_NAME_CONS = "test"
    val KAFKA_OUTPUT_TOPIC_NAME_CONS = "test"
    val KAFKA_BOOTSTRAP_SERVERS_CONS = "localhost:9092"


    val spark = SparkSession.builder
      .master("local[*]")
      .appName("Spark Structured Streaming with Kafka Demo")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    // Stream from Kafka
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS)
      .option("subscribe", KAFKA_TOPIC_NAME_CONS)
      .option("startingOffsets", "latest")
      .load()

    val ds = df
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "test2")
      .start()
  }
}

И ошибка:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
    at streaming_app_demo$.main(teste.scala:29)
    at streaming_app_demo.main(teste.scala)

И мой sbt.build:

name := "scala_212"

version := "0.1"

scalaVersion := "2.12.11"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.5" % "provided"

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0"

Спасибо!

1 Ответ

0 голосов
/ 03 мая 2020

Для spark structured streaming + kafka требуется эта библиотека spark- sql -kafka-0-10.

Вы получаете это исключение org.apache.spark.sql.AnalysisException: Failed to find data source: kafka, поскольку библиотека spark-sql-kafka недоступна в вашем пути к классам & It не удается найти org.apache.spark.sql.sources.DataSourceRegister в папке META-INF / services.

DataSourceRegister path inside jar file

/ org / apache / spark / spark- sql -kafka-0-10_2.11 /2.2.0/spark-sql-kafka-0-10_2.11-2.2.0.jar!/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

Update

Если вы используете SBT, попробуйте добавить ниже блок кода. Это будет включать в себя файл org.apache.spark.sql.sources.DataSourceRegister в вашей окончательной банке.

// META-INF discarding
assemblyMergeStrategy in assembly := {
  case PathList("META-INF","services",xs @ _*) => MergeStrategy.filterDistinctLines
  case PathList("META-INF",xs @ _*) => MergeStrategy.discard
  case "application.conf" => MergeStrategy.concat
  case _ => MergeStrategy.first
}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...