Получение ошибки: Исключение в потоке "main" java .lang.NoClassDefFoundError: org / apache / spark / SparkConf - PullRequest
0 голосов
/ 01 мая 2020

Я работаю над Kafka Spark Streaming. IDLE не показывает никаких ошибок, и программа также успешно собирается, но я получаю эту ошибку:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/SparkConf
at KafkaSparkStream1$.main(KafkaSparkStream1.scala:13)
at KafkaSparkStream1.main(KafkaSparkStream1.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkConf
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 2 more

Я использую Maven. Я также правильно настроил переменные окружения, так как каждый компонент работает индивидуально. Моя версия spark - 3.0.0-preview2, Scala версия - 2.12. Я экспортировал файл jar spark-streaming-Kafka.

Вот мой файл pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.org.cpg.casestudy</groupId>
<artifactId>Kafka_casestudy</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
    <spark.version>3.0.0</spark.version>
    <scala.version>2.12</scala.version>
</properties>

<build>
    <plugins>
        <!-- Maven Compiler Plugin-->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

<dependencies>
    <!-- Apache Kafka Clients-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>
    <!-- Apache Kafka Streams-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.5.0</version>
    </dependency>
    <!-- Apache Log4J2 binding for SLF4J -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.11.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0-preview2</version>
        <scope>provided</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.0.0-preview2</version>
        <scope>provided</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.0.0-preview2</version>
    </dependency>

</dependencies>

Вот мой код (количество слов сообщения, отправленного производителем):

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark._
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.codehaus.jackson.map.deser.std.StringDeserializer

object KafkaSparkStream {
def main(args: Array[String]): Unit = {

val brokers = "localhost:9092";
val groupid = "GRP1";
val topics = "KafkaTesting";
val SparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming");
val ssc = new StreamingContext(SparkConf,Seconds(10))
val sc = ssc.sparkContext

sc.setLogLevel("off")

val topicSet = topics.split(",").toSet

val kafkaPramas = Map[String , Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
   ConsumerConfig.GROUP_ID_CONFIG -> groupid,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)

val messages = KafkaUtils.createDirectStream[String,String](
  ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topicSet,kafkaPramas)
)

val line=messages.map(_.value)
val words = line.flatMap(_.split(" "))
val wordCount = words.map(x=> (x,1)).reduceByKey(_+_)

wordCount.print()

ssc.start()
ssc.awaitTermination()

}
}

1 Ответ

0 голосов
/ 01 мая 2020

Попробуйте очистить локальный репозиторий mvn или запустите приведенную ниже команду, чтобы переопределить ваши JAR-файлы зависимостей из сети

mvn clean install -U

Ваши искровые зависимости, особенно spark-core_2.12-3.0.0-preview2.jar, не добавляются в путь к классам при выполнении Spark JAR ,

Вы можете сделать это через

spark-submit --jars <path>/spark-core_2.12-3.0.0-preview2.jar
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...