Невозможно прочитать кафку с помощью spark sql - PullRequest
0 голосов
/ 21 июня 2019

Я пытаюсь прочитать kafka, используя spark, но, похоже, столкнулся с некоторой библиотечной проблемой.

Я помещаю какое-то событие в темы kafka, которые я могу прочитать через потребителя консоли kafka, но не могу прочитать через spark. Я использую библиотеку spark-sql-kafka, и проект написан на maven. Версия Scala - 2.11.12, версия spark - 2.4.3.

            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.3</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.4.3</version>
            <scope>provided</scope>
        </dependency>

Мой код Java ниже: -

SparkSession spark = SparkSession.builder()
                .appName("kafka-tutorials")
                .master("local[*]")
                .getOrCreate();

        Dataset<Row> rows = spark.readStream().
                format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "meetup-trending-topics")
                .option("startingOffsets", "latest")
                .load();

        rows.writeStream()
                .outputMode("append")
                .format("console")
                .start();

        spark.streams().awaitAnyTermination();
        spark.stop();

Ниже приведено сообщение об ошибке, которое я получаю: -

Исключение в потоке "main" org.apache.spark.sql.AnalysisException: Не удалось найти источник данных: kafka. Пожалуйста, разверните приложение согласно разделу «Руководство по интеграции структурированной потоковой передачи + Kafka» .; в org.apache.spark.sql.execution.datasources.DataSource $ .lookupDataSource (DataSource.scala: 652) в org.apache.spark.sql.streaming.DataStreamReader.load (DataStreamReader.scala: 161)

Решение: - Любой из обоих: 1) создать uber jar или ii) --packages org.apache.spark: spark-sql-kafka-0-10_2.11: 2.4.3 Ранее я давал --packages org.apache.spark: spark-sql-kafka-0-10_2.11: 2.4.3 после основного класса.

1 Ответ

2 голосов
/ 21 июня 2019

Это:

<scope>provided</scope>

означает, что вы несете ответственность за предоставление соответствующей банки.Я (и многие другие) предпочитаю избегать использования этой области и вместо этого создать Uberjar для развертывания.

...