Этот ответ предназначен для записи данных в Cassandra, а не в DSE (, который поддерживает структурированную потоковую передачу для хранения данных )
Для Spark 2.4.0 и более поздних версий вы можете использовать метод foreachBatch,который позволяет использовать средство записи пакетных данных Cassandra, предоставляемое Spark Cassandra Connector, для записи результатов каждой микропакета потокового запроса в Cassandra:
import org.apache.spark.sql.cassandra._
df.writeStream
.foreachBatch { (batchDF, _) =>
batchDF
.write
.cassandraFormat("tableName", "keyspace")
.mode("append")
.save
}.start
Для версий Spark ниже 2.4.0,вам нужно реализовать приемник foreach.
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.Statement
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
class CassandraSink(sparkConf: SparkConf) extends ForeachWriter[Row] {
def open(partitionId: Long, version: Long): Boolean = true
def process(row: Row) = {
def buildStatement: Statement =
QueryBuilder.insertInto("keyspace", "tableName")
.value("key", row.getAs[String]("value"))
CassandraConnector(sparkConf).withSessionDo { session =>
session.execute(buildStatement)
}
}
def close(errorOrNull: Throwable) = Unit
}
И тогда вы можете использовать приемник foreach следующим образом:
df.writeStream
.foreach(new CassandraSink(spark.sparkContext.getConf))
.start