Я передаю данные из Кафки со структурированным потоком
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("enable.auto.commit", false)
.option("auto.offset.reset", "earliest")
.option("group.id", UUID.randomUUID().toString)
.option("subscribe", "test")
.load()
, а затем попытайтесь присоединиться к нему за столом Кассандры
val d = df.select(from_json(col("value").cast("string"), schema).cast("string").alias("url"))
.rdd.joinWithCassandraTable[(String, String, String)]("analytics", "nlp2", SomeColumns("url", "ner", "sentiment"), SomeColumns("url"))
.toDS()
.writeStream
.format("console") // <-- use ConsoleSink
.option("truncate", false)
.option("numRows", 10)
.trigger(Trigger.ProcessingTime(5 seconds))
.queryName("rate-console")
.start
.awaitTermination()
но я получаю, когда я пытаюсь преобразовать фрейм данных в rdd, есть идеи, почему?
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)