Я использую структурированную потоковую передачу Spark вместе с Cassandra в качестве приемника. Фрагмент ниже:
override def start(): StreamingQuery = {
sparkContext.getSparkSession()
.readStream
.option("header", "false")
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("failOnDataLoss","false")
.option("subscribe", topicName)
.load()
.writeStream
.option("checkpointLocation",checkpointLocation)
.foreachBatch(forEachFunction.arceusForEachFunction(_,_))
.start()
И я использую нижеследующее, чтобы написать Кассандре внутри foreach:
RDD.saveToCassandra(keyspace, tableName)
Пока я делал это, мне было интересно, как справляться с такими проблемами, как Кассандра, спускаясь и т.д. c. Предположим, что из 3М загружаемых данных было написано 2М до возникновения проблемы. Теперь мне нужно либо отменить 2М, либо обработать только 1М. Я не уверен, что произойдет, если такой сценарий ios.
Об этом как-то заботятся? Или я должен что-то написать, чтобы позаботиться об этом?
Я также посмотрел на spark docs , а для "foreach batch" написано "зависит от реализации "
Любая помощь приветствуется. Спасибо