Я пытаюсь создать структурированный поток из Кафки в Spark, который представляет собой строку json.Теперь нужно проанализировать json в определенном столбце, а затем сохранить кадр данных в таблицу cassandra с оптимальной скоростью.Используя Spark 2.4 и cassandra 2.11 (Apache), а не DSE.
Я попытался создать Direct Stream, который дает DStream класса case, который я сохранял в Cassandra, используя foreachRDD на DStream, но это зависало после каждых 6-7 дней.Таким образом, была попытка выполнить потоковую передачу, которая напрямую выдает фрейм данных и может быть сохранена в Cassandra.
val conf = new SparkConf()
.setMaster("local[3]")
.setAppName("Fleet Live Data")
.set("spark.cassandra.connection.host", "ip")
.set("spark.cassandra.connection.keep_alive_ms", "20000")
.set("spark.cassandra.auth.username", "user")
.set("spark.cassandra.auth.password", "pass")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
.set("spark.executor.memory", "2g")
.set("spark.driver.memory", "2g")
.set("spark.submit.deployMode", "cluster")
.set("spark.executor.instances", "4")
.set("spark.executor.cores", "2")
.set("spark.cores.max", "9")
.set("spark.driver.cores", "9")
.set("spark.speculation", "true")
.set("spark.locality.wait", "2s")
val spark = SparkSession
.builder
.appName("Fleet Live Data")
.config(conf)
.getOrCreate()
println("Spark Session Config Done")
val sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(10))
val sqlContext = new SQLContext(sc)
val topics = Map("livefleet" -> 1)
import spark.implicits._
implicit val formats = DefaultFormats
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "brokerIP:port")
.option("subscribe", "livefleet")
.load()
val collection = df.selectExpr("CAST(value AS STRING)").map(f => parse(f.toString()).extract[liveevent])
val query = collection.writeStream
.option("checkpointLocation", "/tmp/check_point/")
.format("kafka")
.format("org.apache.spark.sql.cassandra")
.option("keyspace", "trackfleet_db")
.option("table", "locationinfotemp1")
.outputMode(OutputMode.Update)
.start()
query.awaitTermination()
Ожидается, что фрейм данных будет сохранен в cassandra.Но получая эту ошибку: -
Исключение в потоке "main" org.apache.spark.sql.AnalysisException: Запросы с потоковыми источниками должны выполняться с помощью writeStream.start ()