Интеграция Spark Kafka и запуск в сети - PullRequest
0 голосов
/ 19 мая 2019

Я хочу интегрировать потоковую передачу искры с kafka и запускать в intellij

Моя kafka работает локально в порту 9092 и реализует код с помощью spark scala для чтения потоковой передачи в реальном времени. Вот мой код, и исключение даетintellij

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Minutes, Seconds, 
    StreamingContext, kafka}
    import _root_.kafka.serializer.StringDecoder
    object TestKafka {
    def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new 
    SparkConf().setMaster("local[*]").setAppName("Wordcount")
    val ssc = new StreamingContext(conf, Seconds(2))
    //    val kafkaParam = Map("metadata.broker.list"-> "localhost:9092")
    val topics = "testing1"
    val zkQuorum = "localhost:9092"
    val group = "g1"
    //    val Array(zkQuorum, group, topics, numThreads) = args
    ssc.checkpoint("checkpoint")
    val topicMap = topics.split(",").map((_, 1)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.awaitTermination()
   }
    }

Использование стандартного профиля Spark log4j: org / apache / spark / log4j- defaults.properties Исключение в потоке "main" java.lang.NoClassDefFoundError: org / apache / spark / Logging

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...