Я пытаюсь прочитать kafka, используя spark, но, похоже, столкнулся с некоторой библиотечной проблемой.
Я помещаю какое-то событие в темы kafka, которые я могу прочитать через потребителя консоли kafka, но не могу прочитать через spark. Я использую библиотеку spark-sql-kafka, и проект написан на maven. Версия Scala - 2.11.12, версия spark - 2.4.3.
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.3</version>
<scope>provided</scope>
</dependency>
Мой код Java ниже: -
SparkSession spark = SparkSession.builder()
.appName("kafka-tutorials")
.master("local[*]")
.getOrCreate();
Dataset<Row> rows = spark.readStream().
format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "meetup-trending-topics")
.option("startingOffsets", "latest")
.load();
rows.writeStream()
.outputMode("append")
.format("console")
.start();
spark.streams().awaitAnyTermination();
spark.stop();
Ниже приведено сообщение об ошибке, которое я получаю: -
Исключение в потоке "main" org.apache.spark.sql.AnalysisException: Не удалось найти источник данных: kafka. Пожалуйста, разверните приложение согласно разделу «Руководство по интеграции структурированной потоковой передачи + Kafka» .;
в org.apache.spark.sql.execution.datasources.DataSource $ .lookupDataSource (DataSource.scala: 652)
в org.apache.spark.sql.streaming.DataStreamReader.load (DataStreamReader.scala: 161)
Решение: -
Любой из обоих: 1) создать uber jar или ii) --packages org.apache.spark: spark-sql-kafka-0-10_2.11: 2.4.3
Ранее я давал --packages org.apache.spark: spark-sql-kafka-0-10_2.11: 2.4.3 после основного класса.