Spark Streaming работает в локальном режиме, но «этапы не выполняются» с «не удалось инициализировать класс» в режиме клиент / кластер - PullRequest
0 голосов
/ 09 ноября 2018

У меня есть потоковое приложение Spark + Kafka, которое отлично работает в локальном режиме, однако, когда я пытаюсь запустить его в режиме пряжи + локальный / кластер, я получаю несколько ошибок, как показано ниже

Первая ошибка, которую я всегда вижу, это

WARN TaskSetManager: Lost task 1.1 in stage 3.0 (TID 9, ip-xxx-24-129-36.ec2.internal, executor 2): java.lang.NoClassDefFoundError: Could not initialize class TestStreaming$
        at TestStreaming$$anonfun$main$1$$anonfun$apply$1.apply(TestStreaming.scala:60)
        at TestStreaming$$anonfun$main$1$$anonfun$apply$1.apply(TestStreaming.scala:59)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Следующая ошибка, которую я получаю:

ОШИБКА JobScheduler: Ошибка запуска задания потоковой передачи задания 1541786030000 ms.0

с последующим

java.lang.NoClassDefFoundError: Не удалось инициализировать класс

Spark версия 2.1.0 Scala 2.11 Кафка версия 10

Часть моего кода при запуске загружает конфигурацию в main. Я передаю этот файл конфигурации во время выполнения с -conf ПОСЛЕ jar (см. Ниже). Я не совсем уверен, но должен ли я передать этот конфиг исполнителям?

Я запускаю мое потоковое приложение с помощью команды ниже. Один показывает локальный режим, другой - режим клиента.

runJar = myProgram.jar loggerPath = / путь / к / log4j.properties

MainClass = TestStreaming

Регистратор = -DPHDTKafkaConsumer.app.log4j = $ loggerPath

ConfFile = application.conf

----------- Локальный режим ---------- SPARK_KAFKA_VERSION = 0.10 nohup spark2-submit --driver-java-options "$ logger" --conf "spark.executor.extraJavaOptions = $ logger" --class $ mainClass - master local [4] $ runJar -conf $ confFile &

----------- Режим клиента ---------- SPARK_KAFKA_VERSION = 0.10 nohup spark2-submit --master yarn --conf> "spark.executor.extraJavaOptions = $ logger" --conf> "spark.driver.extraJavaOptions = $ logger" --class $ mainClass $ runJar -conf> $ confFile &

Вот мой код ниже. Сражаюсь с этим уже больше недели.

import Util.UtilFunctions
import UtilFunctions.config
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.log4j.Logger


object TestStreaming extends Serializable {

  @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

  def main(args: Array[String]) {
    logger.info("Starting app")

    UtilFunctions.loadConfig(args)
    UtilFunctions.loadLogger()

    val props: Map[String, String] = setKafkaProperties()

    val topic = Set(config.getString("config.TOPIC_NAME"))

    val conf = new SparkConf()
      .setAppName(config.getString("config.SPARK_APP_NAME"))
      .set("spark.streaming.backpressure.enabled", "true")

    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
    ssc.sparkContext.setLogLevel("INFO")
    ssc.checkpoint(config.getString("config.SPARK_CHECKPOINT_NAME"))

    val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topic, props))
    val distRecordsStream = kafkaStream.map(record => (record.key(), record.value()))
    distRecordsStream.window(Seconds(10), Seconds(10))
    distRecordsStream.foreachRDD(rdd => {
      if(!rdd.isEmpty()) {
        rdd.foreach(record => {
          println(record._2) //value from kafka
        })
      }
    })

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }

  def setKafkaProperties(): Map[String, String] = {

    val deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
    val zookeeper = config.getString("config.ZOOKEEPER")
    val offsetReset = config.getString("config.OFFSET_RESET")
    val brokers = config.getString("config.BROKERS")
    val groupID = config.getString("config.GROUP_ID")
    val autoCommit = config.getString("config.AUTO_COMMIT")
    val maxPollRecords = config.getString("config.MAX_POLL_RECORDS")
    val maxPollIntervalms = config.getString("config.MAX_POLL_INTERVAL_MS")

    val props = Map(
      "bootstrap.servers" -> brokers,
      "zookeeper.connect" -> zookeeper,
      "group.id" -> groupID,
      "key.deserializer" -> deserializer,
      "value.deserializer" -> deserializer,
      "enable.auto.commit" -> autoCommit,
      "auto.offset.reset" -> offsetReset,
      "max.poll.records" -> maxPollRecords,
      "max.poll.interval.ms" -> maxPollIntervalms)
    props
  }

}
...