У меня есть следующий код, и я хотел бы записать в cassandra, используя структурированную потоковую передачу spark 2.4 foreachBatch
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
Dataset<Row> values=df.selectExpr(
"split(value,',')[0] as field1",
"split(value,',')[1] as field2",
"split(value,',')[2] as field3",
"split(value,',')[3] as field4",
"split(value,',')[4] as field5");
//TODO write into cassandra
values.writeStream().foreachBatch(
new VoidFunction2<Dataset<String>, Long> {
public void call(Dataset<String> dataset, Long batchId) {
// Transform and write batchDF
}
).start();