У меня есть потоковое приложение Spark + Kafka, которое отлично работает в локальном режиме, однако, когда я пытаюсь запустить его в режиме пряжи + локальный / кластер, я получаю несколько ошибок, как показано ниже
Первая ошибка, которую я всегда вижу, это
WARN TaskSetManager: Lost task 1.1 in stage 3.0 (TID 9, ip-xxx-24-129-36.ec2.internal, executor 2): java.lang.NoClassDefFoundError: Could not initialize class TestStreaming$
at TestStreaming$$anonfun$main$1$$anonfun$apply$1.apply(TestStreaming.scala:60)
at TestStreaming$$anonfun$main$1$$anonfun$apply$1.apply(TestStreaming.scala:59)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Следующая ошибка, которую я получаю:
ОШИБКА JobScheduler: Ошибка запуска задания потоковой передачи задания 1541786030000 ms.0
с последующим
java.lang.NoClassDefFoundError: Не удалось инициализировать класс
Spark версия 2.1.0
Scala 2.11
Кафка версия 10
Часть моего кода при запуске загружает конфигурацию в main. Я передаю этот файл конфигурации во время выполнения с -conf ПОСЛЕ jar (см. Ниже). Я не совсем уверен, но должен ли я передать этот конфиг исполнителям?
Я запускаю мое потоковое приложение с помощью команды ниже. Один показывает локальный режим, другой - режим клиента.
runJar = myProgram.jar
loggerPath = / путь / к / log4j.properties
MainClass = TestStreaming
Регистратор = -DPHDTKafkaConsumer.app.log4j = $ loggerPath
ConfFile = application.conf
----------- Локальный режим ----------
SPARK_KAFKA_VERSION = 0.10 nohup spark2-submit --driver-java-options
"$ logger" --conf "spark.executor.extraJavaOptions = $ logger" --class
$ mainClass - master local [4] $ runJar -conf $ confFile &
----------- Режим клиента ----------
SPARK_KAFKA_VERSION = 0.10 nohup spark2-submit --master yarn --conf> "spark.executor.extraJavaOptions = $ logger" --conf> "spark.driver.extraJavaOptions = $ logger" --class $ mainClass $ runJar -conf> $ confFile &
Вот мой код ниже. Сражаюсь с этим уже больше недели.
import Util.UtilFunctions
import UtilFunctions.config
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.log4j.Logger
object TestStreaming extends Serializable {
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
def main(args: Array[String]) {
logger.info("Starting app")
UtilFunctions.loadConfig(args)
UtilFunctions.loadLogger()
val props: Map[String, String] = setKafkaProperties()
val topic = Set(config.getString("config.TOPIC_NAME"))
val conf = new SparkConf()
.setAppName(config.getString("config.SPARK_APP_NAME"))
.set("spark.streaming.backpressure.enabled", "true")
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
ssc.sparkContext.setLogLevel("INFO")
ssc.checkpoint(config.getString("config.SPARK_CHECKPOINT_NAME"))
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topic, props))
val distRecordsStream = kafkaStream.map(record => (record.key(), record.value()))
distRecordsStream.window(Seconds(10), Seconds(10))
distRecordsStream.foreachRDD(rdd => {
if(!rdd.isEmpty()) {
rdd.foreach(record => {
println(record._2) //value from kafka
})
}
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
def setKafkaProperties(): Map[String, String] = {
val deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
val zookeeper = config.getString("config.ZOOKEEPER")
val offsetReset = config.getString("config.OFFSET_RESET")
val brokers = config.getString("config.BROKERS")
val groupID = config.getString("config.GROUP_ID")
val autoCommit = config.getString("config.AUTO_COMMIT")
val maxPollRecords = config.getString("config.MAX_POLL_RECORDS")
val maxPollIntervalms = config.getString("config.MAX_POLL_INTERVAL_MS")
val props = Map(
"bootstrap.servers" -> brokers,
"zookeeper.connect" -> zookeeper,
"group.id" -> groupID,
"key.deserializer" -> deserializer,
"value.deserializer" -> deserializer,
"enable.auto.commit" -> autoCommit,
"auto.offset.reset" -> offsetReset,
"max.poll.records" -> maxPollRecords,
"max.poll.interval.ms" -> maxPollIntervalms)
props
}
}