Я хочу написать набор потоковых данных в Cassandra.Я использую водяной знак в сценарии и использую аналогичный код, используемый
в: https://github.com/polomarcus/Spark-Structured-Streaming-Examples или https://github.com/fhuertas/cassandra-sink-spark-structured-streaming для сбора данных.
Поток сценария: Kafka -> время события водяного знака -> groupBy -> слив к Cassandra
val counts= words
.withWatermark("eventTime", "60 seconds")
.groupBy(
window($"eventTime","60 seconds", "30 seconds"),
$"word")
.count()
val query= counts
.writeStream
.format("streamsinkprovider.CassandraSinkProvider")
.outputMode("append")
.start()
, и я попробовал два подхода к сливу:
import org.apache.spark.sql.cassandra._
df.select("word","count").as[(String,BigInt)]
.write
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.cassandraFormat("test", CassandraLibrary.CASSANDRA_KEYSPACE)
.save()
и
ds.rdd.saveToCassandra(
CassandraLibrary.CASSANDRA_KEYSPACE,
"test", SomeColumns("word","count"), cassandraWriteConf)
В обоих случаях я получаю исключение ниже:
java.lang.AssertionError: сбой утверждения: нет плана для EventTimeWatermark eventTime # 272: отметка времени, интервал1 минута
Кажется, saveToCassandra пытается создать RDD, и это вызывает искру для создания нового плана.
JDBC-приемник (mysql) работает без ошибок для того же кода.
Я пробовал это с Spark версии 2.2.0 и 2.3.2 и datastax.spark-cassandra-connector 2.0.5 и 2.3.2.Я пытался с DSE Cassandra 5.1 и Apache Cassandra 3.11
Есть идеи, как обойти эту проблему?