Я хочу интегрировать потоковую передачу искры с kafka и запускать в intellij
Моя kafka работает локально в порту 9092 и реализует код с помощью spark scala для чтения потоковой передачи в реальном времени. Вот мой код, и исключение даетintellij
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds,
StreamingContext, kafka}
import _root_.kafka.serializer.StringDecoder
object TestKafka {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new
SparkConf().setMaster("local[*]").setAppName("Wordcount")
val ssc = new StreamingContext(conf, Seconds(2))
// val kafkaParam = Map("metadata.broker.list"-> "localhost:9092")
val topics = "testing1"
val zkQuorum = "localhost:9092"
val group = "g1"
// val Array(zkQuorum, group, topics, numThreads) = args
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, 1)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
ssc.awaitTermination()
}
}
Использование стандартного профиля Spark log4j: org / apache / spark / log4j- defaults.properties Исключение в потоке "main" java.lang.NoClassDefFoundError: org / apache / spark / Logging