во время выполнения получения кода kafka
1) ОШИБКА StreamExecution: Запрос [id = c6426655-446f-4306-91ba-d78e68e05c15, runId = 420382c1-8558-45a1-b26d-f6299044fa04] завершен сошибка java.lang.ExceptionInInitializerError
2) Исключение в потоке «поток выполнения потока для [id = c6426655-446f-4306-91ba-d78e68e05c15, runId = 420382c1-8558-45a1-b26d-f6299044fa04]" java.lang.ExceptionInInitializerError
3) Исключение в потоке "main" org.apache.spark.sql.streaming.StreamingQueryException: null
sbt зависимость
// https://mvnrepository.com/artifact/org.apache.spark/spark-core libraryDependencies + = "org.apache.spark" %% "spark-core"% "2.2.3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql libraryDependencies + = "org.apache.spark "%%" spark-sql "%" 2.2.3 "
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming libraryDependencies + =" org.apache.spark "%%" spark-streaming "%" 2.2.3 "%" предоставлено "
// https://mvnrepository.com/artifact/org.apache.kafka/kafka libraryDependencies + =" org.apache.kafka "%%" kafka "%" 2.1.1 "
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients библиотекаЗависимости + = "org.apache.kafka"% "kafka-clients"% "2.1.1"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams libraryDependencies + = "org.apache.kafka"% "kafka-потоки "%" 2.1.1 "
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 libraryDependencies + =" org.apache.spark "%%" spark-sql-kafka-0-10 "%" 2.2.3"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-scala libraryDependencies + =" org.apache.kafka "%%" kafka-streams-scala "%" 2.1.1 "
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
object demo1 {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir","c:\\hadoop\\")
val spark: SparkSession = SparkSession.builder
.appName("My Spark Application")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
.config("spark.sql.streaming.checkpointLocation", "file:///C:/checkpoint")
.getOrCreate
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark,sqlshuffle.partations","2")
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "162.244.80.189:9092")
.option("startingOffsets", "earliest")
.option("group.id","test1")
.option("subscribe", "demo11")
.load()
import spark.implicits._
val dsStruc = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp").as[(String, String, Timestamp)]
val abc = df.writeStream
.outputMode("append")
.format("console")
.start().awaitTermination()
df.show()