Я получаю следующее сообщение об ошибке:
Exception in thread "main" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:40)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:40)
at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:40)
at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:157)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:65)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:126)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:149)
at org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.spark.kafka.JavaDirectKafkaWordCount.main(JavaDirectKafkaWordCount.java:50)
18/05/29 18:05:43 INFO SparkContext: Invoking stop() from shutdown hook
18/05/29 18:05:43 INFO SparkUI: Stopped Spark web UI at
Фрагмент кода:
Я пишу простой код потоковой передачи kafka - spark в eclipse, чтобы получать сообщения от брокера kafka с использованием потоковой передачи искры.Ниже приведен код
Я пытаюсь выполнить соответствующий код, он работает нормально до JavaInputDStream, после этого я получаю ошибку.Иви импорт в порядке.Может ли кто-нибудь помочь в этом
package com.spark.kafka;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import scala.Tuple2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.Durations;
public final class JavaDirectKafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
String brokers = "localhost:9092";
String topics = "sparktestone";
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", brokers);
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
В Pom я добавил соответствующие зависимости.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>2.0.0</version>
</dependency>