ОБНОВЛЕНИЕ: Исправлена ошибка.Была проблема с версиями.Нужно поменять spark-core и другие до версии 2.2.0.
Я новичок в Apache Spark.Я пытаюсь отправить логи через Kafka и обработать их с помощью Spark Streaming, прежде чем передать их в ElasticSearch.Однако я получаю следующую ошибку.
Исключение в потоке "main" java.lang.NoClassDefFoundError: org / apache / spark / sql / execute / streaming / Source $ class в org.apache.spark.sql.kafka010.0.scala: 241) в org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 3.apply (TreeNode.scala: 279) в org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ 3.apply (TreeNode.scala: 279) в org.apache.spark.sql.catalyst.trees.CurrentOrigin $ .withOrigin (TreeNode.scala: 69) в org.apache.spark.sql.catalyst.trees.TreeNode.transformDown (TreeNode.scala: 278) в org.apache.spark.sql.catalyst.trees.TreeNode.transform (TreeNode.scala: 268) в org.apache.spark.sql.streaming.StreamingQueryManager.startQuery (StreamingQueryManager.scala: 241) в org.apache.spark.sql.streaming.DataStreamWriter.start (DataStreamWriter.scala: 287) в com.fork.SreamingApp.App.main (App.java:78). Причина: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.streaming.Source $класс в java.base / jdk.internal.loader.BuiltinClassLoader.loadClass (BuiltinClassLoader.java:582) в java.base / jdk.internal.loader.ClassLoaders $ AppClassLoader.loadClass (ClassLoaders.java:190) в java.base /java.lang.ClassLoader.loadClass (ClassLoader.java:499) ... еще 13
Вот мои файлы кода
App.java
public static void main(String[] args) throws NotSerializableException, InterruptedException {
App streamingApp = new App();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL data sources example")
.master("local")
.getOrCreate();
Dataset<Row> ds1 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:32770")
.option("subscribe", "context3")
.load();
StreamingQuery query = ds1.writeStream()
.format("console")
.start();
query.awaitTermination();
System.out.println("DONE");
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fork</groupId>
<artifactId>SreamingApp</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>SreamingApp</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version> </dependency> -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.9.5</version>
</dependency>
</dependencies>
Как это исправить?Пожалуйста, помогите мне.