Написание структуры Spark Потоковая передача данных в Cassandra не выполняется из-за сбоя в утверждении - PullRequest
0 голосов
/ 04 октября 2018

Я хочу написать набор потоковых данных в Cassandra.Я использую водяной знак в сценарии и использую аналогичный код, используемый
в: https://github.com/polomarcus/Spark-Structured-Streaming-Examples или https://github.com/fhuertas/cassandra-sink-spark-structured-streaming для сбора данных.

Поток сценария: Kafka -> время события водяного знака -> groupBy -> слив к Cassandra

    val counts= words
        .withWatermark("eventTime", "60 seconds")
        .groupBy(
            window($"eventTime","60 seconds", "30 seconds"),
            $"word")
        .count()

    val query= counts
        .writeStream
        .format("streamsinkprovider.CassandraSinkProvider")
        .outputMode("append")
        .start() 

, и я попробовал два подхода к сливу:

import org.apache.spark.sql.cassandra._

df.select("word","count").as[(String,BigInt)]
    .write
    .format("org.apache.spark.sql.cassandra")
    .mode(SaveMode.Append)
    .cassandraFormat("test", CassandraLibrary.CASSANDRA_KEYSPACE)
    .save()  

и

 ds.rdd.saveToCassandra(
        CassandraLibrary.CASSANDRA_KEYSPACE, 
        "test", SomeColumns("word","count"), cassandraWriteConf)

В обоих случаях я получаю исключение ниже:

java.lang.AssertionError: сбой утверждения: нет плана для EventTimeWatermark eventTime # 272: отметка времени, интервал1 минута

Кажется, saveToCassandra пытается создать RDD, и это вызывает искру для создания нового плана.

JDBC-приемник (mysql) работает без ошибок для того же кода.

Я пробовал это с Spark версии 2.2.0 и 2.3.2 и datastax.spark-cassandra-connector 2.0.5 и 2.3.2.Я пытался с DSE Cassandra 5.1 и Apache Cassandra 3.11

Есть идеи, как обойти эту проблему?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...